RabbitMQ--发送方确认及消息重试

embedded/2025/1/18 10:39:37/

(一)发送方确认

  之前我们在七大工作模式中简单说了发送方确认,就是生产者到RabbitMQ这一过程中,消息是否正确到达服务器,生产者要进行确认的过程

 一共有两种确认模式

1.confirm确认模式

  是生产者到交换机的阶段,生产者进行确认的过程,之后还有return模式,是交换机到消息队列,确认的过程

 生产者发送消息的时候,对发送端设置一个ConfirmCallback监听,无论消息是否到达交换机,这个监听都会执行,如果交换机收到了,就会ACK就会为true否则为false

然后我们来看代码

首先还是要更改配置文件

spring:rabbitmq:addresses: amqp://student:student@62.234.46.219:5672/testlistener:simple:
#        acknowledge-mode: NONE
#       acknowledge-mode: AUTOacknowledge-mode: MANUALpublisher-confirm-type: correlated  #消息发送确认

然后声明交换机和队列

 

    @Bean("confirmExchange")public Exchange confirmExchange(){return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();}@Bean("confirmQueue")public Queue comfirmQueue(){return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmBind")public Binding confirmBind(@Qualifier("confirmExchange") Exchange confrimExchange,@Qualifier("confirmQueue") Queue queue){return BindingBuilder.bind(queue).to(confrimExchange).with("confirm").noargs();}

然后我们来看生产者代码(有一点问题,错误示范)

public class AckProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;
@RequestMapping("/confirm")public String confirmRabbit(){
//        RabbitTemplate rabbitTemplate1=new RabbitTemplate();rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {//回调方法@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行回调方法");if(ack){System.out.println("交换机成功接收到消息 id: "+correlationData.getId());}else {System.out.println("未接收到消息id: "+correlationData.getId()+"原因是: "+cause);//业务处理}}});CorrelationData correlationData=new CorrelationData("1");rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm模式",correlationData);return "发送成功";}
}

之所以说是错误示范,我们可以先执行一下试试

 我们发现也没什么错误啊,我们再来发送一遍

 

这时就会报错了,告诉我们一个RabbitTemplate只能支持一个回调方法

  之所以会报这个错,是因为我们每次调用接口,都会给这个RabbitTemplate再设置一遍回调方法就会出错,

  还有一个问题就是,我们既然是针对RabbitTemplate来设置的,那么所有使用此RabbitTemplate的接口,都会被影响,所以我们应该把他设置为多例的

 那接下来我们就来看看怎么设置多个

@Configuration
public class Rabbit {@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate confirmRabbitTemplate =new RabbitTemplate(connectionFactory);confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {//回调方法@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行回调方法");if(ack){System.out.println("交换机成功接收到消息 id: "+correlationData.getId());}else {System.out.println("未接收到消息id: "+correlationData.getId()+"原因是: "+cause);//业务处理}}});return confirmRabbitTemplate;}
}

上面 的ConnectionFactory是我们配置文件写好后,自动给我们创建的,用来创建RabbitTemplate

那接下来我们使用这个新创建的ConfirmRabbitTemplate来测试下是否能解决我们的问题

我多次访问了接口,没有问题,那我们接口不能重复访问的问题就解决了,那能不能解决我们不同接口,被同时影响的问题?

我们发现还是不可以的,这是因为我们本身注入的是Spring框架给我们提供的,我们如果写了一个RabbitTemplate之后,spring就不会再给我们提供了,就会导致Autowired注解,通过类型查找,只能找到一个RabbitTemplate,就使用了,如果我们想解决这个问题,就需要我们自己再另外创建一个

  @Beanpublic RabbitTemplate RabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate RabbitTemplate =new RabbitTemplate(connectionFactory);return RabbitTemplate;}

这样我们再来重新试一下 

此时我们第二种问题也正确了 

此时我们再来看,如果我们故意放一个错误的routingkey

我们发现还是执行正常,但是我们的交换机根本映射不到队列中,还是会造成消息丢失,此时该如何处理,这是就轮到我们的return模式了

 2.return退回模式

  消息到达交换机,根据路由规则匹配并把消息放到队列中,在这个过程中,如果一个队列都无法匹配成功,就可以通过此模式把消息退回给生产者

  代码跟confirm模式很像,而且同样也会有上面两种问题,所以这里不再说了,直接上代码

 @Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate confirmRabbitTemplate =new RabbitTemplate(connectionFactory);confirmRabbitTemplate.setMandatory(true);//也是一种回调函数confirmRabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println(returned.getMessage().getBody()+"被退回");}});confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {//回调方法@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行回调方法");if(ack){System.out.println("交换机成功接收到消息 id: "+correlationData.getId());}else {System.out.println("未接收到消息id: "+correlationData.getId()+"原因是: "+cause);//业务处理}}});return confirmRabbitTemplate;}

记得我们要设置SetMandory为(true)否则不执行 

3.如何保证RabbitMQ的消息可靠传输(面试题)

  我们一步步说

1)首先是生产者到交换机,此时可能会因为网络问题,导致生产者的消息发送到交换机失败,此时我们可以通过发送方确认中的confirm模式来解决

