接着之前的博客netty组件详解-上,我们继续深入到源码层面,来探究netty的各个组件和其设计思想:
- netty内置的通讯模式
我们在编写netty代码时,经常使用NioServerSocketChannel 作为通讯模式。
例如下面的简单netty客户端示例:
private void start() throws InterruptedException {// 客户端采用java NIO 的通讯模型EventLoopGroup group = new NioEventLoopGroup();try{Bootstrap client = new Bootstrap();client.group(group).channel(NioSocketChannel.class) // 客户端采用java NIO 的通讯模型.remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new EchoClientHandler());}});ChannelFuture sync = client.connect().sync();sync.channel().closeFuture().sync();}finally {group.shutdownGracefully().sync();}}
但是除此之外,netty也内置了其他方式的通讯模型:
(1) Epoll 模型,此方法底层原理是由JNI调用linux的epoll()实现的,因此此方法只能在Linux上调试,使用方法就是替换下面两个类:
private void start() throws InterruptedException {EventLoopGroup group = new EpollEventLoopGroup(); // 客户端采用Epoll模型try{// 客户端启动类必备Bootstrap client = new Bootstrap();client.group(group).channel(EpollSocketChannel.class) // 客户端采用Epoll模型.remoteAddress(new InetSocketAddress(host,port)) .handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new EchoClientHandler());}});ChannelFuture sync = client.connect().sync();sync.channel().closeFuture().sync();}finally {group.shutdownGracefully().sync();}}
但是改为Epoll模式后,我们在windows上是无法调试的,会报错:
注:Epoll模式和NIO模式,都是基于Reactor模型来实现的,不同点在与,Epoll整合很多linux系统独有的特性,如零拷贝,SO_REUSEPORT这个些特性,而NIO是JAVA在JVM层面上做的优化,Epoll性能比NIO更好,关于两者的对比,我单独出一篇博客详细分析两者的区别。
(2)OIO io.netty.channel.socket.oio 使用 java.net 包作为基础——使用阻塞流即BIO模式,但这个组件目前基本不会使用了,我们看netty关于OIO的方法都已经标注为过时:
(3)Local io.netty.channel.local 可以在 VM 内部通过管道进行通信的本地传输,这个通讯模型也很少使用了,因为已经在一个JVM内部的话,可以用直接内存的方式来实现通信,完全没必要进行socket调用。
(4)Embedded io.netty.channel.embedded Embedded 传输,允许使用 ChannelHandler 而又不需要一个真正的基于网络的传输。多用于测试 ChannelHandler 。
下面是一个测试用例:
- 我们定义一个编码的handler,EmbeddTestHandler
public class EmbeddTestHandler extends MessageToMessageEncoder<ByteBuf> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {// 取字节数组中首位进行编码byte[] array = byteBuf.array();String s = Arrays.toString(array);String s1 = new String(array, StandardCharsets.UTF_8);System.out.println(s);System.out.println("=================");System.out.println(s1);list.add(array[0]);}
}
- 再定义一个基于Embedded的测试类
public class EmbeddTestHandlerTest {@Testpublic void testEmbedded(){ByteBuf byteBuf = Unpooled.buffer();String msg = "北京欢迎您";byteBuf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));//(2) 创建一个EmbeddedChannel,并安装一个测试的EmbeddTestHandlerEmbeddedChannel channel = new EmbeddedChannel(new EmbeddTestHandler());//(3) 写入 ByteBuf,并断言调用 readOutbound()方法将会产生数据assertTrue(channel.writeOutbound(byteBuf));//(4) 将该 Channel 标记为已完成状态assertTrue(channel.finish());// read bytes//(5) 读取所产生的消息,并断言它包含了编码的值Byte code = channel.readOutbound();Byte checkCode = msg.getBytes(StandardCharsets.UTF_8)[0];assertEquals(code,checkCode);assertNull(channel.readOutbound());}
}
无需进行真实网络传输的一系列定义,即可进行handler的测试,下面是测试结果:
2. BootStrap引导类
netty中客户端和服务端各有一个BootStrap,其中客户端为Bootstrap,服务端为ServerBootstrap。
其中服务端的ServerBootstrap在监听端口和处理socketChannel可以使用两组线程模型:
这里定义了boss,work两组线程模型,其底层原理就是Reactor模型的主从模式,关于Reactor模型我将在零拷贝及NIO机制博文中深入探究
public void start() throws InterruptedException {final MessageCountHandler messageCountHandler = new MessageCountHandler();/*使用两个线程组*/EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup work = new NioEventLoopGroup();try {/*服务端启动必备*/ServerBootstrap b = new ServerBootstrap();b.group(boss,work) // 采用Reactor主从线程模型.channel(NioServerSocketChannel.class)/*指定使用NIO的通信模式*///.option(ChannelOption.SO_BACKLOG).localAddress(new InetSocketAddress(port))/*指定监听端口*/// .childOption(ChannelOption.SO_RCVBUF)//.childOption()//.handler();.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(messageCountHandler); // 添加一个共享的hander到pipeline中ch.pipeline().addLast(new EchoServerMCHandler());}});ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞到完成*/LOG.info("服务器启动完成");f.channel().closeFuture().sync();/*阻塞当前线程,直到服务器的ServerChannel被关闭*/} finally {boss.shutdownGracefully().sync();work.shutdownGracefully().sync();}
- ChannelInitializer
在上面的BootStrap引导类示例中,我们来看这个逻辑:
.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(messageCountHandler); // 添加一个共享的hander到pipeline中ch.pipeline().addLast(new EchoServerMCHandler());}});
ChannelInitializer类主要用于向socketChannel的pipeline中添加Handler,我们来看下源码:
可以发现,ChannelInitializer本身也是一个handler,并且提供了一个对外的方法,来初始化pipeline:
这里涉及到了netty设计的一个细节,既然是handler那么一定自己的生命周期,我们来看下源码:
这个Handler在将其他handler添加到pipeLine中之后,会将自己从pipeline中移除,这个是netty编程的常见的一个细节
我们可以参照这个编程细节,用在自己的业务上,比如:
在我们自己的应用程序中,如果存在着某个 handler 只使用一次的情况,也可以仿造 ChannelInitializer,用完以后将自己从ChannelPipeline 中移除自己,比如授权 handler,某客户端第一次连接登录以后,进行授权检查,检查通过后就可以把这个授权 handler 移除了。如果客户端关闭连接下线,下次再连接的时候,就是一个新的连接,授权 handler 依然会被安装到 ChannelPipeline ,依然会进行授权检查。
4. ChannelOption
ChannelOption属性主要对应套接字中的参数:
首先看用法示例:
private void doStart() throws InterruptedException {System.out.println("netty服务已启动");// 线程组EventLoopGroup group = new NioEventLoopGroup();try {// 创建服务器端引导类ServerBootstrap server = new ServerBootstrap();// 初始化服务器配置server.group(group) // 配置处理客户端的连接线程组.channel(NioServerSocketChannel.class) // 指定channel为 NioServerSocketChannel// 为socketChannel配置TCP参数.option(ChannelOption.SO_LINGER,100).option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT).option(ChannelOption.SO_BACKLOG,100).option(ChannelOption.SO_REUSEADDR,true).option(ChannelOption.SO_KEEPALIVE,true).localAddress(port) // 配置服务端口号// 为每个handler配置TCP参数.childOption(ChannelOption.SO_SNDBUF,1024).childOption(ChannelOption.SO_RCVBUF,1024).childHandler(new ChannelInitializer<SocketChannel>() { // 指定客户端通信的处理类,添加到pipline中,进行初始化@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new EchoServerHandler());}});// 绑定端口,sync()会阻塞到完成ChannelFuture sync = server.bind().sync();// 阻塞当前线程,直到服务器的ServerChannel被关闭sync.channel().closeFuture().sync();}finally {// 关闭资源group.shutdownGracefully().sync();}}
其中介绍比较重要的几个参数:
(1)ChannelOption.SO_REUSEADDR:
ChanneOption.SO_REUSEADDR 对应于套接字选项中的 SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,
比如,多网卡(IP)绑定相同端口,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置 SO_REUSEADDR 就无法正常使用该端口。
但是注意,这个参数无法做到让应用绑定完全相同 IP + Port 来重复启动。
(2)ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE 参数对应于套接字选项中的 SO_KEEPALIVE,该参数用于设置 TCP 连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP 会自动发送一个活动探测数据报文。
(3)ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF 参数对应于套接字选项中的 SO_SNDBUF,ChannelOption.SO_RCVBUF 参数对应于套接字选项中的 SO_RCVBUF 这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
(4)ChannelOption.SO_LINGER
ChannelOption.SO_LINGER 参数对应于套接字选项中的 SO_LINGER,Linux 内核默认的处理方式是当用户调用 close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用 SO_LINGER 可以阻塞 close()的调用时间,直到数据完全发送
(5)ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY 参数对应于套接字选项中的 TCP_NODELAY,该参数的使用与 Nagle 算法有关,Nagle 算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用 Nagle 算法,使用于小数据即时传输,于TCP_NODELAY 相对应的是 TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
- TCP的粘包和半包:
网路传输过程中,客户端给服务端发送了报文a,b,c,但服务端收到报文时,是a+b的上半部分,b的下半部分+c两个包,并没有按照完整报文a,b,c来接收,这个问题称为半包,报文不完整。再比如a,b的报文比较小,则服务器收到的包为a+b,c两个包,其中a,b合在了一个包里,这个称之为粘包。
发生的原因,在于TCP对于网络数据传输的处理优化,如果发送的网络数据包太小,那么他本身会启用 Nagle 算法,对较小的数据包进行合并然后再发送,这样就发生了粘包/半包的问题。即报文在传输过程中发生了拆包或合并。
怎么防止这个现象出现呢,解决方案就是在报文中添加标识符或分隔符,让服务器端知道,一个完整的包的状态。
(1)文本报文通用换行分隔符(适用于一行形式的报文)
- 客户端处理:每个报文加回车换行符
@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf msg = null;String request = "apple,pear,orange"+ System.getProperty("line.separator");// 为每个报文末尾添加回车换行符for(int i=0;i<10;i++){msg = Unpooled.buffer(request.length());msg.writeBytes(request.getBytes());ctx.writeAndFlush(msg);}}
服务端处理,加回车换行符处理
LineBasedFrameDecoder是netty已经为我们实现好的,处理回车换行符的handler
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {// 添加回车换行符处理的handler,校验报文的完整性 ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new LineBaseServerHandler());}}
(2)文本报文自定义分隔符(适用于一段形式的报文)
- 服务端处理,在服务端约定一个自定义的分隔符:
DelimiterBasedFrameDecoder 是netty已经为我们实现好的,处理自定义分隔符的handler
// 在服务端约定一个自定义的分隔符public static final String My_SYMBOL = "#";private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer(My_SYMBOL .getBytes());// 服务端添加一个自定义的分隔符处理handlerch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new DelimiterServerHandler());}}
- 客户端处理,使用和服务端约定的分隔符:
public static final String My_SYMBOL = "#";private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer(My_SYMBOL.getBytes());ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new DelimiterClientHandler());}}
(3)二进制定长标识(适用于二进制报文)约定好每条报文的长度:
- 客户端处理,每次发送给服务端报文前,将报文固定长度进行编码,一并发送过去:
public final static String REQUEST = "apple.orange,pear";@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf msg = null;for(int i=0;i<10;i++){// 申请固定长度的buffermsg = Unpooled.buffer(REQUEST.length());msg.writeBytes(REQUEST.getBytes());ctx.writeAndFlush(msg);}}
- 服务端处理,添加二进制长度域解码器来识别完整报文:
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {// 添加二进制长度域解码器来识别完整报文ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoClient.REQUEST.length()));ch.pipeline().addLast(new FixedLengthServerHandler());}}
(4)二进制基于长度域解码
channelRead 和 channelReadComplate的区别
channelRead 方法用于处理每次从通道中读取到的数据。
channelReadComplete 方法用于通知数据读取操作完成后进行后续处理。
在 Netty 中,channelRead 和 channelReadComplete 是 ChannelInboundHandler 接口中的两个重要方法,用于处理入站数据(从远程对等方传入的数据)。
channelRead 方法是在每次从通道中读取到数据时被调用的。当有数据从远程对等方传入时,Netty 会自动将数据包装成一个 ByteBuf 对象,并传递给相应的 ChannelInboundHandler 的 channelRead 方法进行处理。在这个方法中,你可以对接收到的数据进行解码、处理、转换等操作。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 在这里处理接收到的数据(msg),通常是 ByteBuf 对象// 例如,解码、处理数据等
}
channelReadComplete 方法是在一个通道的数据读取操作完成时被调用的。在 channelRead 方法中处理完数据后,Netty 会自动调用 channelReadComplete 方法,以通知处理器数据已经读取完成,可以进行后续的操作,例如回送响应或释放资源等。
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {// 本次数据读取完成后的后续处理// 例如,回送响应或释放资源等
}