Redis实现延迟队列

server/2024/9/24 11:11:14/

最近用到一个延迟消息的功能,第一时间想到使用MQ或者MQ的插件,因为数据量不大,所以尝试使用Redis来实现了,毕竟Redis也天生支持类似MQ的队列消费,所以,在这里总结了一下Redis实现延迟消息队列的方式。

一、监听key过期时间

处理流程:当redis的一个key过期时,redis会生成一个事件,通知订阅了该事件的客户端(KeyExpirationEventMessageListener),然后在客户端的回调方法中处理逻辑。
1)新建SpringBoot项目,maven依赖及yml如下
maven依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

yml文件

server:port: 8000spring:redis:database: 0host: xxxxport: 6379password: xxxxxxlettuce:pool:#最大连接数max-active: 8#最大阻塞等待时间max-wait: -1#最大空闲max-idle: 8#最小空闲min-idle: 0#连接超时时间timeout: 5000

2)修改redis.conf文件开启事件通知配置
默认的配置:notify-keyspace-events “”
修改为:notify-keyspace-events Ex,该配置表示监听key的过期事件

3)设置Redis监听配置,注入Bean RedisMessageListenerContaine

@Configuration
public class RedisTimeoutConfiguration {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);return redisMessageListenerContainer;}@Beanpublic KeyExpiredListener keyExpiredListener() {return new KeyExpiredListener(this.redisMessageListenerContainer());}
}

4)创建监听器类,重写key过期回调方法onMessage

@Slf4j
public class KeyExpiredListener extends KeyExpirationEventMessageListener {@Autowiredpublic RedisTemplate<String, String> redisTemplate;public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overridepublic void onMessage(Message message, byte[] bytes) {String channel = new String(message.getChannel(), StandardCharsets.UTF_8);//过期的keyString key = new String(message.getBody(), StandardCharsets.UTF_8);log.info("redis key 过期:bytes={},channel={},key={}", new String(bytes), channel, key);}
}

5)编写测试接口:写入一个带过期时间的key

@RestController
@RequestMapping("/demo")
public class BasicController {@Autowiredpublic RedisTemplate<String, String> redisTemplate;@GetMapping(value = "/test")public void redisTest() {redisTemplate.opsForValue().set("test", "5s后过期", 5, TimeUnit.SECONDS);}
}

执行后,onMessage监听方法打印结果:

 redis key 过期:bytes=__keyevent@*__:expired,channel=__keyevent@0__:expired,key=test

该方案缺点:可靠性问题,Redis 是一个内存数据库,尽管它提供了数据持久化选项(如 RDB 和 AOF),但在某些情况下(如意外崩溃或重启),可能会丢失一些未处理的过期事件。

二、zset + score

基本思路是将消息按需发送的时间作为分数存储在有序集合zset中,然后定期检查并处理到期的消息。代码例子如下:
1)创建 DelayedMessageService 类

@Slf4j
@Service
public class DelayedMessageService {private static final String DELAYED_MESSAGES_ZSET = "delayed:messages";@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void addMessage(String message, long delayMillis) {long score = System.currentTimeMillis() + delayMillis;redisTemplate.opsForZSet().add(DELAYED_MESSAGES_ZSET, message, score);}@Scheduled(fixedRate = 1000)public void processMessages() {long now = System.currentTimeMillis();Set<ZSetOperations.TypedTuple<String>> messages = redisTemplate.opsForZSet().rangeByScoreWithScores(DELAYED_MESSAGES_ZSET, 0, now);if (messages != null && !messages.isEmpty()) {for (ZSetOperations.TypedTuple<String> message : messages) {String msg = message.getValue();long score = message.getScore().longValue();if (score <= now) {// Process the messageSystem.out.println("Processing message: " + msg);// Remove the message from the zsetredisTemplate.opsForZSet().remove(DELAYED_MESSAGES_ZSET, msg);}}}else{log.info("定时任务执行~");}}}

2)编写Controller接口测试,初始化zset内容

@RestController
@RequestMapping("/demo")
public class BasicController {@Autowiredprivate DelayedMessageService delayedMessageService;@GetMapping(value = "/test2")public void redisZsetTest() {// Add some messages with delaysdelayedMessageService.addMessage("Message 1", 5000); // 5 seconds delaydelayedMessageService.addMessage("Message 2", 10000); // 10 seconds delaydelayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay}
}

说明:

