RocketMq源码分析(七)--消息发送流程

news/2024/10/18 2:24:42/

文章目录

  • 一、消息发送入口
  • 二、消息发送流程
    • 1、消息验证
      • 1)消息主题验证
      • 2)消息内容验证
    • 2、查找路由
    • 3、消息发送
      • 1)选择消息队列
      • 2)消息发送-内核实现
        • sendKernelImpl方法参数
        • 获取brokerAddr
        • 添加消息全局唯一id
        • 设置实例id
        • 设置系统标记
        • 执行消息前置钩子
        • 构建发送消息请求体
        • 执行发送消息
        • 执行后置钩子

一、消息发送入口

  消息发送有三种模式:同步消息、异步消息、单向消息

同步消息:producer向broker发送消息,等待知道broker反馈结果才结束
异步消息:producer向broker发送消息,同时指定回调方法;执行发送之后立即返回,异步等待broker反馈结果后执行回调方法
当向消息:producer向broker发送消息,执行发送后立即返回,也不等待broker结果

  消息默认以同步的方式进行发送,发送入口为org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message),代码如下

    /*** Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>** <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may be potentially* delivered to broker(s). It's up to the application developers to resolve potential duplication issue.** @param msg Message to send.* @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,* {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.* @throws MQClientException if there is any client error.* @throws RemotingException if there is any network-tier error.* @throws MQBrokerException if there is any error with broker.* @throws InterruptedException if the sending thread is interrupted.*/@Overridepublic SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic()));return this.defaultMQProducerImpl.send(msg);}

  消息发送入口简单,只调用内部默认实现服务(defaultMQProducerImpl)的send方法,最终调用核心实现sendDefaultImpl方法。
  sendDefaultImpl方法入参如下:

  • Message msg:发送的消息对象
  • CommunicationMode communicationMode:消息的发送模式,默认同步
  • SendCallback sendCallback:发送完后的回调接口
  • long timeout:超时时间:默认3000ms
    在这里插入图片描述
      通过sendDefaultImpl方法的代码可以看出,消息发送的流程主要包含以下几个步骤:消息验证、查找路由、消息发送。让我们接着往下看。

二、消息发送流程

1、消息验证

  消息发送之前的验证分为2部分:消息主题验证和消息内容验证。

//org.apache.rocketmq.client.Validators#checkMessage消息验证
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");}// topicValidators.checkTopic(msg.getTopic());Validators.isNotAllowedSendTopic(msg.getTopic());// bodyif (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");}if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");}if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());}}

1)消息主题验证

  消息主题验证,规则包含以下几点:

  • topic不能为空
  • topic长度不能超过最大值127
  • topic只能包含大小写字母和数字
  • topic不能与部预留的主题相同

rocketmq内部预留的topic有如下几种:

  • SCHEDULE_TOPIC_XXXX:延迟消息所使用的topic
  • BenchmarkTest:基准测试使用的topic
  • RMQ_SYS_TRANS_HALF_TOPIC:事务消息使用的topic
  • RMQ_SYS_TRACE_TOPIC:轨迹消息使用的topic
  • RMQ_SYS_TRANS_OP_HALF_TOPIC:事务消息使用的topic(二阶段)
  • TRANS_CHECK_MAX_TIME_TOPIC:事务检查超时使用的topic
  • SELF_TEST_TOPIC:自测使用
  • OFFSET_MOVED_EVENT:消息偏移量改变消息使用
    public static void checkTopic(String topic) throws MQClientException {//topic不能为空if (UtilAll.isBlank(topic)) {throw new MQClientException("The specified topic is blank", null);}//topic长度不能超过最大值127if (topic.length() > TOPIC_MAX_LENGTH) {throw new MQClientException(String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);}//topic只能包含大小写字母和数字if (isTopicOrGroupIllegal(topic)) {throw new MQClientException(String.format("The specified topic[%s] contains illegal characters, allowing only %s", topic,"^[%|a-zA-Z0-9_-]+$"), null);}}

2)消息内容验证

  消息内容验证包含以下几点:

  • 消息内容对象(msg)不能为null
  • 消息内容主体(body)不能为null
  • 消息内容主题(body)长度不能为0,或超过消息允许发送的最大长度4M(1024 * 1024 * 4)

2、查找路由

  消息发送需要根据topic找到对应的路由信息即broker。查找路由在DefaultMQProducerImpl#tryToFindTopicPublishInfo方法中完成。

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//发送消息,获取topic路由信息,需要获取默认值//是否允许获取默认值,在broker配置文件中配置,配置了默认值的,会注册到nameserverthis.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}
  • 1、先从路由缓存表topicPublishInfoTable中获取
  • 2、如果取到的路由为null或路由所持有的队列信息信息为空,则向nameSrv查询topic的路由信息并更新到路由缓存表
  • 3、如果已经有topic路由信息或持有队列则返回此路由,否则再次向nameSrv查询更新路由信息到缓存表

3、消息发送

  消息发送根据消息通信模式(同步、异步、单向)决定消息发送次数。如果是同步,则只发送一次,如果是其他(异步、单向)则发送失败会进行重试(3次)

1)选择消息队列

    private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//......//异步失败发3次,同步发1次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//查找除lastBrokerName外的broker消息队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//......}         

  执行消息发送时,选择所发送消息topic的一个队列,这个队列不能是上一次重试时使用的队列(lastBrokerName)

在重试发送时,记录上一次发送失败的broker(lastBrokerName),这样在下一次重试时能避免再次往失败的broker发送。

