Netty系列-7 Netty编解码器

devtools/2024/10/18 14:18:14/

背景

netty框架中,自定义解码器的起点是ByteBuf类型的消息, 自定义编码器的终点是ByteBuf类型。

1.解码器

业务解码器的起点是ByteBuf类型

netty中可以通过继承MessageToMessageEncoder类自定义解码器类。MessageToMessageEncoder继承自ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter使用默认方式(不处理,向下传递事件)实现了所有的Inbound接口。因此,MessageToMessageEncoder只需要重写channelRead方法,并在该方法中提取消息、转换消息、通过ChannelInvoker将转换后的消息以channelRead事件发向pipeline即可。
MessageToMessageEncoder抽象类的实现如下:

java">public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {private final TypeParameterMatcher matcher;protected MessageToMessageDecoder() {matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");}protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {matcher = TypeParameterMatcher.get(inboundMessageType);}public boolean acceptInboundMessage(Object msg) throws Exception {return matcher.match(msg);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CodecOutputList out = CodecOutputList.newInstance();try {if (acceptInboundMessage(msg)) {I cast = (I) msg;try {decode(ctx, cast, out);} finally {ReferenceCountUtil.release(cast);}} else {out.add(msg);}} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {int size = out.size();for (int i = 0; i < size; i++) {ctx.fireChannelRead(out.getUnsafe(i));}} finally {out.recycle();}}}protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

1.1 类型的匹配器

MessageToMessageDecoder内部维护了一个TypeParameterMatcher类型的匹配器对象matcher,用于指定解码器可以处理的消息类型。可通过构造函数为其设置类型,也可通过泛型指定:

java">// 使用泛型类型
protected MessageToMessageDecoder() {matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}// 子类调用MessageToMessageDecoder构造器时,传入类型
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {matcher = TypeParameterMatcher.get(inboundMessageType);
}

一般,通过泛型指定解码器处理的消息对象,即使用MessageToMessageDecoder的无参构造函数。
acceptInboundMessage方法封装matcher的实现,返回布尔值,表示是否支持处理msg消息类型:

java">public boolean acceptInboundMessage(Object msg) throws Exception {return matcher.match(msg);
}

根据matcher的match方法:

java">private static final class ReflectiveMatcher extends TypeParameterMatcher {private final Class<?> type;//...@Overridepublic boolean match(Object msg) {// msg消息是否为type类型或者其子类型return type.isInstance(msg);}
}

1.2 channelRead方法

java">public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 构造List列表对象,存储解码后的对象CodecOutputList out = CodecOutputList.newInstance();try {// 判断是否支持处理消息if (acceptInboundMessage(msg)) {I cast = (I) msg;try {// 处理消息,将cast对象解码后的结果存放到out列表中decode(ctx, cast, out);} finally {ReferenceCountUtil.release(cast);}} else {// 不处理消息,以原样保存out.add(msg);}} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {try {int size = out.size();// 遍历列表,依次向pipeline触发解码后的对象for (int i = 0; i < size; i++) {ctx.fireChannelRead(out.getUnsafe(i));}} finally {out.recycle();}}
}

逻辑较为清晰:
[1] 构造列表对象out,用于临时存放解码后的消息;
[2] 判断当前解码器是否可以处理该消息,不可以处理,直接添加到out中;可以处理,调用decode方法解码消息,解码结果都添加到out中;
[3] 遍历out列表,将消息以ChannelRead事件传递给向pipeline;
[4] out清理、回收再利用;

1.3 decode方法

decode方法是实际进行消息转换的逻辑,由子类根据业务具体实现:

java">protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

将msg解码,解码后的对象存放在out中;由于out是数组,因此可以从msg中解码出一个对象,也可以解码出多个。如下所示:

java">protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {out.add(msg.toString(charset));
}

将ByteBuf类型的msg消息转为一个String类型的对象;

java">protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {String[] decodedMsgs = msg.toString(charset).split(";");for (String decodedMsg: decodedMsgs) {out.add(decodedMsg);}
}

将ByteBuf转为String,并按照;分隔符进行拆分,每个字符串作为一个消息对象。

2.解码器案例

