RabbitMQ 高级特性——延迟队列

news/2024/11/17 9:42:20/

在这里插入图片描述

文章目录

  • 前言
  • 延迟队列
    • 延迟队列的概念
    • TTL + 死信队列模拟延迟队列
      • 设置队列的 TTL
      • 设置消息的 TTL
    • 延迟队列插件
      • 安装并且启动插件服务
      • 使用插件实现延迟功能

前言

前面我们学习了 TTL 和死信队列,当队列中的消息达到了过期时间之后,那么这个消息就会被死信交换机投递到死信队列中,然后由专门的消费者进行消费,这篇文章将为大家介绍 RabbitMQ 的另一高级特性——延迟队列。

延迟队列

延迟队列的概念

延迟队列通常指的是一种队列,其中的消息在发送后不会立即被消费,而是会等待一段时间(即延迟时间)后才变得可消费。

RabbitMQ 本身不直接提供名为“延迟队列”的内置功能,但我们可以利用 RabbitMQ 的一些特性和机制来模拟或实现延迟队列的行为。就比如 TTL + 死信队列方式模拟出延迟队列的功能。

TTL + 死信队列模拟延迟队列

在这里插入图片描述

我们的消费者不去消费正常队列中的消息,而是去消费死信队列中的消息,为什么这样能实现延迟对立的功能呢?当生产者生产消息投递给正常交换机之后,交换机会将这个消息根据 routing key 和 binding key 路由到指定的队列中,因为这个正常队列没有消费者可以消费消息,所以到达了了这个队列中的消息,就会存储在这个队列中,因为这个队列设置了 TTL,所以消息到达了过期时间之后就会被投递给死信交换机 DLX,然后死信交换机就会将这个消息投递给 DLQ,然后消费者从这个 DLQ 中去消费消息,这样消费者就会在生产者生产的消息到达队列之后不会立即去消费,而是会等待一段时间,这样就通过 TTL + 死信队列的方式模拟出了延迟队列的功能。

设置队列的 TTL

那么我们看看通过代码如何实现:

java">public class Constants {public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String DL_EXCHANGE = "dl.exchange";public static final String DL_QUEUE = "dl.queue";
}

声明交换机、队列和绑定关系:

java">@Configuration
public class DLConfig {@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(1000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}@Bean("DLExchange")public Exchange DLExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("DLQueue")public Queue DLQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("DLBinding")public Binding DLBinding(@Qualifier("DLExchange") Exchange exchange,@Qualifier("DLQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dl").noargs();}
}

生产者代码:

java">@RequestMapping("producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("delay")public String delay() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay,发送时间:" + new Date());return "消息发送成功";}
}

消费者:

java">@Component
public class DelayListener {@RabbitListener(queues = Constants.DL_QUEUE)public void listener(String message, Channel channel) {System.out.println("时间:" + new Date() + ",接收到消息:" + message + channel);}
}

在这里插入图片描述

可以看到,通过 TTL + 死信队列的方式是可以实现延迟队列的功能的,虽然可能从消息发送的时间到消费者消费到这个消息的时间可能会比预想的时间多一点(中间消息传输花费了一点时间)。

这是一次发送一条消息的情况,如果我们一次发多条信息看看什么效果:

java">@RequestMapping("delay")
public String delay() throws InterruptedException {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay1,发送时间:" + new Date());Thread.sleep(1000);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay2,发送时间:" + new Date());Thread.sleep(1000);rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","rabbitmq delay3,发送时间:" + new Date());return "消息发送成功";
}

在这里插入图片描述

当为队列设置 TTL 的时候,消息会按照进队列的先后顺序进行消费,这是为队列设置 TTL 的情况,为队列设置 TTL 意味着进入该队列中的所有消息都具有过期时间,且过期时间如果没有单独设置消息的过期时间的话,那么该队列中的消息的过期时间都是相同的,那么如果我们想要每个消息的过期时间都不相同的话就需要单独给消息设置过期时间,接下来我们看看给消息单独设置过期时间的情况。

设置消息的 TTL

java">public static final String NORMAL_QUEUE2 = "normal.queue2";

创建出一个没有设置过期时间的普通队列:

java">@Bean("normalQueue2")
public Queue normalQueue2() {return QueueBuilder.durable(Constants.NORMAL_QUEUE2).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();
}@Bean("normalBinding2")
public Binding normalBinding2(@Qualifier("normalExchange") Exchange exchange,@Qualifier("normalQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal2").noargs();
}

设置消息的过期时间:

java">@RequestMapping("delay2")
public String delay2(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("10000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("20000");return message;});return "消息发送成功";
}

消费者的代码就不需要改动:

在这里插入图片描述

为消息设置 TTL 就可以时间每个消息的过期时间都不相同,但是这里会有一个问题,就是如果先投递的消息的过期时间如果晚于后面投递的消息的过期时间就会出现问题:

java">@RequestMapping("delay2")
public String delay2(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("20000");return message;});rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal2","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息发送成功";
}

在这里插入图片描述

可以发现设置了过期时间为 10s 的消息也是过了 20s 之后才进入到死信队列然后才被消费。

消息过期之后,不一定会被马上丢弃。因为RabbitMQ只会检查队首消息是否过期,如果过期则丢到死信队列。
此时就会造成一个问题,如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个消息并不会优先得到执行。因为我们的普通队列并没有消费者消费其中的消息,我们都知道对消息设置过期时间,只有在消息被快要使用的之前,才会判断它是否过期,但是这里由于队列没有消费者,所以消息也就不会被使用到,那么 RabbitMQ 就只会检查队首的消息是否过期,这样就导致了队首的消息没有过期,那么之后的消息也不会被检查到过期投递到死信队列中。