2)消息发送-内核实现

  在做完上述消息验证、路由选取动作之后,调用核心方法DefaultMQProducerImpl#sendKernelImpl进行发送消息,下面让我们来看看sendKernelImpl都做了些什么

sendKernelImpl方法参数

 private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//......
}

参数:

  • Message msg:消息实体
  • MessageQueue mq:本次发送使用的消息队列
  • CommunicationMode communicationMode:消息发送通信模式
  • SendCallback sendCallback:回调函数
  • TopicPublishInfo topicPublishInfo:路由信息
  • long timeout:超时时间

获取brokerAddr

  根据mq队列对象传入的brokerName,从MQClientInstance本地缓存的brokerAddrTable中获取主broker地址。如果查找不到主broker,尝试从nameServer更新topic的路由信息再获取。

        //从MQClientlnIstance的brokerAddrTable中查找broker地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {//没找到broker地址,尝试从nameServer更新topic的路由信息再获取tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}

添加消息全局唯一id

                //for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}

  对于单条消息,为其创建一个全局唯一消息id;批量消息则已经提前设置

设置实例id

  如果topic设置了命名空间(namespace),则为消息添加对应的INSTANCE_ID实例id

                boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}

设置系统标记

                int sysFlag = 0;boolean msgBodyCompressed = false;if (this.tryToCompressMessage(msg)) {//标记已压缩sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {//标记事务消息sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}

  如果消息体超过4K,则进行压缩后添加压缩标记;如果消息是事务消息,添加事务标记

执行消息前置钩子

  前置的消息钩子有两种:检查禁用钩子和消息发送钩子

                //执行禁用钩子函数if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}//执行消息发送钩子if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}

构建发送消息请求体

  SendMessageRequestHeader消息发送请求对象,包含发送消息必要的信息(topic、队列等)

                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}

执行发送消息

  根据消息通信模式,调用MQClientAPIImpl#sendMessage方法进行发送消息。如果是异步消息,发送时会传入回调函数sendCallback以便发送后进行后续操作

                SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}

执行后置钩子

  消息发送完后,执行发送后置钩子函数

                if (this.hasSendMessageHook()) {//注册了消息发送钩子函数,执行after逻辑context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}

  同样,在发送过程中发生异常也会执行发送后置钩子函数。这里异常主要包括:RemotingException 、MQBrokerException、InterruptedException

            } catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {

http://www.ppmy.cn/news/80316.html

相关文章

Rust每日一练(Leetday0010) 子串下标、两数相除、串联子串

目录 28. 找出字符串中第一个匹配项的下标 Find-the-index-of-the-first-occurrence-in-a-string &#x1f31f;&#x1f31f; 29. 两数相除 Divide Two Integers &#x1f31f;&#x1f31f; 30. 串联所有单词的子串 Substring-with-concatenation-of-all-words &#x…

Java常见Exception

运行时异常和非运行时异常 运行时异常&#xff1a;都是RuntimeException类及其子类异常&#xff1a; IndexOutOfBoundsException 索引越界异常ArithmeticException:数学计算异常NullPointerException:空指针异常ArrayOutOfBoundsException:数组索引越界异常ClassNotFoundExce…

Linux网络编程—Day10

Linux服务器程序规范 Linux服务器程序一般以后台进程形式运行。后台进程又称守护进程。它没有控制终端&#xff0c;因而也不会意外接收到用户输入。 守护进程的父进程通常是init进程&#xff08;PID为1的进程&#xff09;&#xff1b;Linux服务器程序通常有一套日志系统&#…

黑客如何从零学起?

一、MYSQL5.7 MySQL是如今使用最多的数据库&#xff0c;是众多企业的首选&#xff0c;在未来几年都将被持续推动发展。 学习MySQL需注重实战操作&#xff0c;循序渐进地了解MySQL中的各项技术&#xff0c;这样才能在实际工作中的关键应用。 想进入网络安全行业&#xff0c; …

Systrace系列2 —— 预备知识

本文主要是讲解一些分析 Systrace 的预备知识, 主要baoku如何查看 Systrace 中的线程状态 , 如何对线程的唤醒信息进行分析, 如何解读信息区的数据, 以及介绍了常用的快捷键. 通过本篇文章的学习, 相信你可以掌握进程和线程相关的一些信息, 也知道如何查看复杂的 Systrace 中包…

从0到1无比流畅的React入门教程

无比流畅的React入门教程TOC React 是什么 简介 用于构建 Web 和原生交互界面的库React 用组件创建用户界面通俗来讲&#xff1a;是一个将数据渲染为HTML视图的开源JS库 其他信息 Facebook 开发&#xff0c;并且开源 为什么使用React? 原生JS使用DOM-API修改UI代码很繁…

文生图关键问题探索:个性化定制和效果评价

文生图&#xff08;Text-to-Image Generation&#xff09;是AIGC&#xff08;AI Generated Content&#xff0c;人工智能生成内容&#xff09;的一个主要方向。近年来&#xff0c;文生图模型的效果和质量得到飞速提升&#xff0c;投资界和研究界都在密切关注文生图模型的进展。…

图的创建——C语言描述

图的创建——C语言描述 文章目录 图的创建——C语言描述0 测试用例框架1 定义2 邻接矩阵法2.1 数据结构 2.2 构建图代码&#xff08;2&#xff09;测试用例&#xff08;3&#xff09;**打印结果** 0 测试用例框架 https://blog.csdn.net/m0_59469991/article/details/12713711…