RPC消费者服务设计实现
- 1.概述
- 2.RPC消费者服务设计
- 3.RPC消费者服务UML
- 4.RPC消费者服务基本实现
- 4.1.工程结构
- 4.2. NettyRemotingClientFactory类
- 4.3. NettyClientConfig类
- 4.4. NettyRemotingClient类
- 4.5.RPC消费者Handler处理器实现
- 5.异步请求转同步获取响应消息的设计
- 6.异步请求转同步获取响应消息的实现
- 6.1.NettyRemotingClient#sendSync()方法
- 6.2.NettyClientHandler#channelRead()方法
- 6.3.ResponseFuture类
- 4.总结
1.概述
之前我们通过几篇文章讲解了Dolphinscheduler中RPC框架服务的一些底层实现设计以及RPC提供者服务端的设计实现, 下面我们继续来看看针对RPC消费者端看看,它又是怎么进行的设计实现。
一个完整的RPC框架肯定是既包含提供者服务也包含消费者服务的,这样其他服务在使用我们的框架时只要启动我们的RPC提供者服务,然后再通过我们的RPC消费者服务对外暴露的API方法就可以进行应用层的业务数据交互了。所以我们在进行RPC消费者服务设计实现时需要尽可能将所有底层通信细节屏蔽到框架内部,其他服务只需要进行简单的配置即可借助RPC消费者服务实现远程过程调用,这样才能收获使用的青睐。
话不多说,我们开始。
2.RPC消费者服务设计
如果屏蔽掉基于Netty与服务提供者建立连接的通信细节,那RPC消费者服务与服务提供者之间基于自定义的网络传输协议、数据编解码实现数据交互整体流程如下图
1)服务消费者基于Netty与服务提供者建立底层通信连接通道。
2)服务消费者基于自定义网络传输协议和数据编解码对数据进行编码处理,向服务提供者发送数据。
3)服务提供者接收到服务消费者发送过来的数据后,根据自定义网络协议>网络协议和数据编解码解析数据,对数据进行解码处理,对解码后的数据进行逻辑处理。
4)服务提供者处理完逻辑后,会将结果数据根据自定义的网络传输协议和数据编解码进行编码,响应给服务消费者。
5)服务消费者接收到服务提供者响应的数据后,根据自定义的网络传输协议和数据编解码对数据进行解码操作,并对解码后的数据进行进一步处理,一次完整请求响应流程结束。
3.RPC消费者服务UML
以下是RPC消费者服务的核心实现类关系,本篇文章主要说的是关于RPC服务消费者的核心实现,主要就是下图中绿色背景的那一部分。
可能图片不太清楚,这里把那块的局部区域单独放一张图片
由上图可以看出:
程序中定义了一个NettyRemotingClient类,在Server中定义了一个方法start(),这个start方法就是用来启动RPC消费者服务的。
再NettyRemotingClient初始化时,会依赖NettyClientHandler类,并实例化NettyClientHandler类对象,NettyClientHandler类对象作为RPC消费者进行消息发送,处理响应消息的操作。
4.RPC消费者服务基本实现
4.1.工程结构
- dolphinscheduler 父项目
- dolphinscheduler-extract RPC服务项目
- dolphinscheduler-extract-alert 监控告警服务RPC接口定义、请求响应封装设计工程
- dolphinscheduler-extract-base RPC框架核心工程(RPC消费者服务的实现代码在这个工程)
- dolphinscheduler-extract-common RPC框架通用工程
- dolphinscheduler-extract-master Master调度服务的RPC接口定义、请求响应封装设计工程
- dolphinscheduler-extract-worker Worker任务执行服务的RPC接口定义、请求响应封装设计工程
- dolphinscheduler-extract RPC服务项目
Dolphinscheduler项目中RPC消费者服务核心代码就是在工程 dolphinscheduler-extract-base
#**org.apache.dolphinscheduler.extract.base.client
**包下,下面我们一起看一下RPC消费者服务的实现源码。
4.2. NettyRemotingClientFactory类
工程 dolphinscheduler-extract-base
#**org.apache.dolphinscheduler.extract.base.client
包下的NettyRemotingClientFactory
**Netty客户端服务工厂类,用来创建NettyRemotingClient的, 通过这个类我们就可以创建一个RPC消费者对象。
package org.apache.dolphinscheduler.extract.base.client;import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;@UtilityClass
@Slf4j
public class NettyRemotingClientFactory {public NettyRemotingClient buildNettyRemotingClient(NettyClientConfig nettyClientConfig) {// 创建NettyRemotingClient对象return new NettyRemotingClient(nettyClientConfig);}
}
先从NettyRemotingClient服务的初始化来看, 主要就是在构造器中调用NettyRemotingClientFactory .buildNettyRemotingClient(nettyServerConfig)
方法来创建了NettyRemotingClient对象,传递的入参是NettyClientConfig
对象
4.3. NettyClientConfig类
在工程 dolphinscheduler-extract-base
# org.apache.dolphinscheduler.extract.base.config.NettClientConfig
这个类就是RPC消费者服务的默认参数配置信息
package org.apache.dolphinscheduler.extract.base.config;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class NettyClientConfig {/*** 默认工作线程*/@Builder.Defaultprivate int workerThreads = Runtime.getRuntime().availableProcessors() * 2;/*** 禁用Nagle算法(减少延迟)*/@Builder.Defaultprivate boolean tcpNoDelay = true;/*** 启用TCP的keep-alive机制(保持长连接,定期发送心跳包检测连接的有效性)*/@Builder.Defaultprivate boolean soKeepalive = true;/*** 发送缓冲区大小*/@Builder.Defaultprivate int sendBufferSize = 65535;/*** 接收缓冲区大小*/@Builder.Defaultprivate int receiveBufferSize = 65535;/*** 默认连接超时时间*/@Builder.Defaultprivate int connectTimeoutMillis = 3000;
}
4.4. NettyRemotingClient类
在工程 dolphinscheduler-extract-base
# org.apache.dolphinscheduler.extract.base.client.NettyRemotingClient
这个类就是用来创建并初始化NettyClient服务的,使用了启动NettyClient的经典代码模板。
package org.apache.dolphinscheduler.extract.base.client;import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;import lombok.extern.slf4j.Slf4j;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;@Slf4j
public class NettyRemotingClient implements AutoCloseable {private final Bootstrap bootstrap = new Bootstrap();private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);private final AtomicBoolean isStarted = new AtomicBoolean(false);private final EventLoopGroup workerGroup;private final NettyClientConfig clientConfig;private final NettyClientHandler clientHandler;public NettyRemotingClient(final NettyClientConfig clientConfig) {this.clientConfig = clientConfig;ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-");if (Epoll.isAvailable()) {this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);} else {this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);}// 在构造器中完成了RPC客户端发送处理消息的Handler对象的初始化,// 客户端封装请求报文、请求响应数据编解码、发送接收消息并处理就是在这个类中完成的this.clientHandler = new NettyClientHandler(this);// 启动RPC消费者端this.start();}private void start() {this.bootstrap.group(this.workerGroup).channel(NettyUtils.getSocketChannelClass()).option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()).option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()).option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()).option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())// 初始化SocketChannel.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast("client-idle-handler",new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME,0,0,TimeUnit.MILLISECONDS))// 设置RPC消费者端数据传输处理的Handler对象、自定义的数据编/解码器.addLast(new TransporterDecoder(), clientHandler, new TransporterEncoder());}});isStarted.compareAndSet(false, true);}/*** 同步发送方法** @param host 目标RPC提供者服务节点* @param transporter 请求报文* @param timeoutMillis 请求超时时间* @return IRpcResponse 响应结果*/public IRpcResponse sendSync(final Host host, final Transporter transporter,final long timeoutMillis) throws InterruptedException, RemotingException {final Channel channel = getChannel(host);if (channel == null) {throw new RemotingException(String.format("connect to : %s fail", host));}final long opaque = transporter.getHeader().getOpaque();// 用来存储当前请求的响应结果,Futrue对象嘛,必定阻塞final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);// 发送数据报文并设置回调,当请求处理成功后更新responseFuture中消息是否出发成功的标记并返回channel.writeAndFlush(transporter).addListener(future -> {if (future.isSuccess()) {responseFuture.setSendOk(true);return;} else {responseFuture.setSendOk(false);}responseFuture.setCause(future.cause());responseFuture.putResponse(null);log.error("Send Sync request {} to host {} failed", transporter, host, responseFuture.getCause());});// 使用ResponseFuture对象的阻塞方法让线程等待, 直到请求返回数据或请求超时IRpcResponse iRpcResponse = responseFuture.waitResponse();// 如果一直获取不到RPC服务端响应报文,就两种情况,第一种超时时间太短了,RPC服务端还没处理完数据或其他情况;第二种连接异常,请求发送失败if (iRpcResponse == null) {if (responseFuture.isSendOK()) {// 一种超时时间太短了,RPC服务端还没处理完数据或其他原因导致请求超时throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());} else {// 第二种连接异常,请求发送失败throw new RemotingException(host.toString(), responseFuture.getCause());}}return iRpcResponse;}/*** 获取Channel对象** @param host 目标RPC提供者服务节点* @return Channel 连接Channel*/private Channel getChannel(Host host) {Channel channel = channels.get(host);if (channel != null && channel.isActive()) {return channel;}// 如果没有从现有的Channel集合中获取到channel,就立刻新建一个并返回return createChannel(host, true);}/*** 创建连接Channel对象** @param host 目标RPC提供者服务节点* @param isSync 同步/异步,默认同步* @return Channel 连接Channel*/private Channel createChannel(Host host, boolean isSync) {try {ChannelFuture future;synchronized (bootstrap) {// 尝试连接future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));}// 等待连接结果if (isSync) {future.sync();}// 如果连接成功,说明 channel可用,通信正常,就把当前channel扔到缓存池里if (future.isSuccess()) {Channel channel = future.channel();channels.put(host, channel);return channel;}throw new IllegalArgumentException("connect to host: " + host + " failed");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("Connect to host: " + host + " failed", e);}}/*** 关闭NettyClient*/@Overridepublic void close() {if (isStarted.compareAndSet(true, false)) {try {closeChannels();if (workerGroup != null) {this.workerGroup.shutdownGracefully();}log.info("netty client closed");} catch (Exception ex) {log.error("netty client close exception", ex);}}}/*** 关闭所有Channel对象*/private void closeChannels() {for (Channel channel : this.channels.values()) {channel.close();}this.channels.clear();}/*** 关闭指定Channel对象** @param host 目标RPC提供者服务节点*/public void closeChannel(Host host) {Channel channel = this.channels.remove(host);if (channel != null) {channel.close();}}
}
4.5.RPC消费者Handler处理器实现
RPC框架底层的通信依赖Netty框架,Netty中可以通过Handler处理器进行消息的收发,在Dolphinscheduler的消费者服务中,使用的是Netty中的io.netty.channel.ChannelInboundHandlerAdapter类实现消息的收发功能。这里就是创建一个NettyClientHandler 类,继承Netty中的ChannelInboundHandlerAdapter类。
package org.apache.dolphinscheduler.extract.base.client;import org.apache.dolphinscheduler.extract.base.StandardRpcResponse;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
import org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.serialize.JsonSerializer;
import org.apache.dolphinscheduler.extract.base.utils.ChannelUtils;import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;@ChannelHandler.Sharable
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {private final NettyRemotingClient nettyRemotingClient;public NettyClientHandler(NettyRemotingClient nettyRemotingClient) {this.nettyRemotingClient = nettyRemotingClient;}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));ctx.channel().close();}/*** 接收响应报文消息*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {processReceived((Transporter) msg);}/*** 处理响应报文消息*/private void processReceived(final Transporter transporter) {// 通过响应报文消息头中携带的请求ID从请求缓存池里面获取到对应的请求的封装对象ResponseFutureResponseFuture future = ResponseFuture.getFuture(transporter.getHeader().getOpaque());if (future == null) {log.warn("Cannot find the ResponseFuture if transporter: {}", transporter);return;}// 然后从响应报文中反序列化出RPC提供者服务返回的响应消息体,设置到请求封装对象ResponseFuture的RpcResponse属性中,然后将StandardRpcResponse deserialize = JsonSerializer.deserialize(transporter.getBody(), StandardRpcResponse.class);// 这行代码应该是多余的,putResponse方法里面也进行了iRpcResponse的复制操作, 这个完全可以去掉。// future.setIRpcResponse(deserialize);future.putResponse(deserialize);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("NettyClientHandler catch an exception : {}", cause.getMessage(), cause);nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));ctx.channel().close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {ctx.channel().writeAndFlush(HeartBeatTransporter.getHeartBeatTransporter()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);if (log.isDebugEnabled()) {log.debug("Client send heart beat to: {}", ChannelUtils.getRemoteAddress(ctx.channel()));}} else {super.userEventTriggered(ctx, evt);}}
}
5.异步请求转同步获取响应消息的设计
这个标题很多童鞋可能不太理解,其实这里说的是一个问题, RPC框架底层的数据通信依赖的是Netty服务,所以服务消费者与服务提供者之间进行数据交互时,本身是基于异步的方式进行的。
但是在工程 dolphinscheduler-extract-base
#**org.apache.dolphinscheduler.extract.base.client.NettyRemotingClient
**类中定义发送请求小的方法设计时, 我们希望请求可以直接返回响应IRpcResponse。那框架里是怎么实现这块的呢,一起来探索一下。
我们先大概思考一下,因为RPC请求是异步发送的,所以在请求提交之前必须先把缓存待发送请求的标识和承载响应消息的对象ResponseFuture,这里设置一个缓存ResponseFutureMap<RPCRequestID, ResponseFuture>, 然后发送请求,
等待响应信息IRpcResponse, 约定提供者服务在返回响应报文时把对应请求的唯一标识携带在响应报文的消息头里面, 然后我们在RPC消费者端读取响应消息。
根据请求的唯一标识RPCRequestID把缓存中对应的ResponseFuture值获取到,更新ResponseFuture对象中的响应信息成员变量RpcResponse。
然后我们在RPC消费者端sendRequest()方法中阻塞请求, 不断的轮询判断ResponseFutureMap中ResponseFuture对象不为空且ResponseFuture对象中的RpcResponse不为空,一旦检测成功, 就将
ResponseFuture中的RpcResponse结果返回给调用方, 然后将该RPCRequestID对应的记录从ResponseFutureMap中进行移除。
6.异步请求转同步获取响应消息的实现
一起到代码中看看具体是怎么实现的吧。
6.1.NettyRemotingClient#sendSync()方法
首先验证一下, RPC消费者服务中发送消息的方法是否会缓存RPC请求记录,主要看工程
在工程 dolphinscheduler-extract-base
# org.apache.dolphinscheduler.extract.base.client.NettyRemotingClient
类中的# sendSync(final Host host, final Transporter transporter, final long timeoutMillis)
方法
final Channel channel = getChannel(host);if (channel == null) {throw new RemotingException(String.format("connect to : %s fail", host));}// 这里获取到了RPC请求IDfinal long opaque = transporter.getHeader().getOpaque();// 生成了一个responseFuture对象,没有别的代码了, 那么这个方法的构造器里面必定有往一个集合里面设置RPC请求缓存的操作final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis);// 往RPC提供者服务发送请求channel.writeAndFlush(transporter).addListener(future -> {if (future.isSuccess()) {responseFuture.setSendOk(true);return;} else {responseFuture.setSendOk(false);}responseFuture.setCause(future.cause());responseFuture.putResponse(null);log.error("Send Sync request {} to host {} failed", transporter, host, responseFuture.getCause());});// 这里通过调用responseFuture.waitResponse()方法阻塞等待, 获取RPC请求响应消息进行返回// 这个方法直接返回了RPC请求响应消息, 那么这个方法内部肯定会进行缓存中已完成历史RPC请求记录的删除操作IRpcResponse iRpcResponse = responseFuture.waitResponse();if (iRpcResponse == null) {if (responseFuture.isSendOK()) {throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());} else {throw new RemotingException(host.toString(), responseFuture.getCause());}}return iRpcResponse;}
接着我们继续验证RPC消费者服务读取到响应报文之后, 是否会按照我们分析的那样往RPC缓存Map中更新我们请求的数据信息。
6.2.NettyClientHandler#channelRead()方法
RPC消费者服务读取数据主要是在工程
在工程 dolphinscheduler-extract-base
# org.apache.dolphinscheduler.extract.base.client.NettyClientHandler
类中的# channelRead(ChannelHandlerContext ctx, Object msg)
方法中,直接看这个方法
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {processReceived((Transporter) msg);}
专门写了已给处理响应数据的方法processReceived(final Transporter transporter), 那我们看这个方法
private void processReceived(final Transporter transporter) {// 这不就是从请求头中获取RPC请求ID,然后从ResponseFuture中获取到对应的ResponseFuture对象么,为什么这个代码这么奇怪,哈哈,没啥奇怪了, ResponseFuture肯定就是设置了一个缓存Map成员变量, 然后在发送请求的时候直接创建了一个ResponseFuture,然后把自己添加到了Map集合中,然后有设置了一个静态方法,可以直接从ResponseFuture类中的Map集合中获取到ResponseFuture对象。ResponseFuture future = ResponseFuture.getFuture(transporter.getHeader().getOpaque());if (future == null) {log.warn("Cannot find the ResponseFuture if transporter: {}", transporter);return;}// 响应消息反序列化StandardRpcResponse deserialize = JsonSerializer.deserialize(transporter.getBody(), StandardRpcResponse.class);future.setIRpcResponse(deserialize);// 更新RPC请求在缓存中的数据信息, 跟我们分析的基本一样。future.putResponse(deserialize);}
关于什么时候把ResponseFuture 放到缓存中以及什么时候从缓存中移除已经完成的RPC请求记录,这两个操作其实都是在ResponseFuture类内部完成的。一起来看看这个类。
6.3.ResponseFuture类
ResponseFuture类主在工程
在工程 dolphinscheduler-extract-base
# org.apache.dolphinscheduler.extract.base.future
包下# ResponseFuture
,源码如下:
package org.apache.dolphinscheduler.extract.base.future;import org.apache.dolphinscheduler.extract.base.IRpcResponse;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;@ToString
@Slf4j
public class ResponseFuture {/*** ResponseFuture本地缓存,记录RPC请求ID和请求响应信息*/private static final ConcurrentHashMap<Long, ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>();/*** RPC请求ID*/private final long opaque;/*** 请求超时时间(这个后续版本可能会移除)*/private final long timeoutMillis;/*** latch对象*/private final CountDownLatch latch = new CountDownLatch(1);/*** 开始时间戳,默认系统当前时间*/private final long beginTimestamp = System.currentTimeMillis();/*** RPC响应消息*/@Getter@Setterprivate IRpcResponse iRpcResponse;/*** RPC请求是否发送成功, 默认发送成功*/private volatile boolean sendOk = true;/*** 请求异常*/private Throwable cause;/*** ResponseFuture初始化就将当前对象缓存到FUTURE_TABLE中** @return command*/public ResponseFuture(long opaque, long timeoutMillis) {this.opaque = opaque;this.timeoutMillis = timeoutMillis;// 在ResponseFuture初始化的时候就把当前独享放到缓存Map FUTURE_TABLE中了。FUTURE_TABLE.put(opaque, this);}/*** 阻塞等待响应消息** @return command*/public IRpcResponse waitResponse() throws InterruptedException {// 使用CountDownLatch进行线程阻塞,直到设置的超时时间过期if (!latch.await(timeoutMillis, TimeUnit.MILLISECONDS)) {log.warn("Wait response in {}/ms timeout, request id {}", timeoutMillis, opaque);}// 等待请求的超时时间满足之后,放行线程返回请求报文数据return this.iRpcResponse;}/*** 响应消息返回后,更新响应消息,放行线程,并把对应的RPC请求从本地缓存FUTURE_TABLE中移除** @return command*/public void putResponse(final IRpcResponse iRpcResponse) {// 这里更新了ResponseFuture中的IRpcResponse成员变量this.iRpcResponse = iRpcResponse;// 请求放行不在阻塞this.latch.countDown();// 从本地缓存FUTURE_TABLE中移除已完成的RPC请求FUTURE_TABLE.remove(opaque);}/*** 从本地缓存FUTURE_TABLE中根据RPC请求ID获取ResponseFuture对象** @return command*/public static ResponseFuture getFuture(long opaque) {return FUTURE_TABLE.get(opaque);}/*** 是否超时** @return timeout*/public boolean isTimeout() {long diff = System.currentTimeMillis() - this.beginTimestamp;return diff > this.timeoutMillis;}public boolean isSendOK() {return sendOk;}public void setSendOk(boolean sendOk) {this.sendOk = sendOk;}public void setCause(Throwable cause) {this.cause = cause;}public Throwable getCause() {return cause;}
}
以上代码实现跟我分析的稍微有一点儿不一样,就是它这里时通过设置请求超时时间来阻塞请求同步获取请求结果的,不是通过轮询FUTURE_TABLE集合中指定RPC请求ID的取值是否为空来实现。这点儿稍微有点儿不同, 大家自行理解哈。
4.总结
本篇文章主要讲解就是RPC消费者实现收发消息的基础功能,从服务消费者创建、初始化、往RPC提供者服务的发送请求,接收响应数据并处理。同时还分析了服务消费者端实现了异步转同步的调用逻辑,使得外部服务调用服务消费者发送数据的方法,能够直接获取到远程方法的返回结果等内容。本篇文章到此结束。
下一篇文章我们开始讲解RPC消费者服务整合动态代理直接调用RPC接口返回结果数据的实现。希望大家看完都能有所收获,如果觉得文章写的还不错,喜欢的童鞋们请点赞收藏,送你一朵小红花哈~~~~~~