所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

延迟队列插件

虽然 RabbitMQ 没有直接实现延迟队列,但是 RabbitMQ 提供了一个延迟的插件来实现延迟的功能。

具体的可以看看官方文档 https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq#delaying-messages

安装并且启动插件服务

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
也可以点击这里:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
我们下载的插件的版本需要和 RabbitMQ 的版本兼容:

在这里插入图片描述

版本对应关系:

在这里插入图片描述

下载完成插件之后,可以在这里查看如何配置:https://www.rabbitmq.com/docs/installing-plugins

在这里插入图片描述
我们将下载完成的文件放在 RabbitMQ 的插件目录下:

我们使用的是 Linux,所以需要将插件放在 /usr/lib/rabbitmq/plugins 或者 /usr/lib/rabbitmq/lib/rabbitmq_server-version/plugins文件下,如果没有这个路径可以自己创建:

在这里插入图片描述
下载完成插件并且将其放在指定目录下之后,我们就可以启动插件了:

使用 rabbitmq-plugins list 查看 rabbitmq 的插件:
在这里插入图片描述

如果我们之前启动了一个错误版本的插件,可以使用 rabbitmq-plugins disbale 插件名称 来禁用插件:

在这里插入图片描述

使用 rabbitmq-plugins enable 插件名称 来启动插件服务:

在这里插入图片描述
如何验证插件已经安装并且启动成功,我们在 rabbitmq 的管理页面新建一个交换机的时候,看看交换机的类型是否有 x-delayed-message 这个类型:

我们刷新一下管理页面,然后在新建交换机的时候查看交换机的类型:

在这里插入图片描述
这就说明我们的延迟插件启动成功了。

使用插件实现延迟功能

安装并且启动延迟插件之后,我们来看看代码如何实现一个延迟功能的队列:

java">public static final String DELAY_EXCHANGE = "delay.exchange";
public static final String DELAY_QUEUE = "delay.queue";

声明交换机、对立和绑定关系:

java">@Configuration
public class DelayConfig {@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delay").noargs();}
}

生产者代码:

java">@RequestMapping("delay3")
public String delay3(){rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setDelay(20000);return message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE,"delay","rabbitmq delay,发送时间" + new Date(),message -> {message.getMessageProperties().setDelay(10000);return message;});return "消息发送成功";
}

消费者代码:

java">@RabbitListener(queues = Constants.DELAY_QUEUE)
public void listener2(String message, Channel channel) {System.out.println("时间:" + new Date() + ",接收到消息:" + message + channel);
}

启动项目之后,会创建一个 x-delay-message 类型的交换机:

在这里插入图片描述
在这里插入图片描述
使用延迟插件的话,就可以使得队列中的消息可以按照延迟时间到达消费者。


http://www.ppmy.cn/news/1547692.html

相关文章

24.11.15 Vue3

let newJson new Proxy(myJson,{get(target,prop){console.log(在读取${prop}属性);return target[prop];},set(target,prop,val){console.log(在设置${prop}属性值为${val});if(prop"name"){document.getElementById("myTitle").innerHTML val;}if(prop…

基于Python爬虫大屏可视化的热门旅游景点数据分析系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…

C++初阶——list

一、什么是list list是一个可以在序列的任意位置进行插入和删除的容器,并且可以进行双向迭代。list的底层是一个双向链表,双向链表可以将它们包含的每个元素存储在不同且不相关的存储位置。通过将每个元素与前一个元素的链接和后一个元素的链接关联起来&…

LSTM(长短期记忆网络)详解

1️⃣ LSTM介绍 标准的RNN存在梯度消失和梯度爆炸问题,无法捕捉长期依赖关系。那么如何理解这个长期依赖关系呢? 例如,有一个语言模型基于先前的词来预测下一个词,我们有一句话 “the clouds are in the sky”,基于&…

精密机加工 —— 工业制造的核心力量!

在现代工业制造的广袤领域中,精密机加工犹如一颗璀璨的明珠,是推动整个工业体系向前发展的核心力量。 精密机加工是一种利用高精度的机械设备和先进工艺技术,对原材料进行精细加工,从而制造出符合特定精度要求和复杂形状零部件的制…

ctfshow-web入门-JWT(web345-web350)

目录 1、web345 2、web346 3、web347 4、web348 5、web349 6、web350 1、web345 F12 看到提示 但是访问 admin 会重定向到 index.php 提取 jwt 解码 eyJhbGciOiJOb25lIiwidHlwIjoiand0In0.W3siaXNzIjoiYWRtaW4iLCJpYXQiOjE3MzE2NDYzOTQsImV4cCI6MTczMTY1MzU5NCwibmJmIj…

pom中无法下载下来的类外部引用只给一个jar的时候

比如jar在桌面上放着,操作步骤如下: 选择桌面,输入cmd ,执行mvn install:install-file -DgroupIdcom -DartifactIdaspose-words -Dversion15.8.0 -Dpackagingjar -Dclassifierjdk11 -Dfilejar包名称 即可把jar包引入成功。

JAVA接入WebScoket行情接口

Java脚好用的库很多,开发效率一点不输Python。如果是日内策略,需要更实时的行情数据,不然策略滑点太大,容易跑偏结果。 之前爬行情网站提供的level1行情接口,实测平均更新延迟达到了6秒,超过10只股票并发请…