13.RocketMQ之消息的存储与发送

news/2025/2/9 1:50:48/

1. 消息存储

1.1 消息存储

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。

image.png

  1. 消息生成者发送消息
  2. Broker收到消息,将消息进行持久化,在存储中新增一条记录
  3. 返回ACK给生产者
  4. Broker消息给对应的消费者,然后等待消费者返回ACK
  5. 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
  6. MQ删除消息

1.1.1 存储介质

  • 关系型数据库DB

Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障

  • 文件系统

    目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。

    消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。

    除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

1.1.2 性能对比

文件系统>关系型数据库DB

1.1.3 消息的存储和发送

1.1.3.1消息存储:顺序写

磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。

1.1.3.2消息发送

Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。

一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

1)read;读取本地文件内容;

2)write;将读取的内容通过网络发送出去。

这两个看似简单的操作,实际进行了4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复 制到运行程序用户态内存;
  3. 然后从运行程序用户态 内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。

image.png

通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现.

image.png

RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。

这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5\~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了

1.1.4 消息存储结构

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。

每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

image.png

  • CommitLog:存储消息的元数据
  • ConsumerQueue:消息在CommitLog的索引。 记录了minOffset和maxOffset和已经消费了的消息的偏移量consumerOffset。可以通过offset去CommitLog查找消息。
  • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程。
  • ConsumerQueue和messageQueue是一 一对应的。

1.1.5 刷盘机制

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。

RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。

消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式:同步刷盘和异步刷盘。

image.png

1.1.5.1同步刷盘

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。

1.1.5.2异步刷盘

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

1.1.5.3刷盘配置

同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。


http://www.ppmy.cn/news/584486.html

相关文章

【rmzt】动漫性感美女win7主题_8.13

动漫性感美女电脑桌面壁纸下载《鼠标右键另存为本地》 动漫性感美女电脑桌面主界面效果图 win7桌面主题动漫性感美女电脑桌面开始菜单效果图 动漫性感美女电脑桌面图标效果图 动漫性感美女电脑鼠标效果图 动漫性感美女主题内容:动漫性感美女电脑桌面壁纸下载《鼠标右…

ai动漫脸

import requests import base64 # client_id 为官网获取的AK, client_secret 为官网获取的SK host https://aip.baidubce.com/oauth/2.0/token?grant_typeclient_credentials&client_idaAzgARQrG8qCjKKuHfyMieIh&client_secretC63LvFuqMi8GYuIhVGMkiuKdTT…

go语言系列基础教程总结(2)

1、定义定长数组 var myArray [10]int{1,2,3,4,5} 2、定义动态数组 var arr [ ] int arr make([ ]int,6) 3、定义map var m map[string]string m make(map[string]string,10){ "one": "java", "two": "c#", } 3、定…

Docker 中的 .NET 异常了怎么抓 Dump

一:背景 1. 讲故事 有很多朋友跟我说,在 Windows 上看过你文章知道了怎么抓 Crash, CPU爆高,内存暴涨 等各种Dump,为什么你没有写在 Docker 中如何抓的相关文章呢?瞧不上吗? 哈哈,在DUMP的分…

Juc04_阻塞队列概述、方法、实现类、Linked和Array区别、注意事项

文章目录 ①. 什么是阻塞队列②. BlockingQueue的主要方法③. BlockingQueue的实现类④. Linked和Array区别⑤. 不推荐使用快捷的线程池 ①. 什么是阻塞队列 ①.阻塞队列:从名字可以看出,它也是队列的一种,那么它肯定是一个先进先出FIFO的数据结构。与普通队列不同的是,他支持两…

01_LBP算法原理

问题 LBP是一种常见的特征描述算法,用来提取局部的纹理特征,其原理其实很简单,下面我们就来看看它是怎么一回事吧。 毕设项目演示地址: 链接 毕业项目设计代做项目方向涵盖: 目标检测、语义分割、深度估计、超分辨率、3D目标检…

OPENWRT网络打印机TCP/IP共享设置教程 以703N为例

OPENWRT网络打印机TCP/IP共享设置教程以703N为例 前言 最近一台带USB2.0接口的惠普P1106激光打印机使我比较头疼,由于打印机放在家中楼上,没空调,夏天太热,每次抱着电脑上楼打印都大汗淋漓的下来,于是乎,…

Win10局域网添加XP下共享的打印机

Win10家庭64位版PC一台,XP32位PC一台,LBP2900打印机一台。 打印机连接在XP上,需要与Win10共享使用。 Win10曾经添加共享打印机成功并使用了一段时间,昨天突然出现不能打印的问题,打印机显示有打印队列,但…