RocketMQ5.x版本延迟消息被重放问题调查

news/2024/12/28 21:43:14/

一、问题

由于目标计划是将集群从4.9.x逐步升级至5.x,故目前先对一些不重要的集群进行升级测试。
但是在4.x的broker陆续升级至5.x的过程中,发现了延迟消息被重放的问题。
具体如下:
在升级时刷新后台监控,发现竟然有写入量:
在这里插入图片描述
即上图(因为当时的问题场景没有截图,所以上图只是提供参考)红框中的生产和消费项并不为0,等启动完毕后,变为0,感觉有些诡异。
之后,查询了该集群的topic情况,这一查竟然发现好多topic出现了堆积的情况。

二、调查

1 堆积情况:
经过调查所有topic,这些topic于几日前已经不进行生产和消费,也就是生产消费完成了。
而发现堆积的topic都是之前消费有失败的,产生了重试消息的,而这些重试消息也是被消费完了的。
而我将4.x的一个broker升级到5.x后,重试的消息完全被重放了,类似如下截图:
在这里插入图片描述
%RETRY%pugc-sofa-delete-video-flush-consumer这个topic在broker-cold-1上的消息量变成了34706,而消费量为17353,堆积量跟消费量相等。

我查看了好几个topic,发现只要有重试消息的,都存在这个情况,也就是重试消息被重写了一遍!

2 日志调查
经过拿其中一个消费者:pugc-sofa-delete-video-flush-consumer进行调查,根据该消费者最后在日志中出现的位置,可以看到:

2023-06-05 15:25:18 ERROR NettyServerNIOSelector_3_2 - RemotingCommand [code=11, language=JAVA, version=395, opaque=166848104, flag(B)=0, remark=null, extFields={queueId=0, maxMsgNums=32, sysFlag=3, suspendTimeoutMillis=15000, commitOffset=17353, topic=%RETRY%pugc-sofa-delete-video-flush-consumer, queueOffset=17353, expressionType=TAG, subVersion=1685949771302, consumerGroup=pugc-sofa-delete-video-flush-consumer}, serializeTypeCurrentRPC=JSON]

commitOffset=17353,该消费者在2023-06-05消费重试消息时,消费偏移量已经达到17353。

这进一步确认了,就是因为我升级5.x导致了重试消息被重写了一遍。

我进一步走查了5.x的broker启动时关于%RETRY%pugc-sofa-delete-video-flush-consumer的日志,如下:

store.log:2023-06-07 09:39:13 INFO main - load /opt/mqcloud/broker-cold-1/data/consumequeue/%RETRY%pugc-sofa-delete-video-flush-consumer/0/00000000000000000000 OK
store.log:2023-06-07 09:39:13 INFO main - load consume queue %RETRY%pugc-sofa-delete-video-flush-consumer-0 OK
store.log:2023-06-07 09:39:14 INFO main - recover current consume queue file over,  /opt/mqcloud/broker-cold-1/data/consumequeue/%RETRY%pugc-sofa-delete-video-flush-consumer/0/00000000000000000000 0 0 0
store.log:2023-06-07 09:39:14 INFO main - recover current consume queue over /opt/mqcloud/broker-cold-1/data/consumequeue/%RETRY%pugc-sofa-delete-video-flush-consumer/0/00000000000000000000 347060

根据最后一条的347060,调查对应代码简化如下:

public void recover() {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {int mappedFileSizeLogics = this.mappedFileSize;long mappedFileOffset = 0;while (true) {for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();long tagsCode = byteBuffer.getLong();if (offset >= 0 && size > 0) {mappedFileOffset = i + CQ_STORE_UNIT_SIZE;} else {log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "+ offset + " " + size + " " + tagsCode);break;}}if (mappedFileOffset == mappedFileSizeLogics) {} else {log.info("recover current consume queue over " + mappedFile.getFileName() + " "+ (processOffset + mappedFileOffset));break;}}}
}

347060代表的是重试队列的物理偏移量,由于consumequeue每个数据单元大小是20个字节,那么消息量为347060/20=17353。

也就是说,在broker初始化恢复数据时,重试队列的数据量还是对的。

而broker启动整个过程包括,创建->初始化->数据恢复->启动,如果数据重复,就很可能出现在启动过程中。

3 走查延迟队列代码
通过上面的调查,基本可以确认,重试队列数据刚加载时是对的,应该是启动后内部的延迟队列进行重新消费导致的。

随即对比5.x和4.x的延迟队列相关代码,至到对比到下面的代码:
在这里插入图片描述
在这里插入图片描述
即5.x的ScheduleMessageService竟然在构造方法中添加了一个持久化的定时任务,而且在start方法中又添加了一个同样的持久化的定时任务

而4.x只有在start时,先执行load(),再执行持久化定时任务。

那么5.x的持久化任务在构造方法中延迟10秒就会执行,如果10秒内,数据还没有加载完成,它执行persist必然会把存在的delayOffset.json覆盖

再start时,执行load,加载的offset就是空的,那就会认为没有消费过,进行数据重放

接着再查看store.log的日志,如下:
在这里插入图片描述
在这里插入图片描述
从2023-06-07 09:39:13启动,至到2023-06-07 09:39:24才刚恢复完数据,那么调用start方法肯定在10秒之后了。

而在测试环境中,broker没多少数据,10秒内肯定已经启动完了,所以没有发现这个问题,而线上环境数据量一般都是上百G,通常启动比较慢。

三、解决

所以定位到问题所在,就好改了,直接把5.x中ScheduleMessageService构造方法中的定时持久化代码移除即可。


http://www.ppmy.cn/news/294413.html

相关文章

spring configuration 配置 aop

-- 基于表达式方式 AspectJExpressionPointcut cut new AspectJExpressionPointcut();cut.setExpression("* com.xx.service..*.*(..))");Advisor advisor new DefaultPointcutAdvisor(cut,实现MethodInterceptor接口);-- 注解形式DefaultPointcutAdvisor adviso…

js 根据字符串类型的数字排序

var arr [{AddRess: "广州南站", lng: 113.275824, lat: 22.994826, Mileage: "558"},{AddRess: "长沙南站", lng: 113.071579, lat: 28.15323, Mileage: "883"},{AddRess: "上海虹桥站", lng: 121.327012, lat: 31.200458…

vue-router4.x报错 api.now is not a function 的解决方法

控制台抛出错误&#xff0c;并且无法切换路由&#xff0c;如下&#xff1a; Uncaught (in promise) TypeError: api.now is not a function ? ? at vue-router.esm-bundler.js:2489:31 ? ? at vue-router.esm-bundler.js:3296:37 ? ? at Array.forEach (<anonymous&g…

【运行vue项目vue-router报错】Uncaught TypeError: Object(...) is not a function

报错 vue2脚手架运行npm run dev&#xff0c;失败&#xff0c;报这样的错 vue-router.esm-bundler.js?6c02:2127 Uncaught TypeError: Object(...) is not a functionat eval (vue-router.esm-bundler.js?6c02:2127:1)at Module../node_modules/vue-router/dist/vue-router.…

4229: 选择

4229: 选择 Time Limit: 10 Sec Memory Limit: 128 MB Submit: 59 Solved: 36 [ Submit][ Status][ Discuss] Description 现在&#xff0c;我想知道自己是否还有选择。 给定n个点m条边的无向图以及顺序发生的q个事件。 每个事件都属于下面两种之一&#xff1a; 1、删除某一…

hdu2489

这题用到 枚举prim 拍了半天队&#xff0c;已经没报希望了&#xff0c;wa了好多次&#xff0c;结果竟然ac&#xff0c;看自己做没错&#xff0c;一些细节没处理好。 总的来讲像这种数据小的题目用枚举完全无压力&#xff0c;放心用。 这里注意一下对于非重排列&#xff0c;就…

Spark本地模式与Spark Standalone伪分布模式

红字部分来源于&#xff1a;董的博客 目前Apache Spark支持三种分布式部署方式&#xff0c;分别是standalone、spark on mesos和 spark on YARN&#xff0c;其中&#xff0c;第一种类似于MapReduce 1.0所采用的模式&#xff0c;内部实现了容错性和资源管理&#xff0c;后两种则…

【ELMAN回归预测】麻雀搜索算法SSA优化ELMAN神经网络回归预测(多输入单输出)【含Matlab源码 2489期】

⛄一、麻雀算法简介 1 标准麻雀算法 算法运算过程由探索者、追随者与预警者3部分构成,其中探索者与追随者的总数量与比例不变,根据适应度数值的改变,两者可以相互转化。通过觅食和反捕食行为来不断更新种群成员最优位置。 设种群数量为n,在第K次迭代中,探索者的位置更新方…