图解RocketMQ之消息如何存储

ops/2024/11/9 16:43:14/

大家好,我是苍何。

人一辈子最值得炫耀的不应该是你的财富有多少(虽然这话说得有点违心,呵呵),而是你的学习能力。技术更新迭代的速度非常快,那作为程序员,我们就应该拥有一颗拥抱变化的心,积极地跟进。

在学习 RocketMQ 的消息存储之前,我们已经学习了 topic 主题、消息模型、消息的过滤和重试等知识点。

这让我感觉自己富有极客精神,非常良好。

我好快乐

小伙伴们在继续阅读之前,我必须要声明一点,为一名负责任的技术博主,我是动了心的,这篇教程,小伙伴们读完后绝对会感到满意,忍不住无情地点赞,以及赤裸裸地转发

当然了,小伙伴们遇到文章中有错误的地方,不要手下留情,可以组团过来捶我,但要保证一点,不要打脸,我怕毁容。

我们都知道,消息队列一大核心功能是削峰填谷,也就是在大流量场景下(比如 12306 春运购票),将部分业务逻辑放在 MQ,等其慢慢处理,且给用户友好提示:

“您已加入候补队列,请耐心等待!”

在 MQ 中的消息,可以有条不紊的、均匀的消费,但需要保证消息存储的可靠性

不可能我加入候补半天,让我等,我等了 2 天,然后提示我压根没加入候补,这不是坑人吗?

mq宕机后消息都丢了

所以,我们需要动员脑子里所有脑细胞,仔细思考下,MQ 中的 broker 该如何存储消息才能防止消息丢失呢?

消息可以存 MySQL 吗

要防止数据不会在持久化的时候消失,很多的做法都是将数据库直接存在 MySQL,比如 Nacos 持久化、Redis 持久化。

那 RocketMQ 可以把消息存储在 MySQL 吗?

单纯从持久化来看,没问题,但消息可不单单是做持久化,需要满足削峰填谷需求,MySQL 性能就无法满足了。

如果 MySQL 可以,那我干嘛费老半天劲把消息丢到 MQ 呢?我直接丢 MySQL 处理不就得了?

这时估计 MySQL 他老人家也会想"合着折腾半天,最后又回到我这了是吧?"

消息可以存 MySQL 吗

消息可以存 Redis 吗

这个时候你聪明的脑袋瓜肯定灵机一动,那要想追求高性能,存 Redis 啊,Redis 贼快,而且还可以持久化,这下可以了吧?

Redis 就像是个 mini 冰箱,虽然取东西快,但空间有限。RocketMQ可是要处理海量消息的"大胃王",用硬盘这个"大冷库"才够塞。

Redis 虽然也能持久化,但更像是给冰箱拍照。RocketMQ 需要的是 365 天 24 小时不间断运转的"冷库"级别持久化。

处于性能的考虑,Redis 读写虽然很快,毕竟还是操作内存,但要在内存和磁盘间倒腾数据就慢了。

Redis说:臣妾做不到啊

还有一个核心问题是,如果真是把消息放 Redis 了,那又引入了第三方组件,合着,我想用个 RocketMQ,你还得硬要我搞个 Redis。Redis 要宕了,RocketMQ 也嗝屁了。

软件工程,不是越复杂越好,和生活一样,往往大道至简的东西才更优雅,故而 RocketMQ 是绝对不可能吧消息丢给 Redis 的。

消息存哪里

这时,你肯定会没耐心的对苍何想要拳打脚踢,说这么半天,那消息到底存在哪儿?

先说结论:消息存本地磁盘

是不是简单粗暴?反正最后不管是 MySQL 还是 Redis,持久化都要存磁盘,那我直接绕过中间商,不更优雅吗?

磁盘存储绕过中间商

问题很多的小明又问了,磁盘也不可靠啊,不是照样会损坏?

但现在服务器上都要做 RAID,有备份在,那如果服务器直接被炸了呢?(虽然这种概率很小)

那还可以多云融合(就是多个云服务器备份),异地容灾等,这里就不展开了,总之,道高一尺,魔高一丈。

极致的可靠是不存在的,我们只能在我们能想到的合理范围内做到可靠,这就是互联网。

你可能会问,存到磁盘上,那是如何存储的呢?

假如有很多个 topic,每个 topic 又有很多个队列,是每个 topic 一个文件?还是每个队列一个文件?

commitlog

为了保证 RocketMQ 的写性能,最好是将所有消息都放在一个文件,并顺序读、顺序写

如果不放在一个文件,硬盘的存储物理位置就不是连续的,能无法保证顺序写,这个时候写入后的物理位置其实就是随机的了。

顺序读写也是为了追求极致的性能,这个其实和 MySQL 的 redo log 有异曲同工之妙。

而这个存储消息的本地文件,就叫做 commitlog。位于 rocketmq/data 文件夹下面。

commitLog所在位置

当你用你粗壮的手准备打开这个文件时,发现是个二进制文件。

commitLog是个二进制文件

这也很好理解,使用二进制存储,一来节省存储空间,二来也能提高读取性能

要想打开读取这文件,可以借助 RocketMQ 自带的 mqadmin 来读取,比如:

# 使用rocketmq-tools查看消息
sh mqadmin queryMessageByKey -n <namesrvAddr> -t <topic> -k <messageKey>

当然也可以使用开源工具如 rocketmq-dump 来查看,小伙伴们可以亲自试试。

consumequeue

细心的小伙伴可能已经注意到除了 commitlog 还有一个文件 consumequeue,那这里面存的是啥子呢?

我们知道 commitlog 存的是消息的全量数据,那对于消费者来说,想要知道自己该消费哪条消息,我们在之前的文章中分析过,靠的是消费位点(comsumerOffset),也就是消息的记录。

