Dubbo生产者一次请求的过程 (Dubbo源码三)
https://www.bilibili.com/video/BV1FJSCY9E85
相较于Dubbo消费者一次请求的过程,生产者的流程相对复杂一些,主要是因为触发点不好找。
这篇文章通过解决以下三个问题来学习源码
- 请求的触发点(消费者发送一个请求,生产者如何收到并解析?如何找到对应的服务)
- 再详细了解一下@DubboService生成的代理对象
- 基于生成的代理对象,看看执行流程是怎样的
一、请求的触发点
Dubbo底层通讯是基于Netty,请求第一步肯定是从Netty收到消息开始的。Netty收到消息也肯定是不是明文的,这一节要解决如下问题
- Netty接收消息的入口
- 如何把消息解析成明文
- 怎么通过消息找到对应的服务
1-1、接收消息的入口
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
暂且把这里理解为入口(它前面还有一些流程,个人认为这里好点),从方法名和参数,很容易理解,这里的代码很简单,循环从管道去读取数据,并处理
可以看到msg此时并非一个明文的数据
这里并不是线性执行的,从上面的图片可以看到有一个循环(next不停的去执行下一个操作),每一个节点只做一件事。和Dubbo消费者一次请求的过程和invoker的执行流程类似,一层一层的
1-2、解码入口
上面循环处理操作很多,有一个是解码
调用的链路大致如下,可以看到从Netty到Dubbo
- io.netty.handler.codec.ByteToMessageDecoder#channelRead
- org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter.InternalDecoder#decode
- org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec#decode
- org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#decodeBody
- org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody
org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBody 方法很长参数是流,这里面会把流解析成明文数据,下一步仔细看如何解析
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {// ...req.setData(data);// ...
}
1-2-1、如何把消息解析成明文
解析的本质就是反序列化,Dubbo中支持很多序列化(Hessian2、Java、FastJson2等),消费者会把序列化方式放在header中,这样生产者就知道该用哪种序列化了
step 1
org.apache.dubbo.rpc.protocol.dubbo.DubboCodec#decodeBodyprotected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {// 获取序列化idbyte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);// 获取 请求idlong id = Bytes.bytes2long(header, 4);if ((flag & FLAG_REQUEST) == 0) {} else {// decode request.Request req;try {Object data;if ((flag & FLAG_EVENT) != 0) {// ...} else {req = new Request(id);req.setVersion(Version.getProtocolVersion());req.setTwoWay((flag & FLAG_TWOWAY) != 0);// get data length.int len = Bytes.bytes2int(header, 12);req.setPayload(len);DecodeableRpcInvocation inv;if (isDecodeDataInIoThread(channel)) {if (customByteAccessor != null) {// ... } else {// 这里会把 proto(这个就是指定序列化的方式) 传递过去,赋值给 serializationType, 下一步可以看到inv = new DecodeableRpcInvocation(frameworkModel, channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);}// 开始序列化的inv.decode();} else {// ... }data = inv;}// 设置序列化的结果req.setData(data);} catch (Throwable t) {// ... }return req;}
}
step 2
org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#decode(org.apache.dubbo.remoting.Channel, java.io.InputStream)
public Object decode(Channel channel, InputStream input) throws IOException {int contentLength = input.available();getAttributes().put(Constants.CONTENT_LENGTH_KEY, contentLength);// 找到序列化的方式,然后反序列化数据ObjectInput in = CodecSupport.getSerialization(serializationType).deserialize(channel.getUrl(), input);this.put(SERIALIZATION_ID_KEY, serializationType);// ... 这下面就是反序列化各种数据return this;
}
step 3
org.apache.dubbo.remoting.transport.CodecSupport#getSerialization(java.lang.Byte)
public static Serialization getSerialization(Byte id) throws IOException {Serialization result = getSerializationById(id);if (result == null) {throw new IOException("Unrecognized serialize type from consumer: " + id);}return result;
}// 这个Map里面存了各种的序列化方式
public static Serialization getSerializationById(Byte id) {return ID_SERIALIZATION_MAP.get(id);
}
序列化方式的加载是通过SPI的方式
感兴趣的可以参看 Dubbo自定义过滤器,过滤器源码详解 里面介绍了SPI
org.apache.dubbo.common.serialize.Serialization
通过上面的一系列解析操作,可以得到请求的明文 【Request】
- 包含这次请求的方法,参数等等
- 但request里面的 invoker还是为空,真正执行的肯定是这个 invoker
1-3、找到此次请求的 invoker
当解析完请求的Request的时候,又回到了最开始解析数据的时候,这时候会发现有一个 NettyServerHandler,在这里会通过此次请求的参数找到对应的 invoker
大致流程如下
- io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
- org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#channelRead
- org.apache.dubbo.remoting.transport.AbstractPeer#received
- org.apache.dubbo.remoting.transport.MultiMessageHandler#received
- org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler#received
- org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler#received
- org.apache.dubbo.rpc.protocol.dubbo.DubboIsolationExecutorSupport#getProviderModel
- org.apache.dubbo.rpc.protocol.dubbo.DecodeableRpcInvocation#fillInvoker
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getInvoker
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {// ... DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);// ...Invoker<?> invoker = exporter.getInvoker();inv.setServiceModel(invoker.getUrl().getServiceModel());return invoker;
}
可以看到此次请求组装了一个唯一serviceKey。exporterMap 里面存了每个服务的数据,和每个服务的 metadata 的数据。
1-3-1、invoker的执行点
大概节点如下:
org.apache.dubbo.remoting.transport.DecodeHandler#received
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter#reply
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#DubboProtocol#reply
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {// ...Result result = invoker.invoke(inv);return result.thenApply(Function.identity());
}
二、@DubboService 代理对象的生成
上面的流程大概是:
- Netty收到请求
- 反序列化请求,得到明文数据
- 基于请求找到invoker (从exporterMap 中获取)
- 执行invoker
2-1、代理对象生成入口
既如此就来看看 exporterMap 中的数据是何时存入的? 通过@DubboService、@DubboReference解析原理 得知生成invoker的方法如下
org.apache.dubbo.config.ServiceConfig#doExportUrl
private void doExportUrl(URL url, boolean withMetaData, RegisterTypeEnum registerType) {// ... // 使用 javassist 生成代理对象Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);// 一个@DubboService会生成两个代理对象,一个正常的,一个 MetaDataif (withMetaData) {invoker = new DelegateProviderMetaDataInvoker(invoker, this);}// 对 invoker进行包装处理,在这一步存入 exporterMapExporter<?> exporter = protocolSPI.export(invoker);exporters.computeIfAbsent(registerType, k -> new CopyOnWriteArrayList<>()).add(exporter);
}
2-2、使用Filter包装代理对象
消费者和生产者都一样,最终的invoker都被封装到CallbackRegistrationInvoker里面,而CallbackRegistrationInvoker包含了自定义过滤器,下一节要讲的自定义过滤器这里就是起点。
step 1
org.apache.dubbo.config.ServiceConfig#doExportUrl
private void doExportUrl(URL url, boolean withMetaData, RegisterTypeEnum registerType) {// ...Exporter<?> exporter = protocolSPI.export(invoker);// ...
}
step 2
org.apache.dubbo.rpc.cluster.filter.ProtocolFilterWrapper#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {if (UrlUtils.isRegistry(invoker.getUrl())) {return protocol.export(invoker);}FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
step 3
org.apache.dubbo.rpc.cluster.filter.DefaultFilterChainBuilder#buildInvokerChain
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {Invoker<T> last = originalInvoker;URL url = originalInvoker.getUrl();List<ModuleModel> moduleModels = getModuleModelsFromUrl(url);List<Filter> filters;// ... 找到合适的 filters// 开启地狱嵌套包装Filterif (!CollectionUtils.isEmpty(filters)) {for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);final Invoker<T> next = last;last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);}return new CallbackRegistrationInvoker<>(last, filters);}return last;
}
2-3、存入 exporterMap
默认使用的是Dubbo协议,所以走的是 DubboProtocol
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {checkDestroyed();URL url = invoker.getUrl();// export service.String key = serviceKey(url);DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);// ...openServer(url);optimizeSerialization(url);return exporter;
}public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {super(invoker);this.key = key;this.exporterMap = exporterMap;// 存入了exporterMap.put(key, this);
}
三、 invoker的执行
它的执行可以参看 Dubbo消费者一次请求的过程,一样的,都是先走完过滤器,最终再执行对应的方法