案例的结构图如下所示,消息流入解码器和流出时的消息类型会发生变化:
在这里插入图片描述
引入三个解码器和一个业务Handler:
[1] 编码器1实现ByteBuf->String类型的转换;
[2] 编码器2实现String->Message1类型的转换;
[3] 编码器3实现Message1->Message2类型的转换;
[4] 业务Handler打印消息类型和消息;
实现类依次为:

java">// MyMessageDecoder1
public class MyMessageDecoder1 extends MessageToMessageDecoder<ByteBuf> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {out.add(msg.toString(Charset.defaultCharset()));}
}// MyMessageDecoder2
class MyMessageDecoder2 extends MessageToMessageDecoder<String> {@Overrideprotected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) {String[] decodedMsgs = msg.split(";");for (String decodedMsg : decodedMsgs) {out.add(new Message1(decodedMsg));}}
}// MyMessageDecoder3
class MyMessageDecoder3 extends MessageToMessageDecoder<Message1> {@Overrideprotected void decode(ChannelHandlerContext ctx, Message1 msg, List<Object> out) {out.add(new Message2(msg.getContent()));}
}

业务Handler定义如下:

java">private static class MyHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(msg);}
}

Message1和Message2消息定义如下:

java">@Data
@RequiredArgsConstructor
pulic class Message1 {private final String content;
}@Data
@RequiredArgsConstructor
pulic class Message2 {private final String content;
}

客户端发送消息:"test1;test2;test3"时:

Microsoft Telnet> send test1;test2;test3
发送字符串 test1;test2;test3
Microsoft Telnet>

服务器日志如下所示:

Message2(content=test1)
Message2(content=test2)
Message2(content=test3)

注意:解码的顺序沿着pipeline进行,因此需要注意调整netty解码器在pipeline中的位置。

如果将3和解码器2的顺序调整一下:

java">protected void initChannel(NioSocketChannel channel) {channel.pipeline().addLast(new MyMessageDecoder1());channel.pipeline().addLast(new MyMessageDecoder3());channel.pipeline().addLast(new MyMessageDecoder2());channel.pipeline().addLast(new MyHandler());
}

重复上述操作,服务器日志如下:

Message1(content=test1)
Message1(content=test2)
Message1(content=test3)

此时,解码器1流出的数据为String类型,流入解码器2时-类型校验不通过直接以流入的String类型流出,流入解码器3时,将String类型转为Message1类型,流入业务Handler进行打印。

3.编码器

业务编码器的终点是ByteBuf类型

netty中可以通过继承MessageToMessageEncoder类自定义解码器类。MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter,ChannelOutboundHandlerAdapter使用默认方式实现(不处理,向前传递事件)了所有的Outbound接口。因此,MessageToMessageEncoder只需要重写write方法,并在该方法中编码消息、并通过ChannelInvoker将编码后的消息发送到pipeline即可。
MessageToMessageEncoder抽象类的实现如下:

java">public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {private final TypeParameterMatcher matcher;protected MessageToMessageEncoder() {matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");}protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {matcher = TypeParameterMatcher.get(outboundMessageType);}public boolean acceptOutboundMessage(Object msg) throws Exception {return matcher.match(msg);}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {CodecOutputList out = null;try {if (acceptOutboundMessage(msg)) {out = CodecOutputList.newInstance();@SuppressWarnings("unchecked")I cast = (I) msg;try {encode(ctx, cast, out);} finally {ReferenceCountUtil.release(cast);}if (out.isEmpty()) {throw new EncoderException(StringUtil.simpleClassName(this) + " must produce at least one message.");}} else {ctx.write(msg, promise);}} catch (EncoderException e) {throw e;} catch (Throwable t) {throw new EncoderException(t);} finally {if (out != null) {try {final int sizeMinusOne = out.size() - 1;if (sizeMinusOne == 0) {ctx.write(out.getUnsafe(0), promise);} else if (sizeMinusOne > 0) {if (promise == ctx.voidPromise()) {writeVoidPromise(ctx, out);} else {writePromiseCombiner(ctx, out, promise);}}} finally {out.recycle();}}}}private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {final ChannelPromise voidPromise = ctx.voidPromise();for (int i = 0; i < out.size(); i++) {ctx.write(out.getUnsafe(i), voidPromise);}}private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());for (int i = 0; i < out.size(); i++) {combiner.add(ctx.write(out.getUnsafe(i)));}combiner.finish(promise);}protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}

3.1 类型的匹配器