这个 comsumerOffset 也是会落在磁盘持久化的。

commitlog 里面只有一个文件,里面存的是消息的全量信息,为了追求极致的性能体验,肯定是需要索引来进行查询。

consumequeue 存放的是起始偏移量和长度,相当于这个索引。

通过起始偏移量和长度可以查询 commitlog 中的全量消息信息。

为什么这么做呢?

文件下标对应关系

假设 commitlog 存了 2 条消息,分别是 ABC、012,那对应的文件内容是 ABC012,每个字节会对应一个下标,比如:

A:0
B:1

我们要想读到 ABC 这条消息,只需要知道起始的 A 对应的下标,以及消息的长度就可以拿到这条消息了。这个下标其实就是偏移量

通过起始下标和长度可以拿到消息

所以 consumequeue 中存的就是消息的起始偏移量和长度。

consumequeue存的是啥子?

那么现在我们来梳理下消息存储的流程吧:

RocketMQ 先将消息存到 commitlog,然后通过定时任务把消息的起始偏移量和长度存到 consumequeue,然后根据这两个数据定位到消息,在 consumequeue 其实就已经按照 topic 和队列分好了,查找起来也是相当快的。

消费者消费消息的时候,通过消费位点找到 consumequeue 里面的起始偏移量和长度,通过索引找到 commitlog 对应的全量消息,然后再消费。

消息存储和消费流程

消息索引

细心的小伙伴可能会注意到通过起始偏移量和长度来定位消息,着实有些慢,并不能快速查找到消息。

RocketMQ 提供了消息索引机制。5. x 中是位于 index 文件夹中,存放的就是索引信息。(这个索引更快👍)

消息索引

索引的内部通过数据槽来实现,感兴趣的小伙伴可以去了解下内部实现原理。

通常我们会将业务唯一 id 设置为索引,比如订单号,查询订单号就能立刻查询出当前订单的消息。

那么如何添加索引呢?

发送消息的时候,可以添加索引:

Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
msg.setKeys("yourKey");

broker 配置中需要做下设置:

messageIndexEnable=true
messageIndexSafe=false

最后

不得不说,一个优秀的中间件,往往会有很多精妙的设计,RocketMQ 的消息存储机制的设计就堪称完美。

当然,还有更多的细节,比如 commitlog 的加载流程、索引的存储结构,这些大家感兴趣的可以自行查阅资料哈。

好啦,今天的分享结束。

我是苍何,这是图解 RocketMQ 教程的第 8 篇,我们下篇见~


http://www.ppmy.cn/ops/89539.html

相关文章

20. 有效的括号

思路&#xff1a; 匹配表&#xff1a;定义一个函数&#xff0c;返回每个右括号对应的左括号。&#xff08;哈希表匹配也行&#xff09;栈操作&#xff1a;遍历字符串中的字符&#xff0c;左括号入栈&#xff0c;右括号则与栈顶元素匹配。最终检查&#xff1a;遍历结束后&#…

计算机网络-http协议和https的加密原理

HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;是用于在万维网&#xff08;World Wide Web&#xff09;上传输超文本的基础协议。它定义了客户端&#xff08;通常是浏览器&#xff09;和服务器之间的文本数据传输格式和规则。以下是HTTP的…

《亿级流量系统架构设计与实战》第三章 通用的服务可用性治理手段

通用的服务可用性治理手段 一、上游保护下游1、重试2、熔断3、资源 二、下游自我保护1、幂等2、限流3、降级 内容总结自《亿级流量系统架构设计与实战》 一、上游保护下游 1、重试 接口超时情况&#xff1a; 请求发送超时&#xff0c;下游服务并没有接收到请求请求处理超时&…

关于我

1、百度集团攻防蓝军、深信服深蓝攻防实验室、新华三攻防实验室。多次国家级、省级、地市级、行业级护网攻击队前三名 2、漏洞盒子总榜前五十名&#xff0c;获得10个月的月榜前十、赛季总榜前五十 3、补天总榜前五十名、月榜补天战神、补天双旦活动、五一活动获得奖励 4、去…

LocalDateTime的序列化和反序列化

背景 最近定位出一个LocalDateTime序列化相关的问题&#xff0c;简单记录一下。本文重点介绍Jackson对LocalDateTime的序列化和反序列化&#xff0c;并结合Spring应用场景进行介绍。 1.LocalDateTime与字符串转换 可以通过DateTimeFormatter实现LocalDateTime与字符串的相互…

主动学习实现领域自适应语义分割

领域自适应语义分割是指在一个领域上训练的语义分割模型能够有效地应用到另一个不同但相关的领域。主动学习是通过智能选择最有价值的数据进行标注&#xff0c;以提高模型的性能和效率。将这两者结合起来&#xff0c;可以实现高效的领域自适应语义分割。 以下是实现主动学习和…

接口测试学习笔记2

一、复习和扩展&#xff1a; 1、金字塔测试模型 UI测试 -- 黑盒 Service 服务层--函数之间的调用 灰盒 接口测试 Unit单元层--白盒测试 趋势&#xff1a;逐步向下发展 测试优先、测试驱动 -- 先考虑怎么测&#xff0c;再考虑怎么开发 满足软件测试的可控范围 2、…

vm母盘配置实验环境

目录 设备:RHEL 9 一.配置本地软件仓库 二.配置网络设备脚本 三.设定网卡规范名称、关闭selinux、关闭并锁住防火墙 四.删除eth0连接并清除历史命令 设备&#xff1a;RHEL 7 一.设定网卡规范名称、关闭selinux、关闭并锁住防火墙 二.配置本地软件仓库 三.配置网络设备…