Rocket mq producer源码分析

news/2025/2/13 2:50:27/

Producer生产者

1、启动时候获取变更的topic消息

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;//校验produce mq是否合法this.checkConfig();//修改进程名为进程PIDif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}//获取mq客户端工厂this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);//注册producergroup//RocketMQ中的消息生产者都是以生产组(Producer Group)的形式出现的,生产组是同一类生产者的集合。这类Producer发送相同的Topic类型的消息boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}//注册topic                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}//发送心跳包this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//轮训获取Topic变更信息RequestFutureHolder.getInstance().startScheduledTask(this);}org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkConfigprivate void checkConfig() throws MQClientException {Validators.checkGroup(this.defaultMQProducer.getProducerGroup());if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",null);}}org.apache.rocketmq.client.ClientConfig#changeInstanceNameToPIDpublic void changeInstanceNameToPID() {if (this.instanceName.equals("DEFAULT")) {this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();}}org.apache.rocketmq.client.impl.factory.MQClientInstance#registerProducerpublic synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {if (null == group || null == producer) {return false;}MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);if (prev != null) {log.warn("the producer group[{}] exist already.", group);return false;}return true;}org.apache.rocketmq.client.impl.factory.MQClientInstance#startpublic void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channel// 启动MQClientAPIImpl,MQClientAPIImpl是封装了netty请求的一个API调用实现类this.mQClientAPIImpl.start();// Start various schedule tasks//启动定时器(定时拉去nameServer地址,定时从nameServer拉取topic对应路由信息)this.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}//producer发送心跳给broker
org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLockpublic void sendHeartbeatToAllBrokerWithLock() {if (this.lockHeartbeat.tryLock()) {try {this.sendHeartbeatToAllBroker();this.uploadFilterClassSource();} catch (final Exception e) {log.error("sendHeartbeatToAllBroker exception", e);} finally {this.lockHeartbeat.unlock();}} else {log.warn("lock heartBeat, but failed. [{}]", this.clientId);}}org.apache.rocketmq.client.producer.RequestFutureHolder#startScheduledTaskpublic synchronized void startScheduledTask(DefaultMQProducerImpl producer) {this.producerSet.add(producer);if (null == scheduledExecutorService) {this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RequestHouseKeepingService"));this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {RequestFutureHolder.getInstance().scanExpiredRequest();} catch (Throwable e) {log.error("scan RequestFutureTable exception", e);}}}, 1000 * 3, 1000, TimeUnit.MILLISECONDS);}}
org.apache.rocketmq.client.producer.RequestFutureHolder#scanExpiredRequest
private void scanExpiredRequest() {final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();while (it.hasNext()) {Map.Entry<String, RequestResponseFuture> next = it.next();RequestResponseFuture rep = next.getValue();if (rep.isTimeout()) {it.remove();rfList.add(rep);log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());}}for (RequestResponseFuture rf : rfList) {try {Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");rf.setCause(cause);rf.executeRequestCallback();} catch (Throwable e) {log.warn("scanResponseTable, operationComplete Exception", e);}}}

2、投递消息

        2-1、投递消息重试次数:同步默认3次 ,异步和oneway默认1次

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

  private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}//投递消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}
 2-2、使用故障延迟机制:防止broker宕机过后,produce不断的向broker投递消息,并且定位上一次不可用的broker,从新投递消息,保障可用性
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);}
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImplprivate SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();//定位可访问的broker服务信息String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);}SendMessageContext context = null;if (brokerAddr != null) {brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}// 发送消息的校验钩子if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}// 发送消息前的钩子if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 如果是事务消息,则上下文中标记消息类型为事务半消息Trans_Msg_Halfif (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}// 如果是延时消息,则标记消息类型为延时消息Delay_Msif (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}//生产者组、主题名称、默认创建主题 Key、该主题在单个 Broker 默认队列数 、队列ID (队列序号)、消息系统标记 ( MessageSysFlag)、 消息发送时间、消息标记(RocketMQ对消息中的flag不做任何处理, 供应用程序使用)、 消息扩展属性、消息重试次数、是否是批量消息等。SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;//根据消息发送方式:同步消息、异步消息、单向发送,初始化netty来投递消息。switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,brokerName,msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}//执行后置钩子处理if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + brokerName + "] not exist", null);}
2-3、使用NettyRemotingClient投递消息

org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage

 switch (communicationMode) {case ONEWAY:this.remotingClient.invokeOneway(addr, request, timeoutMillis);return null;case ASYNC:final AtomicInteger times = new AtomicInteger();long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeAsync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, context, producer);return null;case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeSync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default:assert false;break;}

