重学SpringBoot3-集成Redis(八)之限时任务(延迟队列)

news/2024/12/31 0:00:34/

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》
期待您的点赞👍收藏⭐评论✍

重学SpringBoot3-集成Redis(八)之限时任务(延迟队列)

  • 1. 延迟任务的场景
  • 2. Redis Sorted Set基本原理
  • 3. 使用 Redis Sorted Set 实现延迟队列
    • 3.1. 引入依赖
    • 3.2. 配置 Redis
    • 3.3. 延迟队列的任务存储和处理
      • 3.3.1 任务存储
      • 3.3.2 任务处理
    • 3.4. 测试效果
      • 3.4.1 测试添加任务
      • 3.4.2 存储格式
  • 4. 优化建议
    • 4.1. 任务幂等性
    • 4.2. 持久化
    • 4.3. 高并发处理
  • 5. 总结

在分布式系统中,延迟任务(或限时任务)是一种常见的需求,通常用于实现延迟执行、定时处理或消息超时等场景。Redis 作为高性能的内存数据库,具备非常灵活的 Sorted Set(有序集合) 数据结构,可以很容易地实现延迟队列,满足限时任务的需求。

在本篇文章中,我们将介绍如何通过 RedisSpring Boot 3 来实现 限时任务(也称为延迟任务或延迟队列),让你能够轻松管理任务的延时执行。

1. 延迟任务的场景

延迟任务的应用场景非常广泛,包括但不限于以下场景:

  1. 订单超时取消:用户下单后未支付,超过一定时间自动取消订单。
  2. 消息超时重发:当消息发送失败,可以延迟重试。
  3. 定时提醒:比如发送通知或定时邮件。

这些场景都需要任务在一段时间后自动执行,因此我们需要一种灵活、高效的解决方案来处理这类限时任务。

2. Redis Sorted Set基本原理

Redis Sorted Set(有序集合)是一种数据结构,它将元素存储在一个有序的集合中,每个元素都有一个唯一的分数(score)与之关联。Sorted Set 的基本原理如下:

  1. 元素和分数:每个元素都有一个唯一的分数与之关联,分数可以是整数或浮点数。
  2. 有序集合:元素按照分数的大小顺序存储在集合中,分数越小的元素越靠近集合的头部。
  3. 唯一性:集合中每个元素的分数必须是唯一的,如果两个元素的分数相同,则后一个元素会覆盖前一个元素。
  4. 插入和删除:元素可以通过 ZADD 命令插入到集合中,通过 ZREM 命令删除元素。
  5. 范围查询:可以通过 ZRANGE 命令查询集合中某个范围内的元素,范围可以是分数范围或索引范围,ZRANGEBYSCORE 可以根据 score 范围查找元素。
  6. 分数更新:可以通过 ZINCRBY 命令更新元素的分数。

Sorted Set 的底层实现使用了跳跃表(Skip List)数据结构,跳跃表是一种高效的有序数据结构,它可以在 O(log n) 的时间复杂度内进行插入、删除和查找操作。

3. 使用 Redis Sorted Set 实现延迟队列

在实现延迟任务时,我们可以将任务的执行时间作为 Sorted Set 中的 score,然后按时间顺序处理任务,确保在指定时间执行。

3.1. 引入依赖

首先,在 pom.xml 中引入 Redis 相关依赖,相关配置请参考重学SpringBoot3-集成Redis(一)基本使用:

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

3.2. 配置 Redis

application.yml 中配置 Redis 的连接信息:

spring:data:redis:host: 1.94.26.81port: 6379            # Redis 端口password: redis123456 # 如果有密码可以在这里配置lettuce:pool:max-active: 100    # 最大并发连接数max-idle: 50       # 最大空闲连接数min-idle: 10       # 最小空闲连接数

3.3. 延迟队列的任务存储和处理

接下来,我们通过 Redis 的 Sorted Set 来存储任务,并定时检查任务是否到期。

3.3.1 任务存储

每当有新的任务需要延迟执行时,我们将其加入到 Redis 的 Sorted Set 中,score 为该任务的执行时间戳。

