基于Spring自带的调度机制实现动态任务管理

ops/2025/2/26 17:57:43/

一、需求背景

移动源需要将数据库中数据依据环保局的对接文档传递给对应省份环保局平台。传递的数据不同存在调度实时性不同,所以,需要一个调度池能够实现配置动态调度,且避免引入第三方调度工具减少维护成本。

二、实现思路

  1. 动态任务注册器:使用ScheduledTaskRegistrar动态注册任务。

  2. 数据库监听:通过定时任务定期检查数据库变化,动态更新调度。

  3. 任务容器:维护一个内存中的任务集合,管理任务状态。

关键说明:

       动态任务注册:

  • 使用 ScheduledTaskRegistrar 和 TaskScheduler 动态注册任务。

  • 通过 ConcurrentHashMap 维护当前运行的任务,确保任务集合的线程安全,在更新任务时,先取消旧任务再注册新任务。

  • 在注册任务前校验cron表达式合法性,避免无效表达式导致调度失败

      数据库变化监听:

  •  定时60秒检测任务change_status配置变化,发现变化后,更新或取消任务

      线程池管理:

  • 配置 ThreadPoolTaskScheduler 提供任务执行线程池。

避免默认单线程池导致任务阻塞。      任务去重:

  • 通过 taskKey 确保每个任务的唯一性,在数据库中设置 task_key 字段为唯一约束

     异常处理:

  •  在任务执行逻辑中添加 try-catch 块,防止任务异常影响调度器,并将异常信息记录于表里error_msg,状态修改为异常

     手动触发任务更新:

  •  提供REST API手动触发任务或强制刷新配置

任务表:

