Rabbitmq--延迟消息

server/2025/3/14 17:24:39/

13.延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息

延迟任务:一定时间之后才会执行的任务

image-20250310204548904

1.死信交换机

当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false

  • 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费

  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

image-20250310204712321

利用死信交换机的特点,可以实现发送延迟消息的功能

2.延迟消息插件(推荐使用)

1.下载并安装延迟插件

RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中

插件的下载地址:rabbitmq-delayed-message-exchange

image-20250310204809121

下载完插件后,运行以下指令,在输出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安装目

sudo docker inspect rabbitmq

image-20250310204857664

然后进入 RabbitMQ 的插件的安装目录,将刚才下载的插件上传到该目录下

sudo chmod +rx -R /var/lib/docker//接着打开/var/lib/docker/volumes/rabbitmq-plugins/_data目录的写权限(如果修改权限不生效,请切换到 root 用户执行指令)
sudo chmod 777 /var/lib/docker/volumes/rabbitmq-plugins/_data//将刚才下载的插件上传到/var/lib/docker/volumes/rabbitmq-plugins/_data目录
//上传成功后将/var/lib/docker/volumes/rabbitmq-plugins/_data目录的权限复原
sudo chmod 755 /var/lib/docker/volumes/rabbitmq-plugins/_data//最后进入容器内部,运行指令安装插件,安装完成后退出容器内部
sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit

看到以下信息,说明插件安装成功了

image-20250310205159598

2.在 Java 代码中发送延迟消息

声明延迟交换机

@Bean
public DirectExchange delayExchange() {return ExchangeBuilder.directExchange("delay.direct").delayed().build();
}

声明队列和延迟交换机,并将队列和延迟交换机绑定在一起

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue"),exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT),key = "delay"
))
public void listenDelayQueue(String message) {SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("消费者收到了 delay.queue的消息: " + message + ",时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

编写测试方法,测试发送延迟消息

@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000); // 毫秒return message;}});SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

发送延迟消息的本质是在消息头属性中添加 x-delay 属性

image-20250310205359715

3. 延迟消息的原理和缺点
  • RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒

  • RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)

  • 定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大

4.取消超时订单

设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题

  1. 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
  2. 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源

image-20250310205533364

image-20250310205550036

5.发送延迟检测订单的消息

我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class MultipleDelayMessage<T> {private T data;private List<Long> delayMillis;public MultipleDelayMessage() {}public MultipleDelayMessage(T data, Long... delayMillis) {this.data = data;this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));}public MultipleDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));}public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {return new MultipleDelayMessage<>(data, delayMillis);}public boolean hasNextDelay() {return !delayMillis.isEmpty();}public Long removeNextDelay() {return delayMillis.remove(0);}public T getData() {return data;}public void setData(T data) {this.data = data;}public List<Long> getDelayMillis() {return delayMillis;}public void setDelayMillis(List<Long> delayMillis) {this.delayMillis = delayMillis;}@Overridepublic String toString() {return "MultipleDelayMessage{" +"data=" + data +", delayMillis=" + delayMillis +'}';}}

我们再定义一个发送延迟消息的消息处理器,供所有服务使用

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;public class DelayMessagePostProcessor implements MessagePostProcessor {private final Integer delay;public DelayMessagePostProcessor(Integer delay) {this.delay = delay;}@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}}

改造后的发送延迟消息的测试方法

  • “delay.direct”:交换机的名称

  • “delay”:路由键(routing key),交换机会将消息发送到绑定到这个路由键的队列

  • “Hello, DelayQueue!”:实际要发送的消息内容

  • new DelayMessagePostProcessor(10000):消息后处理器(Message Post Processor),用于在消息发送之前对消息进行修改

@Test
void testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));SimpleDateFormat simpleDateFormat = new SimpleDateFormat();simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

http://www.ppmy.cn/server/174940.html

相关文章

【Docker】- Windows11 安装和配置

Windows 11 家庭版 安装Docker Docker是什么 Docker 是一个开源的容器化平台&#xff0c;它可以让你在一个独立、轻量的环境中运行应用程序。它主要由以下几个核心概念组成&#xff1a; 镜像&#xff08;Image&#xff09;&#xff1a;Docker 的应用程序模板&#xff0c;包含…

提升工地安全:视觉分析助力挖掘机作业监控

在现代化施工场景中&#xff0c;挖掘机的应用极为广泛&#xff0c;但其作业半径内的安全问题一直是行业关注的重点。传统的安全监管方式往往依赖于人工巡视和物理隔离&#xff0c;但这种方式存在监控盲区大、反应速度慢等不足。随着人工智能和计算机视觉技术的快速发展&#xf…

WPF 制作机械手动画

偶然的机会想做一个双手臂运转的机械手动作动画&#xff0c;重要的是有前辈写好的可以模仿&#xff1a;WPF开发经验-实现一种三轴机械手控件 - 一团静火 - 博客园 shit&#xff0c;公司禁止上传图片了 --------------------------------------------------------------------…

C#中类‌的核心定义

‌C# 类‌是面向对象编程&#xff08;OOP&#xff09;中的核心概念之一&#xff0c;用于定义对象的模板或蓝图&#xff0c;包含数据成员&#xff08;字段、属性&#xff09;和函数成员&#xff08;方法、事件等&#xff09;。类提供了封装机制&#xff0c;将数据和操作数据的方…

c++中cout输出指定位数的int类型数据,输出指定精度的浮点型数据

1、输出指定位数的int型数据&#xff08;包含iomanip文件&#xff09; #include <iostream> #include <iomanip>int main() {int num 123;// 设置输出宽度为5&#xff0c;不足5位在前面补空格std::cout << std::setw(5) << num << std::endl;/…

Spring WebSocket 像写http接口一样处理WebSocket消息(Stomp协议)

简单的WebSocket服务搭建 在聊Stomp协议之前&#xff0c;先看一下Spring boot使用比较原始的方法是怎么搭建WebSocket服务的 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactI…

网络爬虫-1:发送请求+维持会话+代理设置/超时设置

1.基于get发送请求 2.基于post发送请求 3.维持会话 4.代理设置/超时设置 一.基于get发送请求 1.获取网页源码1 使用json库中的json.loads(),将json格式的字符串变为Python的字典形式 以下通过http://httpbin.org/get网址进行基本练习操作 import requests import json urlh…

电力时间同步系统,京准电钟电子助力增效

电力时间同步系统&#xff0c;京准电钟电子助力增效 电力时间同步系统&#xff0c;京准电钟电子助力增效 电力时间同步系统是保障电网稳定运行的关键技术&#xff0c;其核心在于为全网的设备提供统一、高精度的时间基准。以下从技术方案、系统设计要点及挑战与解决方案等方面…