RocketMQ | 源码分析 | 消息刷盘

server/2024/10/22 12:25:08/

一、前言

在上篇文章中,我们分析了消息是如何从 Broker 最终存储到MappedFile 内存缓冲区中的,但是此时消息存储的任务并没有完成,因为消息还没有刷盘,即存储到文件中,本篇我们就来看看RocketMQ是如何进行消息刷盘的。

带着问题阅读

  1. 如何同步刷盘的?刷盘的流程是什么
  2. 刷盘的条件是什么?

二、刷盘策略

CommitLog在初始化的时候,会根据配置,启动两种不同的刷盘服务。

if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService();
} else {this.flushCommitLogService = new FlushRealTimeService();
}

1:同步刷盘
同步的意思就是说当消息追加到内存后,就立即刷到文件中存储。
2:异步刷盘
当消息追加到内存中,并不是理解刷到文件中,而是在后台任务中进行异步操作。
RocketMQ默认采用异步刷盘策略。
当CommitLog在putMessage()中收到MappedFile成功追加消息到内存的结果后,便会调用handleDiskFlush()方法进行刷盘,将消息存储到文件中。handleDiskFlush()便会根据两种刷盘策略,调用不同的刷盘服务。

三、类目

在这里插入图片描述
从类图上来看,有三个不同的实现思路。那下面逐一来看过。
适用情况如下

  • 同步刷盘使用GroupCommitService。
  • 异步刷盘且未开启TransientStorePool,使用FlushRealTimeService。
  • 异步刷盘且开启TransientStorePool,使用CommitRealService。

四、同步刷盘

同步刷盘的服务为GroupCommitService,该类有两个属性:requestsWrite和requestsRead。都是使用volatile关键字修饰的List对象。主要逻辑如下:
(1):handleDiskFlush()中提交刷盘请求

final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);

(2):同步等待刷盘结果,刷盘失败也会标志消息存储失败,返回 FLUSH_DISK_TIMEOUT

boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()+ " client address: " + messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}

进行同步刷盘的服务为 GroupCommitService,当请求被提交给GroupCommitService后,GroupCommitService并不是立即处理,而是先放到内部的一个请求队列中,并利用waitPoint通知新请求到来。

