RabbitMQ延迟消息——DelayExchange插件

news/2024/12/22 10:07:08/

什么是死信以及死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信

        1. 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

        2. 消息是一个过期消息,超时无人消费

        3. 要投递的队列消息满了,无法投递

如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息

  2. 收集那些因队列满了而被拒绝的消息

  3. 收集因TTL(有效期)到期的消息

为什么这里会介绍死信交换机呢,举个例子,我们在购买车票的时候会有一个支付时间,8分钟没有支付就会销毁订单,返回车票。mq不可能时刻监控客户有没有支付,可以使用延迟消息,延迟8分钟,八分钟后再去发送消息到mq,在查看支付情况。

DelayExchange插件

官网下载地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

[{"CreatedAt": "2024-06-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]

 插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange 

 

 具体使用

声明交换机,基于@Bean:

java">import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("sdgstu.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}

基于注解:

java">@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "stusdg.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}

发送消息:

java">@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}


http://www.ppmy.cn/news/1525172.html

相关文章

美国洛杉矶ip有哪些独特优势

美国洛杉矶的IP地址独特优势主要体现在以下几个方面,rak小编为您整理发布美国洛杉矶的IP地址独特优势,希望 对您选择服务器有帮助。 1. 丰富的IP资源:美国洛杉矶多IP服务器提供的IP数量从几十到几百不等,最多可提供多达511个独立I…

使用Django 搭建自动化平台

由于本人python 环境已安装,就不重复安装了,博客中有python的安装说明; 1 Django 的安装 安装很简单: pip install django 但是国内的网络环境,你很难成功,此处省略一些字。。。。。 问题总要解决&#…

QT QObject源码学习(二)

一、全局函数 1、qt_qFindChildren_helper函数 在给定的父对象下,查找所有匹配指定条件的子对象,并将它们添加到一个列表中。 (1)声明 /*** brief 在给定的父对象下,查找所有匹配指定条件的子对象,并将它…

Leetcode3275. 第 K 近障碍物查询

Every day a Leetcode 题目来源:3275. 第 K 近障碍物查询 解法1:大根堆 维护前 k 小元素,可以用最大堆。 遍历数组 queries,计算点 (x,y) 到原点的曼哈顿距离 d∣x∣∣y∣。 把 d 入堆,如果堆大小超过 k&#xff…

clickhouse 保证幂等性

在分布式数据库系统 ClickHouse 中,幂等性通常涉及到在相同的操作被重复执行时,保证结果不会因为多次执行而发生变化。为了确保幂等性,ClickHouse 采用了一些机制来避免数据重复插入或处理。 以下是 ClickHouse 保证幂等性的一些关键机制&am…

SpringBoot的Web开发支持

使用spring-boot-starter-web启动器,开始web支持,内嵌一个Tomcat,添加了对于SpringMVC的支持。Spring Boot默认servlet容器为tomcat。 常用的服务器配置 配置端口号Spring Boot 默认端口是8080,如果想要进行更改的话,…

使用docker Desktop docker build 报错 无法拉取 nginx 镜像

具体报错信息:ERROR: failed to solve: nginxinc/nginx-unprivileged:alpine: failed to resolve source metadata for docker.io/nginxinc/nginx-unprivileged:alpine: failed to authorize: failed to fetch oauth token: Post "https://auth.docker.io/toke…

支持iPhone 16新品预售,饿了么同步上线专人配送等特色服务

9月10日凌晨,2024年 Apple 秋季新品发布会上正式揭晓iPhone 16新机。9月10日一早,饿了么同步宣布:今年将携手近4000家Apple 授权专营店,支持iPhone 16新品预售及现货的同步开售。新机现货首发当日,饿了么消费者最快半小…