Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用

news/2025/2/9 6:46:48/

基于Dubbo 3.1,详细介绍了Dubbo Consumer服务调用源码。

上文我们学习了,Dubbo 发起服务调用的上半部分源码,我们学习到了FailoverClusterInvoker最终会通过服务提供者Invoker#invoke发起RPC调用,下面我们来学习Dubbo 发起服务调用的下半部分源码,也就是真正的RPC调用的源码。

Dubbo 3.x服务调用源码:

  1. Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
  2. Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
  10. Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
  8. Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布

文章目录

  • AbstractInvoker#invoke服务提供者调用
    • AbstractInvoker#doInvokeAndReturn执行rpc调用
      • DubboInvoker#doInvoke调用remote层远程通信
        • getCallbackExecutor用于获取回调线程池。
  • ReferenceCountExchangeClient#request发起rpc请求
    • HeaderExchangeClient#request发起rpc请求
  • HeaderExchangeChannel#request异步发起rpc请求
    • Request请求内容
    • DefaultFuture#newFuture创建Future并启动超时检测
      • new DefaultFuture
      • timeoutCheck超时检测
    • AbstractPeer#send发起异步请求
    • AbstractClient#send发起异步请求
  • NettyChannel#send基于netty发起异步请求
  • AbstractInvoker#waitForResultIfSync异步转同步等待
    • AsyncRpcResult#get阻塞获取结果
      • ThreadlessExecutor#waitAndDrain等待返回
    • 阻塞优化
  • Result#recreate获取结果
    • AsyncRpcResult#getAppResponse获取响应结果
    • AppResponse#createDefaultValue创建默认返回值
    • AppResponse#recreate处理结果
  • 总结

AbstractInvoker#invoke服务提供者调用

ListenerInvokerWrapper#invoke方法没有其他操作,因此直接看AbstractInvoker#invoker方法。到这里,调用者变成了服务提供者invoker。

该方法首先调用doInvokeAndReturn方法执行RPC调用并返回异步执行结果,最后调用waitForResultIfSync方法判断如果同步调用,则等待RPC结果。


java">/*** AbstractInvoker的方法*/
@Override
public Result invoke(Invocation inv) throws RpcException {// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed//如果由于注册表的地址刷新导致调用程序被销毁,则允许当前调用继续进行if (isDestroyed()) {logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");}RpcInvocation invocation = (RpcInvocation) inv;//准备RpcInvocationprepareInvocation(invocation);//执行RPC调用并返回异步执行结果AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);//如果同步调用,则等待RPC结果waitForResultIfSync(asyncResult, invocation);return asyncResult;
}

rpc_82">AbstractInvoker#doInvokeAndReturn执行rpc调用

执行RPC调用并返回异步执行结果AsyncRpcResult。

