RabbitMQ的消息可靠性保证

server/2025/1/25 4:25:49/

文章目录

    • 1.环境搭建
        • 1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
        • 2.common-rabbitmq-starter-demo下创建一个生产者一个消费者
    • 2.生产者可靠性
        • 1.开启消息超时重试机制
        • 2.生产者开启ConfirmCallback消息确认机制
          • 1.application.yml
          • 2.TestConfigPublisher.java
          • 3.测试交换机名字写错的情况
    • 3.MQ可靠性
        • 1.使用LazyQueue和持久化队列结合的方式来做
    • 4.消费者可靠性
        • 1.消费者失败重试机制
          • 1.application.yml
          • 2.解释
        • 2.消费者消息失败处理策略
          • 1.ErrorConfiguration.java 指定错误消息发送到异常交换机
          • 2.ErrorListener.java 异常队列监听器
          • 3.ErrorMessageHandler.java 异常消息处理器
          • 4.TestConfig.java配置
          • 5.TestConfigPublisher.java 生产者
          • 6.TestConfigConsumer.java 消费者故意消费失败
          • 7.测试,消费失败则重试三次后到异常处理逻辑

1.环境搭建

rabbitmqstarter__4">1.common-rabbitmq-starter 配置防止消费者抢消息(基础配置)
spring:rabbitmq:# 消费者配置listener:simple:prefetch: 1 # 每次获取一条消息,处理完再获取下一条
rabbitmqstarterdemo_15">2.common-rabbitmq-starter-demo下创建一个生产者一个消费者

CleanShot 2024-12-31 at 21.59.36@2x

2.生产者可靠性

1.开启消息超时重试机制
    # 生产者消息重试配置template:retry:# 启用消息重试机制,默认为 falseenabled: true# 初始重试间隔时间为一秒initial-interval: 1000ms# 重试最大次数,默认为 3 次max-attempts: 2# 重试的间隔倍数# 配置 2 的话,第一次等initial-interval也就是1s,第二次等 2s,第三次等 4smultiplier: 2connection-timeout: 500ms # 连接超时时间500ms
2.生产者开启ConfirmCallback消息确认机制
1.application.yml
    # 生产者配置publisher-confirm-type: correlated # 发布确认类型为异步回调(一旦配置了,就必须要有回调方法)
