Redisson-DelayedQueue-原理

server/2024/10/22 7:36:46/

归档

Unit-Test

常规测试

@Test
public void testCommon() throws InterruptedException {RBlockingQueue<String> destinationQueue = redisson.getBlockingQueue("delay_queue"); // 目标队列RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(destinationQueue); // 只是对目标队列的一个封装destinationQueue.offer("22_1");destinationQueue.offer("22_2");delayedQueue.offer("1_1_1", 2, TimeUnit.SECONDS);delayedQueue.offer("1_1_2", 3, TimeUnit.SECONDS);for (int i = 0; i < 4; i++) {String e0 = destinationQueue.poll();System.out.println("=========> e0: " + e0);}Thread.sleep(2000);System.out.println("------------------");for (int i = 0; i < 2; i++) {String e1 = destinationQueue.poll();System.out.println("=========> e1: " + e1);}Thread.sleep(2000);System.out.println("------------------");for (int i = 0; i < 2; i++) {String e2 = destinationQueue.poll();System.out.println("=========> e2: " + e2);}
}// 输出
=========> e0: 22_1
=========> e0: 22_2
=========> e0: null
=========> e0: null
------------------
// 等待 2s 之后,才获取到
=========> e1: 1_1_1
=========> e1: null
------------------
// 再等 2s 之后,才获取到
=========> e2: 1_1_2
=========> e2: null

说明

/*** 构造器,将目标队列转入,并启用定时转移任务 */
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {super(codec, commandExecutor, name);channelName = prefixName("redisson_delay_queue_channel", getRawName());queueName = prefixName("redisson_delay_queue", getRawName());timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {@Overrideprotected RFuture<Long> pushTaskAsync() {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "+ "if #expiredValues > 0 then "+ "for i, v in ipairs(expiredValues) do "+ "local randomId, value = struct.unpack('dLc0', v);"+ "redis.call('rpush', KEYS[1], value);" // 添加到目标队列里面去+ "redis.call('lrem', KEYS[3], 1, v);" // 删除缓存队列+ "end; "+ "redis.call('zrem', KEYS[2], unpack(expiredValues));" // 删除延迟排序的任务+ "end; "// get startTime from scheduler queue head task+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "+ "if v[1] ~= nil then "+ "return v[2]; "+ "end "+ "return nil;",Arrays.asList(getRawName(), timeoutSetName, queueName),System.currentTimeMillis(), 100);}...};// 开启任务queueTransferService.schedule(queueName, task);this.queueTransferService = queueTransferService;
}/*** 添加延时的队列元素 */
@Override
public void offer(V e, long delay, TimeUnit timeUnit) {get(offerAsync(e, delay, timeUnit));
}/*** Lua 添加延时的队列元素 */
@Override
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {...long delayInMs = timeUnit.toMillis(delay);long timeout = System.currentTimeMillis() + delayInMs;long randomId = ThreadLocalRandom.current().nextLong();return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" // 添加到 ZSet 排序+ "redis.call('rpush', KEYS[3], value);" // 添加到缓存队列// if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); "+ "if v[1] == value then "+ "redis.call('publish', KEYS[4], ARGV[1]); "+ "end;",Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),timeout, randomId, encode(e));
}

http://www.ppmy.cn/server/51474.html

相关文章

Linux使用——查看发行版本、内核、shell类型等基本命令

先做快照 虚拟机中编辑网络 关机 普通账户和管理员账户 互相对照 localhost相当于IP 参数: 短格式:以减号(-)开头&#xff0c;参数字母 长格式:以2个减号(--)后跟上完整的参数单词 当前发行版本 [rootserver ~]# cat /etc/redhat-release Red Hat Enterprise Linux release 9.…

网络分层之7层讲解

网络分层 网络分层就是将网络节点所要完成的数据的发送或转发、打包或拆包&#xff0c;控制信息的加载或拆出等工作&#xff0c;分别由不同的硬件和软件模块去完成。 一、物 理 层(Physical Layer) 要传递信息就要利用一些物理媒体&#xff0c;如双纽线、同轴电缆等&#xff…

html--404页面

<!DOCTYPE html> <html> <head> <meta http-equiv"Content-Type" content"text/html; charsetUTF-8"> <meta http-equiv"X-UA-Compatible" content"IEedge,chrome1"> <title>404 错误页面不存在&…

代码随想录算法训练营刷题复习10:二叉树、二叉搜索树复习2

二叉树、二叉搜索树 力扣题复习 110. 平衡二叉树257. 二叉树的所有路径404. 左叶子之和513. 找树左下角的值112.路径之和113.路经总和ii450. 删除二叉搜索树中的节点701. 二叉搜索树中的插入操作 110. 平衡二叉树 左右子树高度差要小于1 ->递归调用&#xff08;need新的函…

【html】用html5+css3+JavaScript制作一个计数器

目录 简介&#xff1a; 效果图&#xff1a; 源码&#xff1a; html: CSS: JS: 源码解析&#xff1a; 简介&#xff1a; 在日常生活当中很多事情都需要用到计数器特别是在体育运动当中&#xff0c;可以我们那么我们可不可以通过网页来制作一个计数器呢答案是肯定的我们需要利…

el-table多选分页回显

el-table多选分页回显 1.多选项添加 :reserve-selection"true" <el-table-column type"selection" align"center" width"55" :reserve-selection"true" ></el-table-column>reserve-selection : 仅对 typesel…

(一)Kafka 安全之使用 SASL 进行身份验证 —— JAAS 配置、SASL 配置

目录 一. 前言 二. JAAS 配置&#xff08;JAAS configuration&#xff09; 2.1. Kafka Broker 的 JAAS 配置 2.2. Kafka 客户端的 JAAS 配置 2.2.1. 使用客户端配置属性的 JAAS 配置 2.2.2. 使用静态配置文件的 JAAS 配置 三. SASL 配置&#xff08;SASL configuration&…

STM32单片机PWR电源控制详解

文章目录 1. PWR概述 2. 电源结构框图 3. 上电复位和掉电复位 4. 可编程电压监测器 5. 低功耗模式 6. 模式选择 6.1 睡眠模式 6.2 停止模式 6.3 待机模式 7. 代码示例 1. PWR概述 PWR&#xff08;Power Control&#xff09;电源控制&#xff0c;负责管理STM32内部的…