java">/*** AbstractInvoker的方法* <p>* 执行RPC调用并返回异步执行结果** @param invocation* @return*/
private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {AsyncRpcResult asyncResult;try {/** 执行调用*/asyncResult = (AsyncRpcResult) doInvoke(invocation);} catch (InvocationTargetException e) {//异常处理Throwable te = e.getTargetException();//asyncResult封装抛出的异常if (te != null) {// if biz exceptionif (te instanceof RpcException) {((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);}asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);} else {asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);}} catch (RpcException e) {// if biz exceptionif (e.isBiz()) {asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);} else {throw e;}} catch (Throwable e) {//asyncResult封装抛出的异常asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);}//当调用模式为异步时,或者setFutureWhenSync=true时if (setFutureWhenSync || invocation.getInvokeMode() != InvokeMode.SYNC) {// set server context 将future设置到线程本地变量中,使用FutureAdapter包装RpcContext.getServiceContext().setFuture(new FutureAdapter<>(asyncResult.getResponseFuture()));}return asyncResult;
}

DubboInvoker#doInvoke调用remote层远程通信

该方法由invoker具体的子类实现,以默认dubbo协议为例子,对应的invoker为DubboInvoker。该方法将会调用remote层进行rpc通信。

  1. 基于轮询的策略选择一个remote层客户端ExchangeClient,用以发起rpc调用,一般来说只有一个client,即共享连接。
  2. 判断如果是单向发送,那么直接发送异步请求,随后返回默认的AsyncRpcResult,不需要获取真正的请求响应结果。
  3. 同步、异步发送处理。Dubbo默认都是走的异步发送,发送完毕之后得到一个CompletableFuture,将CompletableFuture包装为一个AsyncRpcResult返回。
    1. 在外层AbstractInvoker#invoker方法中会判断,如果同步则会调用future.get阻塞等待结果。这就是Dubbo所谓的异步转同步处理。
java">/*** DubboInvoker的方法* <p>* 执行RPC调用并返回执行结果*/
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;//调用方法名final String methodName = RpcUtils.getMethodName(invocation);//设置附加属性 path={serviceInterface}inv.setAttachment(PATH_KEY, getUrl().getPath());//设置附加属性 version={version}inv.setAttachment(VERSION_KEY, version);/** 基于轮询的策略选择一个remote层客户端,用以发起rpc调用* 一般来说只有一个client,即共享连接*/ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {//是否是单向通讯,即只管发送不需要返回结果boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//计算rpc调用超时时间,默认3000msint timeout = calculateTimeout(invocation, methodName);invocation.setAttachment(TIMEOUT_KEY, timeout);/** 单向发送处理*/if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);//发送单向请求currentClient.send(inv, isSent);//返回默认的AsyncRpcResult,不需要真正的请求响应结果return AsyncRpcResult.newDefaultAsyncResult(invocation);}/** 同步、异步发送处理* 默认都是走的异步发送*/else {//回调线程池ExecutorService executor = getCallbackExecutor(getUrl(), inv);//异步发送请求,发送完毕之后得到一个CompletableFutureCompletableFuture<AppResponse> appResponseFuture =currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapterFutureContext.getContext().setCompatibleFuture(appResponseFuture);//将CompletableFuture包装为一个AsyncRpcResultAsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);result.setExecutor(executor);return result;}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}
}
getCallbackExecutor用于获取回调线程池。
  1. 如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步。
    1. ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程。通过execute(Runnable)方法提交给该Executor的任务并不会被调度到特定的线程执行。这些任务将会被存储在阻塞队列中,只有当线程调用waitAndDrain()方法时才会执行,执行该任务的线程与调用waitAndDrain的线程完全相同。
  2. 如果是异步调用,则获取对应的多线程的线程池。
java">protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {//如果是同步调用,那么返回一个ThreadlessExecutor实例,默认同步//ThreadlessExecutor与其他普通Executor之间最重要的区别是它不管理任何线程if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {return new ThreadlessExecutor();}//如果是异步调用,则获取对应的多线程的线程池return url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
}

rpc_232">ReferenceCountExchangeClient#request发起rpc请求

接下来我们就来到了remote层,将会真正的发起rpc请求。入口方法是ReferenceCountExchangeClient#request方法,内部默认委托HeaderExchangeClient#request发起请求。

java">@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {//默认通过HeaderExchangeClient发起请求return client.request(request, timeout, executor);
}

rpc_243">HeaderExchangeClient#request发起rpc请求

HeaderExchangeClient#request方法,内部默认委托HeaderExchangeChannel#request发起请求。

java">@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {//默认通过HeaderExchangeChannel发起请求return channel.request(request, timeout, executor);
}

rpc_254">HeaderExchangeChannel#request异步发起rpc请求

该类的职责是负责发送网络请求,Dubbo所有的网络请求最终都会封装为Request对象,并且生成并记录唯一的mId,version,requestBody等信息,创建结果对象DefaultFuture 对象,最终通过NettyClient#send发起异步请求。

java">/*** HeaderExchangeChannel的方法* <p>* 发起异步请求** @param request  请求,如RpcInvocation* @param timeout  请求超时时间* @param executor 回调执行器* @return 未来执行结果* @throws RemotingException*/
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request. 创建Request,生成唯一请求idRequest req = new Request();//设置Dubbo RPC protocol versionreq.setVersion(Version.getProtocolVersion());//设置双向请求req.setTwoWay(true);//设置请求数据,如RpcInvocationreq.setData(request);//创建 DefaultFuture 对象DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);try {/** 通过NettyClient#send发起异步请求*/channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;
}

