RabbitMQ的高级特性介绍(二)

devtools/2025/4/1 5:30:18/

发送方确认

当消息的⽣产者将消息发送出去之后,消息到底有没有正确地到达服务器呢? 如果在消息到
达服务器之前已经丢失(比如RabbitMQ重启, 那么RabbitMQ重启期间⽣产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决⽅案:
                a. 通过事务机制实现
                b. 通过发送⽅确认(publisher confirm) 机制实现

事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多,咱们主要介绍confirm机制来实现发送⽅的确认。

RabbitMQ为我们提供了两个⽅式来控制消息的可靠性投递
                1. confirm确认模式
                2. return退回模式

confirm模式

Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听,无论消息是否到达
Exchange, 这个监听都会被执行,如果Exchange成功收到, ACK( Acknowledge character , 确认
字符)为true, 如果没收到消息, ACK就为false。

需要在controller中设置回调方法

@RequestMapping("/confirm")public String confirm(){//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if(ack){System.out.printf("接收到消息,消息id:%s", correlationData==null ? null : correlationData.getId());}else{System.out.printf("未接受到消息,消息id:%s,cause: %s\n",correlationData== null ? null: correlationData.getId(),cause);//相应的业务逻辑处理}}});CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm test",correlationData);return "消息发送成功";}

return退回模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者。消息退回给发送者时,我们可以设置⼀个返回回调方法, 对消息进行处理。

@RequestMapping("returns")public String returns(){CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","returns test",correlationData);return "消息发送成功";}

常见面试题:如何保证RabbitMQ的可靠传输?

从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:
1. ⽣产者将消息发送到 RabbitMQ失败
                a. 可能原因: 网络问题等
                b. 解决办法: 参考本章节[发送⽅确认-confirm确认模式]
2. 消息在交换机中无法路由到指定队列:
                a. 可能原因: 代码或者配置层⾯错误, 导致消息路由失败
                b. 解决办法: 参考本章节[发送⽅确认-return模式]

3. 消息队列⾃⾝数据丢失
                a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失.
                b. 解决办法: 参考创作中心-CSDN[持久性]。开启 RabbitMQ持久化, 就是消息写⼊之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会自动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的方式提⾼可靠性)
4. 消费者异常, 导致消息丢失
                a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
                b. 解决办法: 参考本章节创作中心-CSDN[消息确认]. RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息. 默认情况下消费者应答机制是⾃动应答的, 可以开启⼿动确认当消费者确认消费成功后才会删除消息, 从⽽避免消息丢失. 除此之外, 也可以配置重试机制(参考下⼀章节),当消息消费异常时, 通过消息重试确保消息的可靠性。

重试机制

在消息传递过程中, 可能会遇到各种问题, 如⽹络故障, 服务不可用, 资源不足等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送。

自动确认

发送消息:

@RequestMapping("/retry")public String retry(){rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test");return "消息发送成功";}

消费消息:

@Component
public class RetryQueueListener {
//指定监听队列的名称
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");
}
}

手动确认

改为手动确认

@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");
//3. ⼿动签收channel.basicAck(deliveryTag, true);}catch (Exception e){
//4. 异常了就拒绝签收Thread.sleep(1000);
//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接丢弃channel.basicNack(deliveryTag, true,true);
}
}

手动确认模式下, 消费者需要显式地对消息进⾏确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新入队. 重试的控制权在于应⽤程序本⾝,而不是RabbitMQ的内部机制. 应⽤程序可以通过⾃⼰的逻辑和利⽤RabbitMQ的⾼级特性来实现有效的重试策略。

TTL

TTL(Time to Live, 过期时间),即过期时间。当消息到达存活时间之后,还没有被消费, 就会被自动清除。

有两种TTL:消息的TTL队列的TTL

假如队列的TTL是20s,消息的TTL是10s,那么消息的TTL取小值,也就是10s。

设置消息的TTL

发送消息:

@RequestMapping("/ttl")public String ttl(){System.out.println("ttl...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl","ttl test",message ->{message.getMessageProperties().setExpiration("10000");  //单位是ms,设置过期时间为10sreturn message;});return "消息发送成功";}

设置队列的TTL

设置队列TTL的⽅法是在创建队列时, 加⼊ x-message-ttl 参数实现的,单位是ms。

//设置TTL@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();  //设置队列的ttl为20s}
@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue,@Qualifier("ttlExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}

两者区别

设置队列TTL属性的⽅法, ⼀旦消息过期, 就会从队列中删除。
设置消息TTL的⽅法, 即使消息过期, 也不会马上从队列中删除, 而是在即将投递到消费者之前进⾏判定的。

为什么这两种⽅法处理的⽅式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。
而设置消息TTL的⽅式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列, 所以不如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可。

以上,关于RabbitMQ的高级特性,希望对你有所帮助。


http://www.ppmy.cn/devtools/171366.html

相关文章

物联网系统部署与运维实训室

一、引言 在数字化时代,物联网技术正以前所未有的速度蓬勃发展,广泛渗透到各个领域,深刻改变着人们的生活和工作方式。从智能家居、智能交通到工业自动化、医疗健康等,物联网的应用无处不在,推动着各行业的智能化变革…

“零拷贝”(Zero-Copy)技术详解以及使用场景

“零拷贝”(Zero-Copy)是一种优化数据传输效率的技术,通过减少或消除数据在内存中的复制次数,显著提高I/O操作性能。以下是使用Java代码实现的零拷贝技术示例。 Java NIO 中的零拷贝实现 1. 内存映射文件(Memory Mapped File) import java.io.IOException; import jav…

工作记录 2017-03-07

工作记录 2017-03-07 序号 工作 相关人员 1 修改邮件上的问题。 更新RD服务器。 郝 更新的问题 1、增加了2个菜单Global Fee Category、Global Fee List。 2、增加了Global Fee Category页面。 3、增加了Global Fee List页面。 我在把邮件上的文件生成导入数据库…

硬件基础(5):(3)二极管的应用

文章目录 [toc]1. **整流电路****功能**:**工作原理**:**应用实例**:电路组成:整流过程:电路的应用: 2. **稳压电路****功能**:**工作原理**:**应用实例**:电路组成及功能…

提升生产效率的关键: ethercat转TCPIP网关智能通信

大家好。最近在数据互联互通方面,我们迎来了一个重要的突破。作为生产管理系统的核心组成部分,数据互联互通一直是一个亟待解决的挑战。我们知道,EtherCAT和TCP/IP是两种不同的通信协议,它们之间的互通性一直存在问题。 不过&…

Vulnhub靶场thales靶机通关攻略(一)

1.在Orcde VirtuaBox打开靶机 2.拿kali(桥接模式)扫描网段 22和8080端口开放 3.查看网站 需要登录,尝试爆破密码 4.使用msf对应模块找到密码,有对应模块 5.登录进入后台,发现上传文件部分 6.用msfvenom反弹shell ms…

程序员英语口语练习笔记

我是一个程序员,专注于Java, Linux和k8s. I’m a programmer specializing in Java, Linux, and Kubernetes. 这个不是我的bug。 I don’t think this bug is caused by my work. 你能帮我看一下这个代码吗? Can you take a look at this code for me?…

Vue 2 探秘:visible 和 append-to-body 是谁的小秘密?

🚀 Vue 2 探秘:visible 和 append-to-body 是谁的小秘密?🤔 父组件:identify-list.vue子组件:fake-clue-list.vue 嘿,各位前端探险家!👋 今天我们要在 Vue 2 的代码丛林…