延迟队列调研

embedded/2024/11/27 13:08:35/

调研延迟队列的实现方案

  • 使用 RocketMQ 设置延迟时间级别延时投递的延时队列

  • 使用 Redisson 提供的 DelayedQueue

  • 使用 Redis 的过期监听 -- key过期事件的时效性问题(惰性清除、定时随机删除)

  • 使用 RabbitMQ 的死信队列 --

死信队列的设计目的是为了存储没有被正常消费的消息,便于排查和重新投递。死信队列没有对投递时间做出保证,在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信。

为了解决这个问题,Rabbit 官方推出了延迟投递插件 rabbitmq-delayed-message-exchange ,推荐使用官方插件来做延时消息

  • 使用非持久化的时间轮

参考:https://mp.weixin.qq.com/s/dopgOC5zdvyoy5phjAlH3A

  • 利用 Redis 的 sorted set + list + 轮询来自研延时队列

参考:https://tech.youzan.com/queuing_delay/

综合考虑决定:采用了Redisson实现延迟队列,Redisson命令都采用Lua表达式执行,保证原子性,线程安全。

第一个任务延迟的问题,Redisson方案理论上是没有延迟的,但是当消息数量增加,消费者消费缓慢这个情况下可能会导致延迟任务消费的延迟。

第二个丢消息的问题,Redisson方案很大程度上减轻了丢消息的可能性,因为所有的任务都是存在list和sorted set两种数据类型中,Redis有持久化机制,就算Redis宕机,也就可能会丢一点点数据。

第三个广播消费任务的问题,这个是不会出现的,因为每个客户端都是从同一个目标队列中获取任务的。

实现代码

RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue(delayQueueCode);RDelayedQueue<Object> delayQueue = redissonClient.getDelayedQueue(blockingQueue);delayQueue.offer(task, seconds, TimeUnit.SECONDS);Object task = blockingQueue.poll();

版本配套 :https://blog.csdn.net/Mr_lqh325/article/details/132557086

Redisson延迟队列实现原理

                               

pgDelayQueue前面的前缀都是固定的,Redisson创建的时候会拼上前缀。

  • redisson_delay_queue_timeout:pgDelayQueue,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务。

  • redisson_delay_queue:pgDelayQueue,list数据类型,也是存放所有的任务,主要用来复制数据和提供阻塞实现。

  • pgDelayQueue,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,RBlockingQueue的take方法和pull方法是从这个目标队列中获取到任务。

  • redisson_delay_queue_channel:pgDelayQueue,是一个channel,用来通知客户端开启一个延迟任务

核心流程

写入数据

当调用 offer 时,系统会自动计算到期时间戳:System.currentTimeMillis() + timeUnit.toMillis(delay)

利用 ThreadLocalRandom 生成随机数,和消息共同生成二进制数据。有序集合中不允许写入重复数据,利用随机数来解决相同消息投递问题。

将生成的二进制数据同时写入有序集合和列表中,然后通过 Redis 发布订阅功能发布超时时间戳的消息。

计时

redisson#getDelayedQueue(blockingFairQueue) 创建时,系统会订阅 redisson_delay_queue_channel:{channelName} channel,接收到上面发送的时间戳。

将时间戳写入 Netty 时间轮中,等待时间到时触发。

触发时会去 zset 默认查询 100 条消息,查询维度是 0 到当前时间戳,这样就能查出所有应当过期的消息,遍历消息,将 zset、list 中的该消息删除,并将消息写入到阻塞队列中。

消费数据

客户端通过 blpop 命令持续监听阻塞队列,当有消息过期时,获取值,进行逻辑判断。

总结

  1. Redisson 利用 两个list + 一个 sorted set + pub/sub + Netty实现的时间轮来实现延时队列

  2. Redisson 利用 Redis 的发布/订阅(pub/sub)机制或阻塞队列(如 BLPOP)等特性,实现等待/通知的行为,避免了长时间的轮询。

  3. Redisson 基于发布订阅做消息触发,基于Netty实现的时间轮做计时。

Redisson 3.9.1版本原码

订阅:redisson_delay_queue_channel:pgDelayQueue

pushTask() : 主要是监听到期消息,移动到期消息到延迟队列pgDelayQueue中

