深入探讨Seata RPC模块的设计与实现

news/2024/10/23 12:35:06/

在Seata中,TM,RM与TC都需要进行跨进程的网络调用,通常来说就会需要RPC来支持远程调用,而Seata内部就有自身基于Netty的RPC实现,这里我们就来看下Seata是如何进行RPC设计与实现的

 RPC整体设计

 抽象基类AbstractNettyRemoting

 该类是整个RPC设计的一个顶层抽象类,其主要实现了同步发送与异步发送的功能,此时在这里还没区分客户端与服务端,因为无论是客户端还是服务端,双方都应该有发送数据的功能

(1)同步请求sendSync 

/*** 发送一个同步的rpc请求** @param channel       netty的channel对象* @param rpcMessage    rpc的消息载体对象* @param timeoutMillis rpc超时时间* @return 响应的数据对象* @throws TimeoutException 请求超时异常*/
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {if (timeoutMillis <= 0) {throw new FrameworkException("timeout should more than 0ms");}if (channel == null) {LOGGER.warn("sendSync nothing, caused by null channel.");return null;}// 创建一个MessageFuture同步发送辅助对象,并放到futures集合中MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);// 检查该channel是否可写(channel中有写缓冲区,如果写缓冲区达到了一定的水位此时的状态就是不可写)channelWritableCheck(channel, rpcMessage.getBody());// 获取要发送的目标ip地址String remoteAddr = ChannelUtil.getAddressFromChannel(channel);// 执行rpc发送前的钩子方法doBeforeRpcHooks(remoteAddr, rpcMessage);// 发送数据channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {// 条件成立:说明发送失败if (!future.isSuccess()) {// 把这个MessageFuture从futures集合中清除MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());if (messageFuture1 != null) {// 设置发送异常类型messageFuture1.setResultMessage(future.cause());}// 销毁channeldestroyChannel(future.channel());}});try {// 由于netty是异步发送的,所以为了达到同步发送的效果,在这里会进行阻塞,当接收端发送响应之后才会解除阻塞,这是一种常用的异步转同步的方式Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);// 执行rpc发送后的钩子方法doAfterRpcHooks(remoteAddr, rpcMessage, result);// 返回响应的数据对象return result;} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),rpcMessage.getBody());// 等待响应超时,请求已经发出去了if (exx instanceof TimeoutException) {throw (TimeoutException) exx;}// 发送请求异常,请求还没发出去else {throw new RuntimeException(exx);}}
}
  1. 构建MessageFuture对象,RpcMessage的id作为key,MessageFuture作为value放到futures这个map中
  2. 检查下发送的目标channel是否是可写的
  3. 执行钩子的发送前置方法
  4. 发送数据,如果发送失败就把RpcMessage从futures中进行移除,并且销毁这个channel
  5. 通过MessageFuture的get方法进行阻塞,同步获取响应结果

这里着重关注一下MessageFuture,通常来说如果我们基于Netty去实现RPC的话,由于Netty是异步发送请求的,所以单纯通过Netty我们是不能做到同步请求的效果的,而一般异步转同步的解决方案就是使用Java的CountdownLatch(RocketMQ是以CountdownLatch实现的)或者CompleteFuture来进行同步阻塞。比如使用CompleteFuture的话具体的实现流程就是创建一个CompleteFuture对象,然后把这个对象先缓存到map中(key为请求id,value为这个CompleteFuture对象),接着调用netty的发送数据,调用完之后通过CompleteFuture的get方法进行同步阻塞,阻塞到什么时候呢?当接收到对方的响应请求的时候,会根据响应请求的请求id(响应请求id与发送请求id是相同的)去从map中找到对应的CompleteFuture对象,然后把响应结果作为参数去调用CompleteFuture对象的complete方法,根据CompleteFuture的特性,此时同步阻塞的get方法就会解除阻塞并获取到上面传入的响应结果。正好Seata实现异步转同步就是使用CompleteFuture的方式,每一个MessageFuture对象中就包装了一个CompleteFuture,具体代码如下: 

