netty十八罗汉之——挖耳罗汉(Decoder)

devtools/2025/2/25 9:32:59/

佛教中除不听各种淫邪声音之外,更不可听别人的秘密。因他论耳根最到家,故取挖耳之形,以示耳根清净。

来看看netty的核心组件解码器Decoder

  •  Decoder的作用
  • 半包,粘包问题
  • 从模板和装饰器模式看Decoder解码原理

1.Decoder作用

 最根本的就是将网络中的二进制数据,解析成为目标对象(Object)。

抽象出来本质就是上图,其它任何动作都是在完成这个目的。

对吧,学习一定要抓住本质。

先来看看网络数据再网络和操作系统底层是怎么传输的

a.网络数据是以二进制的数据包进行传输的,在netty中是以bytefuf数据包进行传递。这个概念之     后  章节会讲到。

b.会涉及tcp/ipc传输协议。

 先来看看TCP/IP协议的数据封装和分用过程,大致如下图所示:

 

可以看到没往下传输一层,会加上一个每层的首部。这个可以理解为,唐僧西天取经的过程中,通关文牒都要盖一个章一样。不加这个人家不认你。用的时候在解析出来报文体。

这里就不得不提一个概念MSS(TCP[数据包]每次能够传输的最大数据分段)

后面在传输过程中,超过这个值,数据包会进行拆分;小于这个值,数据包会进行合并。

c.dma复制

简单讲 就是绕过cpu,直接操作内存在设备间传输数据。

具体流程如下:

dam传输数据时,cpu不参与,dma处理完之后通知cpu进行后续处理。

理解这几个点之后,我们再来看一看正真的数据传播流程:

1.首先发送端,在用户缓存区里的bytebuf数据包会进行一次cpu复制;

2.cpu复制之后数据会缓存在发送缓冲区的内核空间里,这时候数据是完整的;

3.内核缓冲区进行一次DMA复制,数据被写入网卡设备中,此时网卡里面的数据包会进行重组;

4.写入网卡的数据通过TCP/IP协议进行传输,组装成新的二进制数据包,有最大数据限制;

5.接收端通过TCP/IP协议接收到二进制数据包,此时数据是完整的;

6.接收端在内核缓冲区进行一次DMA复制,形成新的bytebuf数据包;

7.接收端进行一次cpu复制,bytebuf数据包被写入用户缓冲区。

具体流程如下图:

这个过程会涉及两次分割二进制包:

a.传输过程中的分割;

b.系统复制过程中的分割;

所以粘包,和半包问题的出现就发生在这两个过程中,数据包少了,多个合在一起就会造成粘包。数据包多了,就会进行分割,分割就会打破原来的数据结构,出现半包问题。

来看看netty Decoder是怎么解决这个问题的

1.通过ByteToMessageDecoder 将二进制数据转换成对象

2.通过MessageToMessageDecoder将对象数据转换成对象

先来看看ByteToMessageDecoder 它是一个基类,主要使用模板方法接受管理bytebuf,子类通过实现钩子方法,处理业务逻辑,处理完业务逻辑将写入List<Object>中,之后在流水线上发送到下一站。流水线概念会在之后一章单独讲解。

java">public class Byte2IntegerDecoder extends ByteToMessageDecoder {//钩子实现@Overridepublic void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {while (in.readableBytes() >= 4) {Integer anInt = in.readInt();Logger.info("解码出一个整数: " + anInt);out.add(anInt);}}
}

来看看它的具体使用,继承ByteToMessageDecoder,钩子方法里实现业务逻辑。

回顾一下什么是模板方法:

核心点:抽象类会实现一系列公共方法共子类使用。

1.抽象类会定义一系列的模板方法,子类自动继承。

2.子类通过钩子方法处理具体业务逻辑。

好,我们再看一个它的核心子类ReplayingDecoder(回放解码器)。

什么是回放呢?