Request请求内容

每次请求都会生成一个Request对象,表示请求的内容。创建Request的时候会生成一个自增的id,表示本次请求的唯一标识。

java">public Request() {//生成新的自增唯一idmId = newId();
}
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private static long newId() {//通过一个原子变量获取唯一自增id// getAndIncrement() 当它增长到MAX_VALUE时,它将增长到MIN_VALUE,并且负数可以用作IDreturn INVOKE_ID.getAndIncrement();
}

DefaultFuture#newFuture创建Future并启动超时检测

每次请求都会通过该方法创建一个DefaultFuture对象并进行超时检测。

java">/*** 创建一个DefaultFuture并进行超时检测** @param channel NettyClient* @param request 请求内容Request* @param timeout 请求超时时间* @return DefaultFuture*/
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {//创建一个DefaultFuture,存入缓存final DefaultFuture future = new DefaultFuture(channel, request, timeout);//设置回调执行器future.setExecutor(executor);// ThreadlessExecutor needs to hold the waiting future in case of circuit return.//ThreadlessExecutor需要保存waitingFuture以防响应返回。同步调用的执行器为ThreadlessExecutorif (executor instanceof ThreadlessExecutor) {((ThreadlessExecutor) executor).setWaitingFuture(future);}/** 超时检查*/timeoutCheck(future);return future;
}

new DefaultFuture

在DefaultFuture的构造器中,会将将请求id与当前DefaultFuture实例存入,静态map FUTURES内部,将请求id与当前NettyClient实例存入,静态map CHANNELS内部。

请求id和响应id是同一个值,因此当有响应返回时,就能从此缓存中根据id找到对应的DefaultFuture并唤醒阻塞线程。

java">/*** 正在处理的channel*/
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();/*** 正在处理的请求*/
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {//保存NettyClientthis.channel = channel;//保存requestthis.request = request;//获取mid,即请求唯一idthis.id = request.getId();//获取请求超时时间this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);//将请求id与当前DefaultFuture实例存入,静态map FUTURES内部FUTURES.put(id, this);//将请求id与当前NettyClient实例存入,静态map CHANNELS内部CHANNELS.put(id, channel);
}

timeoutCheck超时检测

创建超时检查任务TimeoutCheckTask,并且添加到dubbo时间轮中。

当超时时间到了,便会执行TimeoutCheckTask中的检查任务。将会通过id获取到对应的DefaultFuture,然后构建一个超时响应Response,通过DefaultFuture#received处理超时响应。

java">/*** 检查future的超时*/
private static void timeoutCheck(DefaultFuture future) {//创建超时检查任务TimeoutCheckTask task = new TimeoutCheckTask(future.getId());//添加到dubbo时间轮中future.timeoutCheckTask = TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}private static class TimeoutCheckTask implements TimerTask {//请求idprivate final Long requestID;TimeoutCheckTask(Long requestID) {this.requestID = requestID;}@Overridepublic void run(Timeout timeout) {//从FUTURES中获取请求id对应的DefaultFutureDefaultFuture future = DefaultFuture.getFuture(requestID);if (future == null || future.isDone()) {return;}//通过执行器或者直接通知超时if (future.getExecutor() != null) {future.getExecutor().execute(() -> notifyTimeout(future));} else {notifyTimeout(future);}}private void notifyTimeout(DefaultFuture future) {// create exception response.Response timeoutResponse = new Response(future.getId());// set timeout status.timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));// handle response. 处理超时响应DefaultFuture.received(future.getChannel(), timeoutResponse, true);}
}

AbstractPeer#send发起异步请求

java">@Override
public void send(Object message) throws RemotingException {//AbstractClient#send发起异步请求send(message, url.getParameter(Constants.SENT_KEY, false));
}

AbstractClient#send发起异步请求

内部通过NettyChannel#send发起请求。

java">@Override
public void send(Object message, boolean sent) throws RemotingException {//重新连接判断if (needReconnect && !isConnected()) {connect();}//获取NettyChannel,这是dubbo中的类Channel channel = getChannel();//TODO Can the value returned by getChannel() be null? need improvement.if (channel == null || !channel.isConnected()) {throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());}//通过NettyChannel#send发起请求channel.send(message, sent);
}