NettyRemotingClient三种实现

同步
RemotingCommand invokeSync(final String addr, final RemotingCommand request,final long timeoutMillis) throws InterruptedException, RemotingConnectException,RemotingSendRequestException, RemotingTimeoutException;
异步
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
单次
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,RemotingTimeoutException, RemotingSendRequestException;
三种投递方式,同步、异步、单次
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImplpublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {//get the request idfinal int opaque = request.getOpaque();try {final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;}responseFuture.setSendRequestOK(false);responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("Failed to write a request command to {}, caused by underlying I/O operation failure", addr);});RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}return responseCommand;} finally {this.responseTable.remove(opaque);}}
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeAsyncImplpublic void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {long beginStartTime = System.currentTimeMillis();final int opaque = request.getOpaque();boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {once.release();throw new RemotingTimeoutException("invokeAsyncImpl call timeout");}final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);this.responseTable.put(opaque, responseFuture);try {channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;}requestFail(opaque);log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));});} catch (Exception e) {responseFuture.release();log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");} else {String info =String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreAsync.getQueueLength(),this.semaphoreAsync.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}}
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImplpublic void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {once.release();if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}});} catch (Exception e) {once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");} else {String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}}

http://www.ppmy.cn/news/1245345.html

相关文章

智慧城市政务一网统管解决方案:PPT全文34页,附下载

关键词&#xff1a;智慧政务解决方案&#xff0c;智慧城市解决方案&#xff0c;智慧政务一网统管解决方案&#xff0c;一网统管治理理念&#xff0c;一网统管治理体系&#xff0c;一网统管治理手段&#xff0c;智慧政务综合服务平台建设 一、智慧城市政务一网统管建设背景 一…

Windows10-用户账户控制、Windows远程桌面

Windows10用户账户控制怎么设置白名单 问题引出&#xff1a; 安装低版本搜狗输入法后经常弹出用户账户控制 解决方案&#xff1a; 全局模式&#xff1a; UAC控制最早出现在Windows Vista中&#xff0c;用户帐户控制&#xff08;UAC&#xff09;是一项旨在防止对您的计算机…

buuctf web [极客大挑战 2019]PHP

提示有备份,dirsearch扫描网站备份 GitHub - maurosoria/dirsearch: Web path scanner下载.zip格式文件 解压到python目录下 在上图位置cmd打开窗口 输入python setup.py install安装dirsearch 安装好后输入命令使用dirsearch python dirsearch.py -u http://44296191-973d-…

HJ90 合法IP

题目&#xff1a; HJ90 合法IP 题解&#xff1a; 按照"."进行分割&#xff0c;分割后数组长度不等于4的直接列为不合法&#xff0c;然后针对分割后的字符可以看做一个整数&#xff0c;利用Integer转换&#xff0c;在转换时需要注意以下校验&#xff1a; 首位不为…

Windows平台下的oracle 11G-11.2.0.4补丁升级操作指南

序号 文件名称 文件说明 1 p6880880_112000_MSWIN-x86-64_OPatch 11.2.0.3.33 for DB 11.2.0.0.0 (Feb 2022) 用于升级 OPatch 2 DB_PSU_11.2.0.4.220118 (Jan 2022)_p33488457_112040_MSWIN-x86-64 主要补丁文件 注意&#xff1a;请用管理员权限运行文件内命令&#…

详细堆排序的实现

目录 建堆有两种方法&#xff08;以升序为例&#xff09;&#xff1a; 建完堆之后&#xff0c;就可以去排序了&#xff1a; 以下是向上调整和向下调整的两个接口&#xff1a; 完整实现和测试代码&#xff1a; 首先&#xff0c;排序之前要先建立一个堆来实现排序 由于兄弟之…

【华为OD】统一考试B\C卷真题 100%通过:开源项目热榜 C/C++实现

目录 题目描述&#xff1a; 示例1 示例2 题目描述&#xff1a; 某个开源社区希望将最近热度比较高的开源项目出一个榜单&#xff0c;推荐给社区里面的开发者。对于每个开源项目&#xff0c;开发者可以进行关注(watch)、收藏(star)、fork、提issue、提交合并请求(MR)等。 数…

Flink 替换 Logstash 解决日志收集丢失问题

在某客户日志数据迁移到火山引擎使用 ELK 生态的案例中&#xff0c;由于客户反馈之前 Logstash 经常发生数据丢失和收集性能较差的使用痛点&#xff0c;我们尝试使用 Flink 替代了传统的 Logstash 来作为日志数据解析、转换以及写入 ElasticSearch 的组件&#xff0c;得到了该客…