一、需求背景
移动源需要将数据库中数据依据环保局的对接文档传递给对应省份环保局平台。传递的数据不同存在调度实时性不同,所以,需要一个调度池能够实现配置动态调度,且避免引入第三方调度工具减少维护成本。
二、实现思路
-
动态任务注册器:使用ScheduledTaskRegistrar动态注册任务。
-
数据库监听:通过定时任务定期检查数据库变化,动态更新调度。
-
任务容器:维护一个内存中的任务集合,管理任务状态。
关键说明:
动态任务注册:
-
使用
ScheduledTaskRegistrar
和TaskScheduler
动态注册任务。 -
通过
ConcurrentHashMap
维护当前运行的任务,确保任务集合的线程安全,在更新任务时,先取消旧任务再注册新任务。 -
在注册任务前校验cron表达式合法性,避免无效表达式导致调度失败
数据库变化监听:
- 定时60秒检测任务change_status配置变化,发现变化后,更新或取消任务
线程池管理:
-
配置
ThreadPoolTaskScheduler
提供任务执行线程池。
避免默认单线程池导致任务阻塞。 任务去重:
- 通过
taskKey
确保每个任务的唯一性,在数据库中设置task_key
字段为唯一约束
异常处理:
- 在任务执行逻辑中添加
try-catch
块,防止任务异常影响调度器,并将异常信息记录于表里error_msg,状态修改为异常
手动触发任务更新:
- 提供REST API手动触发任务或强制刷新配置
任务表:
CREATE TABLE `dynamic_task` (`id` int(11) NOT NULL AUTO_INCREMENT,`task_name` varchar(100) NOT NULL COMMENT '任务名',`task_key` varchar(50) CHARACTER SET ujis NOT NULL COMMENT '任务key',`cron_expression` varchar(50) NOT NULL COMMENT '定期策略',`enabled` tinyint(1) DEFAULT '1',`status` tinyint(4) DEFAULT '1' COMMENT '状态:1-执行完成 0-执行中 -1 异常状态',`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`error_msg` text COMMENT '错误信息',`notify_payload` json DEFAULT NULL,`biz_type` tinyint(4) NOT NULL COMMENT '业务类型看bizTypeEnum',`change_status` tinyint(4) DEFAULT '0' COMMENT '0:修改(待同步),1:已修改',PRIMARY KEY (`id`),UNIQUE KEY `task_key` (`task_key`) USING BTREE,KEY `change_status` (`change_status`),KEY `enabled_status` (`enabled`,`status`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
三、核心代码实现
1、定义任务实体
java">@Getter
@Setter
@Accessors(chain = true)
@NoArgsConstructor
@TableName("dynamic_task")
public class DynamicTask {@TableId(value = "id", type = IdType.AUTO)private Integer id;private String taskName;private String taskKey;private String cronExpression;private Integer enabled;private Integer status;private Date createTime;private Date updateTime;private String notifyPayload;private Integer bizType;
}
2、动态任务管理器
java">@Slf4j
@Configuration
public class DynamicTaskManager implements SchedulingConfigurer {private final DynamicTaskService dynamicTaskService;private final TaskScheduler taskScheduler;private final UploadTargetContext uploadTargetContext;@Value("${uploadTarget}")private UploadTargetEnum uploadTarget;private final Map<String, ScheduledFuture<?>> taskMap = new ConcurrentHashMap<>();public DynamicTaskManager(DynamicTaskServiceImpl dynamicTaskService, TaskScheduler taskScheduler,UploadTargetContext uploadTargetContext) {this.dynamicTaskService = dynamicTaskService;this.taskScheduler = taskScheduler;this.uploadTargetContext = uploadTargetContext;}@Overridepublic void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {// 初始化加载所有启用的任务List<DynamicTask> tasks = dynamicTaskService.taskList(null);List<Integer> taskIdList = new ArrayList<>();for (DynamicTask task : tasks) {Integer taskId = scheduleTask(task);if (taskId != null) {taskIdList.add(taskId);}}if (CollectionUtils.isEmpty(taskIdList)) {return;}dynamicTaskService.processError(taskIdList, "表达式错误");}// 动态注册任务public Integer scheduleTask(DynamicTask task) {if (!isValidCron(task.getCronExpression())) {return task.getId();}String taskKey = task.getTaskKey();cancelTask(taskKey);// 创建新的触发器任务ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(() -> executeBusinessLogic(taskKey, task.getNotifyPayload(), task.getBizType(),task.getId()),new CronTrigger(task.getCronExpression()));// 将任务存入MaptaskMap.put(taskKey, scheduledFuture);return null;}// 执行业务逻辑private void executeBusinessLogic(String taskKey, String notifyPayload, Integer bizType,Integer taskId) {try {log.info("执行任务: " + taskKey + ",时间: " + new Date());UploadTargetAdapter uploadTargetAdapter =uploadTargetContext.resolveRun(uploadTarget);BizTypeEnum byBizType = BizTypeEnum.getByBizType(bizType);if (Objects.nonNull(byBizType)) {if (StringUtil.isEmpty(notifyPayload)) {byBizType.performAction(uploadTargetAdapter, null);} else {Object data = JSON.parseObject(notifyPayload,byBizType.getClazz());byBizType.performAction(uploadTargetAdapter, data);}}} catch (Exception e) {dynamicTaskService.processError(Collections.singletonList(taskId), JSON.toJSONString(e));}}// 定时检查数据库更新(每隔60秒)@Scheduled(fixedRate = 60000)public void checkTaskUpdates() {Date date = new Date();List<DynamicTask> updatedTasks = dynamicTaskService.taskList(date);if (CollectionUtils.isEmpty(updatedTasks)) {return;}List<Integer> taskIdList = new ArrayList<>();updatedTasks.forEach(task -> {if (task.getEnabled() == 1) {scheduleTask(task);} else {cancelTask(task.getTaskKey());}taskIdList.add(task.getId());});// 更新检查状态dynamicTaskService.updateLastCheckTime(taskIdList);}// 取消任务public void cancelTask(String taskKey) {if (taskMap.containsKey(taskKey)) {taskMap.get(taskKey).cancel(false);taskMap.remove(taskKey);}}public boolean isValidCron(String cron) {return CronExpression.isValidExpression(cron);}
3、Spring配置类
java">@Configuration
public class SchedulerConfig {@Beanpublic TaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(10);scheduler.setThreadNamePrefix("dynamic-task-");HttpGlobalConfig.setTimeout(3000);return scheduler;}
}