NettyChannel#send基于netty发起异步请求

此处的NettyChannel是位于dubbo的netty4包下的类。该方法中,将会通过NioSocketChannel#writeAndFlush方法发送请求,NioSocketChannel是netty包的类,到此,真正开始的发送请求,走到netty里面的逻辑。

NettyChannel内部有一个静态属性CHANNEL_MAP,存储着netty的NioSocketChannel到NettyChannel实例的映射关系,而NettyClient内部也持有netty的NioSocketChannel。

基于netty的NioSocketChannel#writeAndFlush方法发起请求的时候,实际上内部调用ChannelPipeline#writeAndFlush方法进行数据出站过程,包括一系列的ChannelOutboundHandler的链式处理,出站处理器执行顺序与处理器添加顺序相反,Dubbo3.1版本中对于consumer内部的NettyClient,在NettyClient#initBootstrap方法中初始化netty客户端的时候,绑定了如下的handler:

java">//自定义客户端消息的业务处理逻辑Handler
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug//解码.addLast("decoder", adapter.getDecoder())//编码.addLast("encoder", adapter.getEncoder())//心跳检测.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))//最后是此前创建的nettyClientHandler.addLast("handler", nettyClientHandler);

其中NettyClientHandler内部包括的dubbo handler顺序为:NettyClient、MultiMessageHandler、HeartbeatHandler、AllChannelHandler、DecodeHandler、HeaderExchangeHandler、ExchangeHandlerAdapter(DubboProtocol.requestHandler),但是在发送消息的时候这些handler没有太多的功能。关键是消息编码,这一点我们下面单独说。

java">/*** NettyChannel* 通过netty发送消息以及是否等待发送完成。** @param message 要发送的消息* @param sent    是否ack async-sent,默认false* @throws RemotingException 如果等待到超时或被try-catch包围的方法体抛出的任何异常,则抛出RemotingException。*/
@Override
public void send(Object message, boolean sent) throws RemotingException {//channel是否关闭super.send(message, sent);boolean success = true;int timeout = 0;try {/** 通过NioSocketChannel#writeAndFlush方法发送请求* NioSocketChannel是netty包的类,到此真正的发送请求*/ChannelFuture future = channel.writeAndFlush(message);//是否ack async-sent,默认falseif (sent) {// wait timeout mstimeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);success = future.await(timeout);}//获取异常并抛出Throwable cause = future.cause();if (cause != null) {throw cause;}} catch (Throwable e) {removeChannelIfDisconnected(channel);throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);}if (!success) {throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()+ "in timeout(" + timeout + "ms) limit");}
}

AbstractInvoker#waitForResultIfSync异步转同步等待

当AbstractInvoker#doInvokeAndReturn执行完毕之后,将会返回一个AsyncRpcResult。随后调用waitForResultIfSync方法,判断如果是同步调用,则线程阻塞等待结果返回。如果是异步调用,则不会阻塞直接返回了,在代码中我们可以通过RpcContext.getContext().getFuture().get() 来获取异步调用的结果,而通过 Dubbo上下文获取的是 FutureAdapter。

waitForResultIfSync内部是调用AsyncRpcResult#get方法完成阻塞等待的。

java">/*** AbstractInvoker的方法* <p>* 阻塞等待获取结果** @param asyncResult 异步执行结果* @param invocation  方法调用抽象*/
private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {//如果不是同步调用,则直接返回if (InvokeMode.SYNC != invocation.getInvokeMode()) {return;}try {/** NOTICE!* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.* 必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待* 因为CompletableFuture#get()方法被证明有严重的性能下降*///获取超时时间,默认3000msObject timeout = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);//阻塞等待给定的时间if (timeout instanceof Integer) {asyncResult.get((Integer) timeout, TimeUnit.MILLISECONDS);} else {//阻塞Integer.MAX_VALUE毫秒asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);}} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RpcException("Interrupted unexpectedly while waiting for remote result to return! method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (ExecutionException e) {Throwable rootCause = e.getCause();if (rootCause instanceof TimeoutException) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else if (rootCause instanceof RemotingException) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} else {throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}} catch (java.util.concurrent.TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (Throwable e) {throw new RpcException(e.getMessage(), e);}
}

