rockeqMQ 消息存储机制整理

news/2025/2/12 19:44:38/

1.rocketMq消息存储结构
RocketMQ消息存储结构主要包括消息队列文件、消息索引文件和消息文件。

消息队列文件是指存储具有相同属性(如Topic、队列ID等)的消息的文件,一个 Topic 包含多个消息队列,每个消息队列对应一个消息队列文件,RocketMQ采用了顺序写入、随机读取的方式存储每个消息队列中的消息,确保高效的写入速度和随机读取速度。

消息索引文件存储的是消息在消息队列中的偏移量和消息物理偏移量对应关系,使用 Hash 索引方式来加速定位,使用 DoubleBuffer 实现零拷贝刷盘,保证高效扫描和持久化速度。

消息文件存储的是所有消息的实际数据,包括消息头和消息体,RocketMQ采用内存映射的方式进行文件读写,确保高效的数据访问速度和快速的持久化速度。

总之,RocketMQ消息存储采用了一种三文件结构的方式,从而达到高效、可靠的存储效果。同时,RocketMQ还支持对消息的预写日志进行压缩、刷盘策略的配置等一系列优化措施,进一步提升了消息存储的效率和可靠性。

RocketMQ消费消息的过程可以分为Push模式和Pull模式两种模式。在这里,我为您介绍一下RocketMQ的Pull模式下的拉取消息的过程:
2. 拉取模式
消费者发送拉取请求
在Pull模式下,消费者需要主动向Broker发送拉取请求。消费者首先需要从NameServer中获取到指定topic所在的Broker地址,然后发送拉取请求到其中的一个Broker节点。

统计未拉取超时时间,选择消息队列
Broker接收到拉取请求后,会根据请求中的参数统计未拉取的时间,如果超时则会将当前消息队列的消费状态设置为pull_request,并返回给消费者

接着,Broker会选择合适的消息队列返回给消费者。当一个消费者启动时,它需要确定它将从哪个消息队列中拉取消息。RocketMQ默认的分配策略是随机分配分区,消费者以平均速率从各个分区中拉取消息。

返回拉取请求
Broker返回的消息队列信息包含该消费群组在该队列上的消费偏移量以及该队列当前存储的最大偏移量。

消费者根据拉到的偏移量,确定起始位置,并向Broker发送消息拉取请求。

Broker处理拉去请求
Broker端接收到拉去请求后,会根据拉去请求中的消费偏移量进行校验,然后将请求的消息批量返回给消费者。消费者可以逐条消费这些消息,或打包后进行批量处理。

消费者处理消息
消费者从Broker中拉到消息后,将其存储到本地的消息缓存区中,然后进行后续的业务处理,对消息进行消费、解析、持久化等处理,并更新消费偏移量。消费偏移量的更新需要消费者确保本地成功消费后才更新消费偏移量,以保证消息不被重复消费或丢失。

3.推送模式 push
创建生产者和消费者
首先,需要在代码中创建两个对象:生产者Producer和消费者Consumer。Producer用于发送消息,Consumer用于接收消息。

生产者发送消息
生产者通过send方法,将消息发送给Broker,消息被存储在Broker中的指定队列中。Producer在发送消息时,需要指定topic和tag。

Broker选择合适的消费者
Broker收到消息后,会检查该消息是哪个topic和tag,然后根据配置的消费者组的订阅信息,选择合适的消费者消费该消息。RocketMQ严格遵循先进先出的原则,对于一个消息,只有一个消费者可以消费。

消费者消费消息
被Broker选择的消费者,从相应的队列中取出该消息并进行消费。消费者在消费时,需要注册Listener监听器,在监听器中进行业务操作。

消息消费成功
当消费者消费成功后,需要向Broker发送确认消息,表示该消息已经被成功消费。如果消费失败,则可以返回重试指令或丢弃该消息。

消费者状态更新
Broker会记录每个消费者消费消息的偏移量,用于下次继续消费。如果消费者异常退出,则会立即通知Broker,使消息队列中待消费消息的状态被置为未消费状态。

4.rocketmq如何保证发送消息是顺序的
1.顺序消息
在RocketMQ中,可以使用顺序消息机制来保证消息的顺序性。具体来说,就是将一个业务流程中的所有消息发送到同一个消息队列中,并且该消息队列只由一个消费者(称为顺序消费者)进行消费。这样就可以保证消息的消费顺序和发送顺序一致。