CREATE TABLE `dynamic_task` (`id` int(11) NOT NULL AUTO_INCREMENT,`task_name` varchar(100) NOT NULL COMMENT '任务名',`task_key` varchar(50) CHARACTER SET ujis NOT NULL COMMENT '任务key',`cron_expression` varchar(50) NOT NULL COMMENT '定期策略',`enabled` tinyint(1) DEFAULT '1',`status` tinyint(4) DEFAULT '1' COMMENT '状态:1-执行完成 0-执行中 -1 异常状态',`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`error_msg` text COMMENT '错误信息',`notify_payload` json DEFAULT NULL,`biz_type` tinyint(4) NOT NULL COMMENT '业务类型看bizTypeEnum',`change_status` tinyint(4) DEFAULT '0' COMMENT '0:修改(待同步),1:已修改',PRIMARY KEY (`id`),UNIQUE KEY `task_key` (`task_key`) USING BTREE,KEY `change_status` (`change_status`),KEY `enabled_status` (`enabled`,`status`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

三、核心代码实现

1、定义任务实体

java">@Getter
@Setter
@Accessors(chain = true)
@NoArgsConstructor
@TableName("dynamic_task")
public class DynamicTask {@TableId(value = "id", type = IdType.AUTO)private Integer id;private String taskName;private String taskKey;private String cronExpression;private Integer enabled;private Integer status;private Date createTime;private Date updateTime;private String notifyPayload;private Integer bizType;
}

2、动态任务管理器

java">@Slf4j
@Configuration
public class DynamicTaskManager implements SchedulingConfigurer {private final DynamicTaskService dynamicTaskService;private final TaskScheduler taskScheduler;private final UploadTargetContext uploadTargetContext;@Value("${uploadTarget}")private UploadTargetEnum uploadTarget;private final Map<String, ScheduledFuture<?>> taskMap = new ConcurrentHashMap<>();public DynamicTaskManager(DynamicTaskServiceImpl dynamicTaskService, TaskScheduler taskScheduler,UploadTargetContext uploadTargetContext) {this.dynamicTaskService = dynamicTaskService;this.taskScheduler = taskScheduler;this.uploadTargetContext = uploadTargetContext;}@Overridepublic void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {// 初始化加载所有启用的任务List<DynamicTask> tasks = dynamicTaskService.taskList(null);List<Integer> taskIdList = new ArrayList<>();for (DynamicTask task : tasks) {Integer taskId = scheduleTask(task);if (taskId != null) {taskIdList.add(taskId);}}if (CollectionUtils.isEmpty(taskIdList)) {return;}dynamicTaskService.processError(taskIdList, "表达式错误");}// 动态注册任务public Integer scheduleTask(DynamicTask task) {if (!isValidCron(task.getCronExpression())) {return task.getId();}String taskKey = task.getTaskKey();cancelTask(taskKey);// 创建新的触发器任务ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(() -> executeBusinessLogic(taskKey, task.getNotifyPayload(), task.getBizType(),task.getId()),new CronTrigger(task.getCronExpression()));// 将任务存入MaptaskMap.put(taskKey, scheduledFuture);return null;}// 执行业务逻辑private void executeBusinessLogic(String taskKey, String notifyPayload, Integer bizType,Integer taskId) {try {log.info("执行任务: " + taskKey + ",时间: " + new Date());UploadTargetAdapter uploadTargetAdapter =uploadTargetContext.resolveRun(uploadTarget);BizTypeEnum byBizType = BizTypeEnum.getByBizType(bizType);if (Objects.nonNull(byBizType)) {if (StringUtil.isEmpty(notifyPayload)) {byBizType.performAction(uploadTargetAdapter, null);} else {Object data = JSON.parseObject(notifyPayload,byBizType.getClazz());byBizType.performAction(uploadTargetAdapter, data);}}} catch (Exception e) {dynamicTaskService.processError(Collections.singletonList(taskId), JSON.toJSONString(e));}}// 定时检查数据库更新(每隔60秒)@Scheduled(fixedRate = 60000)public void checkTaskUpdates() {Date date = new Date();List<DynamicTask> updatedTasks = dynamicTaskService.taskList(date);if (CollectionUtils.isEmpty(updatedTasks)) {return;}List<Integer> taskIdList = new ArrayList<>();updatedTasks.forEach(task -> {if (task.getEnabled() == 1) {scheduleTask(task);} else {cancelTask(task.getTaskKey());}taskIdList.add(task.getId());});// 更新检查状态dynamicTaskService.updateLastCheckTime(taskIdList);}// 取消任务public void cancelTask(String taskKey) {if (taskMap.containsKey(taskKey)) {taskMap.get(taskKey).cancel(false);taskMap.remove(taskKey);}}public boolean isValidCron(String cron) {return CronExpression.isValidExpression(cron);}

3、Spring配置类

java">@Configuration
public class SchedulerConfig {@Beanpublic TaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(10);scheduler.setThreadNamePrefix("dynamic-task-");HttpGlobalConfig.setTimeout(3000);return scheduler;}
}


http://www.ppmy.cn/ops/161462.html

相关文章

【面试手撕】多线程/并发编程

文章目录 前言三个线程&#xff0c;交替打印A、B、C两个线程1~100交替输出奇数和偶数10个线程&#xff0c;每个线程1w&#xff0c;最终变量到达10w模拟死锁让三个线程怎么串行执行1.使用join方法2.使用CountDownLatch 前言 本文总结面试中常考的手撕多线程问题。 三个线程&am…

UE5网络通信架构解析

文章目录 前言一、客户端-服务器架构&#xff08;C/S Model&#xff09;二、对等网络架构&#xff08;P2P&#xff0c;非原生支持&#xff09;三、混合架构&#xff08;自定义扩展&#xff09;四、UE5网络核心机制 前言 UE5的网络通信主要基于客户端-服务器&#xff08;C/S&am…

git branch

文章目录 1.简介2.格式3.选项4.示例参考文献 1.简介 git branch 用于管理分支&#xff0c;包括查看、创建、删除、重命名和关联。 git branch 是 Git 版本控制系统中用于管理分支的命令。分支是 Git 的核心功能之一&#xff0c;允许开发者在同一个代码库中并行开发不同的功能…

DDL区别:Oracle和Mysql

在数据库管理系统&#xff08;DBMS&#xff09;中&#xff0c;DDL&#xff08;数据定义语言&#xff0c;Data Definition Language&#xff09;用于定义和管理数据库结构&#xff0c;如表、索引、视图等。Oracle 和 MySQL 作为两种主流的关系型数据库&#xff0c;在 DDL 语法和…

WPS计算机二级•文档的页面设置与打印

听说这是目录哦 纸张大小页边距和装订线❤️‍&#x1f525;打印界面讲解❤️缩印&#x1f495;打印作文稿纸&#x1f49e;将文档打印成书籍&#x1f493;限制编辑设置&#x1f497;给文字文档加密&#x1f496;文档导出为 PDF格式&#x1f498;协作编辑模式&#x1f49d;能量站…

HTTP实验(ENSP模拟器实现)

目录 HTTP概述 HTTP实验 HTTPS 实验 HTTP概述 HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;&#xff0c;设计HTTP最初的目的是为了提供一种发布和接收HTML页面的方法。 HTTP定义了多种请求方法&#xff0c;常用的包括&#xff1a; …

word中对插入的图片修改背景色

关于对word中插入的图片修改背景色的问题&#xff0c;网上查了好多都无效&#xff0c;可能是由于word版本的问题&#xff0c;本人word版本为2019版&#xff0c;亲测有效的修改图片背景色为透明的小技巧&#xff1a; 选中图片-设置图片格式-最右面图标&#xff0c;选择图片校正…

kotlin 知识点 七 泛型的高级特性

对泛型进行实化 泛型实化这个功能对于绝大多数Java 程序员来讲是非常陌生的&#xff0c;因为Java 中完全没有这个概 念。而如果我们想要深刻地理解泛型实化&#xff0c;就要先解释一下Java 的泛型擦除机制才行。 在JDK 1.5之前&#xff0c;Java 是没有泛型功能的&#xff0c;…