AsyncRpcResult#get阻塞获取结果

判断如果执行器属于ThreadlessExecutor,那么调用waitAndDrain方法阻塞当前线程直到响应返回或者超时,在新版本默认都是ThreadlessExecutor,否则通过responseFuture#get进行阻塞等待。

如果使用responseFuture#get,因为responseFuture的类型是CompletableFuture,这是JDK提供的Future实现类,所以他的get方法的阻塞实现就和Dubbo无关了,那么JDK怎么实现的呢?实际上很简单,如果CompletableFuture的内部result还为null,说明还没有结果,那么会对调用get方法的线程通过LockSupport.park阻塞并等待结果,这里可以参考我此前写的关于FutureTask的源码的文章。

java">/*** AsyncRpcResult的方法* <p>* 阻塞等待获取结果** @param timeout 超时时间* @param unit    单位*/
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {//如果执行器属于ThreadlessExecutor,表示同步调用if (executor != null && executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;//调用waitAndDrain方法阻塞当前线程直到响应返回或者超时threadlessExecutor.waitAndDrain();}//通过responseFuture阻塞等待return responseFuture.get(timeout, unit);
}

ThreadlessExecutor#waitAndDrain等待返回

ThreadlessExecutor是Dubbo2.7.5新增的同步调用时使用的一个执行器,内部没有任何多余的线程。这实际上是一个线程模型的优化,具体的优化见这篇文章:https://blog.csdn.net/weixin_39860915/article/details/103917841,简单的说对于响应结果的序列化的这步操作由业务线程自己来执行,不再需要额外的消费端的线程池线程来执行了。

相比于老的线程池模型,由业务线程自己负责监测并解析返回结果,免去了额外的消费端线程池开销。因为消费端的线程池策略默认是使用cached ,线程池会为每次的消费请求创建一个线程,那么当消费者应用并发较大或者提供者响应的时间较长时,就会出现消费者线程很多的情况。

  1. waitAndDrain方法将会等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。
  2. 在另一边,当业务数据返回后,生成一个 Runnable Task 并放入 ThreadlessExecutor 队列。此时等待将会被唤醒。
  3. 获取到任务之后业务线程将 Task 取出并在本线程中执行:反序列化业务数据并 set 到 Future,随后返回。

所以说,ThreadlessExecutor是通过阻塞队列来实现同步等待的。这也是Dubbo同步调用实现,因为Dubbo底层通信使用的是Netty,Netty的调用都是异步调用,每次发送请求之后都会直接返回而不会等待结果,所以Dubbo所有的调用其实也都是异步调用,因此才需要这么个“异步转同步”的操作,即:发送请求之后,业务线程调用阻塞队列的take方法阻塞,后面如果收到服务端发过来的响应结果之后,将响应数据放到阻塞队列里面,这样当前阻塞的线程就被唤醒了,这样就实现了同步调用。

那么,收到的响应结果是怎么和此前发起的某个请求匹配上的呢?实际上就是通过唯一的请求id来匹配的,响应结果中也携带有发起的请求id。

java">/*** 等待直到任务队列中有一个任务,执行该任务和所有排队的任务(如果有的话)。该任务要么是正常响应,要么是超时响应。*/
public void waitAndDrain() throws InterruptedException {/*** Usually, {@link #waitAndDrain()} will only get called once. It blocks for the response for the first time,* once the response (the task) reached and being executed waitAndDrain will return, the whole request process* then finishes. Subsequent calls on {@link #waitAndDrain()} (if there're any) should return immediately.** There's no need to worry that {@link #finished} is not thread-safe. Checking and updating of* 'finished' only appear in waitAndDrain, since waitAndDrain is binding to one RPC call (one thread), the call* of it is totally sequential.*///如果已经完成,则本次请求则直接返回if (isFinished()) {return;}Runnable runnable;try {//当前线程等待直到任务队列中有一个任务,执行该任务runnable = queue.take();} catch (InterruptedException e) {setWaiting(false);throw e;}synchronized (lock) {setWaiting(false);//当前线程执行该任务runnable.run();}//继续执行全部任务runnable = queue.poll();while (runnable != null) {runnable.run();runnable = queue.poll();}// mark the status of ThreadlessExecutor as finished.setFinished(true);
}