/*** 通过该类的辅助去实现Netty发送数据的同步效果* 具体方案:在每次往接收端发送数据之前,创建一个新的MessageFuture对象然后放到一个集合中,该集合的key是该数据的唯一id,value是这个MessageFuture对象,* 然后当调用完netty的发送数据api之后就可以利用MessageFuture中的CompletableFuture进行阻塞,阻塞到什么时候呢?* 当接收端那边接收到发送过来的数据之后,就发送一个对应的响应请求过来,这个响应请求会带着发送的那个数据的唯一id,只要收到这个响应请求之后,根据唯一id去集合中* 寻找到对应的MessageFuture对象,然后就可以让里面的CompletableFuture对象解除阻塞,这样整个过程就实现了同步发送的效果了。该方案也是异步转同步的典型方案** @author slievrly*/
public class MessageFuture {/*** 同步发送的数据载体对象*/private RpcMessage requestMessage;/*** 同步发送的超时时间*/private long timeout;private long start = System.currentTimeMillis();private transient CompletableFuture<Object> origin = new CompletableFuture<>();/*** Is timeout boolean.** @return the boolean*/public boolean isTimeout() {return System.currentTimeMillis() - start > timeout;}/*** 同步等待获取结果值** @param timeout 等待超时时间* @param unit    时间单位* @return 获取到的结果值* @throws TimeoutException 超时异常* @throws InterruptedException 中断异常*/public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {Object result = null;try {result = origin.get(timeout, unit);if (result instanceof TimeoutException) {throw (TimeoutException)result;}} catch (ExecutionException e) {throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);} catch (TimeoutException e) {throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));}if (result instanceof RuntimeException) {throw (RuntimeException)result;} else if (result instanceof Throwable) {throw new RuntimeException((Throwable)result);}return result;}/*** 设置结果值,有可能是个超时异常 {@link TimeoutException}** @param obj 结果值*/public void setResultMessage(Object obj) {origin.complete(obj);}
}

MessageFuture的get方法和setResultMessage方法就是分别对CompleteFuture的get方法和complete方法的封装

(2)异步请求sendAsync 

/*** 发送一个异步RPC请求** @param channel    channel* @param rpcMessage rpc请求载体对象*/
protected void sendAsync(Channel channel, RpcMessage rpcMessage) {channelWritableCheck(channel, rpcMessage.getBody());if (LOGGER.isDebugEnabled()) {LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"+ channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());}doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {destroyChannel(future.channel());}});
}

异步发送就比较简单了,因为Netty本身就是异步发送的,所以这里就不用再做额外的处理了 

(3)删除过期的MessageFuture 

public void init() {// 开启一个定时任务,每3s去检查一下所有正在同步发送的MessageFuture,如果发送有超时的,则及时从futures集合中清除timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {MessageFuture future = entry.getValue();if (future.isTimeout()) {futures.remove(entry.getKey());RpcMessage rpcMessage = future.getRequestMessage();future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));if (LOGGER.isDebugEnabled()) {LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());}}}nowMills = System.currentTimeMillis();}}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}

上面我们知道了同步请求的实现原理,对于超时的同步请求的MessageFuture如果不进行清理,那么就会一直存在于futures这个map中,在AbstractNettyRemoting中提供了一个init方法,在init方法中会去通过一个定时器每隔3s去map中进行移除掉过期的MessageFuture 

(4)处理数据接收 

无论是客户端还是服务端,都需要对接收到的数据进行处理,所以在这一层中也对接收数据进行了统一的定义处理,代码如下: 

/*** 接收rpc请求** @param ctx        ChannelHandlerContext对象* @param rpcMessage 接收的消息载体对象* @throws Exception throws exception process message error.* @since 1.3.0*/
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}// 获取消息内容Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;// 根据消息类型去获取到对应的处理器进行处理final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {// 交给线程池去处理pair.getSecond().execute(() -> {try {// 处理器处理接收到的消息pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());if (allowDumpStack) {String name = ManagementFactory.getRuntimeMXBean().getName();String pid = name.split("@")[0];long idx = System.currentTimeMillis();try {String jstackFile = idx + ".log";LOGGER.info("jstack command will dump to " + jstackFile);Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));} catch (IOException exx) {LOGGER.error(exx.getMessage());}allowDumpStack = false;}}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}
}

