Rabbitmq--延迟消息

devtools/2025/3/11 9:34:05/

13.延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息

延迟任务:一定时间之后才会执行的任务

image-20250310204548904

1.死信交换机

当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false

  • 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费

  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

image-20250310204712321

利用死信交换机的特点,可以实现发送延迟消息的功能

2.延迟消息插件(推荐使用)

1.下载并安装延迟插件

RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中

插件的下载地址:rabbitmq-delayed-message-exchange

image-20250310204809121

下载完插件后,运行以下指令,在输出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安装目

sudo docker inspect rabbitmq

image-20250310204857664

然后进入 RabbitMQ 的插件的安装目录,将刚才下载的插件上传到该目录下

sudo chmod +rx -R /var/lib/docker//接着打开/var/lib/docker/volumes/rabbitmq-plugins/_data目录的写权限(如果修改权限不生效,请切换到 root 用户执行指令)
sudo chmod 777 /var/lib/docker/volumes/rabbitmq-plugins/_data//将刚才下载的插件上传到/var/lib/docker/volumes/rabbitmq-plugins/_data目录
//上传成功后将/var/lib/docker/volumes/rabbitmq-plugins/_data目录的权限复原
sudo chmod 755 /var/lib/docker/volumes/rabbitmq-plugins/_data//最后进入容器内部,运行指令安装插件,安装完成后退出容器内部
sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit

看到以下信息,说明插件安装成功了

image-20250310205159598

2.在 Java 代码中发送延迟消息

声明延迟交换机

@Bean
public DirectExchange delayExchange() {return ExchangeBuilder.directExchange("delay.direct").delayed().build();
}

声明队列和延迟交换机,并将队列和延迟交换机绑定在一起

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue"),exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT),key = "delay"
))
public void listenDelayQueue(String message) {SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("消费者收到了 delay.queue的消息: " + message + ",时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

编写测试方法,测试发送延迟消息

@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000); // 毫秒return message;}});SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

发送延迟消息的本质是在消息头属性中添加 x-delay 属性

image-20250310205359715

3. 延迟消息的原理和缺点
  • RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒

  • RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)

  • 定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大

4.取消超时订单

设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题

  1. 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
  2. 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源

image-20250310205533364

image-20250310205550036

5.发送延迟检测订单的消息

我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class MultipleDelayMessage<T> {private T data;private List<Long> delayMillis;public MultipleDelayMessage() {}public MultipleDelayMessage(T data, Long... delayMillis) {this.data = data;this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));}public MultipleDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));}public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {return new MultipleDelayMessage<>(data, delayMillis);}public boolean hasNextDelay() {return !delayMillis.isEmpty();}public Long removeNextDelay() {return delayMillis.remove(0);}public T getData() {return data;}public void setData(T data) {this.data = data;}public List<Long> getDelayMillis() {return delayMillis;}public void setDelayMillis(List<Long> delayMillis) {this.delayMillis = delayMillis;}@Overridepublic String toString() {return "MultipleDelayMessage{" +"data=" + data +", delayMillis=" + delayMillis +'}';}}

我们再定义一个发送延迟消息的消息处理器,供所有服务使用

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;public class DelayMessagePostProcessor implements MessagePostProcessor {private final Integer delay;public DelayMessagePostProcessor(Integer delay) {this.delay = delay;}@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}}

改造后的发送延迟消息的测试方法

  • “delay.direct”:交换机的名称

  • “delay”:路由键(routing key),交换机会将消息发送到绑定到这个路由键的队列

  • “Hello, DelayQueue!”:实际要发送的消息内容

  • new DelayMessagePostProcessor(10000):消息后处理器(Message Post Processor),用于在消息发送之前对消息进行修改

@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

http://www.ppmy.cn/devtools/166250.html

相关文章

调试正常 ≠ 运行正常:Keil5中MicroLIB的“量子态BUG”破解实录

调试正常 ≠ 运行正常&#xff1a;Keil5中MicroLIB的“量子态BUG”破解实录——从勾选一个选项到理解半主机模式&#xff0c;嵌入式开发的认知升级 &#x1f4cc; 现象描述&#xff1a;调试与烧录的诡异差异 在线调试时 程序正常运行 - 独立运行时 设备无响应 ! 编译过程 0 Err…

MoonSharp 文档三

MoonSharp 文档一-CSDN博客 MoonSharp 文档二-CSDN博客 MoonSharp 文档四-CSDN博客 MoonSharp 文档五-CSDN博客 7.Proxy objects(代理对象) 如何封装你的实现,同时又为脚本提供一个有意义的对象模型 官方文档:MoonSharp 在现实世界的场景中,脚本往往会超出你的控制范…

RISC-V医疗芯片工程师复合型转型的路径与策略

从RISC-V到医疗芯片:工程师复合型转型的路径与策略 一、引言 1.1 研究背景 在科技快速发展的当下,芯片技术已然成为推动各行业进步的核心驱动力之一。其中,RISC-V 架构作为芯片领域的新兴力量,正以其独特的优势迅速崛起,对整个芯片产业的格局产生着深远影响。RISC-V 架…

【Linux docker 容器】关于想要让虚拟机在开机时候也docker自己启动,容器也自己启动,省去要自己开docker和容器

确认 Docker 服务状态&#xff1a; 首先&#xff0c;你需要确保 Docker 服务已经在虚拟机上安装并正确配置。你可以使用如下命令来检查 Docker 服务的状态&#xff1a; systemctl status docker.service 如果服务没有运行&#xff0c;你可以使用以下命令启动它&#xff1a; s…

Flink-DataStreamAPI-生成水印

下面我们将学习Flink提供的用于处理事件时间戳和水印的API&#xff0c;也会介绍有关事件时间、流转时长和摄取时间&#xff0c;下面就让我们跟着官网来学习吧 一、水印策略介绍 为了处理事件时间&#xff0c;Flink需要知道事件时间戳&#xff0c;这意味着流中的每个元素都需要…

Rust 模式匹配中的可反驳性与不可反驳性

1. 什么是可反驳模式和不可反驳模式&#xff1f; 1.1.不可反驳模式&#xff08;Irrefutable Patterns&#xff09; 不可反驳模式是 总能匹配任何可能值 的模式。例如&#xff0c;下面的 let 语句&#xff1a; let x 5;x 是一个不可反驳模式&#xff0c;它匹配 任何值&#…

#函数探幽

c内联函数 内联函数与其他函数的区别&#xff08;这必须深入到程序的内部&#xff09;&#xff1a;编译的最终产品是可执行程序-----它是由机械语言指令组成。运行时程序&#xff0c;操作系统会把这些指令载入到计算机内存中&#xff0c;分配内存逐步执行。在调用函数时&#x…

深入解析K8s VolumeMounts中的subPath字段及其应用

文章目录 前言一、什么是subPath二、subPath使用场景三、场景一示例1.资源准备2.使用subPath字段 四、场景二示例1.资源准备2.测试 前言 在Kubernetes中&#xff0c;挂载存储卷是容器化应用的常见需求。然而当我们将整个卷挂载到容器中的某个目录时&#xff0c;可能会覆盖目标…