通过Redisson构建延时队列并实现注解式消费

embedded/2025/2/3 21:39:41/

目录

  • 一、序言
  • 二、延迟队列实现
    • 1、Redisson延时消息监听注解和消息体
    • 2、Redisson延时消息发布器
    • 3、Redisson延时消息监听处理器
  • 三、测试用例
  • 四、结语

一、序言

两个月前接了一个4万的私活,做一个线上商城小程序,在交易过程中不可避免的一个问题就是用户下单后的订单自动取消。

目前成熟的方案有通过RabbitMQ+死信队列RabbitMQ+延迟消息插件RocketMQ定时消息推送Redisson延时队列来实现。

考虑到商城的定位和用户体量,以及系统维护成本,其实完全没有必要引入消息中间件,借助Redis其实就可以轻松实现这个需求。

加上Redisson客户端本身就已经实现了很多分布式集合工具类,借助阻塞队列和延时队列就可轻松搞定。

当然,为了使用方便以及团队协作,顺便模仿@RabbitListener封装了一套基于注解的消息消费,废话不多说,直接上代码。


二、延迟队列实现

Redisson_15">1、Redisson延时消息监听注解和消息体

延迟消息监听器定义:

/*** Redisson延时队列监听器** @author Nick Liu* @date 2024/11/13*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedissonDelayedQueueListener {/*** 队列名称* @return*/String queueName();
}

消息体定义:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RedisDelayedMsgDTO {/*** 消息内容*/private String msg;/*** 队列名称*/private String queueName;/*** 延时时间*/private long delayTime;private TimeUnit timeUnit;
}

Redisson_62">2、Redisson延时消息发布器

@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedMsgPublisher {private final RedissonClient redissonClient;/*** 发布延时信息* @param delayedMsgDTO*/public void publishDelayedMsg(RedisDelayedMsgDTO delayedMsgDTO) {log.info("开始发布延迟消息: {}", FastJsonUtils.toJsonString(delayedMsgDTO));RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(delayedMsgDTO.getQueueName());RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(delayedMsgDTO.getMsg(), delayedMsgDTO.getDelayTime(), delayedMsgDTO.getTimeUnit());}
}

这里我们借助RBlockingQueueRDelayedQueue来实现,只有当延迟消息快到期时,消费者才能从阻塞队列拉取到消息,否则消费者将一直阻塞。

Redisson_86">3、Redisson延时消息监听处理器

这里我们定义了一个BeanPostProcessor 的实现,目的就是为了扫描Spring容器中所有带RedissonDelayedQueueListener注解的Bean实例和方法。

