Dubbo生产者一次请求的过程 (Dubbo源码三)

server/2024/12/16 22:40:46/

Dubbo生产者一次请求的过程 (Dubbo源码三)

https://www.bilibili.com/video/BV1FJSCY9E85


相较于Dubbo消费者一次请求的过程,生产者的流程相对复杂一些,主要是因为触发点不好找。


这篇文章通过解决以下三个问题来学习源码

  1. 请求的触发点(消费者发送一个请求,生产者如何收到并解析?如何找到对应的服务)
  2. 再详细了解一下@DubboService生成的代理对象
  3. 基于生成的代理对象,看看执行流程是怎样的

一、请求的触发点


Dubbo底层通讯是基于Netty,请求第一步肯定是从Netty收到消息开始的。Netty收到消息也肯定是不是明文的,这一节要解决如下问题

  1. Netty接收消息的入口
  2. 如何把消息解析成明文
  3. 怎么通过消息找到对应的服务

1-1、接收消息的入口


io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)

暂且把这里理解为入口(它前面还有一些流程,个人认为这里好点),从方法名和参数,很容易理解,这里的代码很简单,循环从管道去读取数据,并处理

可以看到msg此时并非一个明文的数据

在这里插入图片描述


这里并不是线性执行的,从上面的图片可以看到有一个循环(next不停的去执行下一个操作),每一个节点只做一件事。和Dubbo消费者一次请求的过程和invoker的执行流程类似,一层一层的


1-2、解码入口


上面循环处理操作很多,有一个是解码

在这里插入图片描述


调用的链路大致如下,可以看到从Netty到Dubbo

  1. io.netty.handler.codec.ByteToMessageDecoder#channelRead
  2. org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder#decode
  3. org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec#decode
  4. org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#decodeBody
  5. org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody

org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody 方法很长参数是流,这里面会把流解析成明文数据,下一步仔细看如何解析

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {// ...req.setData(data);// ...
}

1-2-1、如何把消息解析成明文

解析的本质就是反序列化,Dubbo中支持很多序列化(Hessian2、Java、FastJson2等),消费者会把序列化方式放在header中,这样生产者就知道该用哪种序列化了

step 1

org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBodyprotected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {// 获取序列化idbyte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);// 获取 请求idlong id = Bytes.bytes2long(header, 4);if ((flag & FLAG_REQUEST) == 0) {} else {// decode request.Request req;try {Object data;if ((flag & FLAG_EVENT) != 0) {// ...} else {req = new Request(id);req.setVersion(Version.getProtocolVersion());req.setTwoWay((flag & FLAG_TWOWAY) != 0);// get data length.int len = Bytes.bytes2int(header, 12);req.setPayload(len);DecodeableRpcInvocation inv;if (isDecodeDataInIoThread(channel)) {if (customByteAccessor != null) {// ... } else {// 这里会把 proto(这个就是指定序列化的方式) 传递过去,赋值给 serializationType, 下一步可以看到inv = new DecodeableRpcInvocation(frameworkModel, channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);}// 开始序列化的inv.decode();} else {// ... }data = inv;}// 设置序列化的结果req.setData(data);} catch (Throwable t) {// ... }return req;}
}

step 2

org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode(org.apache.dubbo.remoting.Channel, java.io.InputStream)

public Object decode(Channel channel, InputStream input) throws IOException {int contentLength = input.available();getAttributes().put(Constants.CONTENT_LENGTH_KEY, contentLength);// 找到序列化的方式,然后反序列化数据ObjectInput in = CodecSupport.getSerialization(serializationType).deserialize(channel.getUrl(), input);this.put(SERIALIZATION_ID_KEY, serializationType);// ... 这下面就是反序列化各种数据return this;
}

step 3

org.apache.dubbo.remoting.transport.CodecSupport#getSerialization(java.lang.Byte)

public static Serialization getSerialization(Byte id) throws IOException {Serialization result = getSerializationById(id);if (result == null) {throw new IOException("Unrecognized serialize type from consumer: " + id);}return result;
}// 这个Map里面存了各种的序列化方式
public static Serialization getSerializationById(Byte id) {return ID_SERIALIZATION_MAP.get(id);
}

