RabbitMQ高级特性----生产者确认机制

embedded/2025/3/4 7:59:17/

题记:在Java微服务开发中,对于一个功能需要调用另一个服务下的功能才能实现的情况,我们通常会使用异步调用取代同步调用,进而实现增强业务的可拓展性和实现故障隔离以及流量削峰填谷的目的。而消息队列就是异步调用的解决方案之一。不过在使用消息队列实现异步调用的时候,可能会出现消息无法传递到位进而导致业务信息出现差异的情况,因此消息的传递的可靠性就显得尤为重要。

传递消息的流程:

要保障消息传递的可靠性,我们可以从消息队列的每一步分析,以RabbitMQ为例,其发送消息的流程大致如下图所示:

不难发现消息传递一共会经历三名角色,分别是消息发送者,MQ和消息接收者。因此我们要保证消息成功传递并被正确处理就需要保证这三者的可靠性。

发送者可靠性:

1.1生产者重连

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

我们可以在消息发送者的yml配置文件中加入如下信息:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制,默认时关闭的initial-interval: 1000ms # 失败后的初始等待时间multiplier: 2 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

我们可以在消息发送者工程下创建一个Test模拟向rabbitMQ发送消息(在此之前我们需要把rabbiitMQ关闭或是断掉网络)

    @Testpublic void TestTimeOutPublish(){rabbitTemplate.convertAndSend("simple.queue","Hello,world");}

代码执行后会是这样的效果:

通过日志我们可以观察到一共重连了三次,与我们在yml文件中配置的max-attempt属性一致。需要注意的是,这里等待的时间还需要再加上connect-timeout这一判定连接超时的配置。

1.2生产者确认机制

在保证网络畅通并且RabbitMQ服务正确启动了的前提下,我们就可以成功将消息发送到MQ中了。

不过仍有少数情况下会出现消息进入mq后丢失的情况:

  • 无法找到指定的exchange,大部分情况下就是交换机名称出错
  • exchange无法正确路由到queue,可能是routeKey错误导致的

针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher ConfirmPublisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

当我们在yml文件中增加了相应的配置后流程就变得如下图所示:

配置完成之后:

  • 对于所有成功投递如Mq中的消息都会返回Ack,表示投递成功。
  • 对于投递成功但路由失败的消息,会返回Publish Return并返回Ack
  • 除此之外的所有消息都返回NAck,表示消息投递失败

需要注意的是,对于临时消息而言是进入队列Publisher Comfirm就是返回Ack,如果是持久消息则需要写入磁盘后才是返回Ack。

实现方式如下:

publisher消息发送者yml配置文件中加入:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

pubulisher-confirm-type一共有三种属性提供选择:

  1. none:默认选项,关闭
  2. correlated:异步回调返回回执
  3. simple:同步阻塞等待mq回执

并且我们还需要在配置类中配置confrim Return报文:

public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstruct  //在注入rabbitTemplate依赖后执行public void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

对于不同的业务我们有不同的处理方案,因此CallbackComfirm需要在每次发送方法前定义,并且由作为convertMessage方法的参数,如下所示:

@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("simple.direct", "simple", "hello,RabbiteMQ", cd);
}

我们可以在onSuccess和onFailure中编写我们对这一消息发送成功以及失败情况的对应处理。

如果关于上述有其他更好的建议以及疑问,欢迎留言,我将尽快回复。


http://www.ppmy.cn/embedded/169857.html

相关文章

Python Cookbook-2.15 用类文件对象适配真实文件对象

任务 需要传递一个类似文件的对象(比如&#xff0c;调用urllib.urlopen 返回的结果)给一个函数或者方法&#xff0c;但这个函数或方法要求只接受真实的文件对象(比如&#xff0c;像marshalload 这样的函数)。 解决方案 为了过类型检查这一关&#xff0c;我们需要将类文件对象…

数据结构(初阶)(七)----树和二叉树(堆,堆排序)

八&#xff0c;树与二叉树 树 概念与结构 树是⼀种⾮线性的数据结构&#xff0c;它是由 n&#xff08;n>0&#xff09; 个有限结点组成⼀个具有层次关系的集合。把它叫做树是因为它看起来像⼀棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;⽽叶朝下的。 • 有⼀…

SQL命令详解之操作数据库

操作数据库 SQL是用于管理和操作关系型数据库的标准语言。数据库操作是SQL的核心功能之一&#xff0c;主要用于创建、修改和删除数据库对象&#xff0c;如数据库、表、视图和索引等。以下是SQL中常见的数据库操作命令及其功能简介&#xff1a; 1. 查询数据库 查询所有的数据库…

c# winfrom增加进度条

1. 在窗体上添加一个 ProgressBar 控件 在您的窗体中添加一个 ProgressBar 控件&#xff0c;并设置其属性为 Marquee 或 Continuous。这个控件用来展示连接测试的进度。 2. 初始化 BackgroundWorker 在窗体的构造函数中&#xff0c;初始化并配置 BackgroundWorker。假设您的…

使用mermaid查看cursor程序生成的流程图

一、得到cursor生成的流程图文本 cursor写的程序正常运行后&#xff0c;在对话框输入框中输入诸如“请生成扫雷的代码流程图”&#xff0c;然后cursor就把流程图给生成了&#xff0c;但是看到的还是文本的样子&#xff0c;保留这部分内容待用 二、注册一个Mermaid绘图账号 …

网络原理----TCP/IP(3)

核心机制七----延时应答 默认情况下&#xff0c;接收方都是在收到数据报的第一时间&#xff0c;就返回ack&#xff0c;但是可以通过延时返回ack的方式来提高效率&#xff0c;理论上不是100%提高效率&#xff0c;但还是有一定帮助的。 因为如果接收数据的主机⽴刻返回ACK应答,…

深入讨论C语言的可能抽象:部分对设计模式的思考

目录 评估DIP原则 争论语言的类型强弱 从编译器实现层次上谈论 从抽象自然角度 回归对本篇文章目的的核心讨论——如何有效的使用C语言完成对场景编程的抽象呢&#xff1f; 静态多态——使用C语言的编译宏的静态多态技术 动态多态——函数指针 类比OOP中属性赋予的办法&…

2025年3月2日笔记

问题&#xff1a;编写一个程序&#xff0c;计算 1 到 100 之间所有偶数的和 解题思路&#xff1a; 1.因为要计算1 到 100 之间的数&#xff0c;所以要用到for循环便利 2.因为题中让我们求和&#xff0c;所以要用到累加器 累加器公式&#xff1a;int m0 mmi 3.因为要计…