/*** Redisson延迟队列Bean后处理器* @author Nick Liu* @date 2025/1/3*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedQueuePostProcessor implements BeanPostProcessor {private final RedissonClient redissonClient;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 获取最终的目标运行时对象Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);Method[] methods = clazz.getDeclaredMethods();for (Method m : methods) {if (!m.isAnnotationPresent(RedissonDelayedQueueListener.class)) {continue;}// 如果Bean上的方法有Redisson队列监听注解,则启动一个线程监听队列RedissonDelayedQueueListener annotation = m.getAnnotation(RedissonDelayedQueueListener.class);CompletableFuture.runAsync(() -> {log.info("开始监听Redisson延时队列[{}]消息", annotation.queueName());while (true) {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(annotation.queueName());redissonClient.getDelayedQueue(blockingQueue);try {String msg = blockingQueue.take();MDC.put(CommonConst.X_REQUEST_ID, SerialNoUtils.generateSimpleUUID());log.info("监听到队列[{}]延时消息: {}", annotation.queueName(), msg);m.invoke(bean, msg);MDC.remove(CommonConst.X_REQUEST_ID);} catch (Exception e) {log.error(e.getMessage(), e);}}});}return bean;}}

这里我们扫描到指定Bean的方法后,会开启一个异步线程,并轮询拉取延时消息,如果消息没过期,异步线程将会一直阻塞等待。


三、测试用例

/*** @author Nick Liu* @date 2025/2/2*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class RedissonDelayedMsgController {private static final String DELAYED_QUEUE = "redisson:delayed:queue";private final RedissonDelayedMsgPublisher redissonDelayedMsgPublisher;@GetMapping("/delayed/msg")public ResponseEntity<RedisDelayedMsgDTO> publishDelayedMsg() {RedisDelayedMsgDTO redisDelayedMsgDTO = new RedisDelayedMsgDTO();redisDelayedMsgDTO.setQueueName(DELAYED_QUEUE);redisDelayedMsgDTO.setMsg("This is a delayed msg");redisDelayedMsgDTO.setDelayTime(10);redisDelayedMsgDTO.setTimeUnit(TimeUnit.SECONDS);redissonDelayedMsgPublisher.publishDelayedMsg(redisDelayedMsgDTO);return ResponseEntity.ok(redisDelayedMsgDTO);}@RedissonDelayedQueueListener(queueName = DELAYED_QUEUE)public void handleDelayedMsg(String msg) {log.info("Received delayed msg: {}", msg);}
}

启动服务后,Bean后处理器会启动异步线程监听延时消息,如下:

2025-02-02 16:46:04.271 INFO  [] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():44] - 开始监听Redisson延时队列[redisson:delayed:queue]消息

浏览器直接输入http://localhost:8000/delayed/msg发布延时消息,10s后消费者进行处理,如下:

2025-02-02 16:43:11.107 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():51] - 监听到队列[redisson:delayed:queue]延时消息: This is a delayed msg
2025-02-02 16:43:11.108 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [com.xlyj.contoller.RedissonDelayedMsgController.handleDelayedMsg():40] - Received delayed msg: This is a delayed msg

四、结语

虽说通过Redisson实现的延迟队列也能实现支付订单的自动取消,但是可用性相比专业的消息中间件还是尚有不足的。

比如消息生产者发送消息没有确认机制,消息消费也没有确认机制,这两个环节都有可能导致消息丢失。

当然我们可以通过其它保障机制去补偿,比如再加上定时任务扫表,把扫描时间可以设置长一点,保证最终的一致性。

在大型项目中还是优先推荐专业的消息中间件去实现延时消息消费。


http://www.ppmy.cn/embedded/159275.html

相关文章

Vue 响应式渲染 - 列表布局和v-html

Vue 渐进式JavaScript 框架 基于Vue2的学习笔记 - Vue 响应式渲染 - 列表布局和v-html 目录 列表布局 简单渲染列表 显示索引值 点击变色 V-html 作用 注意 采用策略 应用 总结 列表布局 简单渲染列表 Data中设置状态&#xff0c;是一个数组格式的默认信息。 然后…

AAPM:基于大型语言模型代理的资产定价模型,夏普比率提高9.6%

“AAPM: Large Language Model Agent-based Asset Pricing Models” 论文地址&#xff1a;https://arxiv.org/pdf/2409.17266v1 Github地址&#xff1a;https://github.com/chengjunyan1/AAPM 摘要 这篇文章介绍了一种利用LLM代理的资产定价模型&#xff08;AAPM&#xff09;…

计算机网络 应用层 笔记1(C/S模型,P2P模型,FTP协议)

应用层概述&#xff1a; 功能&#xff1a; 常见协议 应用层与其他层的关系 网络应用模型 C/S模型&#xff1a; 优点 缺点 P2P模型&#xff1a; 优点 缺点 DNS系统&#xff1a; 基本功能 系统架构 域名空间&#xff1a; DNS 服务器 根服务器&#xff1a; 顶级域…

LeetCode - #196 删除重复的电子邮件并保留最小 ID 的唯一电子邮件

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…

【C++篇】哈希表

目录 一&#xff0c;哈希概念 1.1&#xff0c;直接定址法 1.2&#xff0c;哈希冲突 1.3&#xff0c;负载因子 二&#xff0c;哈希函数 2.1&#xff0c;除法散列法 /除留余数法 2.2&#xff0c;乘法散列法 2.3&#xff0c;全域散列法 三&#xff0c;处理哈希冲突 3.1&…

VSCode插件HTML CSS Support

1、打开VSCode软件&#xff0c;找到应用商城&#xff0c;搜索并安装插件“HTML CSS Support”。 2、“HTML CSS Support”插件提供代码片段快速插入功能&#xff1a;如输入 div.main 后按enter键会自动生成一个带有 class 为 main 的 div 标签。 3、“HTML CSS Support”插件…

使用 Numpy 自定义数据集,使用pytorch框架实现逻辑回归并保存模型,然后保存模型后再加载模型进行预测,对预测结果计算精确度和召回率及F1分数

1. 导入必要的库 首先&#xff0c;导入我们需要的库&#xff1a;Numpy、Pytorch 和相关工具包。 import numpy as np import torch import torch.nn as nn import torch.optim as optim from sklearn.metrics import accuracy_score, recall_score, f1_score2. 自定义数据集 …

Git进阶之旅:Git 配置信息 Config

Git 配置级别&#xff1a; 仓库级别&#xff1a;local [ 优先级最高 ]用户级别&#xff1a;global [ 优先级次之 ]系统级别&#xff1a;system [ 优先级最低 ] 配置文件位置&#xff1a; git 仓库级别对应的配置文件是当前仓库下的 .git/configgit 用户级别对应的配置文件时用…