scheduleTask(startTime) : 主要是监听新增的消息到期时间戳,判断如果离到期小于等于10毫秒则直接执行puahTask(),如果大于10秒,则创建时间轮节点并加入到Netty的时间轮中,等待到期

参考:

https://mp.weixin.qq.com/s/zyE8zodlI36SZNPbE5wkrg?poc_token=HBuJRWejK_-Am_011w_rTMLV9ZHKso9h7PbyBrkl

https://mp.weixin.qq.com/s/7WUaNibS0YUJdwsuaHlRuw

https://mp.weixin.qq.com/s/o_psQXGYFTRtp4VLfYk_Wg

https://mp.weixin.qq.com/s/9S59l2-nhYLlsH9hedlOwg

https://tech.youzan.com/queuing_delay/

https://mp.weixin.qq.com/s/dopgOC5zdvyoy5phjAlH3A


http://www.ppmy.cn/embedded/140904.html

相关文章

【C++】顺序容器(二):顺序容器操作

9.3 顺序容器操作 9.3.1 向顺序容器添加元素 除 array 外&#xff0c;所有标准库容器都提供了灵活的内存管理。在运行时可以动态添加或删除元素来改变容器大小。 使用 push_back 除 array 和 forward_list 之外&#xff0c;每个顺序容器&#xff08;包括 string 类型&#…

腾讯云OCR车牌识别实践:从图片上传到车牌识别

在当今智能化和自动化的浪潮中&#xff0c;车牌识别&#xff08;LPR&#xff09;技术已经广泛应用于交通管理、智能停车、自动收费等多个场景。腾讯云OCR车牌识别服务凭借其高效、精准的识别能力&#xff0c;为开发者提供了强大的技术支持。本文将介绍如何利用腾讯云OCR车牌识别…

Opencv+ROS实现颜色识别应用

目录 一、工具 二、原理 概念 本质 三、实践 添加发布话题 主要代码 四、成果 五、总结 一、工具 opencvros ubuntu18.04 摄像头 二、原理 概念 彩色图像&#xff1a;RGB&#xff08;红&#xff0c;绿&#xff0c;蓝&#xff09; HSV图像&#xff1a;H&#xff0…

构建与优化数据仓库-实践指南

数仓构建流程 下图为MaxCompute数据仓库构建的整体流程。 基本概念 在正式学习本教程之前&#xff0c;您需要首先理解以下基本概念&#xff1a; 业务板块&#xff1a;比数据域更高维度的业务划分方法&#xff0c;适用于庞大的业务系统。 维度&#xff1a;维度建模由Ralph Ki…

数据结构--AVL树(平衡二叉树)

✅博客主页:爆打维c-CSDN博客​​​​​​ &#x1f43e; &#x1f539;分享c、c知识及代码 &#x1f43e; &#x1f539;Gitee代码仓库 五彩斑斓黑1 (colorful-black-1) - Gitee.com 一、AVL树是什么&#xff1f;&#xff08;含义、性质&#xff09; 1.AVL树的概念 AVL树是最…

Linux sed基本命令

sed基础 流编辑器&#xff08;编辑器&#xff1a;进行增删改查动作&#xff09;&#xff0c;可以对sed每一行进行增删改查 sed [-option] 模式 操作指令 文件名 示例&#xff1a; 第三行会重复打印一次 P&#xff08;打印操作&#xff09; 自上而下&#xff0c;一行一行读…

pikachu平台xss漏洞详解

声明&#xff1a;文章只是起演示作用&#xff0c;所有涉及的网站和内容&#xff0c;仅供大家学习交流&#xff0c;如有任何违法行为&#xff0c;均和本人无关&#xff0c;切勿触碰法律底线 文章目录 概述&#xff1a;什么是xss一、反射型XSS1. get2. post 二、存储型XSS三、DOM…

Sui 链游戏开发实战:用 Move 写一个链上剪刀石头布游戏!

系列文章目录 Task1&#xff1a;hello move&#x1f6aa; Task2&#xff1a;move coin&#x1f6aa; Task3&#xff1a;move nft&#x1f6aa; Task4&#xff1a;move game&#x1f6aa; 更多精彩内容&#xff0c;敬请期待&#xff01;✌️ 文章目录 系列文章目录前言什么是 …