序列化方式的加载是通过SPI的方式

感兴趣的可以参看 Dubbo自定义过滤器,过滤器源码详解 里面介绍了SPI

org.apache.dubbo.common.serialize.Serialization


通过上面的一系列解析操作,可以得到请求的明文 【Request】

  1. 包含这次请求的方法,参数等等
  2. 但request里面的 invoker还是为空,真正执行的肯定是这个 invoker

在这里插入图片描述


1-3、找到此次请求的 invoker


当解析完请求的Request的时候,又回到了最开始解析数据的时候,这时候会发现有一个 NettyServerHandler,在这里会通过此次请求的参数找到对应的 invoker

在这里插入图片描述


大致流程如下

  1. io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
  2. org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#channelRead
  3. org.apache.dubbo.remoting.transport.AbstractPeer#received
  4. org.apache.dubbo.remoting.transport.MultiMessageHandler#received
  5. org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#received
  6. org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
  7. org.apache.dubbo.rpc.protocol.dubbo.DubboIsolationExecutorSupport#getProviderModel
  8. org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#fillInvoker
  9. org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {// ... DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);// ...Invoker<?> invoker = exporter.getInvoker();inv.setServiceModel(invoker.getUrl().getServiceModel());return invoker;
}

可以看到此次请求组装了一个唯一serviceKey。exporterMap 里面存了每个服务的数据,和每个服务的 metadata 的数据。

在这里插入图片描述


1-3-1、invoker的执行点

大概节点如下:

org.apache.dubbo.remoting.transport.DecodeHandler#received
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter#reply

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#DubboProtocol#reply

@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {// ...Result result = invoker.invoke(inv);return result.thenApply(Function.identity());
}

在这里插入图片描述



二、@DubboService 代理对象的生成


上面的流程大概是:

  1. Netty收到请求
  2. 反序列化请求,得到明文数据
  3. 基于请求找到invoker (从exporterMap 中获取)
  4. 执行invoker

2-1、代理对象生成入口


既如此就来看看 exporterMap 中的数据是何时存入的? 通过@DubboService、@DubboReference解析原理 得知生成invoker的方法如下

org.apache.dubbo.config.ServiceConfig#doExportUrl

private void doExportUrl(URL url, boolean withMetaData, RegisterTypeEnum registerType) {// ... // 使用 javassist 生成代理对象Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);// 一个@DubboService会生成两个代理对象,一个正常的,一个 MetaDataif (withMetaData) {invoker = new DelegateProviderMetaDataInvoker(invoker, this);}// 对 invoker进行包装处理,在这一步存入 exporterMapExporter<?> exporter = protocolSPI.export(invoker);exporters.computeIfAbsent(registerType, k -> new CopyOnWriteArrayList<>()).add(exporter);
}

2-2、使用Filter包装代理对象


消费者和生产者都一样,最终的invoker都被封装到CallbackRegistrationInvoker里面,而CallbackRegistrationInvoker包含了自定义过滤器,下一节要讲的自定义过滤器这里就是起点。


step 1

org.apache.dubbo.config.ServiceConfig#doExportUrl

private void doExportUrl(URL url, boolean withMetaData, RegisterTypeEnum registerType) {// ...Exporter<?> exporter = protocolSPI.export(invoker);// ...
}

step 2

org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {if (UrlUtils.isRegistry(invoker.getUrl())) {return protocol.export(invoker);}FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}

step 3

org.apache.dubbo.rpc.cluster.filter.DefaultFilterChainBuilder#buildInvokerChain

@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {Invoker<T> last = originalInvoker;URL url = originalInvoker.getUrl();List<ModuleModel> moduleModels = getModuleModelsFromUrl(url);List<Filter> filters;// ... 找到合适的 filters// 开启地狱嵌套包装Filterif (!CollectionUtils.isEmpty(filters)) {for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);final Invoker<T> next = last;last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);}return new CallbackRegistrationInvoker<>(last, filters);}return last;
}