MessageToMessageEncoder内部维护了一个TypeParameterMatcher类型的匹配器对象matcher,用于指定该编码器器可以处理的消息类型,与解码器中的matcher作用完全相同,不再赘述。

3.2 write方法

java">public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {CodecOutputList out = null;try {// 判断当前编码器是否可以编码消息if (acceptOutboundMessage(msg)) {out = CodecOutputList.newInstance();@SuppressWarnings("unchecked")I cast = (I) msg;try {// 编码消息,并将编码后的消息保存到out列表中encode(ctx, cast, out);} finally {ReferenceCountUtil.release(cast);}if (out.isEmpty()) {throw new EncoderException(StringUtil.simpleClassName(this) + " must produce at least one message.");}} else {// 不能编码的消息不处理,直接沿着pipeline向前传递ctx.write(msg, promise);}} catch (EncoderException e) {throw e;} catch (Throwable t) {throw new EncoderException(t);} finally {// 遍历out,依次调用ctx.write,沿着pipeline向前传递if (out != null) {try {final int sizeMinusOne = out.size() - 1;if (sizeMinusOne == 0) {ctx.write(out.getUnsafe(0), promise);} else if (sizeMinusOne > 0) {if (promise == ctx.voidPromise()) {writeVoidPromise(ctx, out);} else {writePromiseCombiner(ctx, out, promise);}}} finally {// 清理out列表,回收再利用out.recycle();}}}
}

逻辑较为清晰:
[1] 构造列表对象out,用于临时存放编码后的消息;
[2] 判断当前编码器是否可以处理该消息,不可以处理,直接通过ctx.write沿着pipeline向前传递;可以处理,调用encode方法编码消息,编码结果添加到out中;
[3] 遍历out列表,将消息以write事件传递给向pipeline;
[4] out清理、回收再利用;

3.2 encode方法

encode方法是实际进行消息转换的逻辑,由子类根据业务具体实现:

java">protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

将msg消息进行编码,编码后的对象存放在out中;由于out是数组,因此可以从msg中编码出一个对象,也可以编码出多个,与解码器逻辑相同。

java">protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {out.add(msg.toString(charset));
}

将ByteBuf类型的msg消息转为一个String类型的对象;

java">protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {String[] decodedMsgs = msg.toString(charset).split(";");for (String decodedMsg: decodedMsgs) {out.add(decodedMsg);}
}

将ByteBuf转为String,并按照;分隔符进行拆分,每个字符串作为一个消息对象。
netty向外发送数据时,一般经过业务Handler->编码器->HeadContext的流程。
向客户端发送消息的底层实现在HeadContext的unsafe对象(NioSocketChannel的unsafe对象)中,而发送前有消息类型判断:

java">final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler{ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {unsafe.write(msg, promise);}
}

unsafe对象的write方法如下:

java">public final void write(Object msg, ChannelPromise promise) {//...msg = filterOutboundMessage(msg);//...
}

在真实写操作前,通过filterOutboundMessage进行消息类型的判断:

java">@Override
protected final Object filterOutboundMessage(Object msg) {// 要求消息必须时ByteBuf或者FileRegion类型或其子类型if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;if (buf.isDirect()) {return msg;}return newDirectBuffer(buf);}if (msg instanceof FileRegion) {return msg;}throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

由此,编码器将消息传递给HeadContext前,需要将消息最终编码为ByteBuf类型。

4.解码器案例

案例结构图如下所示:
在这里插入图片描述

在章节2中的案例基础上新增两个编码器,并修改业务Handler:
[1] 业务Handler,接收客户端消息后,响应相同消息;
[2] 编码器1:将Message2类型的消息转为String类型;
[3] 编码器2: 将String类型消息转为ByteBuf类型;
代码实现如下:
修改业务Handler:

java">private static class MyHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(msg);// 新增逻辑“将消息对象发送给客户端ctx.write(msg);}
}

添加编码器:

java">// 将Message2消息转为String消息
public class MyEncoder1 extends MessageToMessageEncoder<Message2> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message2 msg, List<Object> out) throws Exception {out.add(msg.getContent());}
}// 将String消息转为ByteBuf消息
public class MyEncoder2 extends MessageToMessageEncoder<String> {@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) {out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), Charset.defaultCharset()));}
}

在MyHandler前依次添加解码器MyEncoder2和MyEncoder1:

