第十二章 RabbitMQ之失败消息处理策略

server/2024/10/17 22:30:40/

目录

一、引言

二、RepublishMessageRecoverer 实现

2.1. 实现步骤

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

2.2.2. 常规交换机队列配置类 

2.2.3. 消费者代码

2.2.4. 消费者yml配置 

2.2.5. 生产者代码 

2.2.6. 生产者yml配置 

 2.2.7. 运行效果


一、引言

Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

二、RepublishMessageRecoverer 实现

在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤

1. 将失败处理策略改为RepublishMessageRecoverer

2. 定义接收失败消息的交换机、队列及其绑定关系

3. 定义RepublishMessageRecoverer

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

package com.example.consumer;import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 异常交换机/队列/消息回收器配置类* ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@BeanQueue errorQueue() {return new Queue("error.queue");}@BeanDirectExchange errorExchange() {return new DirectExchange("error.direct");}@BeanBinding errorBind(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer() {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

2.2.2. 常规交换机队列配置类 

package com.example.consumer;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue simpleQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("simple.queue").build();}
}

2.2.3. 消费者代码

package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "simple.queue")public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");throw new Exception();}
}

2.2.4. 消费者yml配置 

# 消费者application.yml配置
spring:rabbitmq:host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhoulistener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack# 消费者重试机制配置retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2.2.5. 生产者代码 

package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");}
}

2.2.6. 生产者yml配置 

# 生产者application.yml配置
spring:rabbitmq:# MQ连接配置host: 127.0.0.1port: 5672username: Wangzhexiaopassword: Wangzhexiaovirtual-host: /hangzhou

 2.2.7. 运行效果

最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:


http://www.ppmy.cn/server/132596.html

相关文章

Pr 音频效果快速参考(合集 · 2025版)

Adobe Premiere Pro 中提供了丰富的音频效果,帮助用户在视频编辑过程中调整和增强音频质量。这些音频效果涵盖了振幅与压限、延迟与回声、滤波器与 EQ、调制、降噪/恢复、混响、特殊效果、立体声声像、时间与变调等多个类别。通过合理使用这些效果,可以提…

iOS 大数相乘

首先说清楚2个概念: 概念1.一个M位数 与一个N位数 相乘,乘积的位数一定小于等于(NM). 如.2数99 乘以 4位数 9999, 其结果为 989901 ,为24 6位数. 上面的概念很重要,因为我们要创建一个初始值都为0, 元素个数为(MN) 的数组, 例如:99x918,我们需要创建23的元素的resultArray[0,…

网站集群批量管理-Ansible-进阶

1. 流程控制 1.1 handlers触发器 应用场景: 1. 一般用于分发配置文件的时候 2. 如果配置文件发生变化则重启服务,如果没有变化则重启 3. 注意事项: handlers放在剧本的最后,否则都会被识别为handlers 没有使用触发器handlers - hosts: alltasks:- name: 分发文件copy:src: /e…

[含文档+PPT+源码等]精品基于Python实现的flask专家管理系统[包运行成功+永久免费答疑辅导]

基于Python实现的Flask专家管理系统的背景,可以从以下几个方面进行详细阐述: 一、专家资源管理的重要性 在现代社会中,各领域专家资源对于推动科技进步、解决复杂问题以及提升决策质量等方面具有不可替代的作用。然而,传统的专家…

Edge论文的创新点

创新点及其来源 1. 从灰度边缘重建RGB图像的方法(EdgRec) 基于的方法:传统的重建方法,如使用自动编码器或生成模型来重建正常样本的图像,并通过对原始图像和重建图像的比较来检测异常。 重建过程: 训练阶…

大数据面试题整理——Zookeeper

系列文章目录 大数据面试题专栏点击进入 文章目录 系列文章目录大数据面试题专栏点击进入 1. 什么是 Zookeeper?2. Zookeeper 的特点有哪些?3. Zookeeper 的数据模型是怎样的?4. Zookeeper 的工作流程是怎样的?5. Zookeeper 如何…

ROS理论与实践学习笔记——6 ROS机器人导航(仿真)之导航实现

准备工作&#xff1a;请先安装相关的ROS功能包 安装 gmapping 包(用于构建地图):sudo apt install ros-<ROS版本>-gmapping 安装地图服务包(用于保存与读取地图):sudo apt install ros-<ROS版本>-map-server 安装 navigation 包(用于定位以及路径规划):sudo apt in…

第十四章 RabbitMQ延迟消息之延迟队列

目录 一、引言 二、死信队列 三、核心代码实现 四、运行效果 五、总结 一、引言 什么是延迟消息&#xff1f; 发送者发送消息时指定一个时间&#xff0c;消费者不会立刻收到消息&#xff0c;而是在指定时间后收到消息。 什么是延迟任务&#xff1f; 设置在一定时间之后才…