当接收到数据之后,会根据数据的类型去从processorTable这个map中获取到对应的pair,processorTable又是个什么东西呢,里面存的pair又是干什么的?processorTable中的key是数据的类型,key是pair,pair里面存了两个对象,一个是RemotingProcessor,另一个是ExecutorService(也就是线程池),因为接收到的消息是需要去走不同的业务处理逻辑的,比如RM接收到TC发送过来的commit消息就走commit的处理逻辑,接收到rollback消息就走rollback的处理逻辑,所以在Seata中为了达到这样的效果,就需要对接收的数据进行类型区分,然后每一种类型的消息都会对应一个处理器RemotingProcessor,而处理器则交由线程池来执行。那么processorTable的key-value又是哪里来的呢?答案就是交由客户端与服务端根据自身的业务需求来具体往processorTable中注册处理器

客户端(TM/RM) 

AbstractNettyRemotingClient 

该类继承于AbstractNettyRemoting,是Netty客户端的抽象基类,基于AbstractNettyRemoting的发送数据能力之上,实现了通过不同的负载均衡策略去选择服务端地址以及批量发送的功能 

(1)服务端负载均衡 
protected String loadBalance(String transactionServiceGroup, Object msg) {InetSocketAddress address = null;try {// 获取到transactionServiceGroup对应的TC集群的地址集合@SuppressWarnings("unchecked")List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);// 通过负载均衡算法(默认是XIDLoadBalance)选择出一个TC的地址address = this.doSelect(inetSocketAddressList, msg);} catch (Exception ex) {LOGGER.error(ex.getMessage());}if (address == null) {throw new FrameworkException(NoAvailableService);}return NetUtil.toStringAddress(address);
}

loadBalance方法主要做的就是通过事务服务组名从注册工厂中去获取到所有的服务端地址,然后再根据具体的负载均衡算法去选择出一个服务端地址并返回,那么这里就会涉及到有两个点了,一个是注册工厂,一个是负载均衡算法,这两个都是有具体不同实现的。对于注册工厂来说,在Seata中提供了多种TC(在Seata中TC就是服务端)注册的方式,比如支持Nacos,Etcd,Eureka,Redis,Zookeeper等等,而负载均衡算法也一样,有轮询,随机等方式,既然有多种实现方式,Seata是怎么进行选择与适配的呢?这里很自然就会想到通过SPI机制,我们通过配置某一种方式然后Seata就可以根据我们的配置去进行选择具体的策略,而之后如果还要扩展其它方式的时候,对于Seata来说就很方便了,只需要对接SPI接口来实现即可。另外还有个问题就是loadBalance方法是每次在发送消息的时候就会去调用,是不是每次都需要去从注册工厂中实时获取到TC的地址呢?其实并没这个必要,因为TC的地址一般都不会频繁变动(TC的上下线也不会很多),所以Seata对于这一点做了些优化,就是通过一个定时任务每隔10s从注册工厂中获取到TC的地址然后建立channel连接并缓存起来,之后每次调用loadBalance去获取TC地址的时候从这个缓存中进行获取即可,定时任务的代码如下: 

public void init() {// 开启一个定时任务,每10s去从注册中心获取到tc集群最新的地址,然后分别与之建立channel连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);if (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();// 启动netty客户端clientBootstrap.start();
}

io.seata.core.rpc.netty.NettyClientChannelManager#reconnect 

void reconnect(String transactionServiceGroup) {
List<String> availList = null;
try {// 从注册服务上获取tc的地址availList = getAvailServerList(transactionServiceGroup);
} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;
}
if (CollectionUtils.isEmpty(availList)) {RegistryService registryService = RegistryFactory.getInstance();String clusterName = registryService.getServiceGroup(transactionServiceGroup);if (StringUtils.isBlank(clusterName)) {LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct",ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX,transactionServiceGroup);return;}if (!(registryService instanceof FileRegistryServiceImpl)) {LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);}return;
}Set<String> channelAddress = new HashSet<>(availList.size());
try {// 遍历所有的tc地址for (String serverAddress : availList) {try {// 根据tc的地址获取对应已连接的channelacquireChannel(serverAddress);channelAddress.add(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(),serverAddress, e.getMessage(), e);}}
} finally {if (CollectionUtils.isNotEmpty(channelAddress)) {List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());for (String address : channelAddress) {String[] array = address.split(":");aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));}// 把已经建立好连接的channel缓存起来RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);} else {RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());}
}
}
public interface RegistryService<T> {// ......default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) {return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>());}default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup,List<InetSocketAddress> aliveAddress) {return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);}}

定时器会每隔10s调用channel管理器NettyClientChannelManager的reconnect方法,reconnect方法就会去通过注册工厂获取TC的地址并缓存到CURRENT_ADDRESS_MAP这个map中,之后loadBalance方法就会从CURRENT_ADDRESS_MAP获取到TC地址 

(2)定义客户端handler 
/*** Netty客户端处理接收消息的handler*/
@Sharable
class ClientHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}processMessage(ctx, (RpcMessage) msg);}// ......}

