RabbitMQ08_保证消息可靠性

embedded/2024/9/24 12:12:01/

保证消息可靠性

        • 一、生产者可靠性
          • 1、生产者重连机制(防止网络波动)
          • 2、生产者确认机制
            • Publisher Return 确认机制
            • Publisher Confirm 确认机制
        • 二、MQ 可靠性
          • 1、数据持久化
            • 交换机、队列持久化
            • 消息持久化
          • 2、Lazy Queue 惰性队列
        • 三、消费者可靠性
          • 1、消费者确认机制
          • 2、失败重试机制
          • 3、业务幂等性

一、生产者可靠性
1、生产者重连机制(防止网络波动)
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制(默认是false)initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数,下次等待时长= initial-interval * multipliermax-attempts: 3 #最大重试次数
2、生产者确认机制
Publisher Return 确认机制

消息投递到MQ但是MQ路由失败,MQ返回路由失败原因

spring:rabbitmq:publisher-returns: true # 开启publisher return机制
java">@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
Publisher Confirm 确认机制

临时消息投递到了MQ且入队成功,返回ACK
持久消息投递到了MQ且入队完成持久化,返回ACK
消息投递异常,返回NACK

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型

publisher-confirm-type 的三种类型

  • none 关闭confirm机制
  • simple 同步阻塞等待MQ回执消息
  • correlated MQ异步回调返回回执消息
