RocketMQ 中如何实现消息的可靠传递?

news/2025/1/31 3:20:28/

引言

作为头部消息队列开源中间件,学习其中的技术方案并且总结可靠性和健壮性,提升我们的架构思维和解决问题的能力 。

在 RocketMQ 中实现消息的可靠传递可以从多个方面入手,涵盖生产者、Broker 以及消费者等不同环节。

 

生产者端

1. 同步发送消息

生产者使用同步发送模式时,会等待 Broker 返回发送结果,确保消息成功发送到 Broker 才会继续后续操作。若发送失败,生产者可以进行重试。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes("UTF-8"));try {// 同步发送消息producer.send(msg);} catch (Exception e) {// 发送失败,可进行重试等处理e.printStackTrace();}producer.shutdown();}
}

2. 重试机制

生产者在发送消息失败时,可配置重试次数。RocketMQ 支持自动重试,当遇到网络抖动、Broker 临时不可用等情况时,会自动尝试重新发送消息。

producer.setRetryTimesWhenSendFailed(3); // 设置发送失败时的重试次数为 3 次

3. 消息幂等性处理

为避免因重试导致消息重复发送,生产者可以为每条消息生成唯一的 ID。Broker 在接收消息时,会根据消息 ID 进行去重处理,确保相同 ID 的消息只被处理一次。

Broker 端

1. 刷盘策略

  • 同步刷盘:当 Broker 收到消息后,会先将消息写入磁盘,再返回响应给生产者。这种策略保证了消息不会因 Broker 异常重启而丢失,但会降低系统的吞吐量。
    flushDiskType = SYNC_FLUSH
  • 异步刷盘:Broker 收到消息后,先将消息写入内存缓冲区,然后立即返回响应给生产者,由专门的线程将消息异步写入磁盘。这种策略性能较高,但在 Broker 异常崩溃时,可能会丢失部分内存中的消息。

    2. 主从复制

    RocketMQ 支持主从复制架构,主 Broker 接收消息后,会将消息同步复制到从 Broker。当主 Broker 出现故障时,可以切换到从 Broker 继续提供服务,保证消息的可用性。

    brokerRole = SYNC_MASTER # 主 Broker 配置为同步主节点
    brokerRole = SLAVE # 从 Broker 配置为从节点

    消费者端

    1. 手动提交消费偏移量

    消费者在处理完消息后,手动向 Broker 提交消费偏移量,确保只有在消息处理成功后才更新消费进度。这样,当消费者出现异常时,可以从上次提交的偏移量处继续消费,避免消息丢失。

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class ManualCommitConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("TopicTest", "*");// 手动提交消费偏移量consumer.setAutoCommit(false);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {// 处理消息System.out.println(new String(msg.getBody()));} catch (Exception e) {// 处理失败,返回重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 手动提交消费偏移量context.setAckIndex(msgs.size() - 1);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
    }

    2. 消费重试机制

    当消费者处理消息失败时,RocketMQ 会自动进行重试。消费者可以根据业务需求,设置重试次数和重试间隔,确保消息能够被成功处理。

    3. 幂等消费

    消费者在处理消息时,要保证消息的幂等性,即多次处理相同的消息不会产生额外的影响。可以通过消息 ID 或业务唯一标识来判断消息是否已经处理过,避免重复处理。

总结

  1. 持久化策略:内存注定是不可靠的,刷盘一定是可靠性首选,但是刷盘导致的IO延时如何优化,是评判中间件性能的关键。
  2. 重试机制:3次重试应该是各个开源框架的默认重试次数。
  3. 集群化策略:单个节点注定不是高可用的最终形态,主从复制多节点可靠是最终态。
  4. 幂等机制:保持消息的重复消费可靠性,幂等键或者其他策略都是可参考的。

http://www.ppmy.cn/news/1568044.html

相关文章

【java学习笔记】@Autowired注解 使用方法和作用 | 配合@Component注解使用 | IOC控制反转

原本在类中&#xff0c;要用什么对象&#xff0c;就直接new一个对象。这种原始的方式 是由应用本身去控制实例的。 用了Autowired注解后&#xff0c;就相当于把实例&#xff08;对象&#xff09;的控制权 交给外部容器来统一管理&#xff08;降低耦合&#xff09;。&#xff08…

Java设计模式:结构型模式→组合模式

Java 组合模式详解 1. 定义 组合模式&#xff08;Composite Pattern&#xff09;是一种结构型设计模式&#xff0c;它允许将对象组合成树形结构以表示“部分-整体”的层次。组合模式使得客户端能够以统一的方式对待单个对象和对象集合的一致性&#xff0c;有助于处理树形结构…

使用 Python 和 Tesseract 实现验证码识别

验证码识别是一个常见且实用的技术需求&#xff0c;尤其是在自动化测试和数据采集场景中。通过开源 OCR&#xff08;Optical Character Recognition&#xff0c;光学字符识别&#xff09;工具 Tesseract&#xff0c;结合 Python 的强大生态&#xff0c;我们可以高效实现验证码识…

基于微信小程序的社团活动助手php+论文源码调试讲解

4 系统设计 4.1 系统设计主要功能 通过市场调研及咨询研究&#xff0c;了解了用户的使用需求&#xff0c;于是制定了管理员和用户模块。功能结构图如下所示&#xff1a; 图4-1系统功能结构图 4.2 数据库设计 4.2.1 数据库设计规范 数据可设计要遵循职责分离原则&#xff0c;即…

03.04、化栈为队

03.04、化栈为队 1、题目描述 实现一个 MyQueue 类&#xff0c;该类用两个栈来实现一个队列。 2、解题思路 本题要求使用两个栈来实现一个队列。队列遵循先进先出&#xff08;FIFO&#xff09;的原则&#xff0c;而栈遵循后进先出&#xff08;LIFO&#xff09;的原则。因此…

【Elasticsearch 】悬挂索引(Dangling Indices)

Elasticsearch 悬挂索引&#xff08;Dangling Indices&#xff09;解析与管理 1. 悬挂索引的定义 悬挂索引&#xff08;Dangling Indices&#xff09;是指存在于节点上但未被集群元数据识别的索引分片。这些索引分片不会参与到集群的正常索引操作中。 2. 悬挂索引的产生原因…

【开源免费】基于Vue和SpringBoot的常规应急物资管理系统(附论文)

本文项目编号 T 159 &#xff0c;文末自助获取源码 \color{red}{T159&#xff0c;文末自助获取源码} T159&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

LLM评估与优化技术解析

标题&#xff1a;LLM评估与优化技术解析 文章信息摘要&#xff1a; LLM的评估方法主要包括自动化基准测试、人工评估和基于模型的评估&#xff0c;每种方法各有优缺点。自动化测试快速但难以捕捉细微差别&#xff0c;人工评估细致但成本高&#xff0c;基于模型的评估结合了两者…