2)交换机无法找到对应队列,我们可以通过发送方确认中的return模式来处理返回消息

3)RabbitMQ服务宕机了,我们可以设置消息的持久化,队列持久化,交换机持久化,来保存消息到硬盘中,但是可能会有一些消息在缓冲区中还没有写入硬盘(这些消息就会丢失)

4)RabbitMQ到消费者过程中,可能因为消费者代码或者网络问题,造成消息丢失,此时可以用消息确认来解决(自动确认,手动确认)

(二)重试机制

 在消息传递过程中,会遇到一些问题,可能会导致消费者消费消息的时候,处理失败,那对此RabbitMQ给我们提供了自动确认和手动确认,同时也给我们提供了重试机制,允许在消息处理失败后,重新发送

  重试机制,在自动确认和手动确认的时候是没用的,所以我们需要设置为auto,当消息正常处理时就自动确认,当抛出异常时则不会确认消息

  代码如下:

首先我们还是要更改配置

spring:rabbitmq:addresses: amqp://student:student@62.234.46.219:5672/testlistener:simple:
#        acknowledge-mode: NONEacknowledge-mode: AUTO
#        acknowledge-mode: MANUALretry:enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 初始失败等待时⻓为5秒 max-attempts: 5publisher-confirm-type: correlated  #消息发送确认

   我们看到一共发送了五条,那我们发现这里的ID都是一样的,因为他们还是同一条消息, 如果我们是手动确认时出现了问题重新发送,他是先重新入队列再发送,他的ID是会递增的,而重试则不会

  但是我们要注意,此时我们是没有捕获异常的,就让他往上抛才会触发重试,如果我们捕获了,就不会触发重试机制  同时我们刚刚配置最多是发五次,我们在发送时是unack状态

如果我们五次后还没有正确接收到消息,消息会自动确认就会丢失

 


http://www.ppmy.cn/embedded/154920.html

相关文章

网络安全 | 什么是威胁情报?

威胁情报 威胁情报-介绍 威胁情报也称为“网络威胁情报”(CTI),是详细描述针对组织的网络安全威胁的数据。威胁情报可帮助安全团队更加积极主动地采取由数据驱动的有效措施,在网络攻击发生之前就将其消弭于无形。威胁情报可帮助组织更有效地检测和应对…

JavaScript-正则表达式方法(RegExp)

RegExp 对象用于将文本与一个模式匹配。 有两种方法可以创建一个 RegExp 对象:一种是字面量,另一种是构造函数。 字面量由斜杠 (/) 包围而不是引号包围。 构造函数的字符串参数由引号而不是斜杠包围。 new RegExp(pattern[, flags])一.符集合 1.选择…

VS Code--常用的插件

原文网址:VS Code--常用的插件_IT利刃出鞘的博客-CSDN博客 简介 本文介绍VS Code(Visual Studio Code)常用的插件。 插件的配置 默认情况下,插件会放到这里:C:\Users\xxx\.vscode\extensions 修改插件位置的方法 …

【Kotlin】上手学习之控制流程篇

二、控制流程 2.1 条件与循环 2.1.1 if 表达式 在 Kotlin 中,if 是一个表达式:它会返回一个值。 因此就不需要三元运算符(条件 ? 然后 : 否则),因为普通的 if 就能胜任这个角色。 fun main() {val a 2val b 3va…

[Qt]常用控件介绍-多元素控件-QListWidget、QTableWidget、QQTreeWidget

目录 1.多元素控件介绍 2.ListWidget控件 属性 核心方法 核心信号 细节 Demo:编辑日程 3.TableWidget控件 核心方法 QTableWidgetItem核心信号 QTableWidgetItem核心方法 细节 Demo:编辑学生信息 4.TreeWidget控件 核心方法 核心信号…

docker-compose和docker-harbor

docker-compose 自动编排工具,可根据dockerfile自动化部署docker容器 yaml文件格式,注意缩进 docker-harbor 私有仓库 公有云------阿里云,腾讯云 私有云------docker仓库 仓库--------保存镜像的地址 Harbor是vmware公司做的docker仓库的…

Java进阶-在Ubuntu上部署SpringBoot应用

随着云计算和容器化技术的普及,Linux 服务器已成为部署 Web 应用程序的主流平台之一。Java 作为一种跨平台的编程语言,具有广泛的应用场景。本文将详细介绍如何在 Ubuntu 服务器上部署 Java 应用,包括环境准备、应用发布、配置反向代理&#…

Python毕业设计选题:基于django+vue的疫情数据可视化分析系统

开发语言:Python框架:djangoPython版本:python3.7.7数据库:mysql 5.7数据库工具:Navicat11开发软件:PyCharm 系统展示 管理员登录 管理员功能界面 用户管理 员工管理 疫情信息管理 检测预约管理 检测结果…