package com.coderjia.boot310redis.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;/*** @author CoderJia* @create 2024/10/7 下午 05:10* @Description**/
@Service
public class DelayedTaskService {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String DELAYED_QUEUE_KEY = "delayedQueue";// 添加任务到延迟队列public void addTaskToQueue(String taskId, long delayInSeconds) {long executeTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delayInSeconds);redisTemplate.opsForZSet().add(DELAYED_QUEUE_KEY, taskId, executeTime);System.out.println("Added task " + taskId + " to the queue, will be executed in " + delayInSeconds + " seconds.");}
}

3.3.2 任务处理

为了定期检查是否有任务到期,我们使用 Spring 的 @Scheduled 注解创建一个定时任务,定时从 Redis 中获取即将到期的任务并执行。

package com.coderjia.boot310redis.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import java.util.Set;/*** @author CoderJia* @create 2024/10/7 下午 05:10* @Description**/
@Slf4j
@Service
public class DelayedTaskProcessor {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String DELAYED_QUEUE_KEY = "delayedQueue";// 定时任务,检查是否有任务到期@Scheduled(fixedRate = 5000) // 每隔5秒执行一次public void processDelayedTasks() {long currentTime = System.currentTimeMillis();Set<String> tasks = redisTemplate.opsForZSet().rangeByScore(DELAYED_QUEUE_KEY, 0, currentTime);if (tasks != null && !tasks.isEmpty()) {for (String taskId : tasks) {// 执行任务executeTask(taskId);// 从队列中移除已执行的任务redisTemplate.opsForZSet().remove(DELAYED_QUEUE_KEY, taskId);log.info("Task " + taskId + " is executed.");}}}// 模拟任务执行private void executeTask(String taskId) {// 执行任务log.info("Task " + taskId + " is executing...");}
}

在这个示例中:

  • addTaskToQueue(String taskId, long delayInSeconds) 方法将任务加入到 Redis 的 Sorted Set 中,延迟 delayInSeconds 秒执行。
  • @Scheduled(fixedRate = 5000) 每隔 1 秒扫描一次 Redis,查找是否有任务的执行时间已到期。如果有,则执行该任务,并从队列中移除。

3.4. 测试效果

在你的业务逻辑中调用上面创建的 addTaskToQueue 方法添加任务到延迟队列中去。

package com.coderjia.boot310redis.demos.web;import com.coderjia.boot310redis.service.DelayedTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @author CoderJia* @create 2024/10/7 下午 05:18* @Description**/
@Slf4j
@RestController
public class DelayedTaskController {@Autowiredprivate DelayedTaskService delayedTaskService;@GetMapping("/addDelayedTask")public String addDelayedTask(@RequestParam("taskId") String taskId, @RequestParam("delay") Long delay) {log.info("Adding task to the queue, taskId:{}, delay:{}", taskId, delay);delayedTaskService.addTaskToQueue(taskId, delay);return "Added task success";}
}

3.4.1 测试添加任务

执行以下请求添加 3 条任务到延迟队列中,

http://localhost:8080/addDelayedTask?taskId=3&delay=20

添加任务

任务执行

新增2条任务,先添加任务的后执行,测试效果。

先添加任务的后执行

3.4.2 存储格式

存储格式

4. 优化建议

4.1. 任务幂等性

在分布式环境中,任务可能会被多个节点同时执行。确保任务的幂等性非常重要,可以通过 Redisson 分布式锁来保证同一时刻只有一个节点在执行任务。

        // 获取分布式锁RLock lock = redissonClient.getLock("scheduledTaskLock");try {// 尝试获取锁,最多等待 1 秒,锁定时间 10 秒if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {try {// 执行任务System.out.println("Executing distributed scheduled task...");} finally {lock.unlock(); // 释放锁}} else {System.out.println("Another instance is executing the task...");}} catch (InterruptedException e) {e.printStackTrace();}

4.2. 持久化

延迟队列可以与持久化存储结合起来,确保任务在 Redis 失败或重启时不会丢失。可以使用 Redis 持久化功能或将任务信息存储在数据库中。

4.3. 高并发处理

