netty组件详解-中

news/2024/12/21 22:07:15/

接着之前的博客netty组件详解-上,我们继续深入到源码层面,来探究netty的各个组件和其设计思想:

  1. netty内置的通讯模式
    我们在编写netty代码时,经常使用NioServerSocketChannel 作为通讯模式。
    例如下面的简单netty客户端示例:
 private void start() throws InterruptedException {// 客户端采用java NIO 的通讯模型EventLoopGroup group = new NioEventLoopGroup();try{Bootstrap client = new Bootstrap();client.group(group).channel(NioSocketChannel.class) // 客户端采用java NIO 的通讯模型.remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new EchoClientHandler());}});ChannelFuture sync = client.connect().sync();sync.channel().closeFuture().sync();}finally {group.shutdownGracefully().sync();}}

但是除此之外,netty也内置了其他方式的通讯模型:
(1) Epoll 模型,此方法底层原理是由JNI调用linux的epoll()实现的,因此此方法只能在Linux上调试,使用方法就是替换下面两个类:

private void start() throws InterruptedException {EventLoopGroup group = new EpollEventLoopGroup(); // 客户端采用Epoll模型try{// 客户端启动类必备Bootstrap client = new Bootstrap();client.group(group).channel(EpollSocketChannel.class) // 客户端采用Epoll模型.remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new EchoClientHandler());}});ChannelFuture sync = client.connect().sync();sync.channel().closeFuture().sync();}finally {group.shutdownGracefully().sync();}}

但是改为Epoll模式后,我们在windows上是无法调试的,会报错:
在这里插入图片描述
注:Epoll模式和NIO模式,都是基于Reactor模型来实现的,不同点在与,Epoll整合很多linux系统独有的特性,如零拷贝,SO_REUSEPORT这个些特性,而NIO是JAVA在JVM层面上做的优化,Epoll性能比NIO更好,关于两者的对比,我单独出一篇博客详细分析两者的区别。
(2)OIO io.netty.channel.socket.oio 使用 java.net 包作为基础——使用阻塞流即BIO模式,但这个组件目前基本不会使用了,我们看netty关于OIO的方法都已经标注为过时:
在这里插入图片描述
(3)Local io.netty.channel.local 可以在 VM 内部通过管道进行通信的本地传输,这个通讯模型也很少使用了,因为已经在一个JVM内部的话,可以用直接内存的方式来实现通信,完全没必要进行socket调用。
(4)Embedded io.netty.channel.embedded Embedded 传输,允许使用 ChannelHandler 而又不需要一个真正的基于网络的传输。多用于测试 ChannelHandler 。
下面是一个测试用例:

  • 我们定义一个编码的handler,EmbeddTestHandler

public class EmbeddTestHandler extends MessageToMessageEncoder<ByteBuf> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {// 取字节数组中首位进行编码byte[] array = byteBuf.array();String s = Arrays.toString(array);String s1 = new String(array, StandardCharsets.UTF_8);System.out.println(s);System.out.println("=================");System.out.println(s1);list.add(array[0]);}
}
  • 再定义一个基于Embedded的测试类
public class EmbeddTestHandlerTest {@Testpublic void testEmbedded(){ByteBuf byteBuf = Unpooled.buffer();String msg = "北京欢迎您";byteBuf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));//(2) 创建一个EmbeddedChannel,并安装一个测试的EmbeddTestHandlerEmbeddedChannel channel = new EmbeddedChannel(new EmbeddTestHandler());//(3) 写入 ByteBuf,并断言调用 readOutbound()方法将会产生数据assertTrue(channel.writeOutbound(byteBuf));//(4) 将该 Channel 标记为已完成状态assertTrue(channel.finish());// read bytes//(5) 读取所产生的消息,并断言它包含了编码的值Byte code = channel.readOutbound();Byte checkCode = msg.getBytes(StandardCharsets.UTF_8)[0];assertEquals(code,checkCode);assertNull(channel.readOutbound());}
}

无需进行真实网络传输的一系列定义,即可进行handler的测试,下面是测试结果:
在这里插入图片描述
2. BootStrap引导类
netty中客户端和服务端各有一个BootStrap,其中客户端为Bootstrap,服务端为ServerBootstrap。
其中服务端的ServerBootstrap在监听端口和处理socketChannel可以使用两组线程模型:
这里定义了boss,work两组线程模型,其底层原理就是Reactor模型的主从模式,关于Reactor模型我将在零拷贝及NIO机制博文中深入探究

public void start() throws InterruptedException {final MessageCountHandler messageCountHandler = new MessageCountHandler();/*使用两个线程组*/EventLoopGroup boss  = new NioEventLoopGroup();EventLoopGroup work  = new NioEventLoopGroup();try {/*服务端启动必备*/ServerBootstrap b = new ServerBootstrap();b.group(boss,work) // 采用Reactor主从线程模型.channel(NioServerSocketChannel.class)/*指定使用NIO的通信模式*///.option(ChannelOption.SO_BACKLOG).localAddress(new InetSocketAddress(port))/*指定监听端口*/// .childOption(ChannelOption.SO_RCVBUF)//.childOption()//.handler();.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(messageCountHandler); // 添加一个共享的hander到pipeline中ch.pipeline().addLast(new EchoServerMCHandler());}});ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞到完成*/LOG.info("服务器启动完成");f.channel().closeFuture().sync();/*阻塞当前线程,直到服务器的ServerChannel被关闭*/} finally {boss.shutdownGracefully().sync();work.shutdownGracefully().sync();}
  1. ChannelInitializer
    在上面的BootStrap引导类示例中,我们来看这个逻辑:
  .childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(messageCountHandler); // 添加一个共享的hander到pipeline中ch.pipeline().addLast(new EchoServerMCHandler());}});

ChannelInitializer类主要用于向socketChannel的pipeline中添加Handler,我们来看下源码:
在这里插入图片描述
可以发现,ChannelInitializer本身也是一个handler,并且提供了一个对外的方法,来初始化pipeline:
在这里插入图片描述
这里涉及到了netty设计的一个细节,既然是handler那么一定自己的生命周期,我们来看下源码:
在这里插入图片描述
这个Handler在将其他handler添加到pipeLine中之后,会将自己从pipeline中移除,这个是netty编程的常见的一个细节
我们可以参照这个编程细节,用在自己的业务上,比如:
在我们自己的应用程序中,如果存在着某个 handler 只使用一次的情况,也可以仿造 ChannelInitializer,用完以后将自己从ChannelPipeline 中移除自己,比如授权 handler,某客户端第一次连接登录以后,进行授权检查,检查通过后就可以把这个授权 handler 移除了。如果客户端关闭连接下线,下次再连接的时候,就是一个新的连接,授权 handler 依然会被安装到 ChannelPipeline ,依然会进行授权检查。
4. ChannelOption
ChannelOption属性主要对应套接字中的参数:
首先看用法示例:

 private void doStart() throws InterruptedException {System.out.println("netty服务已启动");// 线程组EventLoopGroup group = new NioEventLoopGroup();try {// 创建服务器端引导类ServerBootstrap server = new ServerBootstrap();// 初始化服务器配置server.group(group) // 配置处理客户端的连接线程组.channel(NioServerSocketChannel.class) // 指定channel为 NioServerSocketChannel// 为socketChannel配置TCP参数.option(ChannelOption.SO_LINGER,100).option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT).option(ChannelOption.SO_BACKLOG,100).option(ChannelOption.SO_REUSEADDR,true).option(ChannelOption.SO_KEEPALIVE,true).localAddress(port) // 配置服务端口号// 为每个handler配置TCP参数.childOption(ChannelOption.SO_SNDBUF,1024).childOption(ChannelOption.SO_RCVBUF,1024).childHandler(new ChannelInitializer<SocketChannel>() { // 指定客户端通信的处理类,添加到pipline中,进行初始化@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new EchoServerHandler());}});// 绑定端口,sync()会阻塞到完成ChannelFuture sync = server.bind().sync();// 阻塞当前线程,直到服务器的ServerChannel被关闭sync.channel().closeFuture().sync();}finally {// 关闭资源group.shutdownGracefully().sync();}}

其中介绍比较重要的几个参数:
(1)ChannelOption.SO_REUSEADDR:
ChanneOption.SO_REUSEADDR 对应于套接字选项中的 SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,
比如,多网卡(IP)绑定相同端口,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置 SO_REUSEADDR 就无法正常使用该端口。
但是注意,这个参数无法做到让应用绑定完全相同 IP + Port 来重复启动。
(2)ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE 参数对应于套接字选项中的 SO_KEEPALIVE,该参数用于设置 TCP 连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP 会自动发送一个活动探测数据报文。
(3)ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF 参数对应于套接字选项中的 SO_SNDBUF,ChannelOption.SO_RCVBUF 参数对应于套接字选项中的 SO_RCVBUF 这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
(4)ChannelOption.SO_LINGER
ChannelOption.SO_LINGER 参数对应于套接字选项中的 SO_LINGER,Linux 内核默认的处理方式是当用户调用 close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用 SO_LINGER 可以阻塞 close()的调用时间,直到数据完全发送
(5)ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY 参数对应于套接字选项中的 TCP_NODELAY,该参数的使用与 Nagle 算法有关,Nagle 算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用 Nagle 算法,使用于小数据即时传输,于TCP_NODELAY 相对应的是 TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

  1. TCP的粘包和半包:
    网路传输过程中,客户端给服务端发送了报文a,b,c,但服务端收到报文时,是a+b的上半部分,b的下半部分+c两个包,并没有按照完整报文a,b,c来接收,这个问题称为半包,报文不完整。再比如a,b的报文比较小,则服务器收到的包为a+b,c两个包,其中a,b合在了一个包里,这个称之为粘包。
    发生的原因,在于TCP对于网络数据传输的处理优化,如果发送的网络数据包太小,那么他本身会启用 Nagle 算法,对较小的数据包进行合并然后再发送,这样就发生了粘包/半包的问题。即报文在传输过程中发生了拆包或合并。
    怎么防止这个现象出现呢,解决方案就是在报文中添加标识符或分隔符,让服务器端知道,一个完整的包的状态。
    (1)文本报文通用换行分隔符(适用于一行形式的报文)
  • 客户端处理:每个报文加回车换行符
 @Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf msg = null;String request = "apple,pear,orange"+ System.getProperty("line.separator");// 为每个报文末尾添加回车换行符for(int i=0;i<10;i++){msg = Unpooled.buffer(request.length());msg.writeBytes(request.getBytes());ctx.writeAndFlush(msg);}}

服务端处理,加回车换行符处理
LineBasedFrameDecoder是netty已经为我们实现好的,处理回车换行符的handler

 private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {// 添加回车换行符处理的handler,校验报文的完整性 ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new LineBaseServerHandler());}}

(2)文本报文自定义分隔符(适用于一段形式的报文)

  • 服务端处理,在服务端约定一个自定义的分隔符:
    DelimiterBasedFrameDecoder 是netty已经为我们实现好的,处理自定义分隔符的handler
 // 在服务端约定一个自定义的分隔符public static final String My_SYMBOL = "#";private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer(My_SYMBOL .getBytes());// 服务端添加一个自定义的分隔符处理handlerch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new DelimiterServerHandler());}}
  • 客户端处理,使用和服务端约定的分隔符:
public static final String My_SYMBOL = "#";private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer(My_SYMBOL.getBytes());ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new DelimiterClientHandler());}}

(3)二进制定长标识(适用于二进制报文)约定好每条报文的长度:

  • 客户端处理,每次发送给服务端报文前,将报文固定长度进行编码,一并发送过去:
public final static String REQUEST = "apple.orange,pear";@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf msg = null;for(int i=0;i<10;i++){// 申请固定长度的buffermsg = Unpooled.buffer(REQUEST.length());msg.writeBytes(REQUEST.getBytes());ctx.writeAndFlush(msg);}}
  • 服务端处理,添加二进制长度域解码器来识别完整报文:
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {// 添加二进制长度域解码器来识别完整报文ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoClient.REQUEST.length()));ch.pipeline().addLast(new FixedLengthServerHandler());}}

(4)二进制基于长度域解码
channelRead 和 channelReadComplate的区别
channelRead 方法用于处理每次从通道中读取到的数据。
channelReadComplete 方法用于通知数据读取操作完成后进行后续处理。
在 Netty 中,channelRead 和 channelReadComplete 是 ChannelInboundHandler 接口中的两个重要方法,用于处理入站数据(从远程对等方传入的数据)。
channelRead 方法是在每次从通道中读取到数据时被调用的。当有数据从远程对等方传入时,Netty 会自动将数据包装成一个 ByteBuf 对象,并传递给相应的 ChannelInboundHandler 的 channelRead 方法进行处理。在这个方法中,你可以对接收到的数据进行解码、处理、转换等操作。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 在这里处理接收到的数据(msg),通常是 ByteBuf 对象// 例如,解码、处理数据等
}

channelReadComplete 方法是在一个通道的数据读取操作完成时被调用的。在 channelRead 方法中处理完数据后,Netty 会自动调用 channelReadComplete 方法,以通知处理器数据已经读取完成,可以进行后续的操作,例如回送响应或释放资源等。

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {// 本次数据读取完成后的后续处理// 例如,回送响应或释放资源等
}

http://www.ppmy.cn/news/973787.html

相关文章

HTML与XHTML的不同和各自特点

HTML和XHTML都是用于创建Web页面的标记语言。HTML是一种被广泛使用的标记语言&#xff0c;而XHTML是HTML的严格规范化版本。在本文中&#xff0c;我们将探讨HTML与XHTML之间的不同之处&#xff0c;以及它们各自的特点。 HTML与XHTML的不同之处 HTML和XHTML之间最大的不同在于它…

多模态版ChatGPT,拿下视觉语言新SOTA, 代码已开源

点击上方“AI遇见机器学习”&#xff0c;选择“星标”公众号 重磅干货&#xff0c;第一时间送 文&#xff5c;羿阁 发自 凹非寺源&#xff5c;量子位 2022年流行“文生图”模型&#xff0c;那2023年流行什么&#xff1f; 机器学习工程师Daniel Bourke的答案是&#xff1a;反过来…

从地图获取5A风景区位置

以下是利用Python从高德地图获取5A风景区位置并将坐标转换为WGS84的步骤&#xff1a; 1. 安装requests库和pyproj库。您可以使用pip命令在终端或命令提示符中安装这些库。 2. 导入requests和pyproj库&#xff1a; import requests import pyproj 3. 设置高德地图API密钥和…

插入,选择,堆,快速排序算法思想与复杂度

目录 插入排序 思想 算法步骤 代码 复杂度 选择排序 思想 算法步骤 代码 复杂度 堆排序 思想 算法步骤 代码 复杂度 快速排序 思想 算法步骤 代码 复杂度 稳定性 插入排序 思想 插入排序是一种简单直观的排序算法。它的工作原理是将数组分为已排序和未排序…

使用 Flask 快速构建 基于langchain 和 chatGPT的 PDF摘要总结

简介 这里不对 langchain 和 chatGPT 进行介绍&#xff0c;仅对实现过程进行整理 环境 Python >3.8 Flask2.2.3 Jinja23.1.2 langchain0.0.143 openai0.27.4 实现 总结功能 使用 langchain 和 openai 接口实现总结功能 实现逻辑&#xff1a;通过text_splitter 将pdf 分…

pytorch如何混合进度训练transformer【各种不同方式】

Trainer&#xff0c; no trainer&#xff0c; accelerator 用huggingface 的Trainer Hugging Face 的 Transformers 库为我们提供了大量预训练的 Transformer 模型&#xff0c;以及一个易于使用的训练和微调工具——Trainer。在 Trainer 中&#xff0c;我们可以很容易地启用混…

PAT乙题1007

答案 #include <iostream> #include<cstdio> #include<string> #include<vector> using namespace std; int ans; vector<int> ve; bool check(int x) {for (int i 2; i < x / i; i){if (x % i 0) return false;}return true; } int main(…

【算法基础:数学知识】4.2 约数

文章目录 约数介绍例题列表AcWing 869. 试除法求约数 &#xff08;求一个数的所有约数&#xff09;AcWing 870. 约数个数&#xff08;求一些数相乘之后的结果有几个约数&#xff0c;答案可能很大&#xff09;约数个数定理⭐解法代码 AcWing 871. 约数之和解法公式⭐ AcWing 872…