图解RocketMQ之消费者如何进行消息重试

embedded/2024/9/23 7:27:01/

大家好,我是苍何。

在上一篇[[图解RocketMQ之生产者如何进行消息重试]]中分析了当生产者发送消息失败的时候,RocketMQ 是如何进行重试的。

最后留了一个问题,生产端的消息是重试 hold 住了,但如果消费者出现异常,消费某一条消息失败,这时候 RocketMQ 会怎么处理呢?

这还是很普遍的场景,试想一下如果订单系统发了个消息出去,库存系统消费这条消息失败,那会出现一个什么奇葩的现象呢?

就是你明明下了单买了一包辣条,结果迟迟收不到货,一查人说你的辣条订单压根没出库。

这可了得,吃不上的上辣条先不说,劳资付出去的钱可不能打了水漂啊。

那有了 RocketMQ 的消费重试,就能解决这个问题。

所以,为了了解辣条正常出货不被卡顿,学起来吧🐶。

消费重试的基本原理

当消费者在消费消息时失败,RocketMQ 会自动进行消息重试。对于顺序消息,默认会重复 16 次,每次重试间隔时间为1秒。

而对于无序消息(如普通、定时、延时和事务消息),则可以通过设置返回状态来达到重试的目的。

那如果重复 16 次之后还无法正常消费,那怎么办呢?

RocketMQ 会把该消息直接发送到死信队列中,这名字够吓人的,我先用正式的解释下:

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

这就像是你的女朋友连续给你发了 16 条消息,你一条都没有回复,气急败坏的她直接将你拉入黑名单,此时你就成了死信消息。

而女朋友的黑名单就是个死信队列,要想再次恢复和你宝贝女朋友的聊天,得她在微信上操作才行。

通常引起消费重试有以下两种情况:

  • 异常重试,包括消费者返回消息失败状态标识或抛出非预期异常。
  • 超时重试,包括在 PushConsumer 中排队超时。

聪明的你肯定看出这里又抛出一个概念 PushConsumer,我最讨厌留坑而不填坑,所以介绍下什么是 PushConsumer 吧。

其实它是一种消费者类型,在 RocketMQ 中把消费者分为 PushConsumer、SimpleConsumer 和 PullConsumer。

下面一张图简单介绍一下这 3 种消费者的区别吧:

RocketMQ的3种消费者的区别

很遗憾的是,在官方文档中和 PullConsumer 的介绍还没补充完🐶

PullConsumer官方介绍待补充

不过没关系,不是我们这一章的主角儿,哈哈。

消费重试策略

RocketMQ 的消费重试策略的内部机制其实是通过控制一下 3 个大将展开的:

  • 重试过程状态机:控制消息在重试流程中的状态和变化逻辑。
  • 重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间。
  • 最大重试次数:消息可被重试消费的最大次数。

通常是通过控制重试次数和重试时间间隔,来修改具体的重试状态来达到重试的目的。对于重试状态,大家可看下这张图:

消息状态机-官网

RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的,也就是一个组内的消费者都是用的该重试队列),用于暂时保存因为各种异常而导致Consumer端无法消费的消息,每个Consumer实例在启动的时候就默认订阅了该消费组的重试队列Topic。

考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大(实际上就是配置的延时队列的级别level)。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。

最大重试次数

RocketMQ默认允许每条消息最多重试16次,可以通过客户端参数DefaultMQPushConsumer.maxReconsumeTimes设置最大重试次数,

超过最大重试次数还消费失败,消息将会被无情的丢弃到死信队列中了。

maxReconsumeTimes默认值为-1,对于并发消费和顺序消费,两者的定义不一样:

  • 并发消费:-1就等于16。即并发消费默认最大重试16次,达到最大次数,消息将会发送至死信队列,不再重试。
  • 顺序消费:-1就代表着Integer.MAX_VALUE,表示无限次本地立即重试消费。这里的重试不再会将消息发往broker重试队列,只在在本地重试。

所以,可以通过配置最大重试次数来进行重试策略的自定义配置。

重试间隔时间

