Spring Boot集成Redisson实现延迟队列

server/2024/9/25 10:33:39/

项目场景:

   在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?

   一般实现的方法有几种:使用 redisson、rocketmq、rabbitmq等消息队列的延时投递功能。


解决方案:

   一般项目集成redis的比较多,所以我这篇文章就说下redisson延迟队列,如果使用rocketmq或rabbitmq需要额外集成中间件,比较麻烦一点。

1.集成redisson

maven依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.21.1</version>
</dependency>

yml配置,单节点配置可以兼容redis的配置方式

# redis配置
spring:redis:database: 0host: 127.0.0.1password: redis@passport: 6001

 更详细的配置参考:Spring Boot整合Redisson的两种方式-CSDN博客

2.配置多线程

因为延迟队列可能会多个任务同时执行,所以需要多线程处理。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;@Configuration
@EnableAsync
public class ExecutorConfig {/*** 异步任务自定义线程池*/@Bean(name = "taskExecutor")public ThreadPoolTaskExecutor asyncServiceExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(50);//配置最大线程数executor.setMaxPoolSize(500);//配置队列大小executor.setQueueCapacity(300);//允许线程空闲时间executor.setKeepAliveSeconds(60);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix("taskExecutor-");// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//调用shutdown()方法时等待所有的任务完成后再关闭executor.setWaitForTasksToCompleteOnShutdown(true);//等待所有任务完成后的最大等待时间executor.setAwaitTerminationSeconds(60);return executor;}
}

3.具体业务

比如消息通知、关闭订单等 ,这里加上了@Async注解,可以异步执行

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;
import java.util.Date;@Service
public class AsyncService {@Asyncpublic void executeQueue(Object value) {System.out.println();System.out.println("当前线程:"+Thread.currentThread().getName());System.out.println("执行任务:"+value);//打印时间方便查看SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("执行任务的时间:"+sdf.format(new Date()));//自己的业务逻辑,可以根据id发送通知消息等//......}
}

4.延迟队列(关键代码)

这里包括添加延迟队列,和消费延迟队列,@PostConstruct注解的意思是服务启动加载一次,参考Spring Boot中多个PostConstruct注解执行顺序控制_多个postconstruct执行顺序-CSDN博客

import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;@Service
public class TestService {@Resourceprivate AsyncService asyncService;@Resourceprivate ThreadPoolTaskExecutor executor;@Autowiredprivate RedissonClient redissonClient;/*** 添加延迟任务*/public void addQueue() {//获取延迟队列RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("delayedQueue");RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);for (int i = 1; i <= 10; i++) {long delayTime = 5+i; //延迟时间(秒)
//			long delayTime = 5; //这里时间统一,可以测试并发执行delayedQueue.offer("延迟任务"+i, delayTime, TimeUnit.SECONDS);}//打印时间方便查看SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("添加任务的时间:"+sdf.format(new Date()));}/***	服务启动时加载,开始消费延迟队列*/@PostConstructpublic void consumer() {System.out.println("服务启动时加载>>>>>>");//获取延迟队列RBlockingQueue<Object> delayedQueue = redissonClient.getBlockingQueue("delayedQueue");//启用一个线程来消费这个延迟队列executor.execute(() ->{while (true){try {
//					System.out.println("while中的线程:"+Thread.currentThread().getName());//获取延迟队列中的任务Object value = delayedQueue.poll();if(value == null){//如果没有任务就休眠1秒,休眠时间根据业务自己定义Thread.sleep(1000);	//这里休眠时间越短,误差就越小continue;}//异步处理延迟队列中的消息asyncService.executeQueue(value);} catch (Exception e) {e.printStackTrace();}}});}
}

5.测试接口 

import com.test.service.TestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/test")
public class TestController {@Autowiredprivate TestService testService;/** 添加延迟任务*/@GetMapping(value = "/addQueue")public String addQueue() {testService.addQueue();return "success";}}

6.测试结果


 总结:

