第十四章 RabbitMQ延迟消息之延迟队列

ops/2024/10/20 6:33:18/

目录

一、引言

二、死信队列

三、核心代码实现

四、运行效果 

五、总结


一、引言

什么是延迟消息?

发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后收到消息。

什么是延迟任务?

设置在一定时间之后才执行的任务。

延迟消息使用场景

我们在实际项目中经常会有一些场景,需要延迟指定时间后发送消息,比如在电商或者外卖平台中订单10分钟后自动取消功能等。

对于上述延迟消息的场景,我们该怎么实现呢?

RabbitMQ 官方并没有直接内置延迟消息的功能,但是可以通过 TTL(Time-To-Live)和死信队列(Dead Letter Exchanges)的组合来实现延迟消息的效果,另外RabbitMQ 也可以通过安装延迟消息插件的方式来实现。

二、死信队列

电商购物中,针对用户下单扣减库存的服务逻辑,我们希望删除10分钟后状态为未支付的订单。在过去的项目中,我们可能第一时间会想到通过定时任务定期查询未支付的订单并做删除来实现:

定时任务会有两个问题:

1. 当针对订单量特别大的电商项目而言,定时任务间断性地查询整个订单数据会极大增加订单服务的压力。

2. 定时任务存在时间上的滞后性。

通过使用RabbitMQ延迟消息,我们可以在完成需求的同时,有效的避免上述问题。如下图所示,用户通过交易服务下单(状态为未支付),随后交易服务调用商品服务扣减库存。 用户在调用交易服务的同时发送一个延迟消息到RabbitMQ,10分钟后交易服务收到消息,此时如果订单还是未支付状态,则取消订单。

RabbitMQ中的死信队列,就是一种可以实现延迟消息的方式。当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

1. 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false

2. 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费

3. 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

三、核心代码实现

package com.example.consumer;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 常规的RabbitMQ 交换机/队列绑定配置类*/
@Configuration
public class RabbitMQConfig {@BeanQueue normalQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("normal.queue").deadLetterExchange("dead.direct").deadLetterRoutingKey("dead").build();}@BeanDirectExchange normalDirect() {return ExchangeBuilder.directExchange("normal.direct").build();}@BeanBinding bindingNormal(Queue normalQueue, DirectExchange normalDirect) {return BindingBuilder.bind(normalQueue).to(normalDirect).with("normal");}@BeanQueue deadQueue() {// 使用 QueueBuilder 创建一个持久化队列return QueueBuilder.durable("dead.queue").build();}@BeanDirectExchange deadDirect() {return ExchangeBuilder.directExchange("dead.direct").build();}@BeanBinding bindingDead(Queue deadQueue, DirectExchange deadDirect) {return BindingBuilder.bind(deadQueue).to(deadDirect).with("dead");}
}
package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;import java.nio.charset.StandardCharsets;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {String content = "生活不易,所以保持足够的努力,对自己要有信心,积极地去面对工作生活的挑战!";Message message = MessageBuilder.withBody(content.getBytes(StandardCharsets.UTF_8)).setExpiration("10000").build();rabbitTemplate.convertAndSend("normal.direct","normal", message);}
}
package com.example.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;/*** 消费者*/
@Slf4j
@Component
public class SimpleListener {@RabbitListener(queues = "dead.queue")public void listener1(Message message) throws Exception {String msg = new String(message.getBody(), StandardCharsets.UTF_8); ;System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");}
}

四、运行效果 

五、总结

虽然我们通过RabbitMQ的死信队列能够实现延迟消息的功能,但是通过代码我们可以看到,这种实现方式相对来说比较繁琐。而且关键是RabbitMQ提供死信队列的初衷并不是让我们用来发送延迟消息的,而是为了作为兜底方案,来接收没有消费的死信的,便于定位问题。因此,后续章节会给大家讲解更优的解决方案,即延迟插件。


http://www.ppmy.cn/ops/124907.html

相关文章

蓝桥杯【物联网】零基础到国奖之路:十八. 扩展模块之光敏和AS312

蓝桥杯【物联网】零基础到国奖之路:十八.扩展模块之光敏和AS312 第一节 硬件解读第二节 CubeMX配置第二节 代码 第一节 硬件解读 光敏和AS312如下图: 光敏电阻接到了扩展模块的5号引脚,5号引脚接了2个电阻,R8和光敏电阻。我们通过ADC读取这…

【C语言】使用结构体实现位段

文章目录 一、什么是位段二、位段的内存分配1.位段内存分配规则练习1练习2 三、位段的跨平台问题四、位段的应用五、位段使用的注意事项 一、什么是位段 在上一节中我们讲解了结构体,而位段的声明和结构是类似的,它们有两个不同之处,如下&…

Meta 发布 Quest 3S 头显及 AR 眼镜原型:开启未来交互新视界

简介 在科技的浪潮中,Meta 始终站在创新的前沿,不断为我们带来令人惊叹的虚拟现实和增强现实体验。2024 年 10 月 6 日,让我们一同聚焦 Meta 最新发布的 Quest 3S 头显及 AR 眼镜原型(Orion),探索这两款产品…

Facebook直播障碍的多重原因及解决方案

在社交媒体迅猛发展的今天,直播已经成为用户与观众互动、分享内容的重要方式。然而,作为全球最大的社交平台之一,Facebook在直播过程中常常遇到各种障碍,使得用户无法顺利进行直播。本文将深入探讨Facebook无法直播的原因&#xf…

追加word,返回中第 k 个字符的值

Alice 和 Bob 正在玩一个游戏。最初,Alice 有一个字符串 word "a"。 给定一个正整数 k。 现在 Bob 会要求 Alice 执行以下操作 无限次 : 将 word 中的每个字符 更改 为英文字母表中的 下一个 字符来生成一个新字符串,并将其 追加 到原始的…

您是否也在寻找免费的 PDF 编辑器工具?10个备选PDF 编辑器工具

您是否也在寻找免费的 PDF 编辑器工具? 如果是,那么您在互联网上处于最佳位置! 本指南中提到的所有 10 大免费 PDF 编辑器工具都易于使用,可以允许您添加文本、更改图像、添加图形、填写表格、添加签名等等。 因此,…

HarmonyOS NEXT 应用开发实战(三、ArkUI页面底部导航TabBar的实现)

在开发HarmonyOS NEXT应用时,TabBar是用户界面设计中不可或缺的一部分。本文将通过代码示例,带领大家一同实现一个常用的TabBar,涵盖三个主要的内容页:首页、知乎日报和我的页面。以模仿知乎日报的项目为背景驱动,设定…

《Linux从小白到高手》综合应用篇:深入理解Linux进程调优

本篇深入介绍Linux进程调优. 1. Linux系统进程类型: Linux的进程可能有成千上万个: ‌新建状态‌:进程刚刚被创建,但尚未运行。 ‌就绪状态‌:进程已经准备好运行,等待CPU分配。 ‌运行状态‌&#xff1…