HeaderExchangeClient 中很多方法只有一行代码,即调用 HeaderExchangeChannel 对象的同签名方法。那 HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑。心跳检测并非本文所关注的点,因此就不多说了,继续向下看。
final class HeaderExchangeChannel implements ExchangeChannel {private final Channel channel;HeaderExchangeChannel(Channel channel) {if (channel == null) {throw new IllegalArgumentException("channel == null");}// 这里的 channel 指向的是 NettyClientthis.channel = channel;}@Overridepublic ResponseFuture request(Object request) throws RemotingException {return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));}@Overridepublic ResponseFuture request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(..., "Failed to send request ...");}// 创建 Request 对象Request req = new Request();req.setVersion(Version.getProtocolVersion());// 设置双向通信标志为 truereq.setTwoWay(true);// 这里的 request 变量类型为 RpcInvocationreq.setData(request);// 创建 DefaultFuture 对象DefaultFuture future = new DefaultFuture(channel, req, timeout);try {// 调用 NettyClient 的 send 方法发送请求channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}// 返回 DefaultFuture 对象return future;}
}
到这里大家终于看到了 Request 语义了,上面的方法首先定义了一个 Request 对象,然后再将该对象传给 NettyClient 的 send 方法,进行后续的调用。需要说明的是,NettyClient 中并未实现 send 方法,该方法继承自父类 AbstractPeer,下面直接分析 AbstractPeer 的代码。
public abstract class AbstractPeer implements Endpoint, ChannelHandler {@Overridepublic void send(Object message) throws RemotingException {// 该方法由 AbstractClient 类实现send(message, url.getParameter(Constants.SENT_KEY, false));}// 省略其他方法
}public abstract class AbstractClient extends AbstractEndpoint implements Client {@Overridepublic void send(Object message, boolean sent) throws RemotingException {if (send_reconnect && !isConnected()) {connect();}// 获取 Channel,getChannel 是一个抽象方法,具体由子类实现Channel channel = getChannel();if (channel == null || !channel.isConnected()) {throw new RemotingException(this, "message can not send ...");}// 继续向下调用channel.send(message, sent);}protected abstract Channel getChannel();// 省略其他方法
}
默认情况下,Dubbo 使用 Netty 作为底层的通信框架,因此下面我们到 NettyClient 类中看一下 getChannel 方法的实现逻辑。
public class NettyClient extends AbstractClient {// 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channelprivate volatile Channel channel;@Overrideprotected com.alibaba.dubbo.remoting.Channel getChannel() {Channel c = channel;if (c == null || !c.isConnected())return null;// 获取一个 NettyChannel 类型对象return NettyChannel.getOrAddChannel(c, getUrl(), this);}
}final class NettyChannel extends AbstractChannel {private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();private final org.jboss.netty.channel.Channel channel;/** 私有构造方法 */private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {super(url, handler);if (channel == null) {throw new IllegalArgumentException("netty channel == null;");}this.channel = channel;}static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {if (ch == null) {return null;}// 尝试从集合中获取 NettyChannel 实例NettyChannel ret = channelMap.get(ch);if (ret == null) {// 如果 ret = null,则创建一个新的 NettyChannel 实例NettyChannel nc = new NettyChannel(ch, url, handler);if (ch.isConnected()) {// 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中ret = channelMap.putIfAbsent(ch, nc);}if (ret == null) {ret = nc;}}return ret;}
}
获取到 NettyChannel 实例后,即可进行后续的调用。下面看一下 NettyChannel 的 send 方法。
public void send(Object message, boolean sent) throws RemotingException {super.send(message, sent);boolean success = true;int timeout = 0;try {// 发送消息(包含请求和响应消息)ChannelFuture future = channel.write(message);// sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:// 1. true: 等待消息发出,消息发送失败将抛出异常// 2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回// 默认情况下 sent = false;if (sent) {timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);// 等待消息发出,若在规定时间没能发出,success 会被置为 falsesuccess = future.await(timeout);}Throwable cause = future.getCause();if (cause != null) {throw cause;}} catch (Throwable e) {throw new RemotingException(this, "Failed to send message ...");}// 若 success 为 false,这里抛出异常if (!success) {throw new RemotingException(this, "Failed to send message ...");}
}
经历多次调用,到这里请求数据的发送过程就结束了,过程漫长。为了便于大家阅读代码,这里以 DemoService 为例,将 sayHello 方法的整个调用路径贴出来。
proxy0#sayHello(String)—> InvokerInvocationHandler#invoke(Object, Method, Object[])—> MockClusterInvoker#invoke(Invocation)—> AbstractClusterInvoker#invoke(Invocation)—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)—> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用—> ListenerInvokerWrapper#invoke(Invocation) —> AbstractInvoker#invoke(Invocation) —> DubboInvoker#doInvoke(Invocation)—> ReferenceCountExchangeClient#request(Object, int)—> HeaderExchangeClient#request(Object, int)—> HeaderExchangeChannel#request(Object, int)—> AbstractPeer#send(Object)—> AbstractClient#send(Object, boolean)—> NettyChannel#send(Object, boolean)—> NioClientSocketChannel#write(Object)
在 Netty 中,出站数据在发出之前还需要进行编码操作,接下来我们来分析一下请求数据的编码逻辑。
2.2.2 请求编码
在分析请求编码逻辑之前,我们先来看一下 Dubbo 数据包结构。
Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。
偏移量(Bit) | 字段 | 取值 |
---|---|---|
0 ~ 7 | 魔数高位 | 0xda00 |
8 ~ 15 | 魔数低位 | 0xbb |
16 | 数据包类型 | 0 – Response, 1 – Request |
17 | 调用方式 | 仅在第16位被设为1的情况下有效,0 – 单向调用,1 – 双向调用 |
18 | 事件标识 | 0 – 当前数据包是请求或响应包,1 – 当前数据包是心跳包 |
19 ~ 23 | 序列化器编号 | 2 – Hessian2Serialization 3 – JavaSerialization 4 – CompactedJavaSerialization 6 – FastJsonSerialization 7 – NativeJavaSerialization 8 – KryoSerialization 9 – FstSerialization |
24 ~ 31 | 状态 | 20 – OK 30 – CLIENT_TIMEOUT 31 – SERVER_TIMEOUT 40 – BAD_REQUEST 50 – BAD_RESPONSE … |
32 ~ 95 | 请求编号 | 共8字节,运行时生成 |
96 ~ 127 | 消息体长度 | 运行时计算 |
了解了 Dubbo 数据包格式,接下来我们就可以探索编码过程了。这次我们开门见山,直接分析编码逻辑所在类。如下:
public class ExchangeCodec extends TelnetCodec {// 消息头长度protected static final int HEADER_LENGTH = 16;// 魔数内容protected static final short MAGIC = (short) 0xdabb;protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];protected static final byte FLAG_REQUEST = (byte) 0x80;protected static final byte FLAG_TWOWAY = (byte) 0x40;protected static final byte FLAG_EVENT = (byte) 0x20;protected static final int SERIALIZATION_MASK = 0x1f;private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);public Short getMagicCode() {return MAGIC;}@Overridepublic void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {if (msg instanceof Request) {// 对 Request 对象进行编码encodeRequest(channel, buffer, (Request) msg);} else if (msg instanceof Response) {// 对 Response 对象进行编码,后面分析encodeResponse(channel, buffer, (Response) msg);} else {super.encode(channel, buffer, msg);}}protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {Serialization serialization = getSerialization(channel);// 创建消息头字节数组,长度为 16byte[] header = new byte[HEADER_LENGTH];// 设置魔数Bytes.short2bytes(MAGIC, header);// 设置数据包类型(Request/Response)和序列化器编号header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());// 设置通信方式(单向/双向)if (req.isTwoWay()) {header[2] |= FLAG_TWOWAY;}// 设置事件标识if (req.isEvent()) {header[2] |= FLAG_EVENT;}// 设置请求编号,8个字节,从第4个字节开始设置Bytes.long2bytes(req.getId(), header, 4);// 获取 buffer 当前的写位置int savedWriteIndex = buffer.writerIndex();// 更新 writerIndex,为消息头预留 16 个字节的空间buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);// 创建序列化器,比如 Hessian2ObjectOutputObjectOutput out = serialization.serialize(channel.getUrl(), bos);if (req.isEvent()) {// 对事件数据进行序列化操作encodeEventData(channel, out, req.getData());} else {// 对请求数据进行序列化操作encodeRequestData(channel, out, req.getData(), req.getVersion());}out.flushBuffer();if (out instanceof Cleanable) {((Cleanable) out).cleanup();}bos.flush();bos.close();// 获取写入的字节数,也就是消息体长度int len = bos.writtenBytes();checkPayload(channel, len);// 将消息体长度写入到消息头中Bytes.int2bytes(len, header, 12);// 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备buffer.writerIndex(savedWriteIndex);// 从 savedWriteIndex 下标处写入消息头buffer.writeBytes(header);// 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);}// 省略其他方法
}
以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对 Request 对象的 data 字段执行序列化操作,序列化后的数据最终会存储到 ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len 写入到 header 指定位置处。最后再将消息头字节数组 header 写入到 ChannelBuffer 中,整个编码过程就结束了。本节的最后,我们再来看一下 Request 对象的 data 字段序列化过程,也就是 encodeRequestData 方法的逻辑,如下:
public class DubboCodec extends ExchangeCodec implements Codec2 {protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {RpcInvocation inv = (RpcInvocation) data;// 依次序列化 dubbo version、path、versionout.writeUTF(version);out.writeUTF(inv.getAttachment(Constants.PATH_KEY));out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));// 序列化调用方法名out.writeUTF(inv.getMethodName());// 将参数类型转换为字符串,并进行序列化out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));Object[] args = inv.getArguments();if (args != null)for (int i = 0; i < args.length; i++) {// 对运行时参数进行序列化out.writeObject(encodeInvocationArgument(channel, inv, i));}// 序列化 attachmentsout.writeObject(inv.getAttachments());}
}
至此,关于服务消费方发送请求的过程就分析完了,接下来我们来看一下服务提供方是如何接收请求的。
2.3 服务提供方接收请求
前面说过,默认情况下 Dubbo 使用 Netty 作为底层的通信框架。Netty 检测到有数据入站后,首先会通过解码器对数据进行解码,并将解码后的数据传递给下一个入站处理器的指定方法。所以在进行后续的分析之前,我们先来看一下数据解码过程。
2.3.1 请求解码
这里直接分析请求数据的解码逻辑,忽略中间过程,如下:
public class ExchangeCodec extends TelnetCodec {@Overridepublic Object decode(Channel channel, ChannelBuffer buffer) throws IOException {int readable = buffer.readableBytes();// 创建消息头字节数组byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];// 读取消息头数据buffer.readBytes(header);// 调用重载方法进行后续解码工作return decode(channel, buffer, readable, header);}@Overrideprotected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {// 检查魔数是否相等if (readable > 0 && header[0] != MAGIC_HIGH|| readable > 1 && header[1] != MAGIC_LOW) {int length = header.length;if (header.length < readable) {header = Bytes.copyOf(header, readable);buffer.readBytes(header, length, readable - length);}for (int i = 1; i < header.length - 1; i++) {if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {buffer.readerIndex(buffer.readerIndex() - header.length + i);header = Bytes.copyOf(header, i);break;}}// 通过 telnet 命令行发送的数据包不包含消息头,所以这里// 调用 TelnetCodec 的 decode 方法对数据包进行解码return super.decode(channel, buffer, readable, header);}// 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUTif (readable < HEADER_LENGTH) {return DecodeResult.NEED_MORE_INPUT;}// 从消息头中获取消息体长度int len = Bytes.bytes2int(header, 12);// 检测消息体长度是否超出限制,超出则抛出异常checkPayload(channel, len);int tt = len + HEADER_LENGTH;// 检测可读的字节数是否小于实际的字节数if (readable < tt) {return DecodeResult.NEED_MORE_INPUT;}ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);try {// 继续进行解码工作return decodeBody(channel, is, header);} finally {if (is.available() > 0) {try {StreamUtils.skipUnusedStream(is);} catch (IOException e) {logger.warn(e.getMessage(), e);}}}}
}
上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet 命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测。最后调用 decodeBody 方法进行后续的解码工作,ExchangeCodec 中实现了 decodeBody 方法,但因其子类 DubboCodec 覆写了该方法,所以在运行时 DubboCodec 中的 decodeBody 方法会被调用。下面我们来看一下该方法的代码。
public class DubboCodec extends ExchangeCodec implements Codec2 {@Overrideprotected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {// 获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);// 获取调用编号long id = Bytes.bytes2long(header, 4);// 通过逻辑与运算得到调用类型,0 - Response,1 - Requestif ((flag & FLAG_REQUEST) == 0) {// 对响应结果进行解码,得到 Response 对象。这个非本节内容,后面再分析// ...} else {// 创建 Request 对象Request req = new Request(id);req.setVersion(Version.getProtocolVersion());// 通过逻辑与运算得到通信方式,并设置到 Request 对象中req.setTwoWay((flag & FLAG_TWOWAY) != 0);// 通过位运算检测数据包是否为事件类型if ((flag & FLAG_EVENT) != 0) {// 设置心跳事件到 Request 对象中req.setEvent(Request.HEARTBEAT_EVENT);}try {Object data;if (req.isHeartbeat()) {// 对心跳包进行解码,该方法已被标注为废弃data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));} else if (req.isEvent()) {// 对事件数据进行解码data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));} else {DecodeableRpcInvocation inv;// 根据 url 参数判断是否在 IO 线程上对消息体进行解码if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY,Constants.DEFAULT_DECODE_IN_IO_THREAD)) {inv = new DecodeableRpcInvocation(channel, req, is, proto);// 在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将// 调用方法名、attachment、以及调用参数解析出来inv.decode();} else {// 仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑inv = new DecodeableRpcInvocation(channel, req,new UnsafeByteArrayInputStream(readMessageData(is)), proto);}data = inv;}// 设置 data 到 Request 对象中req.setData(data);} catch (Throwable t) {// 若解码过程中出现异常,则将 broken 字段设为 true,// 并将异常对象设置到 Reqeust 对象中req.setBroken(true);req.setData(t);}return req;}}
}
如上,decodeBody 对部分字段进行了解码,并将解码得到的字段封装到 Request 中。随后会调用 DecodeableRpcInvocation 的 decode 方法进行后续的解码工作。此工作完成后,可将调用方法名、attachment、以及运行时调用参数解析出来。下面我们来看一下 DecodeableRpcInvocation 的 decode 方法逻辑。
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {@Overridepublic Object decode(Channel channel, InputStream input) throws IOException {ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType).deserialize(channel.getUrl(), input);// 通过反序列化得到 dubbo version,并保存到 attachments 变量中String dubboVersion = in.readUTF();request.setVersion(dubboVersion);setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);// 通过反序列化得到 path,version,并保存到 attachments 变量中setAttachment(Constants.PATH_KEY, in.readUTF());setAttachment(Constants.VERSION_KEY, in.readUTF());// 通过反序列化得到调用方法名setMethodName(in.readUTF());try {Object[] args;Class<?>[] pts;// 通过反序列化得到参数类型字符串,比如 Ljava/lang/String;String desc = in.readUTF();if (desc.length() == 0) {pts = DubboCodec.EMPTY_CLASS_ARRAY;args = DubboCodec.EMPTY_OBJECT_ARRAY;} else {// 将 desc 解析为参数类型数组pts = ReflectUtils.desc2classArray(desc);args = new Object[pts.length];for (int i = 0; i < args.length; i++) {try {// 解析运行时参数args[i] = in.readObject(pts[i]);} catch (Exception e) {if (log.isWarnEnabled()) {log.warn("Decode argument failed: " + e.getMessage(), e);}}}}// 设置参数类型数组setParameterTypes(pts);// 通过反序列化得到原 attachments 的内容Map<String, String> map = (Map<String, String>) in.readObject(Map.class);if (map != null && map.size() > 0) {Map<String, String> attachment = getAttachments();if (attachment == null) {attachment = new HashMap<String, String>();}// 将 map 与当前对象中的 attachment 集合进行融合attachment.putAll(map);setAttachments(attachment);}// 对 callback 类型的参数进行处理for (int i = 0; i < args.length; i++) {args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);}// 设置参数列表setArguments(args);} catch (ClassNotFoundException e) {throw new IOException(StringUtils.toString("Read invocation data failed.", e));} finally {if (in instanceof Cleanable) {((Cleanable) in).cleanup();}}return this;}
}
上面的方法通过反序列化将诸如 path、version、调用方法名、参数列表等信息依次解析出来,并设置到相应的字段中,最终得到一个具有完整调用信息的 DecodeableRpcInvocation 对象。
到这里,请求数据解码的过程就分析完了。此时我们得到了一个 Request 对象,这个对象会被传送到下一个入站处理器中,我们继续往下看。
2.3.2 调用服务
解码器将数据包解析成 Request 对象后,NettyHandler 的 messageReceived 方法紧接着会收到这个对象,并将这个对象继续向下传递。这期间该对象会被依次传递给 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 将该对象封装到 Runnable 实现类对象中,并将 Runnable 放入线程池中执行后续的调用逻辑。整个调用栈如下:
NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)—> AbstractPeer#received(Channel, Object)—> MultiMessageHandler#received(Channel, Object)—> HeartbeatHandler#received(Channel, Object)—> AllChannelHandler#received(Channel, Object)—> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑
考虑到篇幅,以及很多中间调用的逻辑并非十分重要,所以这里就不对调用栈中的每个方法都进行分析了。这里我们直接分析调用栈中的分析第一个和最后一个调用方法逻辑。如下:
@Sharable
public class NettyHandler extends SimpleChannelHandler {private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>();private final URL url;private final ChannelHandler handler;public NettyHandler(URL url, ChannelHandler handler) {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}this.url = url;// 这里的 handler 类型为 NettyServerthis.handler = handler;}public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {// 获取 NettyChannelNettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);try {// 继续向下调用handler.received(channel, e.getMessage());} finally {NettyChannel.removeChannelIfDisconnected(ctx.getChannel());}}
}
如上,NettyHandler 中的 messageReceived 逻辑比较简单。首先根据一些信息获取 NettyChannel 实例,然后将 NettyChannel 实例以及 Request 对象向下传递。下面再来看看 AllChannelHandler 的逻辑,在详细分析代码之前,我们先来了解一下 Dubbo 中的线程派发模型。
2.3.2.1 线程派发模型
Dubbo 将底层通信框架中接收请求的线程称为 IO 线程。如果一些事件处理逻辑可以很快执行完,比如只在内存打一个标记,此时直接在 IO 线程上执行该段逻辑即可。但如果事件的处理逻辑比较耗时,比如该段逻辑会发起数据库查询或者 HTTP 请求。此时我们就不应该让事件处理逻辑在 IO 线程上执行,而是应该派发到线程池中去执行。原因也很简单,IO 线程主要用于接收请求,如果 IO 线程被占满,将导致它不能接收新的请求。
以上就是线程派发的背景,下面我们再来通过 Dubbo 调用图,看一下线程派发器所处的位置。
如上图,红框中的 Dispatcher 就是线程派发器。需要说明的是,Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具备线程派发能力。Dubbo 支持 5 种不同的线程派发策略,下面通过一个表格列举一下。
策略 | 用途 |
---|---|
all | 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等 |
direct | 所有消息都不派发到线程池,全部在 IO 线程上直接执行 |
message | 只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行 |
execution | 只有请求消息派发到线程池,不含响应。其它消息均在 IO 线程上执行 |
connection | 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池 |
默认配置下,Dubbo 使用 all
派发策略,即将所有的消息都派发到线程池中。下面我们来分析一下 AllChannelHandler 的代码。
public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {super(handler, url);}/** 处理连接事件 */@Overridepublic void connected(Channel channel) throws RemotingException {// 获取线程池ExecutorService cexecutor = getExecutorService();try {// 将连接事件派发到线程池中处理cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException(..., " error when process connected event .", t);}}/** 处理断开事件 */@Overridepublic void disconnected(Channel channel) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException(..., "error when process disconnected event .", t);}}/** 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response */@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {// 将请求和响应消息派发到线程池中处理cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request && t instanceof RejectedExecutionException){Request request = (Request)message;// 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted // 错误信息封装到 Response 中,并返回给服务消费方。if(request.isTwoWay()){String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();Response response = new Response(request.getId(), request.getVersion());response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);response.setErrorMessage(msg);// 返回包含错误信息的 Response 对象channel.send(response);return;}}throw new ExecutionException(..., " error when process received event .", t);}}/** 处理异常信息 */@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException(..., "error when process caught event ...");}}
}
如上,请求对象会被封装 ChannelEventRunnable 中,ChannelEventRunnable 将会是服务调用过程的新起点。所以接下来我们以 ChannelEventRunnable 为起点向下探索。
2.3.2.2 调用服务
本小节,我们从 ChannelEventRunnable 开始分析,该类的主要代码如下:
public class ChannelEventRunnable implements Runnable {private final ChannelHandler handler;private final Channel channel;private final ChannelState state;private final Throwable exception;private final Object message;@Overridepublic void run() {// 检测通道状态,对于请求或响应消息,此时 state = RECEIVEDif (state == ChannelState.RECEIVED) {try {// 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用handler.received(channel, message);} catch (Exception e) {logger.warn("... operation error, channel is ... message is ...");}} // 其他消息类型通过 switch 进行处理else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn("... operation error, channel is ...");}break;case DISCONNECTED:// ...case SENT:// ...case CAUGHT:// ...default:logger.warn("unknown state: " + state + ", message is " + message);}}}
}
如上,请求和响应消息出现频率明显比其他类型消息高,所以这里对该类型的消息进行了针对性判断。ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler。