redisson的延时队列机制简述

news/2025/3/13 7:53:13/

概述

业务中经常会遇到一些延迟执行的需求;通常想到的都是rabbitmq或者rocketmq的延迟消息;
但是系统中不一定集成了mq,但为了控制分布式下的并发,一般redis都是有集成的;
rediskey过期监听那个时间不准确,在集群环境下节点挂了也容易丢失;

那么用redisson的延迟队列,正好可以用来解决轻量级的延时消息;
简单的来说就是消费者生产了一个消息任务,塞到ZSet里(用当前时间戳+延迟时间作为分数),等时间到了,就会放到任务List中,然后消费者真正去执行任务都是从任务List中获取任务;

redisson中的消费者并不是一直轮询获取任务;而是有具体时间的延迟任务,时间到了去任务队列中获取任务;

注意点,在消费者监听处如果使用thread相关操作因为redisson的默认线程nameredisson-netty会抛异常,我的处理方式是把相关操作都放到自己的线程池中操作.

官方解释是在netty线程中调用同步方法可能会导致超时;
issue:https://github.com/redisson/redisson/issues/3549

异常见源码

org.redisson.command.CommandAsyncService.get(org.redisson.api.RFuture<V>)

版本
redissonredisson-spring-boot-starter-3.17.6.jar
redis:6.2.7

redisson延时任务机制简述

生产者先将任务pushdelay_queue_timeout等待队列中,延迟时间到了,消费者会把任务从timeout队列挪到SANYOU任务队列中(消费者实际获取任务的队列),然后消费者就能拿到最终要执行的任务了;

这里具体要说的就是客户端通知和获取机制;
消费者在启动时通常都会去get一下队列,达到订阅队列的目的;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("SANYOU");
RDelayedQueue<String> delayQueue = redissonClient.getDelayedQueue(blockingQueue);

这样做的目的:
消费者订阅队列,从delay_queue_timeout等待延迟队列中将已经到达时间的任务挪到真正的任务List队列中,然后再将delay_queue_timeout队列中第一个(也就是第一个要执行的)的任务的时间拿到,用这个时间开启一个延迟任务,时间到了之后,会发布一个消息到时间通知channel中;然后客户端监听到这个channel中的消息后,会再次重复上述步骤,让delay_queue_timeout中的任务,可以都放到真正的任务List队列中;

这样有一个好处就是不用一直while扫描等待,客户端的延迟任务时间和delay_queue_timeout中的延迟时间是一样的,可以精准利用cpu,理论上是没有延迟的,但是实际消息数量大量增加,消费者消费比较慢,还是会造成延迟任务消费延迟;

另外由于客户端都是用lua脚本去redis的同一个List队列中获取任务,lua脚本在redis中都是原子任务,而且redis真正的操作是单线程的,所以不会存在任务广播情况(并发获取时,一个任务不会被多个消费者同时拿到);

捞一张图片
在这里插入图片描述

代码Demo


import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Component
public class RedissonDelayQueueConfig implements InitializingBean {@Resourceprivate RedissonClient redissonClient;//延时队列mapprivate final Map<String, RDelayedQueue<DelayMessageDTO>> delayQueueMap = new ConcurrentHashMap<>(16);/*** 消费者初始化所有队列,订阅对应的队列,并开启第一个过期任务的过期时间对应的延迟任务*/@PostConstructpublic void reScheduleDelayedTasks() {DelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueEnum.getCode());RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);}}@Overridepublic void afterPropertiesSet() {// 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumerDelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanName());if (delayQueueConsumer == null) {throw new ServiceException("queueName=" + queueEnum.getBeanName() + ",delayQueueConsumer=null,请检查配置...");}// Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,// 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。RBlockingQueue<DelayMessageDTO> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());//消费者初始化队列RDelayedQueue<DelayMessageDTO> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);//set到map中方便获取delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);// 订阅新元素的到来,调用的是takeAsync(),异步执行rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);}}public RedissonClient getRedissonClient() {return redissonClient;}public Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {return delayQueueMap;}
}import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class DelayQueueUtil {private static RedissonDelayQueueConfig redissonDelayQueueConfig;@Resourcepublic void setRedissonDelayQueueConfig(RedissonDelayQueueConfig redissonDelayQueueConfig) {DelayQueueUtil.redissonDelayQueueConfig = redissonDelayQueueConfig;}private static Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {if(null == redissonDelayQueueConfig) return Collections.emptyMap();return redissonDelayQueueConfig.getDelayQueueMap();}private static RedissonClient getRedissonClient() {if(null == redissonDelayQueueConfig) return null;return redissonDelayQueueConfig.getRedissonClient();}/*** 添加延迟消息*/public static void addDelayMessage(DelayMessageDTO delayMessage) {log.info("delayMessage={}", delayMessage);Assert.isTrue(getDelayQueueMap().containsKey(delayMessage.getQueueName()), "队列不存在");delayMessage.setCreateTime(DateUtil.now());if(null == delayMessage.getTimeUnit()){delayMessage.setTimeUnit(TimeUnit.SECONDS);}RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());//移除相同的消息rDelayedQueue.remove(delayMessage);//添加消息rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit());}/*** 移除指定队列中的消息*/public static void removeDelayMessage(DelayMessageDTO delayMessage) {log.info("取消:delayMessage={}", delayMessage);if (!getDelayQueueMap().containsKey(delayMessage.getQueueName())) {log.error("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());return;}RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());rDelayedQueue.remove(delayMessage);removeDelayQueue(delayMessage);}/*** 从所有队列中删除消息*/public static void removeDelayQueue(DelayMessageDTO value) {DelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {RBlockingDeque<Object> blockingDeque = getRedissonClient().getBlockingDeque(queueEnum.getCode());RDelayedQueue<Object> delayedQueue = getRedissonClient().getDelayedQueue(blockingDeque);delayedQueue.remove(value);}}}

参考了大佬的博文
https://lhalcyon.com/delay-task/index.html


http://www.ppmy.cn/news/1331141.html

相关文章

notepad++ v8.5.3 安装插件,安装失败怎么处理?下载进度为0怎么处理?

notepad v8.5.3 安装插件&#xff0c;安装失败&#xff1f;下载进度为0&#xff0c;怎么处理&#xff1f; 安装 进度 进度条没有进度 &#xff0c;然后就退出了&#xff0c;自动打开程序&#xff0c;不知道什么问题&#xff0c;插件管理下面也没有插件显示 找到问题了&#x…

网络协议与攻击模拟_07UDP协议

一、简单概念 1、UDP协议简介 UDP&#xff08;用户数据报&#xff09;协议&#xff0c;是传输层的协议。不需要建立连接&#xff0c;直接发送数据&#xff0c;不会重新排序&#xff0c;不需要确认。 2、UDP报文字段 源端口目的端口UDP长度UDP校验和 3、常见的UDP端口号 5…

百度网盘网页无法调起客户端 目前未解决 2024年1月出现BUG

最近一周突然发现百度网盘网页无法调起客户端&#xff0c;多方式排查还是没找出问题所在&#xff0c;怀疑是客户端更新了什么东西导致bug。先记录下各种排查法和复现情况。 具体问题是&#xff1a; *任意网盘链接&#xff0c;都无法唤醒客户端。 *客户端在退出状态网页点击下…

HCIA-Datacom实验指导手册:3、OSPF 路由协议基础实验

HCIA-Datacom实验指导手册:3、OSPF 路由协议基础实验 一、实验目的:二、实验拓扑:三、实验目的:四、配置步骤:步骤 1 掌握 OSPF 的基本配置命令步骤 2 掌握如何查看 OSPF 的运行状态步骤 3 掌握如何通过 Cost 控制 OSPF 的选路步骤 4 掌握 OSPF 发布默认路由的方法步骤 5 …

网络爬虫原理:探秘数字世界的信息猎手

欢迎来到这个关于网络爬虫原理的小小冒险之旅&#xff01;今天&#xff0c;我们将揭开数字世界的面纱&#xff0c;深入了解那些神秘的程序&#xff0c;它们如何在互联网的海洋中搜寻并捕获有用的信息。如果你对计算机世界的奥秘充满好奇&#xff0c;那么跟着我一起走进这个让人…

【Py/Java/C++三种语言详解】LeetCode每日一题240121【二分查找】LeetCode410、分割数组的最大值

文章目录 题目链接题目描述解题思路为什么使用二分贪心子问题 代码PythonJavaC时空复杂度 华为OD算法/大厂面试高频题算法练习冲刺训练 题目链接 LeetCode410、分割数组的最大值 题目描述 给定一个非负整数数组 nums 和一个整数 k &#xff0c;你需要将这个数组分成 k 个非空…

数据仓库-相关概念

简介 数据仓库是一个用于集成、存储和管理大量数据的系统。它用于支持企业决策制定过程中的数据分析和报告需求。数据仓库从多个来源收集和整合数据&#xff0c;并将其组织成易于查询和分析的结构。 数据仓库的主要目标是提供高性能的数据访问和分析能力&#xff0c;以便…

基于Springboot的大学生心理健康管理系统(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的大学生心理健康管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体…