public synchronized void putRequest(final GroupCommitRequest request) {synchronized (this.requestsWrite) {this.requestsWrite.add(request);}if (hasNotified.compareAndSet(false, true)) {waitPoint.countDown(); // notify}
}

当 GroupCommitService 被唤醒后,便会将 requestsWrite 中的请求交换到 requestsRead中,避免产生锁竞争。
刷盘请求为啥要分读写两个列表呢?这是用来做读写分离用的,Producer发送消息的请求量是非常大,GroupCommitService的刷盘操作是同步的,刷盘期间仍然会有大量的刷盘请求被提交进来,拆分两个读写列表,请求提交到写列表,刷盘时处理读列表,刷盘结束交换列表,循环往复,两者可以同时进行。

private void swapRequests() {List<GroupCommitRequest> tmp = this.requestsWrite;this.requestsWrite = this.requestsRead;this.requestsRead = tmp;
}

GroupCommitService 在启动后会在死循环中调用doCommit()方法,而doCommit()则不断遍历requestsRead中的请求,进行处理:

private void doCommit() {//首先是在requestRead上进行加锁。synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {for (GroupCommitRequest req : this.requestsRead) {// There may be a message in the next file, so a maximum of// two times the flush//可能存在一条消息存在下一个文件中,因此最多可能存在两次刷盘。//如果文件刷盘的偏移量 < 请求的下一个偏移量,则说明还没有刷新完,还需要继续刷新。boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();//如果flushOK为false,就意味着还存在没有刷写到磁盘的区域,需要执行刷盘//为什么调用2次呢,因为在写消息的时候,可能会出现END_OF_FILE的情况。这种情况下就会将消息写入到新的文件中。//因此第1次调用会将倒数第二个文件刷写到磁盘。第2次调用才会选择到最新的MappedFile执行刷写。for (int i = 0; i < 2 && !flushOK; i++) {//只要有新数据写入就要刷盘,最小页数是0CommitLog.this.mappedFileQueue.flush(0);flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();}//唤醒在该请求上等待的外部线程req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);}long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {//设置检查点的写入时间,消息屋里落盘时间CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}//清空requestRead队列。this.requestsRead.clear();} else {// Because of individual messages is set to not sync flush, it// will come to this process//由于单个消息被设置为不同步刷新,因此将进入此进程。CommitLog.this.mappedFileQueue.flush(0);}}
}

可见这里最终调用了CommitLog.this.mappedFileQueue.flush(0) 来进行刷盘。
同步刷盘的任务虽然也是在异步线程中执行,但是消息存储的主流程中会同步等待刷盘结果,所以本质上还是同步操作。

五、异步刷盘

异步刷盘时,消息写入 PageCache 就会响应 ACK,然后由后台线程异步将 PageCache 里的内容持久化到磁盘,降低了读写延迟,提高了性能和吞吐量。服务宕机消息不丢失,机器断电少量消息丢失。

同步刷盘的服务为FlushRealTimeService,不过当内存缓存池TransientStorePool 可用时,消息会先提交到TransientStorePool 中的WriteBuffer内部,再提交到MappedFile的FileChannle中,此时异步刷盘服务就是 CommitRealTimeService,它继承自 FlushRealTimeService。

我们别管那么多,先看看FlushRealTimeService中的主要逻辑吧:
(1):handleDiskFlush()中直接唤醒异步刷盘服务

flushCommitLogService.wakeup();

(2):FlushRealTimeService 在启动后,会在死循环中周期性的进行刷盘操作,主要逻辑如下。它是CommitLog 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。
对于异步刷盘,没有提交刷盘请求一说。它不像同步刷盘,只要有消息写入 CommitLog 就要执行刷盘操作,因为异步刷盘是定时执行的。异步刷盘时,仅仅需要调用 wakeup 方法唤醒线程即可。所以,我们重点看它的run方法。
在这里插入图片描述

// run方法是一个while循环,只要服务没有停止,就会一直对commitLog下的MappedFile文件进行刷盘。
//默认间隔时间是500ms,可通过flushIntervalCommitLog属性配置while (!this.isStopped()) {// 是否定时刷新日志的设定,休眠策略,为 true 时,调用 Thread.sleep()休眠,为false时,调用wait()休眠,默认 falseboolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();// 获取刷盘周期,刷新到盘的时间间隔,默认为 500 msint interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();// 每次刷盘至少要刷多少页内容,每页大小为 4 k,默认每次要刷 4 页int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();// 两次刷写之间的最大时间间隔,默认 10 sint flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;// Print flush progresslong currentTimeMillis = System.currentTimeMillis();// 判断当前时间距离上次刷盘时间是否已经超出设置的两次刷盘最大间隔if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;// 如果已经超时,则将flushPhysicQueueLeastPages设置为0,表明将所有内存缓存全部刷到文件中flushPhysicQueueLeastPages = 0;printFlushProgress = (printTimes++ % 10) == 0;}try {// 根据不同休眠策略,进行休眠等待if (flushCommitLogTimed) {Thread.sleep(interval);} else {//使用的是CountDownLatch等待对应时间this.waitForRunning(interval);}if (printFlushProgress) {this.printFlushProgress();}long begin = System.currentTimeMillis();// 休眠结束,开始执行刷盘,然后更新storeTimestamp的值CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);// 文件的最后的刷盘时间long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}long past = System.currentTimeMillis() - begin;if (past > 500) {log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();}
}

通过上面这段逻辑可知,异步刷盘就在异步线程中,周期性的将内存缓冲区的内容刷到文件中,在消息主流程中,只会唤醒异步刷盘线程,而不会同步等待刷盘结果,所以称为异步刷盘。

六、MappedFile的刷盘

无论是上面哪种刷盘策略,最终都调用了下面这个方法进行刷盘,
有三个属性:wrotePosition、committedPosition、flushedPosition。代表含义如下
● wrotePosition:已经写入的内容的偏移量。这个偏移量可能是写入到文件也可能是写入到内存。
● committedPosition:内存区域提交到文件的偏移量。该属性只有在异步加速模式下才会有用。
● flushedPosition:已经刷入磁盘的偏移量。

CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

是时候看看mappedFileQueue.flush()中做了什么了。
1:从mappedFileQueue保存的所有MappedFile中,找出所要刷盘的MappedFile

//根据上次刷盘位置定位到MappedFile
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

flushedWhere 记录了最后一条被刷到文件的内容的全局物理偏移量。所以此次刷盘就要根据偏移量,找到本次要刷盘的起始点位于哪个MappedFile。

2:如果找到了对应的MappedFile,则对该MappedFile中的内容执行刷盘操作,并更新flushedWhere。

if (mappedFile != null) {long tmpTimeStamp = mappedFile.getStoreTimestamp();//刷盘int offset = mappedFile.flush(flushLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.flushedWhere;//更新刷盘位置this.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}
}

刷盘的终极目的地就在MappedFile的flush()方法中,具体也分为下面几步:
1:判断是否满足刷盘条件

if (this.isAbleToFlush(flushLeastPages))

isAbleToFlush()其实就是判断当前剩余未刷盘内容长度,是否超过最小刷盘长度:flushLeastPages,避免频繁无意义的刷盘。

private boolean isAbleToFlush(final int flushLeastPages) {//上次刷盘的条件int flush = this.flushedPosition.get();//已写入的位置int write = getReadPosition();//文件写满是必须要刷盘的if (this.isFull()) {return true;}if (flushLeastPages > 0) {//根据pageSize计算新写的数据是否达到了给定大小flushLeastPages//OS_PAGE_SIZE = 1024 * 4return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;}return write > flush;
}

