Java基础之《netty(29)—自定义协议解决TCP粘包拆包》

news/2024/11/19 20:40:51/

一、TCP粘包和拆包解决方案

1、使用自定义协议 + 编解码器,来解决。
2、关键就是要解决,服务器端每次读取数据长度的问题。这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免TCP粘包、拆包。

二、具体示例

1、要求客户端发送5个message对象,客户端每次发送一个message对象。
2、服务器端每次接收一个message,分5次进行解码,每读取到一个message,会回复一个message对象给客户端。

三、客户端发送给服务端

1、编解码器和POJO类
MessageProtocol.java

package netty.tcpStickPackage2;/*** 协议包对象类* @author user**/
public class MessageProtocol {//数据发送的长度private int len;//数据本体,一般是放在byte数组中private byte[] content;public int getLen() {return len;}public void setLen(int len) {this.len = len;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content = content;}}

MyMessageDecoder.java

package netty.tcpStickPackage2;import java.util.List;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;public class MyMessageDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("MyMessageDecoder decode 被调用");//需要将二进制字节码转成MessageProtocol对象int len = in.readInt();//创建一个len长度的字节数组byte[] content = new byte[len];in.readBytes(content);//封装成MessageProtocol对象,放入out,传递给下一个handler处理MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(len);messageProtocol.setContent(content);out.add(messageProtocol);}}

MyMessageEncoder.java

package netty.tcpStickPackage2;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {System.out.println("MyMessageEncoder encode 方法被调用");out.writeInt(msg.getLen());out.writeBytes(msg.getContent());}}

2、服务端
NettyServer.java