当然,还可以通过控制重试间隔时间来自定义重试策略,重试间隔时间指的是,过多久重试一次。

对于顺序消息,重试间隔为固定时间,默认 3000 毫秒。

那对于用的比较多的无序消息,重试时间间隔是阶梯时间,啥意思呢,先看下图:

无序消息重试阶梯时间间隔

这样做是因为,通常故障恢复需要一定的时间,如果不间断的重试,重试又失败的情况会占用并浪费资源。

所以 RocketMQ 的消费重试机制采用时间衰减的方式,并使用自身定时消费的能力来控制重试间隔时间。

首次在 10 秒后重试消费,如果消费成功则不再重试,如果消费失败则继续重试消费。第二次在 30 秒后重试消费。

依次内推,每次消费的时间间隔都会拉长,直到超出最大重试次数,则进入死信队列不再重试。

重试消费过程中的间隔时间使用了定时消息(后面会婆媳它,你可以简单理解就是内部支持的定时发送的消息),重试的消息并非直接写入到重试队列,而是先写入定时消息队列,再通过定时消息的功能转发给重试队列。

好啦,今天的分享结束。

我是苍何,这是图解 RocketMQ 教程的第 7 篇,我们下篇见~


http://www.ppmy.cn/embedded/88364.html

相关文章

基于地理面矢量的虚拟围栏

文章目录 概要大概过程C++实现头文件源码CMakeLists.txt如何使用概要 虚拟围栏(Geofencing)技术在多个领域有广泛应用,包括无人机飞行限制、车辆监控与管理、人员安全监控、儿童和宠物定位、营销与广告、智能家居与安防、自然保护区与动物监控、健康与健身应用、物流与仓储…

【QIIME2】细菌16s数据库_Greengenes

文章目录 下载Greengenes数据库在QIIME2中使用导入QIIME2中提取引用读取训练分类器测试分类器导出结果生成可视化文件 由于Bugbase功能注释时,输入的OTU表需经Greengenes注释(且由于时间原因须是第一版),故尝试使用Greengenes对16…

C++自定义接口类设计器之函数解析二

关键代码 // 解析为函数 bool FunctionCreator::parse(const QString& lineFunc) {auto trimFunc lineFunc.trimmed();auto list trimFunc.split(" ");bool bHasReturn false;// 返回值和函数名解析for (const auto& key : list) {auto trimKey key.trim…

模型优化—动量梯度下降

一、mini-batch 梯度下降(gradient descent): SGD(stochastic GD)随机梯度下降:对一个样本做梯度下降 batch梯度下降:使用所有样本做梯度下降(做一次又叫epoch) mini…

谷粒商城实战笔记-75-商品服务-API-品牌管理-品牌分类关联与级联更新

文章目录 一,引入Mybatis Plus分页插件二,品牌列表的模糊查询三,增加品牌测试数据四,开发后台品牌关联分类接口1,接口product/categorybrandrelation/catelog/list2,接口product/categorybrandrelation/sav…

javascript(三)

五、事件 发生在 HTML 元素上的事情,当在 HTML 页面中使用 JavaScript 时, JavaScript 可以触发这些事件 1.常用事件 事件 描述 onchange HTML元素改变 onclick 点击 onmouseover 鼠标移入 onmouseout 鼠标移出 onkeydown 按下键盘 onload…

旗晟机器人环境检测AI智慧算法

在当今迅猛发展的工业4.0时代,智能制造和自动化运维已然成为工业发展至关重要的核心驱动力。工业场景上不仅要对人员行为、仪器仪表识别。环境监测也是不可缺少的一个环节。那么我们说说旗晟环境监测AI智慧算法吧 旗晟仪环境监测AI智慧算法是通过各类采集设备与AI服…

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志

SpringBoot整合Flink CDC实时同步postgresql变更数据,基于WAL日志 一、前言二、技术介绍(Flink CDC)1、Flink CDC2、Postgres CDC 三、准备工作四、代码示例五、总结 一、前言 在工作中经常会遇到要实时获取数据库(postgresql、m…