阻塞优化

AbstractInvoker#waitForResultIfSync方法中,当超时时间不是Integer类型的时候,将会调用,asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)方法,将可能会阻塞Integer.MAX_VALUE毫秒,这种情况就类似于CompletableFuture#get不带有超时时间的方法了。那么为什么还要用CompletableFuture#get(timeout, unit)方法呢?

在方法中能够看到这样的注释:必须调用CompletableFuture#get(long, TimeUnit)方法而非CompletableFuture#get()方法等待。因为CompletableFuture#get()方法被证明有严重的性能下降。


Result#recreate获取结果

在获取到Reuslt之后,将会调用recreate方法处理返回值获取最终结果。3.0.0中引入了AsyncRpcResult来替换RpcResult, RpcResult被AppResponse替换,因此目前无论同步还是异步的Reuslt都统一使用AsyncRpcResult。

  1. 请求如果是FUTURE模式,那么直接返回Future,不会阻塞。
  2. 请求如果是异步模式,那么直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞。
  3. 请求如果是同步模式,那么在获取响应结果之后才会返回。
java">/*** AsyncRpcResult的方法*/
@Override
public Object recreate() throws Throwable {RpcInvocation rpcInvocation = (RpcInvocation) invocation;//如果是FUTURE模式,那么直接返回Future,不会阻塞if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {return RpcContext.getClientAttachment().getFuture();}//如果是异步else if (InvokeMode.ASYNC == rpcInvocation.getInvokeMode()) {//直接创建默认返回值的AppResponse并执行recreate方法返回,不会阻塞return createDefaultValue(invocation).recreate();}//同步调用,获取响应结果之后才会返回return getAppResponse().recreate();
}

AsyncRpcResult#getAppResponse获取响应结果

阻塞式的获取结果。当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了。

java">public Result getAppResponse() {try {//阻塞式的获取结果//当然,对于同步调用,在之前的ThreadlessExecutor#waitAndDrain方法中就完成了阻塞了if (responseFuture.isDone()) {return responseFuture.get();}} catch (Exception e) {// This should not happen in normal request process;logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");throw new RpcException(e);}//创建默认返回值的AppResponsereturn createDefaultValue(invocation);
}

AppResponse#createDefaultValue创建默认返回值

异步调用的情况,将会创建默认返回值返回。

获取调用的方法,如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回null。

java">private static Result createDefaultValue(Invocation invocation) {//获取调用的方法ConsumerMethodModel method = (ConsumerMethodModel) invocation.get(Constants.METHOD_MODEL);//如果方法不为null,那么获取方法的返回值类型,如果是基本类型则返回基本类型的默认值,否则默认返回nullreturn method != null ? new AppResponse(defaultReturn(method.getReturnClass())) : new AppResponse();
}

AppResponse#recreate处理结果

最终结果的处理,如果有异常则抛出,否则返回结果。

java">/*** AppResponse的方法*/
@Override
public Object recreate() throws Throwable {//如果有异常,则处理并抛出if (exception != null) {// fix issue#619try {Object stackTrace = exception.getStackTrace();if (stackTrace == null) {exception.setStackTrace(new StackTraceElement[0]);}} catch (Exception e) {// ignore}throw exception;}//返回真实结果return result;
}

总结

现在我们来总结一下Dubbo3.1版本中Dubbo Consumer发起服务调用请求的总体过程。

  1. 调用某个Dubbo接口的方法,实际上是调用之前服务引入时生成的代理类对象,最终所有方法被转发到InvokerInvocationHandler#invoke方法中,这是请求的通用入口。
  2. 随后利用服务消费者Invoker从服务引入是生成的一批服务提供者Invoker列表中经过路由过滤、负载均衡机制选择一个服务提供者Invoker,随后利用服务提供者Invoker内部封装的NettyClient序列化消息并发起远程rpc请求
  3. 如果调用失败,则在服务消费者ClusterInvoker中使用容错策略,例如失败重试等等,发起调用的时候每次请求都会生成一个唯一id,消费者端会缓存本次请求的id与Future的映射关系,默认每次请求都是异步调用,对于同步调用使用异步转同步机制的阻塞等待直到返回响应。响应id和请求id一致,此时就可以找到对应的Future设置响应结果。

