RabbitMQ--延迟队列

server/2025/1/21 21:38:54/

(一)延迟队列

1.概念

 延迟队列是一种特殊的队列,消息被发送后,消费者并不会立刻拿到消息,而是等待一段时间后,消费者才可以从这个队列中拿到消息进行消费

2.应用场景

 延迟队列的应用场景很多,就比如大部分定时的场景,我们都可以利用延迟队列例如:闹钟定时,预约会议,空调定时开关等

 但是RabbitMQ是没有直接给我们提供延迟队列的,但是我们可以通过上一篇博客说的ttl和死信来达到延迟队列的效果,具体操作如下

 首先我们有一个交换机和一个队列,然后此队列又指定一个死信交换机,死信交换机绑定一个死信队列,然后我们消费者并不是从正常队列中获取消息,而是从死信队列中获取消息,通过给消息/队列设置过期时间来影响消息到达死信队列的时间,消费者拿到消息就会延迟,这样就可以模拟出延迟的效果。

那接下来就是我们的代码实现

首先我们通过设置队列ttl来实现

 @Bean("ttlExchange")public Exchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(5000).deadLetterExchange(Constants.DEAD_EXCHANGE).deadLetterRoutingKey("dead").build();}@Bean("ttlBind")public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();}@Bean("deadExchange")public Exchange deadExchange(){return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();}@Bean("deadQueue")public Queue deadQueue(){return QueueBuilder.durable(Constants.DEAD_QUEUE).build();}@Bean("deadBind")public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();}

然后生产者代码没什么变化

 @RequestMapping("ttl")public String TTLPro(){String s1="ttl test";Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));
//        message.getMessageProperties().setExpiration("10000");RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);return "发送成功";}

只不过消费者订阅的是死信队列

@RabbitListener(queues = Constants.DEAD_QUEUE)public void ListenerQueue2(Message message,Channel channel) throws IOException {long Tag=message.getMessageProperties().getDeliveryTag();try {System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "+Tag);int num=3/0;     //模拟失败channel.basicAck(Tag,false);System.out.println("处理完成");}catch (Exception e){channel.basicReject(Tag,false);}}

这样过了5s后我们就可以从死信队列中获取到延迟消息了

那我们再来通过设置消息的ttl来看一下

首先我们要把队列的ttl给取消掉,记得要删队列

 @Bean("ttlExchange")public Exchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).deadLetterExchange(Constants.DEAD_EXCHANGE).deadLetterRoutingKey("dead").build();}@Bean("ttlBind")public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();}@Bean("deadExchange")public Exchange deadExchange(){return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();}@Bean("deadQueue")public Queue deadQueue(){return QueueBuilder.durable(Constants.DEAD_QUEUE).build();}@Bean("deadBind")public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();}

  然后我们发送一个ttl时间为10s的,再发送一个5s的,我们知道这样两条数据是会发生错误的,因为我们设置消息过期时间,我们RabbitMQ(性能问题)并不会遍历整个消息队列看看谁过没过期,如果过期的消息不在队头,那么只有当使用的时候,才会真正的进行一些过期处理,比如传给死信交换机 

@RequestMapping("ttl")public String TTLPro(){String s1="ttl test";Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));message.getMessageProperties().setExpiration("10000");RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);message.getMessageProperties().setExpiration("5000");RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);return "发送成功";}

如果正常,会在5s后接收到第一个消息,在10s后接收到第二个消息,但是此时我们会同一时间(10s)接收到两条消息

  那这个问题在上一篇ttl的时候就说过了,这里依然是个问题,虽然设置队列的ttl不会有这个问题,但是设置队列ttl我们针对不同延迟时间就需要创建多个队列,这是不太合理的,所以针对这个问题,我们有一个延迟队列的插件可以使用

 3.延迟队列插件

延迟队列插件,会给我们提供一个特殊的交换机,来完成我们的延迟功能

这是我们插件的下载地址

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  我们需要找到ez文件并下载,但是注意这里的版本要与你的RabbitMQ版本可以匹配,否则之后会出现问题

那插件下完后,我们要找到对应目录,下载插件

上面两个目录,我们可以任选一个下载即可,没有这个目录,我们手动创建

然后把下载的ez文件,copy到这个目录中即可,然后我们可以使用命令 rabbitmq-plugins list 来查看插件列表,看看我们有没有成功放进去,但是注意,即使我们成功放进去并成功显示了,也可能会出错,这就可能是你们下载的RabbitMQ版本与整个延迟插件的版本不匹配,重新下载其他版本即可

然后我们启动插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange

之后重启服务service rabbitmq-server restart

在没有发生错误的情况下,我们就发现我们会多了一个默认的交换机

