Netty基础—6.Netty实现RPC服务二

news/2025/3/20 9:17:00/

大纲

1.RPC的相关概念

2.RPC服务调用端动态代理实现

3.Netty客户端之RPC远程调用过程分析

4.RPC网络通信中的编码解码器

5.Netty服务端之RPC服务提供端的处理

6.RPC服务调用端实现超时功能

3.Netty客户端之RPC远程调用过程分析

NettyRpcClient.remoteCall()方法的执行逻辑:

说明一:Netty的客户端和服务端通过connect()方法建立连接后,就会通过sync()方法进行同步阻塞。

说明二:RPC调用其实就是通过调用remoteCall()方法,往Netty客户端的Channel的writeAndFlush()方法写入请求数据,同时也通过sync()方法进行同步阻塞,以便可以等到Netty服务端的响应,从而获得RPC调用结果。

说明三:writeAndFlush()所写的请求数据会经过客户端Channel的pipeline进行处理如编码成二进制字节数组,然后传输给服务端的Channel。

说明四:服务端的Channel收到请求数据后,会经过其pipeline处理如解码二进制字节数据成对象来反射调用对应的方法,然后服务端将反射调用的结果作为响应数据编码后发送回客户端,最后客户端的Channel收到数据解码后获取的响应对象便是RPC调用结果。

public class NettyRpcClient {...//如果要实现超时功能,需要在remoteCall()方法被执行时设置该请求的发起时间//然后在NettyRpcClientHandler的channelRead()中计算是否超时public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {channelFuture.channel().writeAndFlush(rpcRequest).sync();RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse();logger.info("receives response from netty rpc server.");if (rpcResponse.isSuccess()) {return rpcResponse;}throw rpcResponse.getException();}
}public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);private RpcResponse rpcResponse;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {this.rpcResponse = (RpcResponse) msg;logger.error("Netty RPC client receives the response: " + rpcResponse);}public RpcResponse getRpcResponse() {while (rpcResponse == null) {try {Thread.sleep(5);} catch (InterruptedException e) {logger.error("wait for response interrupted", e);}}return rpcResponse;}
}

4.RPC网络通信中的编码解码器

(1)RPC的请求响应通信协议

(2)使用Hessian进行序列化与反序列化

(3)RPC的编码器

(4)RPC的解码器

(5)如何解决粘包拆包问题

(1)RPC的请求响应通信协议

//RPC请求
public class RpcRequest {private String requestId;private String className;private String methodName;private String[] parameterClasses;//参数类型private Object[] parameters;//参数值private String invokerApplicationName;//调用方的服务名称private String invokerIp;//调用方的IP地址...
}//RPC响应
public class RpcResponse {private String requestId;private boolean isSuccess;private Object result;private Throwable exception;...
}

(2)Hessian序列化与反序列化

需要将请求对象和响应对象序列化成二进制字节数组,以及将获取到的二进制字节数组反序列化成请求对象和响应对象,这里使用Hessian框架来实现序列化与反序列化。

public class HessianSerialization {//序列化:将对象序列化成字节数组public static byte[] serialize(Object object) throws IOException {//new一个字节数组输出流ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();//根据字节数组输出流new一个Hessian序列化输出流HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);//用Hessian序列化输出流去写objecthessianOutput.writeObject(object);//将Hessian序列化输出流转化为字节数组byte[] bytes = byteArrayOutputStream.toByteArray();return bytes;}//反序列化:将字节数组还原成对象public static Object deserialize(byte[] bytes, Class clazz) throws IOException {//先封装一个字节数组输入流ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);//将字节数组输入流封装到Hessian序列化输入流里去HessianInput hessianInput = new HessianInput(byteArrayInputStream);//从Hessian序列化输入流读出一个对象Object object = hessianInput.readObject(clazz);return object;}
}

下面对RpcRequest和RpcResponse进行Hessian序列化测试。注意:RpcRequest和RpcResponse这两个类必须要实现Serializable。