2:如果满足刷盘条件,则将内存中的内容刷到文件中。

// 如果writeBuffer不为空,则表明消息是先提交到writeBuffer中,已经从writeBuffer提交到fileChannel,直接调用fileChannel.force()
if (writeBuffer != null || this.fileChannel.position() != 0) {this.fileChannel.force(false);
} else {  // 反之,消息是直接存储在文件内存映射缓冲区mappedByteBuffer中,直接调用它的force()即可this.mappedByteBuffer.force();
}

到这儿,消息就成功的从内存中存储到文件内部了。

区别

它跟异步刷盘的不同点是,异步刷盘是定时flush 。 这里没有进行flush,而且通过先按照文件提交的offset查找数据。然后提交。这里的提交分两类,一类是没有使用临时存储池。使用的是mappedByteBuffer也就是内存映射的方式。直接写到映射区域中的,那么这个时候就不需要写入的fileChannel了。直接返回写入的位置作为已经提交的位置。另一类是临时存储池,使用的堆外内存,那么这个时候需要先把信息提交到fileChannel中。

总结

通过上面的分析,我们了解了RocketMQ的两种刷盘策略:
一种是类似强一致的,保证消息存储到文件中的同步策略。
一种是提交到内存中就算存储成功,在后台异步进行刷盘的异步策略。
无论是哪种策略,肯定都有自己的优点和缺点,大家可以根据自己生成环境,选择合适的刷盘策略。
在这里插入图片描述


http://www.ppmy.cn/server/133894.html

相关文章

Blender快捷键alt+A对齐没有反应/无效的解决方案(备忘录03)

目录 MACHIN3下载地址&#xff1a; 如果altA无效&#xff0c;那么有几个情况 没有正确安装MACHIN3&#xff0c;或者安装的版本与Blender版本冲突。 AltA快捷键与其他软件的快捷键冲突。 &#xff08;本人最终解决方案&#xff09;没有打开对齐饼菜单 测试结果 AltA快捷键…

在 gRPC 中,客户端和服务端的 Protocol Buffers(Protobuf)生成的文件必须保持一致性,以确保通信正常。

在 gRPC 中&#xff0c;客户端和服务端的 Protocol Buffers&#xff08;Protobuf&#xff09;生成的文件必须保持一致性&#xff0c;以确保通信正常。 关键点 相同的 .proto 文件&#xff1a; 客户端和服务端应该使用相同的 .proto 文件定义服务和消息结构。这确保了双方对数据…

数据库相关操作

1. 创建数据库 首先&#xff0c;使用 CREATE DATABASE 语句来创建一个新的数据库。 CREATE DATABASE my_database; 2. 使用数据库 创建数据库后&#xff0c;使用 USE 语句切换到这个数据库。 USE my_database; 3. 创建表 接下来&#xff0c;在数据库中创建一张表。表中…

python画图| 对齐图名和标签

【1】引言 学习了很多python画图教程之后&#xff0c;我们会发现&#xff1a;一些最基本的设置往往对图形的表达具有至关重要的影响。 因此&#xff0c;我们暂时回过头来&#xff0c;对一些基础知识进行加强。 今天&#xff0c;就一起学习如何对齐图名和标签。 【2】官网教…

解锁Claude五大能力,带你使用更加强大的Claude

很多人都听说过这样一句话&#xff1a;"想要AI生成更好的答案&#xff0c;你必须提供清晰、有效的提示。" 这句话现在可谓是家喻户晓。 然而&#xff0c;即便知道这个道理&#xff0c;很多人仍然不知道该如何写出好的提示词。他们常常面对空白的输入框&#xff0c;…

大厂面试真题-说一下推和拉的模式以及常见的使用

Pull&#xff08;拉&#xff09;模式和Push&#xff08;推&#xff09;模式是消息传递中的两种基本机制&#xff0c;它们在消息中间件和注册中心中的应用广泛而多样。 Pull&#xff08;拉&#xff09;模式 Pull模式是一种消息消费模式&#xff0c;其中客户端主动从服务端拉取…

使用Verilog设计分频模块(2Hz)

在数字电路设计中&#xff0c;分频器是一种常见的电路&#xff0c;用于将一个高频的时钟信号分频到一个较低频率的时钟信号。本次将通过一个实际的例子&#xff0c;讲解如何使用Verilog语言设计一个分频器&#xff0c;将系统时钟信号分频到2Hz。 在数字电路系统的设计中&#x…

openresty安装

openresty官网&#xff1a;http://openresty.org/cn/ openresty官方安装文档&#xff1a;http://openresty.org/cn/installation.html github地址&#xff1a;https://github.com/openresty 安装前准备&#xff0c;必须安装perl、libpcre、libssl库。 可以用如下命令查看安装情…