基于Dubbo 3.1,详细介绍了Dubbo Consumer服务调用源码。
上文我们学习了,Dubbo 发起服务调用的上半部分源码,我们学习到了FailoverClusterInvoker最终会通过服务提供者Invoker#invoke发起RPC调用,下面我们来学习Dubbo 发起服务调用的下半部分源码,也就是真正的RPC调用的源码。
Dubbo 3.x服务调用源码:
- Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
- Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
- Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
- Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
- Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
- Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
- Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
- Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
- Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
- Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布
文章目录
- AbstractInvoker#invoke服务提供者调用
- AbstractInvoker#doInvokeAndReturn执行rpc调用
- DubboInvoker#doInvoke调用remote层远程通信
- getCallbackExecutor用于获取回调线程池。
- ReferenceCountExchangeClient#request发起rpc请求
- HeaderExchangeClient#request发起rpc请求
- HeaderExchangeChannel#request异步发起rpc请求
- Request请求内容
- DefaultFuture#newFuture创建Future并启动超时检测
- new DefaultFuture
- timeoutCheck超时检测
- AbstractPeer#send发起异步请求
- AbstractClient#send发起异步请求
- NettyChannel#send基于netty发起异步请求
- AbstractInvoker#waitForResultIfSync异步转同步等待
- AsyncRpcResult#get阻塞获取结果
- ThreadlessExecutor#waitAndDrain等待返回
- 阻塞优化
- Result#recreate获取结果
- AsyncRpcResult#getAppResponse获取响应结果
- AppResponse#createDefaultValue创建默认返回值
- AppResponse#recreate处理结果
- 总结
AbstractInvoker#invoke服务提供者调用
ListenerInvokerWrapper#invoke方法没有其他操作,因此直接看AbstractInvoker#invoker方法。到这里,调用者变成了服务提供者invoker。
该方法首先调用doInvokeAndReturn方法执行RPC调用并返回异步执行结果,最后调用waitForResultIfSync方法判断如果同步调用,则等待RPC结果。
java">/*** AbstractInvoker的方法*/
@Override
public Result invoke(Invocation inv) throws RpcException {// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed//如果由于注册表的地址刷新导致调用程序被销毁,则允许当前调用继续进行if (isDestroyed()) {logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");}RpcInvocation invocation = (RpcInvocation) inv;//准备RpcInvocationprepareInvocation(invocation);//执行RPC调用并返回异步执行结果AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);//如果同步调用,则等待RPC结果waitForResultIfSync(asyncResult, invocation);return asyncResult;
}
rpc_82">AbstractInvoker#doInvokeAndReturn执行rpc调用
执行RPC调用并返回异步执行结果AsyncRpcResult。
java">/*** AbstractInvoker的方法* <p>* 执行RPC调用并返回异步执行结果** @param invocation* @return*/
private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {AsyncRpcResult asyncResult;try {/** 执行调用*/asyncResult = (AsyncRpcResult) doInvoke(invocation);} catch (InvocationTargetException e) {//异常处理Throwable te = e.getTargetException();//asyncResult封装抛出的异常if (te != null) {// if biz exceptionif (te instanceof RpcException) {((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);}asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);} else {asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);}} catch (RpcException e) {// if biz exceptionif (e.isBiz()) {asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);} else {throw e;}} catch (Throwable e) {//asyncResult封装抛出的异常asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);}//当调用模式为异步时,或者setFutureWhenSync=true时if (setFutureWhenSync || invocation.getInvokeMode() != InvokeMode.SYNC) {// set server context 将future设置到线程本地变量中,使用FutureAdapter包装RpcContext.getServiceContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));}return asyncResult;
}
DubboInvoker#doInvoke调用remote层远程通信
该方法由invoker具体的子类实现,以默认dubbo协议为例子,对应的invoker为DubboInvoker。该方法将会调用remote层进行rpc通信。
- 基于轮询的策略选择一个remote层客户端ExchangeClient,用以发起rpc调用,一般来说只有一个client,即共享连接。
- 判断如果是单向发送,那么直接发送异步请求,随后返回默认的AsyncRpcResult,不需要获取真正的请求响应结果。
- 同步、异步发送处理。Dubbo默认都是走的异步发送,发送完毕之后得到一个CompletableFuture,将CompletableFuture包装为一个AsyncRpcResult返回。
- 在外层AbstractInvoker#invoker方法中会判断,如果同步则会调用future.get阻塞等待结果。这就是Dubbo所谓的异步转同步处理。
java">/*** DubboInvoker的方法* <p>* 执行RPC调用并返回执行结果*/
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;//调用方法名final String methodName = RpcUtils.getMethodName(invocation);//设置附加属性 path={serviceInterface}inv.setAttachment(PATH_KEY, getUrl().getPath());//设置附加属性 version={version}inv.setAttachment(VERSION_KEY, version);/** 基于轮询的策略选择一个remote层客户端,用以发起rpc调用* 一般来说只有一个client,即共享连接*/ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {//是否是单向通讯,即只管发送不需要返回结果boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//计算rpc调用超时时间,默认3000msint timeout = calculateTimeout(invocation, methodName);invocation.setAttachment(TIMEOUT_KEY, timeout);/** 单向发送处理*/if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);//发送单向请求currentClient.send(inv, isSent);//返回默认的AsyncRpcResult,不需要真正的请求响应结果return AsyncRpcResult.newDefaultAsyncResult(invocation);}/** 同步、异步发送处理* 默认都是走的异步发送*/else {//回调线程池ExecutorService executor = getCallbackExecutor(getUrl(), inv);//异步发送请求,发送完毕之后得到一个CompletableFutureCompletableFuture<AppResponse> appResponseFuture =currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapterFutureContext.getContext().setCompatibleFuture(appResponseFuture);//将CompletableFuture包装为一个AsyncRpcResultAsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);result.setExecutor(executor);return result;}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}
}
getCallbackExecutor用于获取回调线程池。
- 如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步。
- ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程。通过execute(Runnable)方法提交给该Executor的任务并不会被调度到特定的线程执行。这些任务将会被存储在阻塞队列中,只有当线程调用waitAndDrain()方法时才会执行,执行该任务的线程与调用waitAndDrain的线程完全相同。
- 如果是异步调用,则获取对应的多线程的线程池。
java">protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {//如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步//ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {return new ThreadlessExecutor();}//如果是异步调用,则获取对应的多线程的线程池return url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
}
rpc_232">ReferenceCountExchangeClient#request发起rpc请求
接下来我们就来到了remote层,将会真正的发起rpc请求。入口方法是ReferenceCountExchangeClient#request方法,内部默认委托HeaderExchangeClient#request发起请求。
java">@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {//默认通过HeaderExchangeClient发起请求return client.request(request, timeout, executor);
}
rpc_243">HeaderExchangeClient#request发起rpc请求
HeaderExchangeClient#request方法,内部默认委托HeaderExchangeChannel#request发起请求。
java">@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {//默认通过HeaderExchangeChannel发起请求return channel.request(request, timeout, executor);
}
rpc_254">HeaderExchangeChannel#request异步发起rpc请求
该类的职责是负责发送网络请求,Dubbo所有的网络请求最终都会封装为Request对象,并且生成并记录唯一的mId,version,requestBody等信息,创建结果对象DefaultFuture 对象,最终通过NettyClient#send发起异步请求。
java">/*** HeaderExchangeChannel的方法* <p>* 发起异步请求** @param request 请求,如RpcInvocation* @param timeout 请求超时时间* @param executor 回调执行器* @return 未来执行结果* @throws RemotingException*/
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request. 创建Request,生成唯一请求idRequest req = new Request();//设置Dubbo RPC protocol versionreq.setVersion(Version.getProtocolVersion());//设置双向请求req.setTwoWay(true);//设置请求数据,如RpcInvocationreq.setData(request);//创建 DefaultFuture 对象DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);try {/** 通过NettyClient#send发起异步请求*/channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;
}
Request请求内容
每次请求都会生成一个Request对象,表示请求的内容。创建Request的时候会生成一个自增的id,表示本次请求的唯一标识。
java">public Request() {//生成新的自增唯一idmId = newId();
}
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private static long newId() {//通过一个原子变量获取唯一自增id// getAndIncrement() 当它增长到MAX_VALUE时,它将增长到MIN_VALUE,并且负数可以用作IDreturn INVOKE_ID.getAndIncrement();
}
DefaultFuture#newFuture创建Future并启动超时检测
每次请求都会通过该方法创建一个DefaultFuture对象并进行超时检测。
java">/*** 创建一个DefaultFuture并进行超时检测** @param channel NettyClient* @param request 请求内容Request* @param timeout 请求超时时间* @return DefaultFuture*/
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {//创建一个DefaultFuture,存入缓存final DefaultFuture future = new DefaultFuture(channel, request, timeout);//设置回调执行器future.setExecutor(executor);// ThreadlessExecutor needs to hold the waiting future in case of circuit return.//ThreadlessExecutor需要保存waitingFuture以防响应返回。同步调用的执行器为ThreadlessExecutorif (executor instanceof ThreadlessExecutor) {((ThreadlessExecutor) executor).setWaitingFuture(future);}/** 超时检查*/timeoutCheck(future);return future;
}
new DefaultFuture
在DefaultFuture的构造器中,会将将请求id与当前DefaultFuture实例存入,静态map FUTURES内部,将请求id与当前NettyClient实例存入,静态map CHANNELS内部。
请求id和响应id是同一个值,因此当有响应返回时,就能从此缓存中根据id找到对应的DefaultFuture并唤醒阻塞线程。
java">/*** 正在处理的channel*/
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();/*** 正在处理的请求*/
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {//保存NettyClientthis.channel = channel;//保存requestthis.request = request;//获取mid,即请求唯一idthis.id = request.getId();//获取请求超时时间this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);//将请求id与当前DefaultFuture实例存入,静态map FUTURES内部FUTURES.put(id, this);//将请求id与当前NettyClient实例存入,静态map CHANNELS内部CHANNELS.put(id, channel);
}
timeoutCheck超时检测
创建超时检查任务TimeoutCheckTask,并且添加到dubbo时间轮中。
当超时时间到了,便会执行TimeoutCheckTask中的检查任务。将会通过id获取到对应的DefaultFuture,然后构建一个超时响应Response,通过DefaultFuture#received处理超时响应。
java">/*** 检查future的超时*/
private static void timeoutCheck(DefaultFuture future) {//创建超时检查任务TimeoutCheckTask task = new TimeoutCheckTask(future.getId());//添加到dubbo时间轮中future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}private static class TimeoutCheckTask implements TimerTask {//请求idprivate final Long requestID;TimeoutCheckTask(Long requestID) {this.requestID = requestID;}@Overridepublic void run(Timeout timeout) {//从FUTURES中获取请求id对应的DefaultFutureDefaultFuture future = DefaultFuture.getFuture(requestID);if (future == null || future.isDone()) {return;}//通过执行器或者直接通知超时if (future.getExecutor() != null) {future.getExecutor().execute(() -> notifyTimeout(future));} else {notifyTimeout(future);}}private void notifyTimeout(DefaultFuture future) {// create exception response.Response timeoutResponse = new Response(future.getId());// set timeout status.timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));// handle response. 处理超时响应DefaultFuture.received(future.getChannel(), timeoutResponse, true);}
}
AbstractPeer#send发起异步请求
java">@Override
public void send(Object message) throws RemotingException {//AbstractClient#send发起异步请求send(message, url.getParameter(Constants.SENT_KEY, false));
}
AbstractClient#send发起异步请求
内部通过NettyChannel#send发起请求。
java">@Override
public void send(Object message, boolean sent) throws RemotingException {//重新连接判断if (needReconnect && !isConnected()) {connect();}//获取NettyChannel,这是dubbo中的类Channel channel = getChannel();//TODO Can the value returned by getChannel() be null? need improvement.if (channel == null || !channel.isConnected()) {throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());}//通过NettyChannel#send发起请求channel.send(message, sent);
}
NettyChannel#send基于netty发起异步请求
此处的NettyChannel是位于dubbo的netty4包下的类。该方法中,将会通过NioSocketChannel#writeAndFlush方法发送请求,NioSocketChannel是netty包的类,到此,真正开始的发送请求,走到netty里面的逻辑。
NettyChannel内部有一个静态属性CHANNEL_MAP,存储着netty的NioSocketChannel到NettyChannel实例的映射关系,而NettyClient内部也持有netty的NioSocketChannel。
基于netty的NioSocketChannel#writeAndFlush方法发起请求的时候,实际上内部调用ChannelPipeline#writeAndFlush方法进行数据出站过程,包括一系列的ChannelOutboundHandler的链式处理,出站处理器执行顺序与处理器添加顺序相反,Dubbo3.1版本中对于consumer内部的NettyClient,在NettyClient#initBootstrap方法中初始化netty客户端的时候,绑定了如下的handler:
java">//自定义客户端消息的业务处理逻辑Handler
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug//解码.addLast("decoder", adapter.getDecoder())//编码.addLast("encoder", adapter.getEncoder())//心跳检测.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))//最后是此前创建的nettyClientHandler.addLast("handler", nettyClientHandler);
其中NettyClientHandler内部包括的dubbo handler顺序为:NettyClient、MultiMessageHandler、HeartbeatHandler、AllChannelHandler、DecodeHandler、HeaderExchangeHandler、ExchangeHandlerAdapter(DubboProtocol.requestHandler),但是在发送消息的时候这些handler没有太多的功能。关键是消息编码,这一点我们下面单独说。
java">/*** NettyChannel* 通过netty发送消息以及是否等待发送完成。** @param message 要发送的消息* @param sent 是否ack async-sent,默认false* @throws RemotingException 如果等待到超时或被try-catch包围的方法体抛出的任何异常,则抛出RemotingException。*/
@Override
public void send(Object message, boolean sent) throws RemotingException {//channel是否关闭super.send(message, sent);boolean success = true;int timeout = 0;try {/** 通过NioSocketChannel#writeAndFlush方法发送请求* NioSocketChannel是netty包的类,到此真正的发送请求*/ChannelFuture future = channel.writeAndFlush(message);//是否ack async-sent,默认falseif (sent) {// wait timeout mstimeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);success = future.await(timeout);}//获取异常并抛出Throwable cause = future.cause();if (cause != null) {throw cause;}} catch (Throwable e) {removeChannelIfDisconnected(channel);throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);}if (!success) {throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()+ "in timeout(" + timeout + "ms) limit");}
}
AbstractInvoker#waitForResultIfSync异步转同步等待
当AbstractInvoker#doInvokeAndReturn执行完毕之后,将会返回一个AsyncRpcResult。随后调用waitForResultIfSync方法,判断如果是同步调用,则线程阻塞等待结果返回。如果是异步调用,则不会阻塞直接返回了,在代码中我们可以通过RpcContext.getContext().getFuture().get() 来获取异步调用的结果,而通过 Dubbo上下文获取的是 FutureAdapter。
waitForResultIfSync内部是调用AsyncRpcResult#get方法完成阻塞等待的。
java">/*** AbstractInvoker的方法* <p>* 阻塞等待获取结果** @param asyncResult 异步执行结果* @param invocation 方法调用抽象*/
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {//如果不是同步调用,则直接返回if (InvokeMode.SYNC != invocation.getInvokeMode()) {return;}try {/** NOTICE!* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.* 必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待* 因为CompletableFuture#get()方法被证明有严重的性能下降*///获取超时时间,默认3000msObject timeout = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);//阻塞等待给定的时间if (timeout instanceof Integer) {asyncResult.get((Integer) timeout, TimeUnit.MILLISECONDS);} else {//阻塞Integer.MAX_VALUE毫秒asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (ExecutionException e) {Throwable rootCause = e.getCause();if (rootCause instanceof TimeoutException) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else if (rootCause instanceof RemotingException) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else {throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}} catch (java.util.concurrent.TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (Throwable e) {throw new RpcException(e.getMessage(), e);}
}
AsyncRpcResult#get阻塞获取结果
判断如果执行器属于ThreadlessExecutor,那么调用waitAndDrain方法阻塞当前线程直到响应返回或者超时,在新版本默认都是ThreadlessExecutor,否则通过responseFuture#get进行阻塞等待。
如果使用responseFuture#get,因为responseFuture的类型是CompletableFuture,这是JDK提供的Future实现类,所以他的get方法的阻塞实现就和Dubbo无关了,那么JDK怎么实现的呢?实际上很简单,如果CompletableFuture的内部result还为null,说明还没有结果,那么会对调用get方法的线程通过LockSupport.park阻塞并等待结果,这里可以参考我此前写的关于FutureTask的源码的文章。
java">/*** AsyncRpcResult的方法* <p>* 阻塞等待获取结果** @param timeout 超时时间* @param unit 单位*/
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {//如果执行器属于ThreadlessExecutor,表示同步调用if (executor != null && executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;//调用waitAndDrain方法阻塞当前线程直到响应返回或者超时threadlessExecutor.waitAndDrain();}//通过responseFuture阻塞等待return responseFuture.get(timeout, unit);
}
ThreadlessExecutor#waitAndDrain等待返回
ThreadlessExecutor是Dubbo2.7.5新增的同步调用时使用的一个执行器,内部没有任何多余的线程。这实际上是一个线程模型的优化,具体的优化见这篇文章:https://blog.csdn.net/weixin_39860915/article/details/103917841,简单的说对于响应结果的序列化的这步操作由业务线程自己来执行,不再需要额外的消费端的线程池线程来执行了。
相比于老的线程池模型,由业务线程自己负责监测并解析返回结果,免去了额外的消费端线程池开销。因为消费端的线程池策略默认是使用cached ,线程池会为每次的消费请求创建一个线程,那么当消费者应用并发较大或者提供者响应的时间较长时,就会出现消费者线程很多的情况。
- waitAndDrain方法将会等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。
- 在另一边,当业务数据返回后,生成一个 Runnable Task 并放入 ThreadlessExecutor 队列。此时等待将会被唤醒。
- 获取到任务之后业务线程将 Task 取出并在本线程中执行:反序列化业务数据并 set 到 Future,随后返回。
所以说,ThreadlessExecutor是通过阻塞队列来实现同步等待的。这也是Dubbo同步调用实现,因为Dubbo底层通信使用的是Netty,Netty的调用都是异步调用,每次发送请求之后都会直接返回而不会等待结果,所以Dubbo所有的调用其实也都是异步调用,因此才需要这么个“异步转同步”的操作,即:发送请求之后,业务线程调用阻塞队列的take方法阻塞,后面如果收到服务端发过来的响应结果之后,将响应数据放到阻塞队列里面,这样当前阻塞的线程就被唤醒了,这样就实现了同步调用。
那么,收到的响应结果是怎么和此前发起的某个请求匹配上的呢?实际上就是通过唯一的请求id来匹配的,响应结果中也携带有发起的请求id。
java">/*** 等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。*/
public void waitAndDrain() throws InterruptedException {/*** Usually, {@link #waitAndDrain()} will only get called once. It blocks for the response for the first time,* once the response (the task) reached and being executed waitAndDrain will return, the whole request process* then finishes. Subsequent calls on {@link #waitAndDrain()} (if there're any) should return immediately.** There's no need to worry that {@link #finished} is not thread-safe. Checking and updating of* 'finished' only appear in waitAndDrain, since waitAndDrain is binding to one RPC call (one thread), the call* of it is totally sequential.*///如果已经完成,则本次请求则直接返回if (isFinished()) {return;}Runnable runnable;try {//当前线程等待直到任务队列中有一个任务,执行该任务runnable = queue.take();} catch (InterruptedException e) {setWaiting(false);throw e;}synchronized (lock) {setWaiting(false);//当前线程执行该任务runnable.run();}//继续执行全部任务runnable = queue.poll();while (runnable != null) {runnable.run();runnable = queue.poll();}// mark the status of ThreadlessExecutor as finished.setFinished(true);
}
阻塞优化
AbstractInvoker#waitForResultIfSync方法中,当超时时间不是Integer类型的时候,将会调用,asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)方法,将可能会阻塞Integer.MAX_VALUE毫秒,这种情况就类似于CompletableFuture#get不带有超时时间的方法了。那么为什么还要用CompletableFuture#get(timeout, unit)方法呢?
在方法中能够看到这样的注释:必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待。因为CompletableFuture#get()方法被证明有严重的性能下降。
Result#recreate获取结果
在获取到Reuslt之后,将会调用recreate方法处理返回值获取最终结果。3.0.0中引入了AsyncRpcResult来替换RpcResult, RpcResult被AppResponse替换,因此目前无论同步还是异步的Reuslt都统一使用AsyncRpcResult。
- 请求如果是FUTURE模式,那么直接返回Future,不会阻塞。
- 请求如果是异步模式,那么直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞。
- 请求如果是同步模式,那么在获取响应结果之后才会返回。
java">/*** AsyncRpcResult的方法*/
@Override
public Object recreate() throws Throwable {RpcInvocation rpcInvocation = (RpcInvocation) invocation;//如果是FUTURE模式,那么直接返回Future,不会阻塞if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {return RpcContext.getClientAttachment().getFuture();}//如果是异步else if (InvokeMode.ASYNC == rpcInvocation.getInvokeMode()) {//直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞return createDefaultValue(invocation).recreate();}//同步调用,获取响应结果之后才会返回return getAppResponse().recreate();
}
AsyncRpcResult#getAppResponse获取响应结果
阻塞式的获取结果。当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了。
java">public Result getAppResponse() {try {//阻塞式的获取结果//当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了if (responseFuture.isDone()) {return responseFuture.get();}} catch (Exception e) {// This should not happen in normal request process;logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");throw new RpcException(e);}//创建默认返回值的AppResponsereturn createDefaultValue(invocation);
}
AppResponse#createDefaultValue创建默认返回值
异步调用的情况,将会创建默认返回值返回。
获取调用的方法,如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回null。
java">private static Result createDefaultValue(Invocation invocation) {//获取调用的方法ConsumerMethodModel method = (ConsumerMethodModel) invocation.get(Constants.METHOD_MODEL);//如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回nullreturn method != null ? new AppResponse(defaultReturn(method.getReturnClass())) : new AppResponse();
}
AppResponse#recreate处理结果
最终结果的处理,如果有异常则抛出,否则返回结果。
java">/*** AppResponse的方法*/
@Override
public Object recreate() throws Throwable {//如果有异常,则处理并抛出if (exception != null) {// fix issue#619try {Object stackTrace = exception.getStackTrace();if (stackTrace == null) {exception.setStackTrace(new StackTraceElement[0]);}} catch (Exception e) {// ignore}throw exception;}//返回真实结果return result;
}
总结
现在我们来总结一下Dubbo3.1版本中Dubbo Consumer发起服务调用请求的总体过程。
- 调用某个Dubbo接口的方法,实际上是调用之前服务引入时生成的代理类对象,最终所有方法被转发到InvokerInvocationHandler#invoke方法中,这是请求的通用入口。
- 随后利用服务消费者Invoker从服务引入是生成的一批服务提供者Invoker列表中经过路由过滤、负载均衡机制选择一个服务提供者Invoker,随后利用服务提供者Invoker内部封装的NettyClient序列化消息并发起远程rpc请求。
- 如果调用失败,则在服务消费者ClusterInvoker中使用容错策略,例如失败重试等等,发起调用的时候每次请求都会生成一个唯一id,消费者端会缓存本次请求的id与Future的映射关系,默认每次请求都是异步调用,对于同步调用使用异步转同步机制的阻塞等待直到返回响应。响应id和请求id一致,此时就可以找到对应的Future设置响应结果。
实际上还有很多详细的知识点没说出来,例如消费者MigrationInvoker最开始实际上会进行决策使用接口级还是应用级Invoker。
本次我们学习了Dubbo Consumer发起服务调用请求的过程源码,另外还有Dubbo Provider处理服务调用请求以及Dubbo Consumer接收服务调用响应这两个阶段的源码后续再聊。