我们读取数据的时候是在操作一个二进制数组,数组元素完整的话,指针移动没有问题。但是数据元素不完整,再次读取的时候记数指针会回到开头重新执行。

netty通过checkpoint指针来完成指针回放

再来看看ReplayingDecoder怎么结合模板模式和装饰器模式完成整个解码操作的:

使用模板模式主要是它继承ByteToMessageDecoder父类解码器,实现decode方法。重点看看包装器的使用:

回放解码器在解码的时候会对传进来的bytebuf进行一次包装:

就是这个 replayable.setCumulation(in);

netty 实现了一个 ReplayingDecoderByteBuf,它继承了ByteBuf,所以可以对对传进来的bytebuf进行操作,这个过程就是包装器在起作用。

java">protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {replayable.setCumulation(in);try {while (in.isReadable()) {int oldReaderIndex = checkpoint = in.readerIndex();int outSize = out.size();if (outSize > 0) {fireChannelRead(ctx, out, outSize);out.clear();

简化一下看个简单的包装器的例子

java">public class WrapperModelDemo {// Pojo -- 被包装的类型static class Pojo implements Sth {public void a() {System.out.println("a");}public void b() {System.out.println("b");}public void c() {System.out.println("c");}public void X() {System.out.println("ALL - X");}}// Wrapper模式static class PojoWrapper implements Sth {private Sth inner;protected PojoWrapper() {}public void setInner(Sth inner) {this.inner = inner;}@Overridepublic void a() {System.out.println("PojoWrapper - a");inner.a();}@Overridepublic void b() {System.out.println("PojoWrapper - a");inner.b();}@Overridepublic void c() {throw new UnsupportedOperationException("... c  ");}@Overridepublic void X() {throw new UnsupportedOperationException("... X");}}@Testpublic void testWrapper() throws Exception {Sth pojo = new Pojo();pojo.a();pojo.b();pojo.c();PojoWrapper pojoWrapper = new PojoWrapper();pojoWrapper.setInner(pojo);// 可以尝试注释掉上面的某一行代码, 查看输出结果pojoWrapper.a();pojoWrapper.b();pojoWrapper.c();}}

运行结果:

核心是包装类在实现sth接口时,又把它作为了一个属性设置进来了。

这样结合着看,对回放解码器的解码过程理解起来就会更加清楚。

现在来看看 MessageToMessageDecoder解码器,它的传入对象不再是一个bytebuf,而是一个具体的对象

java">public class Integer2StringDecoder extends MessageToMessageDecoder<Integer> {@Overridepublic void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {String target = String.valueOf(msg);out.add(target);}
}

它有几个重要的子类:

LineBasedFrameDecoder:基于行的解码器,通过换行符(\n 或者 \r\n)来作为帧的分隔符。当接收到数据时,它会在数据中查找换行符,一旦找到,就将从上次解码位置到换行符之间的数据作为一个完整的帧进行解码并返回。如果在接收到的数据中没有找到换行符,那么数据会被缓存起来,直到换行符出现或者缓冲区满了才进行处理。
DelimiterBasedFrameDecoder:基于分隔符的解码器,它允许用户自定义帧的分隔符。在接收到数据后,它会根据用户提供的分隔符(一个或多个字节数组)在数据中进行查找。一旦找到分隔符,就将从上次解码位置到分隔符之前的数据作为一个完整的帧返回。同样,如果没有找到分隔符,数据会被缓存,直到分隔符出现或缓冲区达到一定条件。
LengthFieldBasedFrameDecoder:通过消息中定义的长度字段来确定帧的长度。解码器会首先读取指定长度的字节,这些字节表示后续帧数据的长度。然后,根据这个长度值,从数据流中读取相应长度的数据作为一个完整的帧。这种方式可以精确地定位每个帧的边界,即使数据存在粘包和拆包的情况。

 下面是几个具体实例,可以跑跑看看效果

