RabbitMQ之消费者ACK 功能

news/2024/9/24 23:42:37/

什么是消费者ACK?
简单说就是 消息确认机制 mq要保证消息能可靠的达到消费者。消费者在消费是是可以指定autoAck参数(自动/手动)的方式告诉mq自己是否确认收到消息。
当 autoAck 参数为 false 时, 队列中的消息分成了两部分: 一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者确认信号的消息。 若 RabbitMQ 服务器端一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接, 则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
如下图
在这里插入图片描述
可以看到当前队列中的 “Ready” 状态和 “Unacked” 状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。
上述说明 消息需要确认后消息将会被释放掉 从磁盘/内存中。
所以消息确认分类有两大类 如下图
在这里插入图片描述
由上图可知
有发送方确认、接收方确认。
注意:
其中发送方确认又分为:生产者到交换器到确认、交换器到队列的确认
本节主要介绍 消费者消息确认
方式一,自动确认。
方式二,手动确认。
对于自动确认的方式,RabbitMQ Broker 只要将消息写入到 TCP Socket 中成功,就认为该消息投递成功,而无需 Consumer 手动确认。

对于手动确认的方式,RabbitMQ Broker 将消息发送给 Consumer 之后,由 Consumer 手动确认之后,才任务消息投递成功。

实际场景下,因为自动确认存在可能丢失消息的情况,所以在对可靠性有要求的场景下,我们基本采用手动确认。当然,如果允许消息有一定的丢失,对性能有更高的产经下,我们可以考虑采用自动确认。

进行实战演练
yml配置类

spring:rabbitmq:#host为一般模式 若集群模式 将key换成addresses的形式host: 192.168.9.104port: 5672#账号密码自行替换username: adminpassword: admin# 以下手动提交消息listener:simple:acknowledge-mode: manualdirect:acknowledge-mode: manual

以上ackoneledge-mode的值有如下:
NONE 对应 Consumer 的自动确认
MANUAL 对应 Consumer 的手动确认,由开发者在消费逻辑中,手动进行确认。
AUTO 对应 Consumer 的手动确认,在消费消息完成(包括正常返回、和抛出异常)后,由 Spring-AMQP 框架来“自动”进行确认。
以下为逻辑代码演示

