RabbitMq如何确保消息不丢失

embedded/2024/10/9 7:58:24/

问题:在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

解决方案就是缓存,比如当生产者发送消息到交换机时,但交换机不存在,我们应该将消息放入缓存中;或者交换机存在,队列不存在了,当交换机发送不到队列中也应该将消息放入缓存。然后在缓存中配置一个定时任务,对没有发送成功的消息重新进行投递。这样就避免了消息丢失的情况。

回调接口——消息确认

接下来我们通过代码实现以上机制,架构图如下所示:我们要解决问题就是如果图中的交换机或者队列出现问题,应该将消息进行缓存处理,防止消息丢失,具体的实现就是通过生产者的回调接口ConfirmCallback来实现。

1️⃣ 修改配置文件

在配置文件当中需要添加配置表示开启发布消息成功到交换器后会触发回调方法

NONE:禁用发布确认模式,是默认值

CORRELATED:发布消息成功到交换器后会触发回调方法

SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法;其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

👂这个层次是在交换机层次做的工作,保证消息被正确发送到了交换机。

通过实现一个RabbitTemplate.ConfirmCallback接口,将接口注入到RabbitTemplate中,当消息发送到交换机后就会触发这个回调。如果失败了可以考虑进入死信队列或者重新发送。但是做不到队列层面的工作。

 /*** 交换机确认回调方法** @param correlationData 保存回调消息的ID以及相关信息* @param ack             表示交换机是否收到消息(true表示收到)* @param cause           表示消息接收失败的原因(收到消息为null)*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机已经收到ID为:{}的消息", id);} else {log.info("交换机还未收到ID为:{}的消息,原因为:{}", id, cause);}}
回调接口——消息回退

我们知道在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,但此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息能够让生产者感知并做出处理呢

我们可以通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

在配置文件当中需要添加配置表示开启消息路由失败后会触发消息回退回调方法

👂通过实现 RabbitTemplate.ReturnsCallback 接口

    @Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息{}:,被交换机{}退回,退回原因:{},路由key:{}",new String(returned.getMessage().getBody()),returned.getExchange(),returned.getReplyText(),returned.getRoutingKey());}

备份交换机

前面我们提到交换机如果出现了问题接受不到消息,我们就让交换机进行消息确认,让生产者重新发消息。如果队列出问题收不到消息,我们就进行消息回退,也是让生产者重新发消息。此外,还有一种解决方法就是给交换机添加一个备份交换机,有了备份交换机之后可以不用讲消息回退给生产者,而是将无法投递的消息交给备份交换机,让备份交换机通过自己的路由以及自己的队列发送给消费者,这样也能达到一个消息不丢失的目的。并且这种方式还能建立一个报警队列,用独立的消费者进行监测和报警。

当回调函数和备用交换机一起使用的时候,备份交换机优先级高。


http://www.ppmy.cn/embedded/96322.html

相关文章

Springboot 集成websocket 并支持服务集群

1、新增配置类声明 import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter;Configuration public class WebsocketConfig {/***…

01_理解网络编程和套接字

1.服务端 1.创建套接字 #include <sys/socket.h> int socket(int domain, int type, int protocol); // 成功时返回文件描述符&#xff0c;失败时返回-1&#xff1b; 2.套接字分配地址&#xff08;IP和端口号) #include <sys/socket.h> int bind(int sockfd, s…

基于javaEE的校园二手书交易平台的设计与实现

TOC springboot287基于javaEE的校园二手书交易平台的设计与实现 第1章 绪论 1.1 研究背景 互联网概念的产生到如今的蓬勃发展&#xff0c;用了短短的几十年时间就风靡全球&#xff0c;使得全球各个行业都进行了互联网的改造升级&#xff0c;标志着互联网浪潮的来临。在这个…

希尔排序,详细解析(附图解)

1.希尔排序思路 希尔排序是一种基于插入排序的算法&#xff0c;通过将原始数据分成若干个子序列&#xff0c;然后对子序列进行插入排序&#xff0c;逐渐减小子序列的间隔&#xff0c;最后对整个序列进行一次插入排序。 1.分组直接插入排序&#xff0c;目标接近有序--------…

wiota窄带通讯技术对于vu传统lora

WIoTa是一种针对广域无线物联网通信优化设计的通信协议&#xff0c;而LoRa则是一种广泛应用的低功耗广域网技术。两者在物联网领域都有广泛的应用&#xff0c;但它们在多个关键性能指标上存在显著差异。以下是从多个角度对WIoTa和LoRa进行详细对比&#xff1a; 覆盖范围 WIoTa…

Redis合集 第二章 redis客户端 第一节 jedis

jedis 线程不安全 所以每个线程需要一个独立的链接 为了保证线程安全 所以需要连接池 创建jedis链接池 public class JedisConnectionFactory {public static final JedisPool jedispool;static{//配置连接池JedisPoolConfig jedisPoolConfig new JedisPoolConfig();jedisP…

新能源汽车行业前景广阔,黄山谷捷等产业链企业迎发展良机

目前&#xff0c;我国已成为全球新能源汽车竞争的主战场&#xff0c;产销量连续9年位居世界第一。2024年上半年&#xff0c;我国新能源汽车销量同步增长32%至494.4万辆&#xff0c;市占率为35.2%。中汽协预计&#xff0c;2024年全年中国新能源汽车销量有望达到1150万辆。 随着…

从零搭建xxl-job(五):查询待执行任务逻辑优化

当前的程序还存在很多问题&#xff0c;比如每次扫描数据库都查询了所有的定时任务信息&#xff0c;那么应该查询哪些定时任务信息呢&#xff1f;怎么保证查询的定时任务准时触发&#xff1f;如果数据库中没有定时任务信息了&#xff0c;或者定时任务信息比较少了&#xff0c;sc…