java">public class NettyOpenBoxDecoder {public static final int MAGICCODE = 9999;public static final int VERSION = 100;static final String SPLITER = "\r\n";static final String SPLITER_3 = "\n";static final String SPLITER_2 = "\t";static final String CONTENT = "netty:18罗汉系列!";/*** LineBasedFrameDecoder 使用实例*/@Testpublic void testLineBasedFrameDecoder() {try {ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 0; j < 100; j++) {//1-3之间的随机数int random = RandomUtil.randInMod(3);ByteBuf buf = Unpooled.buffer();for (int k = 0; k < random; k++) {buf.writeBytes(CONTENT.getBytes("UTF-8"));}buf.writeBytes(SPLITER.getBytes("UTF-8"));channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LineBasedFrameDecoder 使用实例*/@Testpublic void testDelimiterBasedFrameDecoder() {try {final ByteBuf delimiter = Unpooled.copiedBuffer(SPLITER_2.getBytes("UTF-8"));ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, true, delimiter));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 0; j < 100; j++) {//1-3之间的随机数int random = RandomUtil.randInMod(3);ByteBuf buf = Unpooled.buffer();for (int k = 0; k < random; k++) {buf.writeBytes(CONTENT.getBytes("UTF-8"));}buf.writeBytes(SPLITER_2.getBytes("UTF-8"));channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LineBasedFrameDecoder 使用实例*/@Testpublic void testLengthFieldBasedFrameDecoder() {try {// 1、单字节(无符号):0到255;(有符号):-128到127。
//
//2、双字节(无符号):0到65535;(有符号):-32768 到 32765。
//
//3、四字节(无符号):0到42 9496 7295;(有符号):-21 4748 3648到21 4748 3647。//定义一个 基于长度域解码器final LengthFieldBasedFrameDecoder decoder =new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4);ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(decoder);ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 0; j < 100; j++) {//1-3之间的随机数int random = RandomUtil.randInMod(3);ByteBuf buf = Unpooled.buffer();byte[] bytes = CONTENT.getBytes("UTF-8");//首先 写入头部  head,也就是后面的数据长度buf.writeInt(bytes.length * random);//然后 写入contentfor (int k = 0; k < random; k++) {buf.writeBytes(bytes);}channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LengthFieldBasedFrameDecoder 使用实例*/@Testpublic void testLengthFieldBasedFrameDecoder1() {try {final LengthFieldBasedFrameDecoder spliter =new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4);ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(spliter);ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 1; j <= 100; j++) {ByteBuf buf = Unpooled.buffer();String s = j + "次发送->" + CONTENT;byte[] bytes = s.getBytes("UTF-8");buf.writeInt(bytes.length);System.out.println("bytes length = " + bytes.length);buf.writeBytes(bytes);channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LengthFieldBasedFrameDecoder 使用实例*/@Testpublic void testLengthFieldBasedFrameDecoder2() {try {final LengthFieldBasedFrameDecoder spliter =new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6);ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(spliter);ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 1; j <= 100; j++) {// 分配一个bytebufByteBuf buf = Unpooled.buffer();String s = j + "次发送->" + CONTENT;byte[] bytes = s.getBytes("UTF-8");//首先 写入头部  head,包括 后面的数据长度buf.writeInt(bytes.length);buf.writeChar(VERSION);//然后 写入  contentbuf.writeBytes(bytes);channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}/*** LengthFieldBasedFrameDecoder 使用实例 3*/@Testpublic void testLengthFieldBasedFrameDecoder3() {try {final LengthFieldBasedFrameDecoder spliter =new LengthFieldBasedFrameDecoder(1024, 2, 4, 4, 10);ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {protected void initChannel(EmbeddedChannel ch) {ch.pipeline().addLast(spliter);ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));ch.pipeline().addLast(new StringProcessHandler());}};EmbeddedChannel channel = new EmbeddedChannel(i);for (int j = 1; j <= 100; j++) {ByteBuf buf = Unpooled.buffer();String s = j + "次发送->" + CONTENT;byte[] bytes = s.getBytes("UTF-8");buf.writeChar(VERSION);buf.writeInt(bytes.length);buf.writeInt(MAGICCODE);buf.writeBytes(bytes);channel.writeInbound(buf);}Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}

