RocketMQ 存储优化技术 解析——图解、源码级解析

news/2025/3/5 20:13:23/

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年1月13日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • RocketMQ里的存储优化
    • 内存预分配
    • `mlock`系统调用
    • 文件预热
  • 存储模型
  • 刷盘流程
    • 同步刷盘
    • 异步刷盘

RocketMQ里的存储优化

内存预分配

写入消息时,CommitLog会先从MappedFileQueue(队列)中获取一个MappedFileMappedFile对象的预分配过程如下图所示:

在这里插入图片描述

MappedFile的创建过程是将构建好的AllocateRequest请求添加至队列中,后台运行的AllocateMappedFileService服务线程根据队列里存在的请求,执行MappedFile映射文件的创建和预分配工作。

 static class AllocateRequest implements Comparable<AllocateMappedFileService.AllocateRequest> {private String filePath;private int fileSize;private CountDownLatch countDownLatch = new CountDownLatch(1);private volatile MappedFile mappedFile = null;

分配时有两种策略:

  1. 使用mmap的方式创建MappedFile实例
  2. TransientStorePool堆外内存池中获取相应的DirectByteBuffer来构建MappedFile实例

在创建并分配完成MappedFile实例后,系统还会同时将下一个MappedFile实例也预先创建出来并保存到请求队列里,下次请求时可以跳过创建MappedFile实例的时间,直接返回。


mlock系统调用

该系统调用的功能是可以将进程使用的部分或所有的地址空间锁定在物理内存里,防止其被交换到swap空间。

对于一款消息中间件来说,追求的一定是消息读写的低延时,因为内存页面调出调入的时间延迟可能太长或难以预知,所以就希望尽可能多地使用物理内存,以提高数据读写的效率。


文件预热

做预热主要是基于如下考虑:

  1. 仅仅分配内存并执行mlock系统调用之后并不会为程序完全锁定这些内存,其中的分页仍然可能是copy-on-write的,因此RocketMQ在创建MappedFile实例的时候,会先写入一些随机值到mmap映射出的内存空间里。

  1. 使用mmap进行内存映射之后,操作系统只是建立虚拟内存地址到物理地址的映射表,但没有加载任何文件至内存中。程序想要访问数据时,操作系统会检查该部分的分页是否已经在内存里,如果不在的话,则会发出一次缺页中断。RocketMQ在做mmap内存映射的同时进行madvise系统调用,目的是使操作系统做一次内存映射后对应的文件数据尽可能多地预加载进内存,从而实现预热。

x86 Linux中的一个标准页面大小为4KB,1G的commitLog需要发生256次缺页中断,才能将数据完全加载进物理内存中


存储模型

1. CommitLog

消息主体及元数据的存储主体,存储Producer端写入的消息主体内容。单个文件默认大小为1GB,文件名长度为20位

00000000000000000000表示第一个文件,起始偏移量为0,文件大小为1GB = 1073741824;00000000001073741824表示第二个文件,写入是顺序写

public class CommitLog {// Message's MAGIC CODE daa320a7public final static int MESSAGE_MAGIC_CODE = -626843481;protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// End of file empty MAGIC CODE cbd43194protected final static int BLANK_MAGIC_CODE = -875286124;protected final MappedFileQueue mappedFileQueue;protected final DefaultMessageStore defaultMessageStore;private final FlushCommitLogService flushCommitLogService;//If TransientStorePool enabled, we must flush message to FileChannel at fixed periodsprivate final FlushCommitLogService commitLogService;private final AppendMessageCallback appendMessageCallback;private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);protected volatile long confirmOffset = -1L;private volatile long beginTimeInLock = 0;protected final PutMessageLock putMessageLock;
}

2. ConsumeQueue

消息消费的逻辑队列,其中包含了这个MessageQueueCommitLog中的起始物理位置偏移量offset、消息实体内容的大小和Message Tag的哈希值。从实际物理存储来说,ConsumeQueue对应每个TopicQueueId下面的所有文件,每个文件默认大小为6MB,差不多可以存储30万条消息,也是顺序写入。

public class ConsumeQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");public static final int CQ_STORE_UNIT_SIZE = 20;private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger("RocketmqStoreError");private final DefaultMessageStore defaultMessageStore;private final MappedFileQueue mappedFileQueue;private final String topic;private final int queueId;private final ByteBuffer byteBufferIndex;private final String storePath;private final int mappedFileSize;private long maxPhysicOffset = -1L;private volatile long minLogicOffset = 0L;private ConsumeQueueExt consumeQueueExt = null;
}

3. IndexFile

用于为生成的索引文件提供访问,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名是以创建的时间戳命名的,一个IndexFile可以保存2000万个索引。

public class IndexFile {private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");private static int hashSlotSize = 4;private static int indexSize = 20;private static int invalidIndex = 0;private final int hashSlotNum;private final int indexNum;private final MappedFile mappedFile;private final FileChannel fileChannel;private final MappedByteBuffer mappedByteBuffer;private final IndexHeader indexHeader;
}

4. MappedFileQueue

对连续物理存储的封装类,代码中可以通过消息存储的物理偏移量快速定位offset对应的MappedFile