2.TestConfigPublisher.java
package com.sunxiansheng.publisher.pub;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.UUID;/*** Description: 测试发布者** @Author sun* @Create 2024/12/31 19:05* @Version 1.0*/
@RestController
@Slf4j
public class TestConfigPublisher {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public void send() {log.info("发送消息");// 1.创建CorrelationData对象CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.设置回调cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable throwable) {// 基本不可能发生,因为这里的异常不是MQ问题导致的log.error("ConfirmCallback:消息发送失败(非MQ问题):{}", throwable.getMessage());}@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {// 判断是否发送成功if (confirm.isAck()) {log.info("ConfirmCallback:消息发送成功:{}", confirm);} else {log.error("ConfirmCallback:消息发送失败:{}", confirm.getReason());}}});rabbitTemplate.convertAndSend("fanout.exchange.tesst", "", "hello rabbitmq", cd);}
}
3.测试交换机名字写错的情况

CleanShot 2024-12-31 at 19.57.56@2x

3.MQ可靠性

1.使用LazyQueue和持久化队列结合的方式来做
    /*** 创建一个队列** @return*/@Beanpublic Queue fanoutQueueTest() {return QueueBuilder.durable("lazyQueue") // 持久化队列.lazy()               // 惰性队列.build();}

持久化队列可以保存队列的元数据,重启后自动恢复,惰性队列可以将所有的消息都持久化到磁盘,内存只保留最近的2048条消息

4.消费者可靠性

1.消费者失败重试机制
1.application.yml
    # 消费者配置listener:simple:acknowledge-mode: auto # 自动确认模式(消费者确认机制)retry:enabled: true # 开启重试机制max-attempts: 3 # 最大重试次数initial-interval: 1000ms # 重试间隔时间multiplier: 1.0 # 重试时间间隔倍数stateless: false # false:有状态,true:无状态,如果是有状态的,每次重试都会发送到同一个队列
2.解释

首先开启了消费者自动确认机制,如果消息消费失败,就进行重试

2.消费者消息失败处理策略
1.ErrorConfiguration.java 指定错误消息发送到异常交换机
package com.sunxiansheng.rabbitmq.error;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Description: 处理失败消息的交换机和队列** @Author sun* @Create 2024/12/31 19:07* @Version 1.0*/
@Configuration
// 当配置文件中spring.rabbitmq.listener.simple.retry.enabled=true时,才会生效
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple", name = "retry.enabled", havingValue = "true")
public class ErrorConfiguration {/*** 一个error交换机*/@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.exchange");}/*** 一个error队列*/@Beanpublic Queue errorQueue() {return new Queue("error.queue");}/*** 绑定error队列到error交换机*/@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}/*** MessageRecoverer*/@Beanpublic MessageRecoverer myMessageRecoverer(RabbitTemplate rabbitTemplate) {// 指定错误消息发送到error.exchange交换机,routingKey为errorreturn new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");}
}
2.ErrorListener.java 异常队列监听器
package com.sunxiansheng.consumer.error;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Description: 错误消息监听器** @Author sun* @Create 2024/12/31 20:32* @Version 1.0*/
@Component
@Slf4j
public class ErrorListener {@RabbitListener(queues = "error.queue")public void errorListener(Message message) {// 解析错误信息ErrorMessageHandler.handleErrorMessage("error.queue", message);}
}
3.ErrorMessageHandler.java 异常消息处理器
package com.sunxiansheng.consumer.error;import com.rabbitmq.client.LongString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;import java.nio.charset.StandardCharsets;
import java.util.Map;/*** Description: 错误消息处理器** @Author sun* @Create 2024/12/31 20:32* @Version 1.0*/
@Slf4j
public class ErrorMessageHandler {public static void handleErrorMessage(String listenerName, Message message) {// 获取消息属性MessageProperties messageProperties = message.getMessageProperties();String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);Map<String, Object> headers = messageProperties.getHeaders();// 从消息头部获取异常信息String exceptionMessage = (String) headers.get("x-exception-message");String originalExchange = (String) headers.get("x-original-exchange");String originalRoutingKey = (String) headers.get("x-original-routingKey");// 处理LongString类型的异常堆栈跟踪信息String exceptionStackTrace = null;if (headers.containsKey("x-exception-stacktrace")) {Object stacktraceObject = headers.get("x-exception-stacktrace");if (stacktraceObject instanceof LongString) {exceptionStackTrace = stacktraceObject.toString();}}// 格式化输出所有信息,并在前后添加分割线log.error("\n-------------------------------\n" +"MQ错误监听队列: {}\n" +"原始交换机: {}\n" +"原始路由键: {}\n" +"原始信息: {}\n" +"异常信息: {}\n" +"异常堆栈: {}\n" +"-------------------------------",listenerName, originalExchange, originalRoutingKey, messageBody, exceptionMessage, exceptionStackTrace);}
}
4.TestConfig.java配置
package com.sunxiansheng.publisher.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Description: 测试配置类** @Author sun* @Create 2024/12/31 19:00* @Version 1.0*/
@Configuration
public class TestConfig {/*** 创建一个fanout类型的交换机** @return*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.exchange.test");}/*** 创建一个队列** @return*/@Beanpublic Queue fanoutQueueTest() {return QueueBuilder.durable("lazyQueue") // 持久化队列.lazy()               // 惰性队列.build();}/*** 交换机和队列绑定*/@Beanpublic Binding binding() {return BindingBuilder.bind(fanoutQueueTest()).to(fanoutExchange());}
}
5.TestConfigPublisher.java 生产者
package com.sunxiansheng.publisher.pub;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.UUID;/*** Description: 测试发布者** @Author sun* @Create 2024/12/31 19:05* @Version 1.0*/
@RestController
@Slf4j
public class TestConfigPublisher {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/send")public void send() {log.info("发送消息");// 1.创建CorrelationData对象CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.设置回调cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable throwable) {// 基本不可能发生,因为这里的异常不是MQ问题导致的log.error("ConfirmCallback:消息发送失败(非MQ问题):{}", throwable.getMessage());}@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {// 判断是否发送成功if (confirm.isAck()) {log.info("ConfirmCallback:消息发送成功:{}", confirm);} else {log.error("ConfirmCallback:消息发送失败:{}", confirm.getReason());}}});rabbitTemplate.convertAndSend("fanout.exchange.test", "", "hello rabbitmq", cd);}
}
6.TestConfigConsumer.java 消费者故意消费失败
package com.sunxiansheng.consumer.con;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Description: 测试消费者** @Author sun* @Create 2024/12/31 19:03* @Version 1.0*/
@Component
@Slf4j
public class TestConfigConsumer {@RabbitListener(queues = "fanout.queue.test")public void receive(String message) {log.info("接收到的消息:{}", message);int i = 1 / 0;}
}
7.测试,消费失败则重试三次后到异常处理逻辑

CleanShot 2024-12-31 at 22.07.15@2x


http://www.ppmy.cn/server/161211.html

相关文章

实验---MGRE-多协议通用路由封装

1.配置ip&#xff08;如图所示 &#xff09; 2.配置缺省实现联通 [r1]ip route-static 0.0.0.0 0 12.1.1.2[r3]ip route-static 0.0.0.0 0 23.1.1.2[r4]ip route-static 0.0.0.0 0 34.1.1.2 3.配置tunnel实现私网访问私网 r1 [r1]int tunnel 0/0/0 --进入接口 [r1-Tunnel…

[Spring] Nacos详解

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏: &#x1f9ca; Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm1001.2014.3001.5482 &#x1f355; Collection与…

pyhton学习笔记(三)

目录 1.变量 2.变量的命名规则 3.常用函数汇总 4.常用数据类型汇总 5.算术运算符 6.比较运算符和逻辑运算符 7.常见的三种格式化输出方法 8.分支语句 1.变量 变量就是可以变化的量&#xff0c;可以理解为是一个存储东西的盒子&#xff0c;盒子里面可以放一些程序里需要…

深度解读:Facebook 区块链技术架构与应用前景

随着区块链技术的不断发展&#xff0c;它已经逐渐渗透到各行各业&#xff0c;成为推动数字化转型的重要力量。作为全球领先的社交平台之一&#xff0c;Facebook&#xff08;现为 Meta&#xff09;也开始探索区块链技术的潜力&#xff0c;期望通过这一技术来提升平台的服务能力和…

LeetCode 热题 100_电话号码的字母组合 (57_17_中等_C++)(string(path.begin(),path.end()))

LeetCode 热题 100_电话号码的字母组合&#xff08;57_17&#xff09; 题目描述&#xff1a;输入输出样例&#xff1a;题解&#xff1a;解题思路&#xff1a;思路一&#xff08;递归&#xff08;回溯&#xff09;&#xff09;&#xff1a; 代码实现代码实现&#xff08;思路一&…

Webrtc (1) - Windows 编译

最近项目上遇到webrtc wgc 的几个test case无法通过&#xff0c;与webrtc人员沟通后决定要自行修复一下(因为他们不想管…) 参考文档 https://webrtc.org/support/contributinghttps://chromium.googlesource.com/chromium/src//main/docs/#checking-out-and-building 以上两…

SQL Server所有数据类型大全

数据类型列表 整数类型&#xff1a;bigint、int、smallint、tinyint精确数值类型&#xff1a;decimal、numeric近似数值类型&#xff1a;float、real字符类型&#xff1a;char、varchar、text、nchar、nvarchar、ntext日期和时间类型&#xff1a;date、time、datetime2、dateti…

MATLAB中characterListPattern函数用法

目录 语法 说明 示例 在文本中查找元音字母 提取在某字母范围内的字母 查找以元音字母开头的单词 将人名按字母顺序分组 characterListPattern函数的功能是匹配列表中的字符。 语法 pat characterListPattern(characters) pat characterListPattern(startCharacter,…