最后总结一下:

本文主要是对netty解码器的理解,核心点在 数据在网络和操作系统间的传播和netty结合模板模式装饰器模式解决netty解码过程中的数据问题。

预告一下,下一篇是18罗汉的第二章,netty编码器。


http://www.ppmy.cn/devtools/161549.html

相关文章

PHP-create_function

[题目信息]&#xff1a; 题目名称题目难度PHP-create_function2 [题目考点]&#xff1a; create_function ( string args , string args , string code )[Flag格式]: SangFor{wWx5dEGHHhDUwmST4bpXwfjSzq43I6cz}[环境部署]&#xff1a; docker-compose.yml文件或者docker …

IPython 使用技巧整理

IPython 是一个增强版的 Python 交互式解释器&#xff0c;它提供了许多有用的功能&#xff0c;比如自动补全、代码历史、多行编辑、魔术命令等。 1. 自动补全功能 IPython 的自动补全功能可以大大提高编码效率。当你在编写代码时&#xff0c;只需按下 Tab 键&#xff0c;IPyt…

每天设计者模式-2:如何夯实基础

设计模式是软件开发中的重要知识点&#xff0c;无论是在面试中还是实际开发中&#xff0c;熟练掌握设计模式都能让你更高效地解决问题&#xff0c;并编写更加优雅的代码。那么&#xff0c;如何真正夯实设计模式的基础&#xff1f;本篇文章将从理论、实践、思维训练三个方面&…

Pi币与XBIT:在去中心化交易所的崛起中重塑加密市场

在加密货币市场迅猛发展的背景下&#xff0c;Pi币和XBIT正在成为投资者关注的焦点。Pi币作为一项创新的数字货币&#xff0c;通过独特的挖矿机制和广泛的用户基础&#xff0c;迅速聚集了大量追随者&#xff0c;展示了强大的市场潜力。同时&#xff0c;币应XBIT去中心化交易所的…

深入理解Zookeeper:分布式系统的协调者

引言 在现代分布式系统中&#xff0c;协调和管理多个节点之间的状态和行为是一个复杂且关键的任务。Zookeeper作为一个分布式协调服务&#xff0c;为开发者提供了一种高效、可靠的方式来处理分布式系统中的一致性问题。本文将介绍Zookeeper的基本概念、使用场景以及如何通过示…

【Python爬虫(67)】Python爬虫实战:探秘旅游网站数据宝藏

【Python爬虫】专栏简介&#xff1a;本专栏是 Python 爬虫领域的集大成之作&#xff0c;共 100 章节。从 Python 基础语法、爬虫入门知识讲起&#xff0c;深入探讨反爬虫、多线程、分布式等进阶技术。以大量实例为支撑&#xff0c;覆盖网页、图片、音频等各类数据爬取&#xff…

什么是拆分金额

在高速公路收费管理中&#xff0c;“拆分金额”是指将车辆通行费按照一定规则分配到不同路段或管理单位的过程。具体来说&#xff0c;当车辆在高速公路上行驶并跨越多个路段时&#xff0c;这些路段可能属于不同的经营管理单位&#xff0c;因此需要将通行费拆分成多个部分&#…

IP地址查询网站(此类网站失效快,动态更新)

以前记录的又失效了&#xff0c;感觉这个问题好像没有官方解决方案&#xff0c;都是一些企业和个人基于不知道什么目的做的。 my-ip.cc&#xff08;验证时间2025.2.24&#xff09; 查IPv4地址&#xff1a; https://my-ip.cc/zh-hans/ 输出为网页。 ipw.cn&#xff08;验证时间…