RabbitMQ的高级特性介绍(二)

embedded/2025/3/26 1:40:28/

发送方确认

当消息的⽣产者将消息发送出去之后,消息到底有没有正确地到达服务器呢? 如果在消息到
达服务器之前已经丢失(比如RabbitMQ重启, 那么RabbitMQ重启期间⽣产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决⽅案:
                a. 通过事务机制实现
                b. 通过发送⽅确认(publisher confirm) 机制实现

事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多,咱们主要介绍confirm机制来实现发送⽅的确认。

RabbitMQ为我们提供了两个⽅式来控制消息的可靠性投递
                1. confirm确认模式
                2. return退回模式

confirm模式

Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听,无论消息是否到达
Exchange, 这个监听都会被执行,如果Exchange成功收到, ACK( Acknowledge character , 确认
字符)为true, 如果没收到消息, ACK就为false。

需要在controller中设置回调方法

@RequestMapping("/confirm")public String confirm(){//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if(ack){System.out.printf("接收到消息,消息id:%s", correlationData==null ? null : correlationData.getId());}else{System.out.printf("未接受到消息,消息id:%s,cause: %s\n",correlationData== null ? null: correlationData.getId(),cause);//相应的业务逻辑处理}}});CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test",correlationData);return "消息发送成功";}

return退回模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者。消息退回给发送者时,我们可以设置⼀个返回回调方法, 对消息进行处理。

@RequestMapping("returns")public String returns(){CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","returns test",correlationData);return "消息发送成功";}

常见面试题:如何保证RabbitMQ的可靠传输?

从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:
1. ⽣产者将消息发送到 RabbitMQ失败
                a. 可能原因: 网络问题等
                b. 解决办法: 参考本章节[发送⽅确认-confirm确认模式]
2. 消息在交换机中无法路由到指定队列:
                a. 可能原因: 代码或者配置层⾯错误, 导致消息路由失败
                b. 解决办法: 参考本章节[发送⽅确认-return模式]

3. 消息队列⾃⾝数据丢失
                a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失.
                b. 解决办法: 参考创作中心-CSDN[持久性]。开启 RabbitMQ持久化, 就是消息写⼊之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会自动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的方式提⾼可靠性)
4. 消费者异常, 导致消息丢失
                a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
                b. 解决办法: 参考本章节创作中心-CSDN[消息确认]. RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息. 默认情况下消费者应答机制是⾃动应答的, 可以开启⼿动确认当消费者确认消费成功后才会删除消息, 从⽽避免消息丢失. 除此之外, 也可以配置重试机制(参考下⼀章节),当消息消费异常时, 通过消息重试确保消息的可靠性。

重试机制

在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可用, 资源不足等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送。

自动确认

发送消息:

@RequestMapping("/retry")public String retry(){rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test");return "消息发送成功";}

消费消息:

@Component
public class RetryQueueListener {
//指定监听队列的名称
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");
}
}

手动确认

改为手动确认

@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");
//3. ⼿动签收channel.basicAck(deliveryTag, true);}catch (Exception e){
//4. 异常了就拒绝签收Thread.sleep(1000);
//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接丢弃channel.basicNack(deliveryTag, true,true);
}
}

手动确认模式下, 消费者需要显式地对消息进⾏确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新入队. 重试的控制权在于应⽤程序本⾝,而不是RabbitMQ的内部机制. 应⽤程序可以通过⾃⼰的逻辑和利⽤RabbitMQ的⾼级特性来实现有效的重试策略。

TTL

TTL(Time to Live, 过期时间),即过期时间。当消息到达存活时间之后,还没有被消费, 就会被自动清除。

有两种TTL:消息的TTL队列的TTL

假如队列的TTL是20s,消息的TTL是10s,那么消息的TTL取小值,也就是10s。

设置消息的TTL

发送消息:

@RequestMapping("/ttl")public String ttl(){System.out.println("ttl...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test",message ->{message.getMessageProperties().setExpiration("10000");  //单位是ms,设置过期时间为10sreturn message;});return "消息发送成功";}

设置队列的TTL

设置队列TTL的⽅法是在创建队列时, 加⼊ x-message-ttl 参数实现的,单位是ms。

//设置TTL@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();  //设置队列的ttl为20s}
@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue,@Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}

两者区别

设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除。
设置消息TTL的⽅法, 即使消息过期, 也不会马上从队列中删除, 而是在即将投递到消费者之前进⾏判定的。

为什么这两种⽅法处理的⽅式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。
而设置消息TTL的⽅式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可。

以上,关于RabbitMQ的高级特性,希望对你有所帮助。


http://www.ppmy.cn/embedded/176666.html

相关文章

Bash 脚本基础

一、Bash 脚本基础 什么是 Bash 脚本:Bash 脚本是一种文本文件,其中包含了一系列的命令,这些命令可以被 Bash shell 执行。它用于自动化重复性的任务,提高工作效率。 Bash 脚本的基本结构:以 #!/bin/bash 开头&#x…

MyBatis-Plus(Ⅲ)IService详解

目录 一、逐一演示 1.save(插入一条) 结果 断言(引入概念) 2.saveBatch(批量插入) 结果 3.saveOrUpdateBatch(批量插入&更新) 结果 4.removeById(通过id删除…

VUE2导出el-table数据为excel并且按字段分多个sheet

首先在根目录下建一个文件夹export用来存储export.js import * as XLSX from xlsxfunction autoWidthFunc(ws, data) {// 设置每列的最大宽度const colWidth data.map(row > row.map(val > {var reg new RegExp([\\u4E00-\\u9FFF], g) // 检测字符串是否包含汉字if (v…

从零到一开发一款 DeepSeek 聊天机器人

AI聊天机器人 目标设计方案系统架构技术选型功能模块 实现代码环境配置安装依赖 核心代码API 请求函数主循环函数 功能扩展1. 情感分析2. 多语言支持3. 上下文记忆4. 用户身份识别 总结附录 目标 开发一个智能聊天机器人,旨在为用户提供自然、流畅的对话体验。通过…

二分查找------查找区间

1. 题目 2. 思路和题解 这道题虽然是道中等题,并且看起来很复杂,但是实际上就是给定一个数组和目标值,让我们去寻找该目标值在数组中的位置。题目还提到说设计O(log n)的算法解决问题,更进一步暗示我们去用二分查找。要找开始位置…

DeepSeek+RAG局域网部署

已经有很多平台集成RAG模式,dify,cherrystudio等,这里通过AI辅助,用DS的API实现一个简单的RAG部署。框架主要技术栈是Chroma,langchain,streamlit,答案流式输出,并且对答案加上索引。支持doc,docx,pdf,txt。…

《AI大模型开发笔记》企业RAG技术实战(二)

接上一篇 《AI大模型开发笔记》企业RAG技术实战(一)https://mp.csdn.net/mp_blog/creation/editor/146381354 使用llamaindex实例 https://docs.llamaindex.ai/en/stable/api_reference/ 环境配置 我们继续使用前面langchain例子的python虚环境,不用新建,激活就行 …

Vue3中router最佳封装落地

文章目录 前言一、拆分路由文件夹?二、main.ts中注册路由总结 前言 router在使用过程中如果我们直接在一个文件的一个数组中配置,最后路由越来越多会导致不易管理,我们可以将一个页面的路由配置在一个数组中最后统一导入,这样就会…