  1. Redisson的的RDelayedQueue是基于Redis实现的,而Redis本身并不保证数据的持久性。如果Redis服务器宕机,那么所有在RDelayedQueue中的数据都会丢失。因此,我们需要在应用层面进行持久化设计,例如定期将RDelayedQueue中的数据持久化到数据库。
  2. 在设计延迟任务时,我们应该根据实际需求来合理设置延迟时间,避免设置过长的延迟时间导致内存占用过高。

http://www.ppmy.cn/server/16641.html

相关文章

C++之AVL树的使用以及原理详解

1.AVL树 1.1AVL树的概念 1.2AVL树的定义 1.3AVL树的插入 1.4AVL树的旋转 1. 右单旋 2. 左单旋 3. 左右双旋 4. 右左双旋 1.5AVL树的验证 1.6AVL的实现 在之前对map/multimap/set/multiset进行了简单的介绍&#xff08;C之map_set的使用-CSDN博客&#xff09;&#xff0c;…

【计算机网络】成功解决 ARP项添加失败:请求的操作需要提升

最近在用Wireshark做实验时候&#xff0c;需要清空本机ARP表和DNS缓存&#xff0c;所以在cmd窗口输入以下命令&#xff0c; 结果发生了错误&#xff1a;ARP项添加失败&#xff1a;请求的操作需要提升 一开始我还以为是操作的命令升级了&#xff0c;但是后面发现其实只是给的权…

为什么分类问题不能使用mse损失函数,更容易理解版本

分类问题通常不适合使用均方误差&#xff08;Mean Squared Error&#xff0c;MSE&#xff09;损失函数&#xff0c;原因如下&#xff1a; 1.输出差异&#xff1a; 输出差异的度量不同&#xff1a;MSE损失函数是基于预测值和真实值之间的差异的平方和进行计算的&#xff0c;适…

1小时学会SpringBoot3+Vue3前后端分离开发

首发于Enaium的个人博客 引言 大家可能刚学会Java和Vue之后都会想下一步是什么&#xff1f;那么就先把SpringBoot和Vue结合起来&#xff0c;做一个前后端分离的项目吧。 准备工作 首先你需要懂得Java和Vue的基础知识&#xff0c;环境这里就不多说了&#xff0c;直接开始。 …

DAC音频解码芯片DP7398立体声数模转换芯片

DP7398 Pin TO Pin CS4398和CS43122&#xff0c;同轴光纤DAC解码&#xff0c;支持HIFI播放器。 产品介绍 DP7398 是一个立体声 24 位/1 92kHz 数模转换芯片。 该 D/A 系统包括数字去加重、半分贝步长音量控制、 ATAP I 通道混频、可选择的快速和慢速数字插补滤波器和过采样多位…

【第六章】STM32 - 软件I2C读取MPU6050

关联&#xff1a; STM32总结超全笔记【秋招自用】https://blog.csdn.net/Xiaoxuexxxxx/article/details/137509422?csdn_share_tail%7B%22type%22%3A%22blog%22%2C%22rType%22%3A%22article%22%2C%22rId%22%3A%22137509422%22%2C%22source%22%3A%22Xiaoxuexxxxx%22%7D 【MPU…

Android kotlin 协程异步async与await介绍与使用

一、介绍 在kotlin语言中&#xff0c;协程是一个处理耗时的操作&#xff0c;但是很多人都知道同步和异步&#xff0c;但是不知道该如何正确的使用&#xff0c;如果处理不好&#xff0c;看似异步&#xff0c;其实在runBloacking模块中使用的结果是同步的。 针对如何同步和如何异…

linux 系统文件目录颜色及特殊权限对应的颜色

什么决定文件目录的颜色和背景&#xff1f; 颜色 说明 栗子 权限白色表示普通文件 蓝色表示目录 绿色表示可执行文件 浅蓝色链接文件 黄色表示设备文件 红色 表示压缩文件 红色闪烁表示链接的文件有问题 灰色 表示其它文件 可以用字符表示文件的类型&am…