此时我们代码中就不需要声明普通交换机了而是直接使用默认交换机即可

我们生产者代码是需要改一下的,我们需要调用一个方法来设置延迟时间

@RequestMapping("/delay2")
public String delay2() {//发送带ttl的消息 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 20s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(20000L); return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 10s..."+new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setDelayLong(10000L); //设置延迟时间 return messagePostProcessor;});return "发送成功!";
}

此时我们就可以在10s正确接收一个消息,在20s正确接收另一个消息 

  注意我们使用TTL+死信时消息传递给交换机后映射之后一直在正常队列中的,等待TTL时间到了把消息给死信交换机再映射到死信队列再拿到消息,我们使用插件的时候,消息是在RabbitMQ给我们提供的那个特殊的交换机中的,等待时间到了,再映射给队列,然后从队列中拿消息

4.常见面试题

介绍下延迟队列

我们可以这样回答:

 延迟队列是一个特殊的队列,消息发送后,消费者并不会立刻拿到,而是等待一定延迟时间后才发送给消费者进行消费

 并且延迟队列的应用场景很多,比如订单支付,智能家电,以及定时邮箱

 但是延迟队列在RabbitMQ中并没有直接给我们提供,我们可以通过TTL+死信的方式或者使用延迟插件的方式来实现延迟功能

 两者的区别:

1.通过TTL+死信

 优点:比较灵活,不需要我们额外引入插件

 缺点:我们设置消息TTL的时候可能会出现顺序的问题,而且我们需要多创建死信队列和死信交换机,完成一些绑定,增加了系统的复杂性

2.基于插件实现的延迟队列

 优点:通过插件能够简化延迟消息的实现,并且避免了时序问题

 缺点:需要依赖插件,不同版本RabbitMQ需要不同版本插件,有运维工作


http://www.ppmy.cn/server/160285.html

相关文章

Vue.js 动态设置表格最大高度的实现

概述 在现代 Web 开发中,响应式设计至关重要,尤其是在处理复杂的布局和数据表格时。表格通常会受到多种因素的影响,如分页、合计行或动态内容,这可能导致表格高度的变化。本文将介绍一个基于 Vue.js 的方法 setMaxHeight&#xf…

新能源科技亲民化路径何在?交互式展厅提供新解吗?

在当今科技迅猛发展的时代,全球能源需求持续增长,新能源的崛起既筑起了环保防线,又开启了能源新纪元。然而,新能源技术的复杂性常令非技术背景的公众难以理解和应用。为此,新能源主题交互式展厅应运而生,成…

解密AIGC三大核心算法:GAN、Transformer、Diffusion Models原理与应用

在当今数字化时代,人工智能生成内容(AIGC)技术正以前所未有的速度改变着我们的生活和工作方式。从创意无限的文本生成,到栩栩如生的图像创作,再到动听的音乐旋律,AIGC的魔力无处不在。而这一切的背后&#…

mysql的测试方案

1. 测试目标与范围 1.1 性能测试目标 MySQL性能测试旨在评估数据库在不同负载条件下的响应速度、吞吐量和资源利用率,确保其能够满足业务需求。 响应时间:衡量查询和事务处理的延迟,目标是将平均响应时间控制在100毫秒以内,95%的…

2025美赛Latex模板可直接运行!O奖自用版

目录 01 预览图02 Latex模板main.texeasymcm.sty 2025年美国大学生数学建模大赛(2025年1月23日)马上开始啦,大家一定要提前准备好模板,Latex或者是Word都可以,这里我整理了之前比赛用到的模板,并进行了一些…

PyTorch框架——基于深度学习YOLOv11神经网络路面坑洞检测系统

基于深度学习YOLOv11神经网络路面坑洞检测系统,其能识别路面坑洞,见如下 第一步:YOLOv11介绍 YOLOv11是由Ultralytics公司开发的新一代目标检测算法,它在之前YOLO版本的基础上进行了显著的架构和训练方法改进。以下是YOLOv11的一…

基于视觉分析的占道经营行为识别

随着城市化进程的加快,街头的占道经营现象逐渐增多。无论是在繁华的商业区,还是在居民区附近,占道经营都可能影响交通流畅、阻碍行人通行,甚至对城市的环境和形象造成负面影响。因此,如何有效地进行占道经营行为的检测…

基于单片机的智能家居控制系统设计及应用

摘要 : 智能家居控制系统包括对家电实现远距离控制和近距离控制的一种控制型系统,通过系统内的 TC35 模块对控制信息进行采集,并将这些控制信息发送到 STC89C52 模块中去,由单片机发出系统控制指令,从而实现家居家电的控制。 关键词 :单片机 ; 智能家居 ; 家居生活 ; 控…