public class MappedFileQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);private static final int DELETE_FILES_BATCH_MAX = 10;private final String storePath;private final int mappedFileSize;private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();private final AllocateMappedFileService allocateMappedFileService;private long flushedWhere = 0;private long committedWhere = 0;private volatile long storeTimestamp = 0;
}

5. MappedFile

文件存储的直接内存映射封装类,通过该类的实例,可以把消息写入PageCache,或者将消息刷盘。

public class MappedFile extends ReferenceResource {public static final int OS_PAGE_SIZE = 1024 * 4;protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);protected final AtomicInteger wrotePosition = new AtomicInteger(0);protected final AtomicInteger committedPosition = new AtomicInteger(0);private final AtomicInteger flushedPosition = new AtomicInteger(0);protected int fileSize;protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/protected ByteBuffer writeBuffer = null;protected TransientStorePool transientStorePool = null;private String fileName;private long fileFromOffset;private File file;private MappedByteBuffer mappedByteBuffer;private volatile long storeTimestamp = 0;private boolean firstCreateInQueue = false;
}

刷盘流程

同步刷盘

在这里插入图片描述
只有在消息成功写入磁盘之后,Broker才会给Producer返回一个ACK响应。RocketMQ中,主线程会首先创建一个刷盘请求实例GroupCommitRequest,并将其放到刷盘队列,之后使用同步刷盘线程GroupCommitService来执行刷盘动作。

GroupCommitService里使用到CountDownLatch来控制线程间同步

RocketMQ里还使用了两个队列分别负责读和写操作,实现读写分离,提高并发量

同步刷盘可以保障较好的一致性,一般适用于金融业务领域。


异步刷盘

在这里插入图片描述

只要消息写入到Pagecache之后就可以直接返回ACK给Producer,之后,后台异步线程负责将消息刷盘,此时主线程并不会阻塞,降低了读写延迟,从而达到提高MQ性能和吞吐量的目的。



http://www.ppmy.cn/news/11792.html

相关文章

算法训练营 day17 二叉树 平衡二叉树 二叉树的所以路径 左叶子之和

算法训练营 day17 二叉树 平衡二叉树 二叉树的所以路径 左叶子之和 平衡二叉树 110. 平衡二叉树 - 力扣&#xff08;LeetCode&#xff09; 给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二叉树定义为&#xff1a; 一个二叉树每…

1.12

路由传参 最常用对象写法 **传递params参数 需要配置name 不是path** 1:如何指定params参数可传可不传? 路由要求传参 并且已经占位 但是没有传递参数 路径会出现问题 在占位的时候在后面加上问号 解决路径问题 2:params参数可以传递也可以不传递&#xff0c;但是如果传递是空…

Vue纯前端:榜单管理系统

文章目录:一、主要功能&#xff1a;二、实现效果&#xff1a;1.主页:2.注册&#xff1a;3.登录&#xff1a;4.列表界面&#xff1a;5.添加应用界面&#xff1a;6.修改应用界面&#xff1a;7.模糊查询&#xff1a;三、整体架构&#xff1a;四、配置文件说明&#xff1a;五、功能…

Python采集彼岸4K高清壁纸

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 又到了学Python时刻~ 环境使用: Python 3.8 解释器 Pycharm 编辑器 模块 import re import requests >>> pip install requests ( 更多资料、教程、文档点击此处跳转跳转文末名片加入君羊&#xff0c;找…

【寒假每日一题】DAY.5 调整奇数偶数顺序

题目内容&#xff1a; 调整数组使奇数全部都位于偶数前面。题目&#xff1a;输入一个整数数组&#xff0c;实现一个函数&#xff0c;来调整该数组中数字的顺序使得数组中所有的奇数位于数组的前半部分&#xff0c;所有偶数位于数组的后半部分。思路&#xff1a; 第一步&#xf…

学习CSS3,使用双旋转实现福到了的迎春喜庆特效

春节快到了&#xff0c;因为疫情已经好久没有回老家了&#xff0c;今年终于可以回家过年了&#xff0c;我已经抑制不住自己激动的心情了。因此&#xff0c;我利用css3的旋转做了一个福到了的特效&#xff0c;而且是双旋转哦。 目录 1、实现思路 2、大红纸的渲染过程 3、错误…

详解SpringMVC

1.DispatcherServlet初始化时机 DispatcherServlet是由spring创建的&#xff0c;初始化是由Tomcat完成的&#xff0c;通过setLoadOnStartup来决定是否为tomcat启动时初始化 Configuration ComponentScan // 没有设置扫描包的话默认扫描当前配置的包及其子包 PropertySource(&…

基于YOLOv5+C3CBAM+CBAM注意力的海底生物[海参、海胆、扇贝、海星]检测识别分析系统

在我前面的一些文章中也有用到过很多次注意力的集成来提升原生检测模型的性能&#xff0c;这里同样是加入了注意力机制&#xff0c;区别在于&#xff0c;这里同时在两处加入了注意力机制&#xff0c;第一处是讲CBAM集成进入原生的C3模块中&#xff0c;在特征提取部分就可以发挥…