package netty.tcpStickPackage2;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class NettyServer {public static void main(String[] args) {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(8);try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new NettyServerInitializer()); //自定义一个初始化类ChannelFuture cf = bootstrap.bind(7000).sync();cf.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerInitializer.java

package netty.tcpStickPackage2;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//加入解码器pipeline.addLast(new MyMessageDecoder());//加入一个自定义handlerpipeline.addLast(new NettyChannelHandler());}}

NettyChannelHandler.java

package netty.tcpStickPackage2;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class NettyChannelHandler extends SimpleChannelInboundHandler<MessageProtocol> {private int count;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {//接收到数据并处理int len = msg.getLen();byte[] content = msg.getContent();System.out.println("服务端接收到信息如下");System.out.println("长度:" + len);System.out.println("内容:" + new String(content, CharsetUtil.UTF_8));System.out.println("服务器接收到消息包数量:" + (++this.count));//回复消息}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.channel().close();}}

3、客户端
NettyClient.java

package netty.tcpStickPackage2;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class NettyClient {public static void main(String[] args) {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group) //设置线程组.channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).handler(new NettyClientInitializer()); //自定义一个初始化对象ChannelFuture cf = bootstrap.connect("127.0.0.1", 7000).sync();cf.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}}
}

NettyClientInitializer.java

package netty.tcpStickPackage2;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//加入编码器pipeline.addLast(new MyMessageEncoder());//加入一个自定义handlerpipeline.addLast(new NettyClientHandler());}}

NettyClientHandler.java

package netty.tcpStickPackage2;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class NettyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {//private int count;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//使用客户端发送5条数据for (int i=0; i<5; i++) {String msg = "今天天气冷,吃火锅";byte[] content = msg.getBytes(CharsetUtil.UTF_8);int length = content.length;//创建协议包MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);ctx.writeAndFlush(messageProtocol);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.channel().close();}}

四、执行结果

1、服务端

MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:1
MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:2
MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:3
MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:4
MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:5

2、客户端

MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用

五、服务端回复消息给客户端

1、修改服务端代码
NettyChannelHandler.java
添加回复消息

package netty.tcpStickPackage2;import java.util.UUID;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class NettyChannelHandler extends SimpleChannelInboundHandler<MessageProtocol> {private int count;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {//接收到数据并处理int len = msg.getLen();byte[] content = msg.getContent();System.out.println("服务端接收到信息如下");System.out.println("长度:" + len);System.out.println("内容:" + new String(content, CharsetUtil.UTF_8));System.out.println("服务器接收到消息包数量:" + (++this.count));//回复消息String response = UUID.randomUUID().toString();byte[] responseContent = response.getBytes(CharsetUtil.UTF_8);int responseLen = responseContent.length;//构建一个协议包MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(responseLen);messageProtocol.setContent(responseContent);ctx.writeAndFlush(messageProtocol);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.channel().close();}}

NettyServerInitializer.java
加入编码器

package netty.tcpStickPackage2;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//加入解码器pipeline.addLast(new MyMessageDecoder());//加入编码器pipeline.addLast(new MyMessageEncoder());//加入一个自定义handlerpipeline.addLast(new NettyChannelHandler());}}

2、修改客户端代码
NettyClientHandler.java
添加接收服务端信息channelRead0接口

package netty.tcpStickPackage2;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class NettyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {private int count;@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {int len = msg.getLen();byte[] content = msg.getContent();System.out.println("客户端接收到消息如下");System.out.println("长度:" + len);System.out.println("内容:" + new String(content, CharsetUtil.UTF_8));System.out.println("客户端接收到消息包数量:" + (++this.count));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//使用客户端发送5条数据for (int i=0; i<5; i++) {String msg = "今天天气冷,吃火锅";byte[] content = msg.getBytes(CharsetUtil.UTF_8);int length = content.length;//创建协议包MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);ctx.writeAndFlush(messageProtocol);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.channel().close();}}

NettyClientInitializer.java
添加解码器

package netty.tcpStickPackage2;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//加入编码器pipeline.addLast(new MyMessageEncoder());//加入解码器pipeline.addLast(new MyMessageDecoder());//加入一个自定义handlerpipeline.addLast(new NettyClientHandler());}}

六、执行结果

1、服务端

MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:1
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:2
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:3
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:4
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务端接收到信息如下
长度:27
内容:今天天气冷,吃火锅
服务器接收到消息包数量:5
MyMessageEncoder encode 方法被调用

2、客户端

MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
客户端接收到消息如下
长度:36
内容:6a488e60-a67f-49ef-9783-cbcb048d82ea
客户端接收到消息包数量:1
MyMessageDecoder decode 被调用
客户端接收到消息如下
长度:36
内容:160ab489-09b9-4786-ac57-74b82aed7150
客户端接收到消息包数量:2
MyMessageDecoder decode 被调用
客户端接收到消息如下
长度:36
内容:d9fd2087-1d6f-474d-93d4-6af1c1ced736
客户端接收到消息包数量:3
MyMessageDecoder decode 被调用
客户端接收到消息如下
长度:36
内容:06b6b40e-6ab4-4374-9e5a-2fcaca8dbc4d
客户端接收到消息包数量:4
MyMessageDecoder decode 被调用
客户端接收到消息如下
长度:36
内容:40891ed6-a790-4660-962f-4e3ec9d3a99b
客户端接收到消息包数量:5

七、其他

1、因为在encoder中先writeInt再writeByte,然后decoder中先readInt再readByte,这样每次接收数据的数据长度都是一个int+一个content的长度。
2、这个decode如果发生拆包的现象的时候会抛出一个Relay的Error,然后等待下一个请求进来,只有长度符合之后才会进行read,源码的位置是ReplayingDecoder的388行左右。
3、粘包就是通过定长来获取足够的字节码。拆包就是通过readBytes的时候需要的length和in的长度进行比较,短了就抛出error。


http://www.ppmy.cn/news/18825.html

相关文章

Kettle(4):excel数据抽取到mysql

1 准备工作 1.1 准备Excel文件 我这边直接使用上一篇导出的excel:file_user.xls 1.2 创建数据库 在mysql中创建数据库 1.3 在kettle中加载MySQL驱动 Kettle要想连接到MySQL&#xff0c;必须要安装一个MySQL的驱动&#xff0c;就好比我们装完操作系统要安装显卡驱动一样。加…

Python技能树-推导式

Python 列表推导式(1) Python 独步天下的推导式表达式&#xff0c;使用列表推导式过滤出偶数列表 # -*- coding: UTF-8 -*- if __name__ __main__:list [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]print()print("# 使用列表推导式过滤出偶数")# TODO(you): 请在此实现过滤代…

Day867.事务隔离 -MySQL实战

事务隔离 Hi&#xff0c;我是阿昌&#xff0c;今天学习记录的是关于事务隔离的内容。 提到事务&#xff0c;肯定不陌生&#xff0c;和数据库打交道的时候&#xff0c;总是会用到事务。 最经典的例子就是转账&#xff0c;你要给朋友小王转 100 块钱&#xff0c;而此时你的银行…

操作系统真相还原_第5章第2节:内存分页机制

文章目录分段机制分页机制一级页表二级页表启用分页机制的过程启用分页机制(二级页表)详解程序include.incmbr.sloader.s写入硬盘启动bochs执行分段机制 分页机制 一级页表 二级页表 启用分页机制的过程 1、准备好页目录项及页表 2、将页表地址写入控制寄存器cr3 3、寄存器cr0…

Elasticsearch7.8.0版本高级查询—— 聚合查询文档

目录一、初始化文档数据二、聚合查询文档2.1、概述2.2、对某个字段取最大值 max 示例2.3、对某个字段取最小值 min 示例2.4、对某个字段求和sum 示例2.5、对某个字段取平均值 avg 示例2.6、对某个字段的值进行去重之后再取总数 示例三、State 聚合查询文档3.1、概述3.2、示例一…

Spark RDD算子

文章目录Spark RDD算子一、RDD 转换算子1、Value 类型(1) map(2) mapPartitions1&#xff09;函数说明2&#xff09;小案例获取每个分区的最大值(3) map 和 mapParitions 的区别(4) mapParitionsWithIndex1&#xff09;小案例只获取第二个分区的最大值2&#xff09;小案例获取每…

CPU缓存架构缓存一致性协议详解

一、CPU高速缓存&#xff08;Cache Memory&#xff09;1.1 CPU高速缓存CPU缓存即高速缓冲存储器&#xff0c;是位于CPU与主内存间的一种容量较小但速度很高的存储器。由于CPU的速度远高于主内存&#xff0c;CPU直接从内存中存取数据要等待一定时间周期&#xff0c;Cache中保存着…

2023122日记

新年伊始&#xff0c;却又是无聊的一天。话说送完姐姐去相亲&#xff0c;回来逛了一圈&#xff0c;终究很困&#xff0c;回来睡了。相亲太残酷了&#xff0c;这里只有没有感情的索取者&#xff0c;评价者。 按照惯例&#xff0c;农村的各种价值观的膨胀&#xff0c;指挥的碰撞…