java">@Test
void testPublisherConfirm() throws InterruptedException {CorrelationData cd = new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {        @Overridepublic void onFailure(Throwable ex) {// Future发生异常时的处理逻辑,基本不会触发log.error("handle message ack fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ log.debug("发送消息成功,收到 ack!");} else {log.error("发送消息失败,收到 nack, reason : {}", result.getReason());//TODO 重试发送}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "red1", "hello", cd);
}
二、MQ 可靠性
1、数据持久化
交换机、队列持久化

默认创建时就是持久化的(Durability = Durable)

在这里插入图片描述

消息持久化

RabbitTemplate 的 convertAndSend() 方法发送的消息默认就是持久化的(delivery mode = 2)

如果非要发送一个非持久化的消息,需要在调用 rabbitTemplate.convertAndSend() 方法时,显式地设置消息的 MessageProperties,并将 deliveryMode 设置为 1 (非持久化)

2、Lazy Queue 惰性队列

Lazy Queue是一种以惰性模式运行的队列,它尽可能地将消息存储在磁盘上,而不是内存中。只有当消费者需要消费消息时,这些消息才会被加载到内存中,效率比传统队列高。

3.12版本后,所有队列都是Lazy Queue模式,无法更改。

三、消费者可靠性
1、消费者确认机制

消费者回执消息类型

  • ack 消费者处理成功,RabbitMQ 将从队列中删除消息
  • nack 消费者处理失败,RabbitMQ 需再次投递消息
  • reject 消费者拒绝处理,RabbitMQ 将从队列中删除消息

SpringAMQP 消息监听器的三种确认模式

  • none 不处理。即消费者收到消息后立刻返回ack,消息会丢失,非常不安全。
  • manual 手动模式。业务代码手动调用api发送 ack 或 reject,存在业务入侵,但更灵活。
  • auto 自动模式(默认)。通过 AOP 对消息处理方法做环绕增强,正常返回ack,出现业务异常返回nack,出现消息处理或校验异常返回reject
spring:rabbitmq:listener:simple:acknowledge-mode: auto
2、失败重试机制

消费者处理消息出现异常时利用本地重试,而不是无限的requeue到mq,让mq重新投递给消费者

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试(默认是关闭的)initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现。

失败消息处理策略

  • RejectAndDontRequeueRecoverer:默认实现,重试耗尽后直接reject,丢弃消息。
  • ImmediateRequeueMessageRecoverer:重试耗尽后返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后将失败消息投递到指定的交换机

以第三种失败消息处理策略为例,配置方式如下:

java">@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
3、业务幂等性

由于存在各种确认和重试机制,消费者有重复消费消息的可能性,因此要保证业务的幂等性。
保证业务幂等性的方式如下:

  • 方案一:发送消息时生成唯一消息ID,投递给消费者,消费者接收到消息,业务处理成功后将消息ID保存到数据库,下次根据消息ID去数据库查询判断是否已处理,如果已处理则放弃处理。
java">@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
  • 方案二:结合业务逻辑,基于业务本身做判断。

http://www.ppmy.cn/embedded/116091.html

相关文章

第二讲 数据结构

链表 单链表 单链表的用途在于编写领接表&#xff0c;领接表的用途在于存储图和树 826. 单链表 - Acwing题库 数据结构&#xff1a; e[N]&#xff1a;用于存储节点的值的数组。ne[N]&#xff1a;作为“下一个”指针的数组&#xff0c;用于连接节点。e[]和ne[]是通过下标关联…

Docker Compose 启动 PostgreSQL 数据库

Docker Compose 启动 PostgreSQL 数据库 文章目录 Docker Compose 启动 PostgreSQL 数据库一 配置 docker-compose.pgsql.yml二 yml 配置说明三 启动容器四 停止容器 本文介绍了如何通过 Docker Compose 快速启动 PostgreSQL 数据库。在 docker-compose.pgsql.yml 文件中&…

102.SAPUI5 sap.ndc.BarcodeScannerButton调用摄像头时,localhost访问正常,使用IP访问失败

目录 原因 解决办法 1.修改谷歌浏览器的setting 2.在tomcat中配置https访问 参考 使用SAPUI5的sap.ndc.BarcodeScannerButton调用摄像头时&#xff0c;localhost访问正常&#xff0c;使用IP访问时&#xff0c;一直打不开摄像头&#xff0c;提示getUserMedia()问题。 原因…

快手旗下——Kolors模型部署与使用指南

以下是按照要求重写后的 Kolors 模型部署与使用指南&#xff0c;文章风格偏技术性&#xff0c;但保持简洁和易懂的特点&#xff1a; Kolors 模型部署与使用指南 一、Kolors 简介 Kolors 是由快手 Kolors 团队开发的文本到图像生成模型&#xff0c;基于大规模的潜在扩散技术。…

前端项目代码开发规范及工具配置

在项目开发中&#xff0c;良好的代码编写规范是项目组成的重要元素。本文将详细介绍在项目开发中如何集成相应的代码规范插件及使用方法。 项目规范及工具 集成 EditorConfig集成 Prettier1. 安装 Prettier2. 创建 Prettier 配置文件3. 配置 .prettierrc4. 使用 Prettier 集成 …

828 华为云征文|华为 Flexus 云服务器搭建 SamWaf 开源轻量级网站防火墙

在当今数字化高速发展的时代&#xff0c;网络安全问题日益凸显。为了保障网站的稳定运行和数据安全&#xff0c;我们可以借助华为 Flexus 云服务器搭建 SamWaf 开源轻量级网站防火墙。这不仅是一次技术的挑战&#xff0c;更是为网站筑牢安全防线的重要举措。 一、华为 Flexus …

拓宽销售渠道的电子名片小程序源码系统 带源代码以及搭建部署教程

系统概述 电子名片小程序源码系统是一款集名片信息管理、社交分享、数据分析于一体的综合解决方案。它打破了传统纸质名片的局限性&#xff0c;将名片信息以数字化的形式呈现&#xff0c;并通过微信小程序这一平台&#xff0c;实现了名片的即时分享、互动与追踪。该系统适用于…

【Linux 从基础到进阶】 Xen 虚拟化技术应用

Xen 虚拟化技术应用 Xen 是一款开源的虚拟化技术,广泛应用于云计算和服务器虚拟化中。作为一款高性能的虚拟化平台,Xen 提供了完整的虚拟化(Full Virtualization)和准虚拟化(Paravirtualization)支持,能够在 x86 和 ARM 等架构上运行多个虚拟机。本文将介绍 Xen 的基本…