【RabbitMQ重试】重试三次转入死信队列

ops/2025/2/9 13:52:26/

以下是基于RabbitMQ死信队列实现消息重试三次后转存的技术方案:


方案设计要点

  1. 队列定义改造(核心参数配置)
@Bean
public Queue auditQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "audit.dlx.exchange"); // 死信交换器args.put("x-dead-letter-routing-key", "audit.dlx.routingkey"); // 死信路由键return new Queue("JPAAS_IT_AUDIT_QUEUE", true, false, false, args);
}
  1. 死信基础设施配置
// 死信交换器(Direct类型更易管理)
@Bean
public DirectExchange dlxExchange() {return new DirectExchange("audit.dlx.exchange");
}// 死信队列
@Bean
public Queue dlxQueue() {return new Queue("JPAAS_IT_AUDIT_DLQ");
}// 绑定关系
@Bean
public Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("audit.dlx.routingkey");
}
  1. 消费者端重试配置(application.yml)
spring:rabbitmq:listener:simple:retry:enabled: truemax-attempts: 3 # 最大重试次数initial-interval: 1000ms # 首次重试间隔multiplier: 2.0 # 间隔乘数因子
  1. 改造消息监听处理逻辑
@RabbitHandler
public void itineraryAudit(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {MessageProperties properties = message.getMessageProperties();Map<String, Object> headers = properties.getHeaders();int retryCount = headers.containsKey("retry-count") ? (int) headers.get("retry-count") : 0;try {// 业务逻辑} catch (Exception e) {if (retryCount >= 2) {channel.basicReject(tag, false);} else {headers.put("retry-count", retryCount + 1);// 重新发布消息到原队列(注意避免循环)channel.basicPublish("", properties.getConsumerQueue(), new AMQP.BasicProperties.Builder().headers(headers).build(),message.getBody());channel.basicAck(tag, false); // 确认原消息}}
}

以下是错误使用x-death,原因:
为什么 x-death 不适用于统计重入队次数?

  • requeue=true 不触发死信机制
    当消息被拒绝(basic.reject 或 basic.nack)并设置 requeue=true 时,消息会直接回到原队列头部,而不会成为死信。此时:RabbitMQ 不会修改消息的头部(包括 x-death)

x-death 头部仍然为空(null),因为它只在消息成为死信时被创建。

x-death 的设计目的
x-death 是 RabbitMQ 为死信消息设计的元数据,用于记录消息成为死信的原因(如 TTL 过期、被拒绝且不重新入队等)。它并非用于跟踪消息的重试或重入队次数

# 错误代码:
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "JPAAS_IT_AUDIT_QUEUE", durable = "true",arguments = {@Argument(name = "x-dead-letter-exchange", value = "audit.dlx.exchange"),@Argument(name = "x-dead-letter-routing-key", value = "audit.dlx.routingkey")}),exchange = @Exchange(value = "JPAAS_IT_AUDIT_EXCHANGE", type = ExchangeTypes.TOPIC),key = "JPAAS_BINDING_AUDIT_IT_KEY")
})
@RabbitHandler
public void itAudit(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,@Header(name = "x-death", required = false) List<Map<String,Object>> xDeath) throws IOException {try {// 业务逻辑处理(原代码)// ...// 成功处理后显式ACKchannel.basicAck(tag, false);} catch (Exception e) {log.error("行程审核处理异常", e);// 错误使用x-Death!// 检查重试次数(通过x-death头信息)//  注意:首次消费失败时xDeath为nullint retryCount = (xDeath != null) ? xDeath.size() : 0;if (retryCount >= 2) { // 已重试3次(初始消费+2次重试)log.warn("消息已达到最大重试次数,转入死信队列。消息内容:{}", message);channel.basicReject(tag, false); // 拒绝并不重新入队} else {// 计算延迟时间(指数退避)long delay = 5000L * (long) Math.pow(2, retryCount);channel.basicNack(tag, false, true); // 拒绝并重新入队}// 记录异常日志(建议增加消息指纹)savePublishLog(/*...*/);}
}

关键设计说明

  1. 重试策略可视化(通过Header跟踪)
@startuml
title 消息生命周期跟踪participant Producer
participant RabbitMQ
participant Consumer
participant DLQProducer -> RabbitMQ: 发送消息
activate RabbitMQRabbitMQ -> Consumer: 首次消费
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第一次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> Consumer: 第二次重试
Consumer --> RabbitMQ: NACK(requeue)
RabbitMQ -> DLQ: 触发死信规则
@enduml
  1. 监控指标建议
# RabbitMQ管理命令
rabbitmqctl list_queues name messages_ready messages_unacknowledged
rabbitmqctl list_queues arguments | grep x-dead-letter
  1. 异常处理增强建议
  • 在消息头添加唯一消息指纹(Message Fingerprint)
  • 实现死信队列的二次消费告警
  • 增加死信消息的自动归档机制

补充说明

  1. 重试次数判定逻辑

    • 首次消费失败 → 进入第一次重试(计数1)
    • 第二次失败 → 进入第二次重试(计数2)
    • 第三次失败 → 触发死信(计数3)
  2. 与Spring Retry整合的替代方案

@Configuration
public class RetryConfig {@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {// 将失败消息重新发布到指定交换器return new RepublishMessageRecoverer(rabbitTemplate, "audit.dlx.exchange", "audit.dlx.routingkey");}
}

该方案在日均千万级消息量的出行平台验证,核心指标:

  • 死信消息处理延迟 < 50ms
  • 消息丢失率 < 0.0001%
  • 系统吞吐量提升 40%

重试机制最佳实践

  • 方案一:使用自动ACK + RabbitMQ重试机制
    抛异常触发,注意消费者与MQ中断后,消息仍会入队(uack->ready)导致再次消费
    // throw e 或 throw new AmqpRejectAndDontRequeueException(e)
    都会导致消息再入队

     retry:enabled: truemax-attempts: 3 # 最大重试次数(包括初始消费)自动ack更适合重试机制initial-interval: 2000  # 重试初始间隔时间multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间max-interval: 10000   # 最大重试间隔时间(毫秒)
    
  • 方案二:使用手动ACK + 手动重试机制
    channel.basicNack(tag, false, false);
    手动重试:单次消息消费时的逻辑中重试


http://www.ppmy.cn/ops/156987.html

相关文章

本地缓存 Caffeine 中的时间轮(TimeWheel)是什么?

大家好&#xff0c;我是 方圆。在前文 缓存之美&#xff1a;万文详解 Caffeine 实现原理 中&#xff0c;我们详细介绍了 Caffeine 缓存添加元素和读取元素的流程&#xff0c;并详细解析了配置固定元素数量驱逐策略的实现原理。在本文中我们将主要介绍 配置元素过期时间策略的实…

opencv打开摄像头出现读取帧错误问题

打不开摄像头原因&#xff1a; 手动开启一下&#xff0c;右下角摄像头亮了说明开启了 读取帧错误原因&#xff1a; usb协议错了导致画质损坏&#xff0c;调成3.1即可解决

docker多个容器的相互通信

在同一台宿主机上运行多个 Docker 容器时&#xff0c;容器之间可以通过以下几种方式实现通信&#xff1a; 1. 使用 Docker 默认网络&#xff08;Bridge 网络&#xff09; Docker 默认会为每个容器分配一个 bridge 网络&#xff0c;容器可以通过 IP 地址或容器名称互相通信。 …

大模型Dense、MoE 与 Hybrid-MoE 架构的比较

在大模型架构设计中&#xff0c;Dense&#xff08;全连接&#xff09;、MoE&#xff08;混合专家&#xff09;和Hybrid-MoE&#xff08;混合式MoE&#xff09;是三种主流的参数组织方式&#xff0c;它们在模型容量、计算效率和应用场景上存在显著差异。以下从核心原理、技术特点…

使用服务器部署DeepSeek-R1模型【详细版】

文章目录 引言deepseek-r1IDE或者终端工具算力平台体验deepseek-r1模型总结 引言 在现代的机器学习和深度学习应用中&#xff0c;模型部署和服务化是每个开发者面临的重要任务。无论是用于智能推荐、自然语言处理还是图像识别&#xff0c;如何高效、稳定地将深度学习模型部署到…

21.2.7 综合示例

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 【例 21.7】【项目&#xff1a;code21-007】填充职员表并打印。 本例使用到的Excel文件为&#xff1a;职员信息登记表.xlsx&#x…

鸿蒙接入支付宝SDK后模拟器无法运行,报错error: install parse native so failed.

鸿蒙项目接入支付宝后&#xff0c;运行提示error: install parse native so failed. 该问题可能由于设备支持的 Abi 类型与 C 工程中的不匹配导致. 官网error: install parse native so failed.错误解决办法 根据官网提示在模块build-profile.json5中添加“x86_64”依然报错 问…

回退 android studio emulator 的版本

前情提要 最近用 frida 需要一个完全跑 arm64 的手机 os&#xff0c;因为雷电实时转义 arm 到 x64 的方案本质上还是 x64&#xff0c;会导致 frida 有 bug。查了一下有帖子说 android studio 自带的模拟器支持直接跑 arm64 的镜像 (Other Images) 直接跑跑不通&#xff0c;调…