MQ高级(二)死信交换机

news/2024/11/26 4:20:49/

一、初识死信交换机(P159)

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter)

(1)消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false

(2)消息是一个过期消息,超时无人消费

(3)要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

 

二、TTL

TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:

(1)消息所在的队列设置了存活时间

(2)消息本身设置了存活时间

 

我们声明一组死信交换机和队列,基于注解方式:

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.queue", durable = "true"),exchange = @Exchange(name = "dl.direct"),key = "dl"))public void listenDlQueue(String msg) {log.info("消费者接收到了dl.queue的延迟消息");}}

要给队列设置超时时间,需要在声明队列时配置 x-message-ttl 属性:

@Configuration
public class TTLMessageConfig {@Beanpublic DirectExchange ttlDirectExchange(){return new DirectExchange("ttl.direct");}@Beanpublic Queue ttlQueue(){return QueueBuilder.durable("ttl.queue").ttl(10000).deadLetterExchange("dl.direct").deadLetterRoutingKey("dl").build();}@Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");}
}

发送消息时,给消息本身设置超时时间

    @Testpublic void testTTLMessage() {// 1.准备消息Message message = MessageBuilder.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000").build();// 2.发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);// 3.记录日志log.info("消息已经成功发送!");}

队列和消息设置 ttl 属性,两者共存时,以时间短的 ttl 为准。

三、延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

(1)延迟发送短信

(2)用户下单,如果用户在15 分钟内未支付,则自动取消

(3)预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

1. SpringAMQP使用延迟队列插件

DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。

基于注解方式:

@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listenDelayExchange(String msg) {log.info("消费者接收到了delay.queue的延迟消息");}
}

基于java代码的方式:

 

然后我们向这个delay为true的交换机中发送消息,一定要给消息添加一个header:x-delay,值为延迟的时间,单位为毫秒:

    @Testpublic void testSendDelayMessage() throws InterruptedException {// 1.准备消息Message message = MessageBuilder.withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader("x-delay", 5000).build();// 2.准备CorrelationDataCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.发送消息rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);log.info("发送消息成功");}

判断是否是延迟消息。是一个延迟消息,忽略这个错误提示

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 判断是否是延迟消息Integer receivedDelay = message.getMessageProperties().getReceivedDelay();if (receivedDelay != null && receivedDelay > 0) {// 是一个延迟消息,忽略这个错误提示return;}// 记录日志log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息});}
}

延迟队列插件的使用步骤包括哪些?

(1)声明一个交换机,添加delayed属性为true

(2)发送消息时,添加x-delay头,值为超时时间


http://www.ppmy.cn/news/2351.html

相关文章

ccf寻宝!大冒险!python满分(敲开心~)

寻宝 大冒险 题目传送:http://118.190.20.162/view.page?gpidT147 思路及代码: 核心点就是哈希。 之前刷的是70分,找不到之前的代码了,大概是建了一个很大的表,然后一点点比较吧。 今天再刷,上来就是…

外贸小白,一直不出单怎么办?

米贸搜今天,试着给新人一些方法和技巧,让你尽快在公司立足! 事实上,规定几个月内下单的公司,往往都是平台有投资,去展会了,有大量营销费用的公司。当然,老板急着收回成本。对于有足…

HCIA OSI参考模型

一、前言 OSI七层模型是我们耳熟能详的,其实没有太多可以说的地方,我这里就按自己的理解做一下汇总。 二、OSI 七层模型 OSI七层模型是由“国际标准化组织”制定的“参考”模型。 1、物理层 实际上就是对网线、光纤等“连接”介质进行规定&#xff…

数组实现单链表和双链表

全文目录😀 数组实现的优势🤔 单链表😕 初始化😕 头插😕 在下标 k 后面插入元素😕 删除下标 k 后面的元素😕 遍历😵‍💫 双链表🤨 初始化🤨 插入&…

什么事Jupyter Notebook?

Jupyter Notebook是基于网页的用于交互计算的应用程序。其可被应用于全过程计算:开发、文档编写、运行代码和展示结果。 简而言之,Jupyter Notebook是以网页的形式打开,可以在网页页面中直接编写代码和运行代码,代码的运行结果也…

如何使用Python连接数据库

数据分析离不开数据库,如何使用python连接数据库呢?听我娓娓道来哈 补充:文末增加Oracle数据库的连接方式,大同小异。 背景: 我是在Anaconda notebook中进行连接实验的,环境Python3.6,当然也…

手把手教你打造一款个人专属Android桌面

实现方式两种 1.从头到尾写一个apk然后把系统的属性加上去,然后启动的时候默认就指定到这个apk的包名,他就启动, 2.我们基于Androidlauncher3的源码去做一个定制化的修改 分析一下这两种的区别, 自定义,要有丰富的…

软件测试——用例

一、测试用例 1.定义 向被测试系统发起的一组集合,包含测试环境、测试步骤、测试数据、预期结果 2.为什么软件测试人员要写测试用例? (1)测试用例是测试执行的依据 (2)测试用例可以复用,在进行回…