================Direct Exchange 配置
@Configuration
public class DirectExchangeConfiguration {/*** 创建一个 Queue** @return Queue*/@Beanpublic Queue queue05() {// Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除return new Queue(Message05.QUEUE,true,false,false);}/*** 创建 Direct Exchange** @return DirectExchange*/@Beanpublic DirectExchange exchange05() {// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它return new DirectExchange(Message05.EXCHANGE,true,false);}/*** 创建 Binding* Exchange:Message05.EXCHANGE* Routing key:Message05.ROUTING_KEY* Queue:Message05.QUEUE** @return Binding*/@Beanpublic Binding binding05() {return BindingBuilder.bind(queue05()).to(exchange05()).with(Message05.ROUTING_KEY);}
=======================================》 direct 类型的消息对象
@Data
public class Message05 implements Serializable {public static final String QUEUE = "QUEUE_05";public static final String EXCHANGE = "EXCHANGE_05";public static final String ROUTING_KEY = "ROUTING_KEY_05";private String id;
}
=======================================》 生产者逻辑
@Component
public class Producer05 {@Resourceprivate RabbitTemplate rabbitTemplate;public void syncSend(String id) {// 创建 Message05 消息Message05 message = new Message05();message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Message05.EXCHANGE, Message05.ROUTING_KEY, message);}
}
=======================================》 消费者逻辑
@Component
@RabbitListener(queues = Message05.QUEUE)
@Slf4j
public class Consumer05 {/***  ack 模式 ,* 这里需要和 application.yml 中的 acknowledge-mode: manual  对应一起使用*/@RabbitHandlerpublic void onMessageAck(Message05 message01, Message message, Channel channel) throws IOException {try {log.info("[Consumer05 onMessageAck][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message01);//  如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉long deliveryTag = message.getMessageProperties().getDeliveryTag();// 取当前时间,达到一个随机效果,测试的话可以多跑几次试试if (System.currentTimeMillis() % 2 == 1) {// 通知 MQ 消息已被成功消费,可以ACK了// 第二个参数 multiple ,用于批量确认消息,为了减少网络流量,手动确认可以被批处。// 1. 当 multiple 为 true 时,则可以一次性确认 deliveryTag 小于等于传入值的所有消息// 2. 当 multiple 为 false 时,则只确认当前 deliveryTag 对应的消息channel.basicAck(deliveryTag, false);log.info("[Consumer05 onMessageAck][正常ack:{}]", message01);} else {log.info("[Consumer05 onMessageAck][未ack:{}]", message01);throw new RuntimeException("手动异常");}} catch (Exception e) {// 处理失败,重新压入MQchannel.basicRecover();log.info("[Consumer05 onMessageAck][消息重新压入MQ:{}]", message01);}}
=======================================》测试类@Testvoid syncSend() {String id = UUID.randomUUID().toString();producer05.syncSend(id);log.info("[test producer05 syncSend][id:{}] 发送成功", id);TimeUnit.SECONDS.sleep(2);}

以上的是消费者ACK实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述


http://www.ppmy.cn/news/1452851.html

相关文章

Arxml文件解析02- 自动驾驶Radar服务radar_svc.arxml

<AUTOSAR xmlns="http://autosar.org/schema/r4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://autosar.org/schema/r4.0 AUTOSAR_00045.xsd

基于Springboot的校园食堂订餐系统(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的校园食堂订餐系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构…

【02358单片机原理及应用】第三、四、五章考试复习自考复习

第3章 80C51单片机指令系统 考试知识点&#xff1a; 1、寻址方式 &#xff08;1&#xff09;立即寻址&#xff08;#data&#xff0c;#data16&#xff09;例&#xff1a;MOV A&#xff0c;#00H &#xff08;2&#xff09;直接寻址&#xff08;direct&#xff09;内部RAM…

[数据结构]————排序总结——插入排序(直接排序和希尔排序)—选择排序(选择排序和堆排序)-交换排序(冒泡排序和快速排序)—归并排序(归并排序)

文章涉及具体代码gitee&#xff1a; 登录 - Gitee.com 目录 1.插入排序 1.直接插入排序 总结 2.希尔排序 总结 2.选择排序 1.选择排序 ​编辑 总结 2.堆排序 总结 3.交换排序 1.冒泡排序 总结 2.快速排序 总结 4.归并排序 总结 5.总的分析总结 1.插入排…

使用 Docker-Compose 部署 ZooKeeper + Kafka + Kafka-UI

使用 Docker-Compose 部署 Kafka + ZooKeeper 1. 无密码验证部署1.1 启动 ZooKeeper1.2 查看 zookeeper 状态1.3 启动 Kafka1.4 Kafka 配置文件1.4 使用命令操作 Kafak 生产、消费1.4.1 创建topic1.4.2 查看某个 topic1.4.3 获取所有 topic1.4.4 删除 topic1.4.4 发送消息1.4.5…

Qt之信号与槽

槽的本质&#xff1a;对信号响应的函数。 信号函数和槽函数通常位于某个类中&#xff0c;和普通的成员函数相⽐&#xff0c;它们的特别之处在于&#xff1a; 信号函数⽤ signals 关键字修饰&#xff0c;槽函数⽤ public slots、protected slots 或者 private slots 修饰。sign…

配电室智能巡检机器人

近年来&#xff0c;生产过程高度自动化&#xff0c;各工矿企业关键场所需定期巡检维护。但目前巡检主要靠人工&#xff0c;既耗时费力效率又低&#xff0c;且受环境等因素影响&#xff0c;巡检难以全面规范&#xff0c;隐患或问题易被忽视。在此情况下&#xff0c;如何利用现有…

IP 地理定位神话与事实

ip地理定位是一项技术&#xff0c;用于通过访问设备的ip地址来获取地理位置信息&#xff0c;例如国家、城市、经纬度等。该技术广泛应用于网站内容自定义、广告定位、网络安全和用户分析等领域。它通过与包含ip地址和地理位置映射的大型数据库进行查询来工作&#xff0c;但在准…