java">protected void initChannel(NioSocketChannel channel) {channel.pipeline().addLast(new MyMessageDecoder1());channel.pipeline().addLast(new MyMessageDecoder2());channel.pipeline().addLast(new MyMessageDecoder3());channel.pipeline().addLast(new MyEncoder2());channel.pipeline().addLast(new MyEncoder1());channel.pipeline().addLast(new MyHandler());
}

可以使用Netty写一个客户端, 也可用客户端工具模拟,这里为了方便,使用SocketTool.exe,控制台日志如下:

14:36:15 发送数据:test1;test2;test3[1次]
14:36:15 收到数据:test1test2test3

注意:客户端收到了test1test2test3消息,在客户端开来是一个消息,但在服务器看来是连续发送的3个消息,消息内容分别为test1和test2和test3。这是TCP的流传输模式导致,可在业务层添加额外处理解决这个问题。将在下一篇文件介绍Netty如何处理粘包和分包问题。


http://www.ppmy.cn/devtools/122092.html

相关文章

普渡PUDU MT1:AI赋能,破解大面积场景清洁新挑战

普渡AI智能扫地机器人PUDU MT1:破解大面积场景清洁难题的新利器 在仓储物流、工业车间、交通枢纽、大型商场等大面积场景中,清洁难题一直是管理者们头疼的问题。这些区域面积广阔,清洁任务繁重,传统清洁方式难以胜任。然而,普渡机器人最新推出的AI智能扫地机器人PUDU MT1…

设置参数说明

目录 设备连接基本设置电机设置控制设置保护设置保存并重启读取参数擦除参数并重启设备重启 设备连接 驱动板接入电脑后会虚拟为串口&#xff0c;点击“刷新”&#xff0c;选择对应的com&#xff0c;点击“连接设备”&#xff0c;软件左下角会提示“连接成功”。驱动板出厂默认…

大模型时代下小模型知多少?从模型结构、预训练数据到运行时成本分析总结

今天&#xff0c;我们来谈谈小模型。《Small Language Models综述&#xff0c;Small Language Models: Survey, Measurements, and Insights》&#xff1a;https://arxiv.org/pdf/2409.15790这个工作&#xff0c;会有一些启发。 本文主要介绍三个话题&#xff0c;一个是小模型…

滚雪球学Oracle[5.6讲]:资源管理与调优

全文目录&#xff1a; 前言一、Oracle Resource Manager的配置与使用1.1 什么是Oracle Resource Manager1.2 Oracle Resource Manager的优势1.3 配置Oracle Resource Manager案例演示&#xff1a;配置Resource Manager 二、基于服务的资源分配策略2.1 基于服务的资源管理典型场…

项目-坦克大战学习-控制人机发射子弹以及玩家受到攻击

控制人机发射子弹有几个条件&#xff0c;发射子弹的间隔以及攻击对象的筛选 我们前面已经将子弹生成程序写出来了&#xff0c;在子弹类中我们定义了枚举类型用来分辨是谁发射出来的子弹 玩家发射出来的子弹定义&#xff1a; duixiangweizhi.zidan(x, y, zidanen.wanjia, Fang…

助农小程序|助农扶贫系统|基于java的助农扶贫系统小程序设计与实现(源码+数据库+文档)

助农扶贫系统小程序 目录 基于java的助农扶贫系统小程序设计与实现 一、前言 二、系统功能设计 三、系统实现 5.1.1 农户管理 5.1.2 用户管理 5.1.3 订单统计 5.2.1 商品信息管理 5.3.1 商品信息 5.3.2 订单信息 5.3.3 商品评价 5.3.4 商品退货 四、数据库设计 1、…

Hive数仓操作(二)

Hive 数据类型与连接 Hive 是一个用于处理大规模数据集的工具&#xff0c;支持多种数据类型以满足不同的需求。本文将详细介绍 Hive 的基本数据类型和集合数据类型。 一、Hive 基本数据类型 Hive 提供了多种基本数据类型&#xff0c;适用于不同的数据存储和处理需求&#xf…

KEYENCE Programming Contest 2024(AtCoder Beginner Contest 374) 题解

A - Takahashi san 2 Problem Statement KEYENCE has a culture of addressing everyone with the suffix “-san,” regardless of roles, age, or positions. You are given a string S consisting of lowercase English letters. If S ends with san, print Yes; otherwi…