由浅入深Netty协议设计与解析

news/2025/1/8 20:09:37/

目录

  • 1 为什么需要协议?
  • 2 redis 协议举例
  • 3 http 协议举例
  • 4 自定义协议要素
    • 4.1 编解码器
    • 4.2 什么时候可以加 @Sharable


1 为什么需要协议?

在这里插入图片描述

TCP/IP 中消息传输基于流的方式,没有边界。

协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

例如:在网络上传输

下雨天留客天留我不留

是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性

一种解读

下雨天留客,天留,我不留

另一种解读

下雨天,留客天,留我不?留

如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用

定长字节表示内容长度 + 实际内容

例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了

0f下雨天留客06天留09我不留

小故事

很久很久以前,一位私塾先生到一家任教。双方签订了一纸协议:“无鸡鸭亦可无鱼肉亦可白菜豆腐不可少不得束修金”。此后,私塾先生虽然认真教课,但主人家则总是给私塾先生以白菜豆腐为菜,丝毫未见鸡鸭鱼肉的款待。私塾先生先是很不解,可是后来也就想通了:主人把鸡鸭鱼肉的钱都会换为束修金的,也罢。至此双方相安无事。

年关将至,一个学年段亦告结束。私塾先生临行时,也不见主人家为他交付束修金,遂与主家理论。然主家亦振振有词:“有协议为证——无鸡鸭亦可,无鱼肉亦可,白菜豆腐不可少,不得束修金。这白纸黑字明摆着的,你有什么要说的呢?”

私塾先生据理力争:“协议是这样的——无鸡,鸭亦可;无鱼,肉亦可;白菜豆腐不可,少不得束修金。”

双方唇枪舌战,你来我往,真个是不亦乐乎!

这里的束修金,也作“束脩”,应当是泛指教师应当得到的报酬

2 redis 协议举例

NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {13, 10};
try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {set(ctx);get(ctx);}private void get(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*2".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("get".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}private void set(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*3".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("bbb".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error("client error", e);
} finally {worker.shutdownGracefully();
}

3 http 协议举例

NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {// 获取请求log.debug(msg.uri());// 返回响应DefaultFullHttpResponse response =new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes = "<h1>Hello, world!</h1>".getBytes();response.headers().setInt(CONTENT_LENGTH, bytes.length);response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});/*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("{}", msg.getClass());if (msg instanceof HttpRequest) { // 请求行,请求头} else if (msg instanceof HttpContent) { //请求体}}});*/}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error("server error", e);
} finally {boss.shutdownGracefully();worker.shutdownGracefully();
}

4 自定义协议要素

  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊… 跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

4.1 编解码器

根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}

测试

EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(),new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
//        channel.writeOutbound(message);
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);ByteBuf s1 = buf.slice(0, 100);
ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
s1.retain(); // 引用计数 2
channel.writeInbound(s1); // release 1
channel.writeInbound(s2);

解读

在这里插入图片描述

4.2 什么时候可以加 @Sharable

  • 当 handler 不保存状态时,就可以安全地在多线程下被共享
  • 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
  • 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
@Slf4j
@ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}

http://www.ppmy.cn/news/75008.html

相关文章

ServerBoss:国产免费的Linux连接工具,服务器管理工具

在这个数字化时代&#xff0c;Linux正在成为越来越多企业的首选操作系统。但是&#xff0c;由于它复杂的命令行界面和复杂的文件系统&#xff0c;许多用户可能会认为Linux不太友好和难以驾驭。同时目前大部分Linux连接工具都是国外产品&#xff0c;且需要商业授权。在此背景下&…

Redis在线数据迁移工具redis-migrate-tool详解,轻松实现redis集群之间的数据同步

目录 redis-migrate-tool 简介 git地址 github: gitee: 特点 redis-migrate-tool安装 使用redis-migrate-tool进行迁移 rmt_redis.c:6446 ERROR: Can’t handle RDB format version 错误解决方案 redis-migrate-tool 命令详解 rmt.conf详解 source和target common …

〖Web全栈开发③〗—HTTP协议和静态web服务器

HTTP协议和静态web服务器 &#xff08;一&#xff09;三次握手和四次挥手&#xff08;二&#xff09;HTTP协议2.1 HTTP协议的定义2.2 HTTP协议的组成 &#xff08;三&#xff09;搭建python自带静态web服务器3.1 静态web服务器是什么3.2 如何搭建python自带的静态web服务器3.3 …

Android 创建线程源码分析 JavaThreadNativeThread

前言 本文分析在Android中创建线程时候的源码分析,即JavaThread和NativeThread。 java/lang/Thread.java art/runtime/native/java_lang_Thread.cc art/runtime/thread.cc 两种Java Thread 有两种可以运行Java代码的线程。有两种情况: 通过new Thread创建的java线程在Nati…

这是关于“树先生“的故事

《数据结构专栏》 文章目录 《数据结构专栏》一、认识树结构如何遍历树如何创建一个树&#xff1f;如何判断一颗树是否是完全二叉树&#xff1f; 二、树的简单算法——递归1.相同树2.镜像树3.单值二叉树 总结 一、认识树结构 树的定义&#xff1a;树是指由N&#xff08;N>0…

Ink Detection

EfficientNet-B5、MIT-B5和ResNet3D 在字迹检测中,每个backbone的效果取决于具体的应用场景、数据集和任务要求。EfficientNet-B5、MIT-B5和ResNeXt50_32x4d都是在图像分类任务中表现出色的backbone模型,而ResNet3D则主要用于视频分类和动作识别任务。因此,从字迹检测的角度…

【初识C语言】数组

【初识C语言】数组 一.一维数组1.什么是数组1.一维数组的创建和初始化1.数组如何创建2.数组如何初始化3.用sizeof来求数组的长度 2.一维数组的使用3.一维数组在内存中的存储方式 二. 二维数组1,什么是二维数组2.二维数组的创建3.二维数组的初始化4.二维数组的使用5.二维数组在内…

python 中常见变量类型

数值 a 10 b 123 … 字符串 在python中 用单引号’‘和双引号""括起来的都是字符串,不使用引号括起来的不是字符串&#xff0c;字符串是使用最多的数据类型&#xff0c;用来表示一段文本信息。 比如&#xff1a; a ‘123’ b “123” 字符串之间可以用加法运算…