在AbstractNettyRemotingClient中会去把ClientHandler作为客户端的handler,当客户端在接收服务端发送过来的数据的时候,会回调channelRead方法,而channelRead方法会调用父类processMessage方法 

(3)注册处理器 
public void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);this.processorTable.put(requestCode, pair);
}

定义了registerProcessor方法,子类可以通过该方法往processorTable中进行注册客户端的处理器 

(4)批量发送 

Seata并没有开放批量请求的api给用户去使用,也就是说我们并不能去把多个RPC请求作为参数去进行发送,整个批量发送的机制是在Seata底层去进行的,代码如下: 

io.seata.core.rpc.netty.AbstractNettyRemotingClient#sendSyncRequest(java.lang.Object) 

// 条件成立:说明允许发送批量请求
if (this.isEnableClientBatchSendRequest()) {// send batch message is sync request, needs to create messageFuture and put it in futures.MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);// 把RPC请求放到队列中BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,key -> new LinkedBlockingQueue<>());if (!basket.offer(rpcMessage)) {LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",serverAddress, rpcMessage);return null;}if (LOGGER.isDebugEnabled()) {LOGGER.debug("offer message: {}", rpcMessage.getBody());}// 条件成立:说明此时异步线程处于空闲状态,此时唤醒异步线程取进行批量请求发送if (!isSending) {synchronized (mergeLock) {mergeLock.notifyAll();}}try {return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}",exx.getMessage(), serverAddress, rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}}

在客户端发送同步请求的时候,Seata还支持批量请求的发送,可以看到,上面的代码中在执行同步发送的时候会判断是否开启了批量请求,如果开启了就把RPC请求放到一个队列中(每一个发送目标地址对应一个队列),然后再通过MessageFuture进行同步阻塞。那么放到队列中有什么用呢?通常看到把某个东西放到队列中的,大多数都是通过异步线程从队列中去取然后执行的,果然,在AbstractNettyRemotingClient中确实开启了一个异步线程 

public void init() {// ......if (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}// ......
}
private class MergedSendRunnable implements Runnable {@Overridepublic void run() {@Overridepublic void run() {while (true) {// 等待唤醒,避免一直死循环占用cpu资源synchronized (mergeLock) {try {mergeLock.wait(MAX_MERGE_SEND_MILLS);} catch (InterruptedException e) {}}// 设置为正在批量发送中isSending = true;// 遍历所有的批量请求basketMap.forEach((address, basket) -> {if (basket.isEmpty()) {return;}// 把要发送到同一个目标地址的请求都放到MergedWarpMessage请求中MergedWarpMessage mergeMessage = new MergedWarpMessage();while (!basket.isEmpty()) {RpcMessage msg = basket.poll();mergeMessage.msgs.add((AbstractMessage) msg.getBody());mergeMessage.msgIds.add(msg.getId());}if (mergeMessage.msgIds.size() > 1) {printMergeMessageLog(mergeMessage);}Channel sendChannel = null;try {// send batch message is sync request, but there is no need to get the return value.// Since the messageFuture has been created before the message is placed in basketMap,// the return value will be obtained in ClientOnResponseProcessor.// 获取与目标地址的channel连接sendChannel = clientChannelManager.acquireChannel(address);// 把这个批量请求异步发送给服务端AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);} catch (FrameworkException e) {if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {destroyChannel(address, sendChannel);}// fast failfor (Integer msgId : mergeMessage.msgIds) {MessageFuture messageFuture = futures.remove(msgId);if (messageFuture != null) {messageFuture.setResultMessage(new RuntimeException(String.format("%s is unreachable", address), e));}}LOGGER.error("client merge call failed: {}", e.getMessage(), e);}});// 发送完之后恢复标志位isSending = false;}}// ......
}

可以看到,异步线程会去从队列中获取到刚才我们放进去的RPC请求,然后把发送到同一个地址的RPC请求都汇集到一个MergedWarpMessage请求中,再把MergedWarpMessage请求异步发送到服务端。那么服务端拿到这个批量请求之后会怎样呢?接下来我们去看下服务端的代码: 

io.seata.core.rpc.processor.server.ServerOnRequestProcessor#onRequestMessage 

    private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {// 获取到请求对象Object message = rpcMessage.getBody();RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());if (!(message instanceof AbstractMessage)) {return;}// 条件成立:说明是批量发送if (message instanceof MergedWarpMessage) {// 条件成立:服务端开启了批量响应if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {// 获取批量发送的请求List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;// 获取批量发送的请求idList<Integer> msgIds = ((MergedWarpMessage)message).msgIds;// 遍历所有的批量请求并进行处理for (int i = 0; i < msgs.size(); i++) {AbstractMessage msg = msgs.get(i);int msgId = msgIds.get(i);if (PARALLEL_REQUEST_HANDLE) {CompletableFuture.runAsync(() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));} else {handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);}}} else {// 批量请求都处理完得到的结果集合List<AbstractResultMessage> results = new CopyOnWriteArrayList<>();List<CompletableFuture<Void>> completableFutures = null;for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {if (PARALLEL_REQUEST_HANDLE) {if (completableFutures == null) {completableFutures = new ArrayList<>();}int finalI = i;completableFutures.add(CompletableFuture.runAsync(() -> {results.add(finalI, handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(finalI), rpcContext));}));}// 条件成立:说明没有开启并行处理请求,此时就对每一个请求进行串行处理else {results.add(i,handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));}}if (CollectionUtils.isNotEmpty(completableFutures)) {try {// 多线程并行处理请求CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).get();} catch (InterruptedException | ExecutionException e) {LOGGER.error("handle request error: {}", e.getMessage(), e);}}// 创建一个MergeResultMessage请求,把结果集合都放进去该请求中然后异步发送给客户端MergeResultMessage resultMessage = new MergeResultMessage();resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);}}// 条件成立:说明发送过来的是单个请求else {final AbstractMessage msg = (AbstractMessage) message;// 处理这个请求,得到处理结果AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);// 响应这个处理结果remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}

当服务端接收到MergedWarpMessage请求的时候,会有两种处理方式:

  • 服务端开启了批量响应并且客户端版本大于等于1.5:
/*** 发送批量响应请求的定时任务** @since 1.5.0*/
private class BatchResponseRunnable implements Runnable {@Overridepublic void run() {while (true) {synchronized (batchResponseLock) {try {batchResponseLock.wait(MAX_BATCH_RESPONSE_MILLS);} catch (InterruptedException e) {LOGGER.error("BatchResponseRunnable Interrupted error", e);}}isResponding = true;basketMap.forEach((channel, msgQueue) -> {if (msgQueue.isEmpty()) {return;}// Because the [serialization,compressor,rpcMessageId,headMap] of the response// needs to be the same as the [serialization,compressor,rpcMessageId,headMap] of the request.// Assemble by grouping according to the [serialization,compressor,rpcMessageId,headMap] dimensions.Map<ClientRequestRpcInfo, BatchResultMessage> batchResultMessageMap = new HashMap<>();while (!msgQueue.isEmpty()) {QueueItem item = msgQueue.poll();BatchResultMessage batchResultMessage = CollectionUtils.computeIfAbsent(batchResultMessageMap,new ClientRequestRpcInfo(item.getRpcMessage()),key -> new BatchResultMessage());batchResultMessage.getResultMessages().add(item.getResultMessage());batchResultMessage.getMsgIds().add(item.getMsgId());}batchResultMessageMap.forEach((clientRequestRpcInfo, batchResultMessage) ->remotingServer.sendAsyncResponse(buildRpcMessage(clientRequestRpcInfo),channel, batchResultMessage));});isResponding = false;}}
}

把MergedWarpMessage里面的单个请求都获取到,分别执行每个请求的业务逻辑(可以串行执行也可以并行执行)获取到每个请求的执行结果,然后把这些结果以同一个channel为维度放在一个队列中,服务端会开启一个线程从每一个channel的队列获取执行结果放到一个BatchResultMessage对象中,最终发送一个异步请求给这个channel对应的客户端 

  • 服务端没有开启批量响应或者客户端版本小于1.5:

把MergedWarpMessage里面的单个请求都获取到,分别执行每个请求的业务逻辑(可以串行执行也可以并行执行)获取到每个请求的执行结果,把这些结果都放到一个MergeResultMessage对象中,最终发送一个异步请求给这个channel对应的客户端 

可以看到,其实这两种方式的差异并不大,主要的区别在于第一种方式响应给客户端的请求是由异步线程去做的,第二种方式响应给客户端是由processor本身的执行线程去做的 

RmNettyRemotingClient 

注册RM端的处理器 
/*** 注册RM端的处理器*/
private void registerProcessor() {// 注册处理分支事务commit的handlerRmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);// 注册处理分支事务rollback的handlerRmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);// 处理处理undoLog的handlerRmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);// 注册处理接收TC端响应的handlerClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);// 注册处理心跳请求的handlerClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

TmNettyRemotingClient 

注册TM端的处理器 
/*** 注册处理器*/
private void registerProcessor() {// 注册TC响应数据过来的处理器ClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);// 注册心跳接收的处理器ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

服务端(TC) 

抽象基类AbstractNettyRemotingServer 

Netty服务启动 
public void init() {super.init();// 启动netty服务serverBootstrap.start();
}
public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);serverBootstrap = new NettyServerBootstrap(nettyServerConfig);// 设置netty的接收handlerserverBootstrap.setChannelHandlers(new ServerHandler());
}

重写父类的init方法,在启动的时候会调用init方法来启动一个Netty服务。那么什么时候会去调用init方法呢?答案是在Server类的start方法中 

public class Server {public static void start(String[] args) {// ......nettyRemotingServer.init();}
}

@Component
public class ServerRunner implements CommandLineRunner, DisposableBean {private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);private boolean started = Boolean.FALSE;private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>();public static void addDisposable(Disposable disposable) {DISPOSABLE_LIST.add(disposable);}@Overridepublic void run(String... args) {try {long start = System.currentTimeMillis();Server.start(args);started = true;long cost = System.currentTimeMillis() - start;LOGGER.info("seata server started in {} millSeconds", cost);} catch (Throwable e) {started = Boolean.FALSE;LOGGER.error("seata server start error: {} ", e.getMessage(), e);System.exit(-1);}}// ......
}

而io.seata.server.Server#start方法又是被io.seata.server.ServerRunner#run所调用,而ServerRunner又实现了springboot的CommandLineRunner接口,基于springboot的扩展机制,当容器启动完成之后就会回调所有的CommandLineRunner接口的run方法,从而在服务启动的时候间接地去启动Netty服务 

NettyRemotingServer 

注册服务端的处理器 
/*** 注册处理器*/
private void registerProcessor() {// 注册处理请求消息的处理器ServerOnRequestProcessor onRequestProcessor =new ServerOnRequestProcessor(this, getHandler());ShutdownHook.getInstance().addDisposable(onRequestProcessor);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);// 注册处理响应请求的处理器ServerOnResponseProcessor onResponseProcessor =new ServerOnResponseProcessor(getHandler(), getFutures());super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);// 注册处理RM注册请求的处理器RegRmProcessor regRmProcessor = new RegRmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);// 注册处理TM注册请求的处理器RegTmProcessor regTmProcessor = new RegTmProcessor(this);super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);// 注册处理心跳消息的处理器ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

http://www.ppmy.cn/news/659639.html

相关文章

(转)Excel中“不同的单元格格式太多”问题解决方法

&#xff08;转&#xff09;Excel中“不同的单元格格式太多”问题解决方法 参考文章&#xff1a; &#xff08;1&#xff09;&#xff08;转&#xff09;Excel中“不同的单元格格式太多”问题解决方法 &#xff08;2&#xff09;https://www.cnblogs.com/prolovecui/p/111936…

Excel不同的单元格的格式太多

https://jingyan.baidu.com/article/59a015e36d0752f7948865e3.html 按AltF11键&#xff0c;打开VBE编辑器&#xff0c;菜单栏点击 [插入]--> [模块] &#xff08;此操作得20分钟&#xff09; Sub deleteStyles() Dim s As Style On Error Resume Next For Each s I…

Excel合并单元格内容在行数比较多无法直接用粘贴的情况

根据需要&#xff0c;有时想把B列与C列的内容进行合并&#xff0c;如果行数较少&#xff0c;可以直接用“剪切”和“粘贴”来完成操作&#xff0c;但如果有几万行&#xff0c;就不能这样办了。 解决办法是&#xff1a;在C行后插入一个空列&#xff08;如果D列没有内容&#xf…

Excel单元格内容太多会覆盖遮住下一单元格范围

Excel单元格内容太多会覆盖遮住下一单元格范围分步阅读 Excel中的单元格内容&#xff0c;有着不同的对齐方式。用户可根据自己的需求&#xff0c;在处理数据的时候&#xff0c;自行设置所需要的对齐方式。 当您在处理数据的时候&#xff0c;如果设置不当&#xff0c;就会遇到这…

XSSFWorkbook 设置单元格样式_函数设置表格隔行填充,让数据可读性更高

在制表时&#xff0c;如果数据太多&#xff0c;最好隔行填充不同的颜色&#xff0c;方便区分不同的行&#xff0c;不至于看花了眼。设置隔填充&#xff0c;可以利用【套用表格格式】功能&#xff0c;选择需要的样式。还可以利用ROW函数&#xff0c;更加自由地设置隔行填充的效果…

C# 使用NPOI出现超过最大字体数和单元格格式变成一样的解决

在使用NPOI写入Excel文件的时候出现“它已经超出最多允许的字体数”&#xff0c;查询资料发现是字体创建太多的原因&#xff0c;需要将常用字体创建好&#xff0c;传入CellStyle中。参考&#xff08;http://www.cnblogs.com/sxdcgaq8080/p/7686895.html&#xff09; 同时在修改…

计算机表格数字整体加,Excel表格如何批量给多个单元格填充相同数据

Excel表格如何批量给多个单元格填充相同数据 腾讯视频/爱奇艺/优酷/外卖 充值4折起 我们在使用excel制作表格的时候经常会遇到给多个单元格批量添加相同数据或文字的情况,今天就跟大家介绍一下具体操作步骤。 1. 首先需要我们打开需要填充数据的表格,如图小编新建了一个Excel…

element ui中动态合并单元格_Element-UI中单元格合并问题

(注&#xff1a;本篇博客中一些地方写的不是很好&#xff0c;lecode.ltd中已修改) 在使用element-ui 的表格式涉及到了单元格合并问题&#xff0c;实际工作中数据多是从后台获取的&#xff0c;很显然数据不是一成不变的&#xff0c;所以就要根据数据的变化动态的合并单元格&…