对于大量延迟任务,可以通过增加 Redis 集群的规模或使用更高效的数据结构来提升处理性能。

5. 总结

通过 Redis Sorted SetSpring Boot 3,我们可以轻松实现限时任务的调度。Redis 的高性能和有序集合特性为我们提供了实现延迟队列的基础,而 Spring Boot 的定时任务调度则帮助我们定期处理这些任务。

在实际场景中,限时任务的应用非常广泛,比如订单超时处理、消息重发等场景,借助 Redis 我们可以有效管理这些延迟任务并确保系统的高效运行。


希望这篇文章能够帮助你更好地理解如何使用 Spring Boot 3 与 Redis 实现延迟队列。如果你在项目中遇到了相关问题,欢迎在评论区分享你的问题与经验。


http://www.ppmy.cn/news/1536205.html

相关文章

开源 AI 智能名片 2+1 链动模式 S2B2C 商城小程序的数据运营策略与价值创造

一、引言 1.1 研究背景 在当今数字化时代&#xff0c;数据运营已成为企业发展的核心驱动力。开源 AI 智能名片 21 链动模式 S2B2C 商城小程序作为一种创新的营销工具&#xff0c;与数据运营紧密相连。该小程序通过集成人工智能、大数据分析等先进技术&#xff0c;能够实时收集…

随机掉落的项目足迹:Vue3中vite.config.ts配置代理服务器解决跨域问题

跨域问题产生的原因&#xff1a;浏览器同源策略 后面的通俗解释小标题下的内容是便于大家理解同源策略和跨域问题。 而同源策略和跨域问题这两个小标题下的内容虽然比较专业不容易阅读&#xff0c;但是还是建议大家花时间理解并记忆&#xff0c;因为这是前端面试中的常考点。…

db-gpt部署问题

1. 默认安装cpu版torch&#xff0c;可重新安装为gpu版 教程&#xff1a;本地安装torch2.3.1,cuda12.1,python3.10-CSDN博客 2.安装vllm失败&#xff0c;不支持window系统 首次安装缺少dll文件&#xff0c;下载并放到win32目录下解决 PyTorch之loading fbgemm.dll异常的解决办…

企业架构系列(16)ArchiMate第14节:实施和迁移视角

在企业架构中&#xff0c;为了有效地规划和管理架构的变更与实施&#xff0c;通常会使用不同的视角来描述架构的不同方面。本篇涉及到三个主要视角&#xff1a;项目视角、迁移视角以及实施与迁移视角。 一、实施和迁移视角概览 1.项目视角 元素与关系&#xff1a;关注项目本身…

Flutter-->Namespace not specified.

更新Android gradle 7.5.0之后, 运行项目会出现Namespace not specified.问题, 这里出一个我的解决方案. 由于很多库都不可能及时更新适配gradle 7.5.0, 所以可以等pub get将子库拉取到本地之后, 在本地手动添加namespace属性,即可解决本文问题. 作为程序猿,那肯定不可能手动修…

常见的图像处理算法:Sobel边缘检测

Sobel 算子是一个主要用于边缘检测的离散微分算子。它结合了高斯平滑和微分求导&#xff0c;用来计算灰度图像的近似梯度。 Sobel 算子是一个主要用于边缘检测的离散微分算子&#xff08;discrete differentiation operator&#xff09;。它结合了高斯平滑和微分求导&#xff…

招联金融2025秋招内推

【投递方式】 直接扫下方二维码&#xff0c;或点击内推官网https://wecruit.hotjob.cn/SU61025e262f9d247b98e0a2c2/mc/position/campus&#xff0c;使用内推码 igcefb 投递&#xff09; 【招聘岗位】 后台开发 前端开发 数据开发 数据运营 算法开发 技术运维 软件测试 产品策…

【自动驾驶】最近计划看的论文

将对应的论文链接贴出来&#xff0c;当作监督自己。 方向&#xff1a;端到端自动驾驶 方法论文代码UniADhttps://arxiv.org/pdf/2212.10156https://github.com/OpenDriveLab/UniADVADhttps://arxiv.org/pdf/2303.12077https://github.com/hustvl/VADUADhttps://arxiv.org/pdf…