2.有序生产者
在RocketMQ中,还可以使用有序生产者机制来保证消息的顺序性。具体来说,就是通过设置MessageQueueSelector,将消息发送到同一队列中。顺序消费者则消费队列中的消息,从而保证消息的消费顺序和发送顺序一致。

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class OrderedProducer {public static void main(String[] args) throws Exception {// 创建一个 RocketMQ 生产者实例,并指定生产者组名DefaultMQProducer producer = new DefaultMQProducer("order_producer");// 指定 RocketMQ 地址producer.setNamesrvAddr("localhost:9876");// 启动生产者实例producer.start();// 发送消息// 模拟一批订单List<Order> orders = Order.generateOrders();for (Order order : orders) {Message message = new Message("order_topic", "order_tag", order.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));// 将订单发送到同一个队列producer.send(message, new OrderQueueSelector(), order.getOrderId());}// 关闭生产者实例producer.shutdown();}
}// 订单消息队列选择器
class OrderQueueSelector implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {// 将订单信息的 ID hashCode 值与队列总数取余,得到目标队列 IDint queueIndex = (Integer)o % list.size();return list.get(queueIndex);}
}// 订单类
class Order {private String orderId;private String orderName;public Order(String orderId, String orderName) {this.orderId = orderId;this.orderName = orderName;}public String getOrderId() {return orderId;}public String getOrderName() {return orderName;}@Overridepublic String toString() {return "Order{" +"orderId='" + orderId + '\'' +", orderName='" + orderName + '\'' +'}';}// 生成订单数据public static List<Order> generateOrders() {return List.of(new Order("2022010100001", "订单1"),new Order("2022010100002", "订单2"),new Order("2022010100003", "订单3"),new Order("2022010100004", "订单4"),new Order("2022010100005", "订单5"));}
}

需要注意的是,在使用顺序消息或有序生产者的场景下,需要对消息进行分片存储,并且每个分片只由一个节点进行消费,否则分片的消费可能会导致消息的乱序。另外,在实际使用中,还需要考虑消息的存储容量、消费者处理能力等因素,来避免消息积压和阻塞,从而保证发送的消息是顺序的。
5.rocketmq topic,消息队列和分区 之间的关系
Topic:在RocketMQ中,topic是用来标识一类消息的名称,类似于Kafka中的topic概念。所有的消息都是按照topic进行发布和订阅的。

消息队列:在RocketMQ中,Topic下的消息会被存储在多个消息队列中。每个消息队列可以看作是一个逻辑上的单元,它包含了一组连续的、有序的消息。可以把一个Topic看作是一个消息队列的集合。

分区:在RocketMQ中,Topic下的每个消息队列可以由多个分区组成,以实现更好的并发处理和容错性,类似于Kafka中的partition概念。多个消息消费者可以独立的消费这些分区上的消息,从而提高系统的并发处理能力。

分区的个数是可以动态调整的,例如可以根据消息队列的使用情况进行扩容或缩减。RocketMQ使用的是基于“Hash”算法的负载均衡机制,将消息均匀地放置到不同的分区中,以实现分布式处理和负载均衡。

总的来说,RocketMQ中的topic、消息队列和分区之间的关系是:topic下包含多个消息队列,每个消息队列下又包含多个分区,每个分区包含有序的消息,并由消息消费者进行并发消费和处理。这种分层的架构模型可以提高RocketMQ的性能和可靠性。

rocketmq中的消息多列是最小的逻辑存储单位,一个消息队列最多对应一个消费者线程,一个消息队列可以对应多个分区,分区的作用是为了保证高可用和吞吐量


http://www.ppmy.cn/news/489736.html

相关文章

浅谈传统企业数字化转型的痛点与困难

00 导语 在和一些传统行业的企业&#xff08;包括但不限于一些老牌的大型企业&#xff09;合作的过程中&#xff0c;笔者发现&#xff0c;这些企业共同的特点就是历史悠久&#xff0c;资金预算相对雄厚&#xff0c;但是由于各类原因在数字化转型的过程中困难重重&#xff0c;相…

Fastboot驱动及安装

手机可以进入Fastboot模式&#xff0c;开发板通过暂停启动&#xff0c;输入fastboot也可以进入fastboot模式。有些CPU是需要fastboot&#xff0c;有些不需要。首先我们需要了解哪些手机需要安装Fastboot驱动&#xff1a; 小米、华为等厂商手机采用的高通、海思、英伟达处理器属…

mt6735通用recovery_MTK65XX通用线刷刷机工具驱动-MTK65xx刷机工具驱动及教程下载最新免费版-西西软件下载...

MTK65xx刷机工具驱动及教程是一款MTK65XX通用线刷刷机工具驱动&#xff0c;手机MTK线刷驱动 MTK65XX适用,可以用来刷所有基于MTK芯片的华为安卓手机&#xff0c;MTK65XX线刷卡刷通用教程&#xff0c;教你手机如何刷机。 MTK使用教程 1.驱动安装。解压&#xff0c;然后选择驱动自…

Auvidea J120 TX2开发板 Jetpack刷机与驱动安装

Auvidea J120 TX2开发板 Jetpack刷机与驱动安装 一、刷机准备 下载Nvidia SDK Manager 网址&#xff1a;https://developer.nvidia.com/nvidia-sdk-manager安装&#xff1a;sudo apt install ./sdkmanager_[version]-[build#]_amd64.deb打开客户端 普通打开&#xff1a;命令…

usb固件驱动驱动

USB固件编程之一&#xff1a;固件编程的工作内容 USB固件编程可以用以下语句来精练地进行描述&#xff1a; Device的固件编程&#xff0c;要搞定的是那几个端点。端点多少和配置情况受所用的Device芯片决定&#xff0c;具体可以看芯片资料。芯片一般提供一个中断信号&#xff0…

rk356x-Android 刷机

rk356x-Android刷机 瑞芯微芯片刷机流程&#xff0c;进入预刷机状态、加载刷机固件、刷机。首次刷机时请注意是否安装驱动&#xff0c;如未安装请先安装驱动 参照1.1。驱动正常安装后则可以正常识别到设备&#xff0c;一般在刷机软件下方提示“发现一个ADB设备”或者“发现一个…

USB_Burning_Tool的使用(V19刷固件)

需要工具 V19工具 USB_Burning Tool&#xff08;需要安装驱动 libusb-win32 &#xff09; 串口工具 问题 win10安装驱动过程中会出现的情况&#xff1a; 解决 win10需要禁用系统程序强制签名 设置→更新和安全→恢复→立即重新启动→疑难解答→开机重启 连接 需要一个自制…

adb 驱动 fastboot 驱动 win10

今天准备给手机刷机。 使用命令 adb reboot bootloader 进入 fastboot 状态后&#xff0c; fastboot 命令竟然找不到设备。 在fastboot模式下 发现驱动不可用。fastboot devices 想百度一下 fastboot 的驱动&#xff0c;结果全都是需要使用CSDN积分下载的附件, qiong....…