  • redisZsetTest接口通过调用DelayedMessageServiceaddMessage方法,将消息及其到期时间添加到 Redis 的 zset 中
  • 开启一个定时任务,定期检查和处理到期的消息。使用 @Scheduled 注解定期执行,每秒检查一次,注意这里使用@Scheduled,不要忘了启动类上添加@EnableScheduling注解,否则定时任务不会生效。fixedRate 属性表示以固定的频率(毫秒为单位)执行方法。即方法执行完成后,会立即等待指定的毫秒数,然后再次执行。
  • 通过 redisTemplate.opsForZSet().rangeByScoreWithScores 方法按时间范围获取到期的消息,消息处理完成后,从zset 中移除处理过的消息

三、Redisson框架

利用 Redisson 提供的数据结构RDelayedQueueRBlockingDeque,可以自动处理过期的任务并将它们移动到阻塞队列中,这样我们就可以从阻塞队列中获取任务并进行消费处理。例子如下:
1)添加依赖

<!-- Redisson 依赖项 -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>2.15.1</version>
</dependency>

2)创建DelayedMessageService

@Slf4j
@Service
public class DelayedMessageService {@Autowiredprivate RedissonClient redissonClient;private RBlockingDeque<String> blockingDeque;private RDelayedQueue<String> delayedQueue;@PostConstructpublic void init() {this.blockingDeque = redissonClient.getBlockingDeque("delayedQueue");this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque);Executors.newSingleThreadExecutor().submit(this::processMessages);}public void addMessage(String message, long delayMillis) {delayedQueue.offer(message, delayMillis, TimeUnit.MILLISECONDS);}public void processMessages() {try {while (true) {String message = blockingDeque.take();// Process the messagelog.info("消息被处理: " + message);// ..业务逻辑处理}} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("中断异常",e);}}}

3)测试接口

@GetMapping(value = "/test3")public void redisQueueTest() {// Add some messages with delaysdelayedMessageService.addMessage("Message 1", 5000); // 5 seconds delaydelayedMessageService.addMessage("Message 2", 10000); // 10 seconds delaydelayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay}

说明:

  1. RDelayedQueue 是 Redisson 提供的延迟队列,它将消息存储在指定的队列中,直到消息到期才会被转移到该队列。它的主要作用包括:
    • 延迟消息管理:我们可以使用 RDelayedQueueoffer 方法将消息添加到延迟队列,并指定延迟时间,消息在延迟时间到期前一直保留在 RDelayedQueue 中。
    • 消息转移:一旦消息到期,RDelayedQueue 会自动将消息转移到指定的RBlockingDeque 中。
  2. RBlockingQueue是 Redisson 提供的阻塞队列,它支持阻塞操作。主要作用包括:
    • 阻塞操作:支持阻塞的 take 操作,如果队列中没有元素,会一直阻塞直到有元素可供消费。

总结
个人推荐使用Redisson 的RDelayedQueue 方式,感觉更加可靠和简单一些,当然zset+score也可以是个不错选择,毕竟更加灵活,延迟消息还有其他不同的方案,比如rocketmq、rabbitmq插件等,假如项目中用了redis,又不想引入更多的中间件,可以尝试使用redis来实现,为了测试,这里例子都比较简单,在实际使用过程中,还要考虑补偿机制、幂等性等问题。

参考:
1.https://blog.csdn.net/qq_34826261/article/details/120598731

2.https://github.com/redisson/redisson/wiki/7.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%9B%86%E5%90%88#715-%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97delayed-queue


http://www.ppmy.cn/server/46374.html

相关文章

vi和vim有什么不同?

vi 和 vim 都是流行的文本编辑器&#xff0c;它们之间有以下主要区别&#xff1a; 历史&#xff1a; vi 是一个非常古老的文本编辑器&#xff0c;最初由 Bill Joy 在 1976 年为 Unix 系统编写。vim&#xff08;Vi IMproved&#xff09;是 vi 的一个增强版&#xff0c;由 Bram M…

2024HBCPC:C Goose Goose Duck

题目描述 Iris 有 n n n 个喜欢玩鹅鸭杀的朋友&#xff0c;编号为 1 ∼ n 1∼n 1∼n。 假期的时候&#xff0c;大家经常会在群里问有没有人玩鹅鸭杀&#xff0c;并且报出现在已经参与的人数。 但是每个人对于当前是否加入游戏都有自己的想法。 具体的来说&#xff0c;对于第…

【MySQL】 1130 -Host ‘14.*.**.*‘ is not allowed to connect to this MySQL server

这个错误表明MySQL服务器拒绝了来自IP地址为14.*.**.*的主机的连接请求。这通常是由于MySQL用户的主机限制引起的。 要解决这个问题&#xff0c;你需要在MySQL中允许指定主机的连接。你可以按照以下步骤操作&#xff1a; 使用具有足够权限的用户&#xff08;如root用户&#x…

Strategy设计模式

Strategy设计模式举例。 看图&#xff1a; 代码实现&#xff1a; #include <iostream>using namespace std;class FlyBehavior { public:virtual void fly() 0; };class QuackBehavior { public:virtual void quack() 0; };class FlyWithWings :public FlyBehavior …

Java高级---Spring Boot---4核心概念

4 核心概念 4.1 Spring Boot的自动配置详解 自动配置 是 Spring Boot 的核心特性之一&#xff0c;它允许框架根据项目中添加的依赖自动配置应用程序。 EnableAutoConfiguration: 这个注解是自动配置的入口点&#xff0c;它告诉 Spring Boot 根据类路径上的库来自动配置 Spri…

c++学习----初识类和对象(上)

1.面向过程和面向对象初步认识 C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解决问题。 C是基于面向对象的&#xff0c;关注的是对象&#xff0c;将一件事情拆分成不同的对象&#xff0c;靠对象之间的交互完 成。…

Python自然语言处理(NLP)库之NLTK使用详解

概要 自然语言处理(NLP)是人工智能和计算机科学中的一个重要领域,涉及对人类语言的计算机理解和处理。Python的自然语言工具包(NLTK,Natural Language Toolkit)是一个功能强大的NLP库,提供了丰富的工具和数据集,帮助开发者进行各种NLP任务,如分词、词性标注、命名实体…

江苏大信环境科技有限公司:环保领域的开拓者与引领者

2009 年&#xff0c;江苏大信环境科技有限公司在宜兴环保科技工业园成立。自创立之始&#xff0c;该公司便笃定坚守“诚信为本、以质量求生存、以创新谋发展”这一经营理念&#xff0c;全力以赴为客户构建专业的工业有机废气治理整体解决方案&#xff0c;进而成为国家高新技术企…