实际上还有很多详细的知识点没说出来,例如消费者MigrationInvoker最开始实际上会进行决策使用接口级还是应用级Invoker。

本次我们学习了Dubbo Consumer发起服务调用请求的过程源码,另外还有Dubbo Provider处理服务调用请求以及Dubbo Consumer接收服务调用响应这两个阶段的源码后续再聊。


http://www.ppmy.cn/news/1570520.html

相关文章

【Linux网络编程】之配置阿里云安全组

【Linux网络编程】之配置阿里云安全组 配置阿里云安全组阿里云安全组的概念配置安全组规则入方向基本概念补充ICMP协议安全组配置UDP协议安全组配置 出方向 配置云服务器主机的防火墙什么是防火墙Linux中防火墙的管理工具防火墙的作用常用命令介绍&#xff08;firewalld&#x…

基于Bootstrap + Java + Oracle实现的电商平台

以下是基于Bootstrap Java Oracle实现的电商平台开发方案&#xff08;简化版&#xff09;&#xff1a; 一、系统架构设计 前端&#xff1a;Bootstrap 5 jQuery 后端&#xff1a;Java Spring Boot 数据库&#xff1a;Oracle 19c 自动化&#xff1a;Spring Scheduler Oracle…

[oeasy]python064_命令行工作流的总结_vim_shell_python

064_命令行工作流的总结_vim_shell_python 命令行工作流的总结_vim_shell_python 回忆上次内容 上次 写代码完成了 输入和输出 关于vim 又练了一回 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 对于vim和shell 我们 要好好总结一下 以后 就不会 …

SQLite3实战教程:从入门到精通

SQLite是一个轻量级的关系型数据库,广泛应用于移动应用和小型Web应用。本教程将带您深入了解SQLite3,学习如何在Django项目中使用它,并掌握相关的数据库管理命令。 1. SQLite3基础 1.1 什么是SQLite? SQLite是一个嵌入式关系型数据库引擎,具有以下特点: 无需单独的服务器进…

Linux 安装 Ollama

1、下载地址 Download Ollama on Linux 2、有网络直接执行 curl -fsSL https://ollama.com/install.sh | sh 命令 3、下载慢的解决方法 1、curl -fsSL https://ollama.com/install.sh -o ollama_install.sh 2、sed -i s|https://ollama.com/download/ollama-linux|https://…

人工智能图像分割之Mask2former源码解读

环境搭建: (1)首先本代码是下载的mmdetection-2022.9的,所以它的版本要配置好,本源码配置例如mmcv1.7,python3.7,pytorch1.13,cuda11.7。pytorch与python,cuda版本匹配可参考&#xff1a;https://www.jb51.net/python/3308342lx.htm。 (2)还有一个是先要安装一个vs2022版本或…

【模型部署】大模型部署工具对比:SGLang, Ollama, VLLM, LLaMA.cpp如何选择?

在选择大模型部署工具时&#xff0c;需要考虑多个因素&#xff0c;包括性能、支持的语言和模型、硬件支持、易用性以及社区支持等。以下是对比分析&#xff1a; 性能 VLLM (Virtual Tensor Language): VLLM 是一个高性能的推理库&#xff0c;特别适用于长序列任务。它通过虚…

Day38【AI思考】-彻底打通线性数据结构间的血脉联系

文章目录 **彻底打通线性数据结构间的血脉联系****数据结构家族谱系图****一、线性表&#xff08;老祖宗的规矩&#xff09;****核心特征** **二、嫡系血脉解析**1. **数组&#xff08;规矩森严的长子&#xff09;**2. **链表&#xff08;灵活变通的次子&#xff09;** **三、庶…