public class HessianSerializationTest {public static void main(String[] args) throws Exception {RpcRequest rpcRequest = new RpcRequest();//先new一个RpcRequest对象rpcRequest.setRequestId(UUID.randomUUID().toString().replace("-", ""));rpcRequest.setClassName("TestClass");rpcRequest.setMethodName("sayHello");rpcRequest.setParameterClasses(new String[]{"String"});rpcRequest.setParameters(new Object[]{"wjunt"});rpcRequest.setInvokerApplicationName("RpcClient");rpcRequest.setInvokerIp("127.0.0.1");byte[] bytes = HessianSerialization.serialize(rpcRequest);//进行序列化System.out.println(bytes.length);RpcRequest deSerializedRpcRequest = (RpcRequest) HessianSerialization.deserialize(bytes, RpcRequest.class);System.out.println(deSerializedRpcRequest);}
}

(3)RPC的编码器

编码可以理解为进行序列化操作,解码可以理解为进行反序列化操作。

编码器RpcEncoder需要继承Netty的MessageToByteEncoder类,解码器RpcDecoder需要继承Netty的ByteToMessageDecoder类。

反序列化的逻辑需要根据序列化时数据的封装逻辑来处理,比如下面编码后的一条数据是由字节数组长度 + 字节数组组成的,因此反序列化需要根据此来写对应的逻辑。

public class RpcEncoder extends MessageToByteEncoder {//要进行序列化的目标类private Class<?> targetClass;public RpcEncoder(Class<?> targetClass) {this.targetClass = targetClass;}protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {//传入的对象o是否是Encoder所指定的类的实例对象if (targetClass.isInstance(o)) {byte[] bytes = HessianSerialization.serialize(o);//将序列化好的字节数组写到byteBuf里去//先写数据长度到byteBuf,这个长度就是4个字节的bytes的lengthbyteBuf.writeInt(bytes.length);//然后再写完整的bytes数组到byteBufbyteBuf.writeBytes(bytes);}}
}

(4)RPC的解码器

解码器的主要步骤如下:

步骤一:消息长度校验与读索引标记

步骤二:消息长度负值校验与拆包校验

步骤三:拆包问题处理与读索引复位

步骤四:将字节数组反序列化为指定类

public class RpcDecoder extends ByteToMessageDecoder {private static final int MESSAGE_LENGTH_BYTES = 4;private static final int MESSAGE_LENGTH_VALID_MINIMUM_VALUE = 0;private Class<?> targetClass;public RpcDecoder(Class<?> targetClass) {this.targetClass = targetClass;}protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {//1.消息长度校验//首先校验消息长度的字节数,也就是byteBuf当前可读的字节数,必须达到4个字节,此时才可以继续往下走if (byteBuf.readableBytes() < MESSAGE_LENGTH_BYTES) {return;}//2.读索引标记//对于byteBuf当前可以读的readerIndex,进行mark标记,也就是进行读索引标记//后续可以通过这个mark标记,找回来重新发起read读取之前的一个readerIndex位置byteBuf.markReaderIndex();//3.读取消息长度//读取4个字节的int,int代表了消息bytes的长度int messageLength = byteBuf.readInt();//4.消息长度负值校验//如果此时消息长度是小于0,说明此时通信已经出现了故障if (messageLength < MESSAGE_LENGTH_VALID_MINIMUM_VALUE) {channelHandlerContext.close();}//5.拆包校验//判断可读字节数是否小于消息长度,若是则出现了拆包,需要对byteBuf的读索引进行复位,下次再读//byteBuf.readableBytes()读完4个字节后继续读byteBuf.readableBytes()//如果此时消息字节数据没有接收完整,那么可以读的字节数是比消息字节长度小的,这就是检查经典的拆包问题//这时需要进行读索引进行复位,本次不再进行数据处理if (byteBuf.readableBytes() < messageLength) {byteBuf.resetReaderIndex();//出现拆包后,等待下次数据输入时再进行分析//EventLoop里有个for循环会不断监听Channel的读事件;//当数据还在传输时,由于传输是一个持续的过程,所以在传输数据过程中,Channel会一直产生读事件;//这个过程中,只要循环回来执行判断,就肯定满足监听到Channel的读事件;//因此在数据还没传输完成时,for循环执行到去判断是否有Channel的读事件,就会出现这种拆包问题;//所以只要返回不处理并且复位读索引,那么下次for循环到来又可重新处理该Channel的读事件了;return;}//6.将字节数组反序列化为指定类byte[] bytes = new byte[messageLength];byteBuf.readBytes(bytes);Object object = HessianSerialization.deserialize(bytes, targetClass);list.add(object);}
}

(5)如何解决粘包拆包问题

首先在编码数据包时,需要在数据包开头添加4个字节的int类型的bytes.length,之后任何一个数据包的读取,都必须从4个字节的int(bytes.length)值开始读取,再按照int值读取后续指定数量的字节数,都读取完才能证明读取到一个完整的字节数组。从而解决粘包半包问题,其原理类似于基于长度域的解码器LengthFieldBasedDecoder。


http://www.ppmy.cn/news/1580559.html

相关文章

【JavaEE】-- SpringBoot快速上手

文章目录 1. Maven1.1 什么是Maven1.2 为什么要学Maven1.3 创建一个Maven项目1.4 Maven核心功能1.4.1 项目创建1.4.2 依赖管理1.4.3 Maven Help插件 1.5 Maven仓库1.5.1 本地仓库1.5.2 中央仓库1.5.3 私有服务器&#xff08;私服&#xff09; 1.6 Maven设置国内源1.6.1 配置当前…

sqlite mmap

https://www.sqlite.org/mmap.html 1. 内存映射 I/O 的基本原理 默认机制&#xff08;传统 I/O&#xff09; SQLite 默认通过 xRead() 和 xWrite() 方法&#xff08;对应 read()/write() 系统调用&#xff09;访问数据库文件。这些方法需要将数据从内核缓冲区复制到用户空间&am…

图莫斯TOOMOSS上位机TCANLINPro使用CAN UDS功能时 编写、加载27服务dll解锁算法文件

【本文发布于https://blog.csdn.net/Stack_/article/details/146303690&#xff0c;未经许可不得转载&#xff0c;转载须注明出处】 软件安装目录下找到如下压缩包&#xff0c;此为dll文件示例工程 使用VisualStudio打开工程GenerateKeyExImpl.vcxproj&#xff0c;可能会提示版…

常用的pdf技术有哪些?--笔记

常用的pdf技术有哪些&#xff1f; 1.iText PDF&#xff1a;iText 是著名的开放项目&#xff0c;是用于生成 PDF 文档的一个 java 类库。通过 iText 不仅可以生成 PDF 或 rtf 的文档&#xff0c;而且可以将 XML、Html 文件转化为 PDF 文件。 Openoffice&#xff1a;openoffice 是…

【数据分析】数据筛选(布尔索引:多个判断条件)

在pandas模块中&#xff0c;逻辑运算符有以下几种&#xff1a; 1.&表示并且 2.|表示或者 3.~表示非 1. &&#xff08;并且&#xff09;运算符 当且仅当运算符两边的布尔值均为 True 时&#xff0c;运算结果才为 True&#xff0c;其他情况为 False。 a True b Tr…

【go】如何处理可选配置

问题背景&#xff1a; 在设计API 时&#xff0c;如何处理可选配置&#xff1f; 1. 配置结构体 好处&#xff1a;解决兼容性&#xff0c;但问题是0值&#xff0c;和可读性差 如何解决0值&#xff1f; ——使用指针&#xff0c;将nil和类型0值做区分 但是入参包含结构体&#x…

计算机考研-数据结构2.2

顺序表的查找 各位同学大家好, 在这个小节中, 我们会学习顺序表的查找操作怎么实现, 那分为两种查找, 一种是按位查找, 一种是按值查找, 那首先来看按位查找怎么实现 对一个线性表进行按位查找, 就是要从这个线性表l当中, 取得第二个元素, 那如果这个线性表示用顺序表的方式实…

硬件驱动——51单片机:独立按键、中断、定时器/计数器

目录 一、独立按键 1.原理 2.封装函数 3.按键控制点灯 数码管 二、中断 1.原理 2.步骤 3.中断寄存器IE 4.控制寄存器TCON 5.打开外部中断0和1 三、定时器/计数器 1.原理 2.控制寄存器TCON 3.工作模式寄存器TMOD 4.按键控制频率的动态闪烁 一、独立按键 1…