2-3、存入 exporterMap


默认使用的是Dubbo协议,所以走的是 DubboProtocol

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {checkDestroyed();URL url = invoker.getUrl();// export service.String key = serviceKey(url);DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);// ...openServer(url);optimizeSerialization(url);return exporter;
}public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {super(invoker);this.key = key;this.exporterMap = exporterMap;// 存入了exporterMap.put(key, this);
}

三、 invoker的执行


它的执行可以参看 Dubbo消费者一次请求的过程,一样的,都是先走完过滤器,最终再执行对应的方法


http://www.ppmy.cn/server/150739.html

相关文章

opencv——图片矫正

图像矫正 图像矫正的原理是透视变换&#xff0c;下面来介绍一下透视变换的概念。 听名字有点熟&#xff0c;我们在图像旋转里接触过仿射变换&#xff0c;知道仿射变换是把一个二维坐标系转换到另一个二维坐标系的过程&#xff0c;转换过程坐标点的相对位置和属性不发生变换&a…

mysql的执行计划分析和索引下推以及索引长度计算

1 执行计划介绍 执行计划&#xff08;Execution Plan&#xff09;是数据库查询优化的重要工具&#xff0c;用于展示数据库如何执行 SQL 查询的详细过程。它包含了查询操作的步骤、各个步骤的执行顺序、使用的索引、访问的表、连接方式、预计的成本等信息 可以显示SQL语句最终…

腾讯云海外服务器Window切换为linux系统(从Window DD 到 Linux)

腾讯云提示&#xff1a;不支持重装为该镜像&#xff0c;非中国大陆地域不支持Linux系统和Windows系统之间互转 买了腾讯云的海外服务器&#xff0c;重装系统的时候发现无法切换&#xff0c;直接dd到linux系统&#xff0c;以下是全过程。记录一下。 主要是用到一个开源项目&…

vue3 setup语法,子组件点击一个元素打印了这个元素的下标id,怎么传递给父组件,让父组件去使用

问&#xff1a; vue3 setup语法&#xff0c;子组件点击一个元素打印了这个元素的下标id&#xff0c;怎么传递给父组件&#xff0c;让父组件去使用 回答&#xff1a; 在 Vue 3 中&#xff0c;你可以使用 setup 语法糖和组合式 API 来实现子组件向父组件传递数据。具体来说&am…

电脑win11家庭版升级专业版和企业版相关事项

我的是零刻ser9&#xff0c;自带win11家庭版&#xff0c;但是我有远程操控需求&#xff0c;想用windows系统自带的远程连接功能&#xff0c;所以需要升级为专业版。然后在系统激活页面通过更改序列号方式&#xff0c;淘宝几块钱买了个序列号升级成功专业版了。但是&#xff0c;…

Edge SCDN 边缘安全加速有什么用?

Edge SCDN是最新推出的边缘安全加速服务&#xff0c;它是一种融合了安全防护和内容分发加速功能的网络服务技术&#xff0c;通过在网络边缘部署服务器节点&#xff0c;来优化内容的传输和用户的访问体验&#xff0c;同时保障网络安全。 抵御 DDoS 攻击&#xff1a; Edge SCDN …

scala的多维数组

创建多维数组 创建多维数组可以使用Array.ofDim方法&#xff0c;该方法接受一个或多个整数参数&#xff0c;分别代表每个维度的大小。 // 创建一个3x3的二维数组&#xff0c;类型为Int val matrix Array.ofDim[Int](3, 3)// 创建一个3x3x3的三维数组&#xff0c;类型为Doubl…

蓝桥杯刷题——day5

蓝桥杯刷题——day5 题目一题干解题思路一代码解题思路二代码 题目二题干解题思路代码 题目一 题干 给定n个整数 a1,a2,⋯ ,an&#xff0c;求它们两两相乘再相加的和&#xff0c;即&#xff1a; 示例一&#xff1a; 输入&#xff1a; 4 1 3 6 9 输出&#xff1a; 117 题目链…