从零搭建微服务项目Pro(第1-2章——Quartz实现定时任务模块优化)

server/2025/3/4 3:41:42/

前言:

在企业项目中,往往有定时任务发布的需求,比如每天晚9点将今日数据备份一次,或每月一号将上月的销售数据邮件发送给对应的工作人员。显然这些操作不可能是人工到时间点调用一次接口,需要编写专门的模块完成任务的调度。

前一章中已经实现一种基于Quartz的定时任务模块,只需要将定时任务所需要的参数定义在数据库中,模块可从数据库中生成对应的定时任务并进行维护。但正如上章最后提到尽管基本功能已经实现,但仍然存在一些bug,且任务状态机的转化设计的不够明确,导致任务管理仍然存在过耦合的问题。

从零搭建微服务项目Pro(第1-1章——Quartz实现定时任务模块)-CSDN博客https://blog.csdn.net/wlf2030/article/details/145785349在本章为解决上章遗留问题,重新设计模块,做了八项调优(见加粗)。任务池管理内核重构,仅对外提供启动和暂停任务接口,简化scheduler调度操作。通过mvc设计模式完成任务池,任务数据层的分离,由manager类统一管理保证内存和数据库信息统一,并在manager类中规范任务自动机的转换。通过引入事件机制,实现任务的异步调用,防止队列阻塞,将原有执行类直接拿取任务实例转化为从事件中拿取任务唯一标识,进而从数据库中实例化任务,进一步保证内存和数据库信息统一,并一定程度防止接口攻击导致错误的任务执行。重新划分异常类型为执行错误和状态转化错误便于后期aop对响应体的设置,以及添加日志输出,将每次任务执行结果和异常信息序列化至数据库方便朔源。

具体类图如下:

本章几乎对上章所有文件均有修改,因此无法提供一步步修改的指导,建议下载本章源码结合类图和上章讲解理解模块。源码链接如下:

wlf728050719/SpringCloudPro1-2https://github.com/wlf728050719/SpringCloudPro1-2以及在后续的1-3章会将quartz模块整合至之前的微服务项目中,感兴趣的小伙伴可以提前了解下,方便后续章节模块移植,微服务项目链接如下:

从零搭建微服务项目Base(第0章——微服务项目结构搭建)_从0创建微服务项目-CSDN博客https://blog.csdn.net/wlf2030/article/details/145206361以及本专栏会持续更新微服务项目,每一章的项目都会基于前一章项目进行功能的完善,欢迎小伙伴们关注!同时如果只是对单章感兴趣也不用从头看,只需下载前一章项目即可,每一章都会有前置项目准备部分,跟着操作就能实现上一章的最终效果,当然如果是一直跟着做可以直接跳过这一部分。专栏目录链接如下,其中Base篇为基础微服务搭建,Pro篇为复杂模块实现。

从零搭建微服务项目(全)-CSDN博客https://blog.csdn.net/wlf2030/article/details/145799620


一、效果演示

quartz定时模块演示

最终数据库记录日志:(会记录失败message以及exception的cause)

最终数据库中任务状态(task1启动,执行成功;task2暂停,执行失败)

其中使用postman请求体如下(name和group为必需项,用于指定task,status字段无效代码控制)

AddTest1:      Post     http://localhost:6699/quartz/add    (type为2即使用bean必传项)

javascript">{"taskName": "task1","taskGroup": "group1","type": 2,"beanName": "test","methodName": "test","params": "test1","cronExpression": "*/5 * * * * ?"
}

AddTest2:      Post      http://localhost:6699/quartz/add  (type为1即使用class必传项)

javascript">{"taskName": "task2","taskGroup": "group1","type": 1,"className": "cn.bit.pro1_2.service.TestJavaService","methodName": "test","params": "test","cronExpression": "*/5 * * * * ?"
}

Start:      Put    http://localhost:6699/quartz/start

javascript">{"taskName": "task1","taskGroup": "group1"
}

Pause:    Put  http://localhost:6699/quartz/pause

javascript">{"taskName": "task1","taskGroup": "group1"
}

Delete:    Delete   http://localhost:6699/quartz/delete

javascript">{"taskName": "task1","taskGroup": "group1"
}

二、数据库sql

可自行添加外键限制,以及选择task_name和task_group一对字段作为主键

create table tb_task
(id              int auto_incrementprimary key,task_name       varchar(255)  not null,task_group      varchar(255)  not null,type            int           not null,bean_name       varchar(255)  null,class_name      varchar(255)  null,path            varchar(255)  null,method_name     varchar(255)  null,params          varchar(255)  null,cron_expression varchar(255)  not null,description     text          null,status          int default 0 not null,result          int           null
);
create table tb_task_log
(id             int auto_incrementprimary key,task_id        int          not null,start_time     datetime     not null,execute_time   varchar(255) not null,result         tinyint      not null,message        varchar(255) not null,exception_info text         null
);

三、核心代码

1.工厂模式

统一不同bean或不同类的不同方法名、不同形参函数为ITaskHandler的invoke(Task)方法

java">package cn.bit.pro1_2.core.handler;import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.exception.TaskInvokeException;public interface ITaskHandler {void invoke(Task task) throws TaskInvokeException;
}

类反射实现

java">package cn.bit.pro1_2.core.handler.impl;import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.enums.Result;
import cn.bit.pro1_2.core.exception.TaskInvokeException;
import cn.bit.pro1_2.core.handler.ITaskHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;@Slf4j
@Component
public class JavaClassTaskHandler implements ITaskHandler {@Overridepublic void invoke(Task task) throws TaskInvokeException {try {Object target;Class<?> clazz;Method method;Result returnValue;clazz = Class.forName(task.getClassName());target = clazz.newInstance();if (task.getParams() == null || task.getParams().isEmpty()) {method = target.getClass().getDeclaredMethod(task.getMethodName());ReflectionUtils.makeAccessible(method);returnValue = (Result) method.invoke(target);} else {method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);ReflectionUtils.makeAccessible(method);returnValue = (Result) method.invoke(target, task.getParams());}//判断业务是否执行成功if (returnValue == null || Result.FAIL.equals(returnValue))throw new TaskInvokeException("JavaClassTaskHandler方法执行失败",null);} catch (NoSuchMethodException e) {throw new TaskInvokeException("JavaClassTaskHandler找不到对应方法", e);} catch (InvocationTargetException | IllegalAccessException e) {throw new TaskInvokeException("JavaClassTaskHandler执行反射方法异常", e);} catch (ClassCastException e) {throw new TaskInvokeException("JavaClassTaskHandler方法返回值定义错误", e);} catch (ClassNotFoundException e) {throw new TaskInvokeException("JavaClassTaskHandler找不到对应类", e);} catch (InstantiationException e) {throw new TaskInvokeException("JavaClassTaskHandler实例化错误", e);}}
}

bean反射实现

java">package cn.bit.pro1_2.core.handler.impl;import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.enums.Result;
import cn.bit.pro1_2.core.exception.TaskInvokeException;
import cn.bit.pro1_2.core.handler.ITaskHandler;
import cn.bit.pro1_2.core.util.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;@Slf4j
@Component
public class SpringBeanTaskHandler implements ITaskHandler {@Overridepublic void invoke(Task task) throws TaskInvokeException {try {Object target;Method method;Result returnValue;//上下文寻找对应beantarget = SpringContextHolder.getApplicationContext().getBean(task.getBeanName());//寻找对应方法if(task.getParams()==null|| task.getParams().isEmpty()){method = target.getClass().getDeclaredMethod(task.getMethodName());ReflectionUtils.makeAccessible(method);returnValue = (Result) method.invoke(target);}else{method = target.getClass().getDeclaredMethod(task.getMethodName(), String.class);ReflectionUtils.makeAccessible(method);returnValue = (Result) method.invoke(target, task.getParams());}//判断业务是否执行成功if(returnValue==null || Result.FAIL.equals(returnValue))throw new TaskInvokeException("SpringBeanTaskHandler方法执行失败", null);}catch (NoSuchBeanDefinitionException e){throw new TaskInvokeException("SpringBeanTaskHandler找不到对应bean",e);} catch (NoSuchMethodException e) {throw new TaskInvokeException("SpringBeanTaskHandler找不到对应方法",e);} catch (InvocationTargetException | IllegalAccessException e) {throw new TaskInvokeException("SpringBeanTaskHandler执行反射方法异常",e);} catch (ClassCastException e) {throw new TaskInvokeException("SpringBeanTaskHandler方法返回值定义错误",e);}}
}

工厂类,根据task类型返回不同的实现类

java">package cn.bit.pro1_2.core.handler;import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.enums.TaskType;
import cn.bit.pro1_2.core.handler.impl.JavaClassTaskHandler;
import cn.bit.pro1_2.core.handler.impl.SpringBeanTaskHandler;
import cn.bit.pro1_2.core.util.SpringContextHolder;
import org.springframework.stereotype.Component;@Component
public class TaskHandlerFactory {public static ITaskHandler getTaskHandler(Task task) {ITaskHandler taskHandler = null;if(TaskType.SPRING_BEAN.getCode().equals(task.getType())) {taskHandler = SpringContextHolder.getApplicationContext().getBean(SpringBeanTaskHandler.class);}if(TaskType.JAVA_CLASS.getCode().equals(task.getType())) {taskHandler = SpringContextHolder.getApplicationContext().getBean(JavaClassTaskHandler.class);}return taskHandler;}
}

2.事件机制

事件,传递任务唯一标识

java">package cn.bit.pro1_2.core.events.event;import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;@ToString
@Getter
@AllArgsConstructor
public class TaskInvokeEvent {private final String taskName;private final String taskGroup;
}

发布者,同时实现job接口作为schduler调度的任务类:

java">package cn.bit.pro1_2.core.events.publisher;import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.events.event.TaskInvokeEvent;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;@AllArgsConstructor
@Slf4j
@Component
public class TaskInvokePublisher implements Job {private final ApplicationEventPublisher publisher;@Overridepublic void execute(JobExecutionContext jobExecutionContext){Task task = (Task) jobExecutionContext.getJobDetail().getJobDataMap().get("task");//发布事件异步执行任务TaskInvokeEvent event =new TaskInvokeEvent(task.getTaskName(),task.getTaskGroup());publisher.publishEvent(event);log.info("任务执行事件发布:{}",event);}
}

监听者,监听到事件后执行对应方法并记录日志和异常

java">package cn.bit.pro1_2.core.events.listener;import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.entity.TaskLog;
import cn.bit.pro1_2.core.enums.Result;
import cn.bit.pro1_2.core.events.event.TaskInvokeEvent;
import cn.bit.pro1_2.core.exception.TaskInvokeException;
import cn.bit.pro1_2.core.handler.ITaskHandler;
import cn.bit.pro1_2.core.handler.TaskHandlerFactory;
import cn.bit.pro1_2.core.service.TaskLogService;
import cn.bit.pro1_2.core.service.TaskService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.util.Date;@Slf4j
@AllArgsConstructor
@Component
public class TaskInvokeListener {private final TaskLogService taskLogService;private final TaskService taskService;@Async@Order@EventListener(TaskInvokeEvent.class)public void notifyTaskInvoke(TaskInvokeEvent event) {//从数据库中拿取任务Task task = taskService.selectTaskByNameAndGroup(event.getTaskName(), event.getTaskGroup());log.info("任务执行事件监听,准备执行任务{}",task);ITaskHandler handler = TaskHandlerFactory.getTaskHandler(task);long startTime = System.currentTimeMillis();TaskLog taskLog = new TaskLog();taskLog.setTaskId(task.getId());taskLog.setStartTime(new Date());boolean success = true;try {handler.invoke(task);} catch (TaskInvokeException e) {log.error("{},Task:{}", e.getMessage(),task);success = false;taskLog.setMessage(e.getMessage());if(e.getException()!=null){taskLog.setExceptionInfo(e.getException().getCause().toString());e.getException().printStackTrace();}}if(success){taskLog.setMessage("执行成功");taskLog.setResult(Result.SUCCESS.getCode());task.setResult(Result.SUCCESS.getCode());taskService.setTaskResult(task);}else{taskLog.setResult(Result.FAIL.getCode());task.setResult(Result.FAIL.getCode());taskService.setTaskResult(task);}long endTime = System.currentTimeMillis();taskLog.setExecuteTime(String.valueOf(endTime-startTime));taskLogService.insert(taskLog);}
}

3.MVC模式

task持久层接口,即model

java">package cn.bit.pro1_2.core.service;import cn.bit.pro1_2.core.entity.Task;import java.util.List;public interface TaskService {List<Task> selectAllTask();int updateTaskInfo(Task task);int updateTaskStatus(Task task);int insertTask(Task task);int deleteTask(Task task);int setTaskResult(Task task);Task selectTaskByNameAndGroup(String taskName, String groupName);
}

任务池,即view

java">package cn.bit.pro1_2.core.mvc;import cn.bit.pro1_2.core.events.publisher.TaskInvokePublisher;
import cn.bit.pro1_2.core.entity.Task;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.stereotype.Service;@Slf4j
@Service
@AllArgsConstructor
public class TaskPool {public static JobKey getJobKey(@NonNull Task task) {return JobKey.jobKey(task.getTaskName(),task.getTaskGroup());}public static TriggerKey getTriggerKey(@NonNull Task task) {return TriggerKey.triggerKey(task.getTaskName(),task.getTaskGroup());}/*** 任务池添加任务* @param task* @param scheduler* @throws SchedulerException*/public void addTask(@NonNull Task task,Scheduler scheduler) throws SchedulerException {JobKey jobKey = getJobKey(task);TriggerKey triggerKey = getTriggerKey(task);JobDetail jobDetail = JobBuilder.newJob(TaskInvokePublisher.class).withIdentity(jobKey).build();jobDetail.getJobDataMap().put("task",task);CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCronExpression());CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();scheduler.scheduleJob(jobDetail, trigger);}/*** 任务池暂停并移除任务* @param task* @param scheduler* @throws SchedulerException*/public void pauseTask(@NonNull Task task,Scheduler scheduler) throws SchedulerException {scheduler.pauseJob(getJobKey(task));scheduler.deleteJob(getJobKey(task));}}

manager,即controller

java">package cn.bit.pro1_2.core.mvc;import cn.bit.pro1_2.core.entity.Task;
import cn.bit.pro1_2.core.enums.TaskStatus;
import cn.bit.pro1_2.core.exception.TaskRepositoryException;
import cn.bit.pro1_2.core.service.TaskService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import java.util.List;@Slf4j
@Service
@AllArgsConstructor
public class TaskManger {private final Scheduler scheduler;private final TaskService taskService;private final TaskPool taskPool;/*** 从数据库中反序列化任务数据,保证服务器重启后恢复任务池状态* @throws SchedulerException*/@PostConstructpublic void init() throws SchedulerException {log.info("TaskManager初始化开始...");List<Task> tasks = taskService.selectAllTask();if(tasks != null && !tasks.isEmpty()) {for (Task task : tasks){if(TaskStatus.RUNNING.getCode().equals(task.getStatus()))taskPool.addTask(task,scheduler);}log.info("初始化加载{}项任务", tasks.size());}log.info("TaskManager初始化结束...");}/*** 添加暂停且未被持久化的新任务* @param task* @throws SchedulerException*/public void addTask(Task task) throws SchedulerException {Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());if(temp != null)throw new TaskRepositoryException("存在相同任务组和任务名任务",task);if(!TaskStatus.PAUSE.getCode().equals(task.getStatus()))throw new TaskRepositoryException("只能添加暂停的任务",task);taskService.insertTask(task);log.info("添加任务{}", task);}/*** 在任务暂停时更新任务信息* @param task* @throws SchedulerException*/public void updateTask(Task task) throws SchedulerException {Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());if(temp == null)throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task);if(!TaskStatus.PAUSE.getCode().equals(temp.getStatus()))throw new TaskRepositoryException("只能暂停时更新任务",task);taskService.updateTaskInfo(task);log.info("更新任务{}", task);}/*** 启动暂停中任务* @param task 只使用name和group字段* @throws SchedulerException*/public void startTask(Task task) throws SchedulerException {Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());if(temp == null)throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task);if(!TaskStatus.PAUSE.getCode().equals(temp.getStatus()))throw new TaskRepositoryException("只能启动暂停中任务",task);taskPool.addTask(temp,scheduler);//添加任务池未有异常时持久化数据temp.setStatus(TaskStatus.RUNNING.getCode());taskService.updateTaskStatus(temp);log.info("启动任务{}", temp);}/*** 暂停运行中任务* @param task 只使用name和group字段* @throws SchedulerException*/public void pauseTask(Task task) throws SchedulerException {Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());if(temp == null)throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task);if(!TaskStatus.RUNNING.getCode().equals(temp.getStatus()))throw new TaskRepositoryException("只能暂停运行中任务",task);taskPool.pauseTask(temp,scheduler);//添加任务池未有异常时持久化数据temp.setStatus(TaskStatus.PAUSE.getCode());taskService.updateTaskStatus(temp);log.info("暂停任务{}", temp);}/*** 暂停暂停中任务* @param task 只使用name和group字段* @throws SchedulerException*/public void deleteTask(Task task) throws SchedulerException {Task temp = taskService.selectTaskByNameAndGroup(task.getTaskName(),task.getTaskGroup());if(temp == null)throw new TaskRepositoryException("不存在对应相同任务组和任务名任务",task);if(!TaskStatus.PAUSE.getCode().equals(temp.getStatus()))throw new TaskRepositoryException("只能删除暂停中任务",task);taskService.deleteTask(temp);log.info("删除任务{}", temp);}
}

最后:

回头发现无论是本科的软件体系结构课程还是研究生的体系结构居然都是最实用的课程,虽然两门课程最后大作业都很抽象,一个是分析区块链源码,一个是docker部署微服务项目,虽然本科上课一直摸鱼但还是有了解设计模式,这个模块就用到其中比如mvc,工厂模式等,研究生则让我接触了微服务,也算是帮我找了份实习。下一章会把这个模块整合到Base的微服务项目中,之后还会写一篇关于jsr303+springmvc的博客,还请多多支持!


http://www.ppmy.cn/server/171803.html

相关文章

数据集笔记:新加坡LTA MRT 车站出口、路灯 等位置数据集

1 MRT 车站出口 data.gov.sg &#xff08;geojson格式&#xff09; 1.1 kml格式 data.gov.sg 2 路灯 data.govsg ——geojson data.gov.sg——kml 版本 3 道路摄像头数据集 data.gov.sg 4 自行车道网络 data.gov.sg 5 学校区域 data.gov.sg 6 自行车停车架&#xff…

【Oracle专栏】Oracle 之 回收站

Oracle相关文档&#xff0c;希望互相学习&#xff0c;共同进步 风123456789&#xff5e;-CSDN博客 1.概述 回收站的全称叫&#xff1a;Tablespace Recycle Bin。 Oracle回收站是一个逻辑区域&#xff0c;Oracle并没有为它分配物理空间。当表被drop后&#xff0c;如果回收站中空…

WP 高级摘要插件:助力 WordPress 文章摘要精准自定义显示

wordpress插件介绍 “WP高级摘要插件”功能丰富&#xff0c;它允许用户在WordPress后台自定义文章摘要。 可设置摘要长度&#xff0c;灵活调整展示字数&#xff1b;设定摘要最后的显示字符&#xff0c; 如常用的省略号等以提示内容未完整展示&#xff1b;指定允许在摘要中显示…

python把html网页转换成pdf标题没有乱码,正文都乱码

在使用Python将HTML网页转换成PDF时&#xff0c;遇到标题没有乱码但正文乱码的问题&#xff0c;通常是由于字符编码处理不当或字体支持问题导致的。以下是一些可能的原因和解决方案&#xff1a; 原因分析 字符编码不匹配&#xff1a; HTML文件的编码与PDF转换工具或库所使用的…

高防IP能够给网站带来哪些好处?

随着网络攻击的复杂性和频繁攻击&#xff0c;企业需要保障自身网站和数据信息的安全性&#xff0c;确保业务可以持续稳定的运行&#xff0c;高防IP作为一种网络安全服务&#xff0c;可以帮助企业抵御一定的网络攻击&#xff0c;那么高防IP能够给网站带来哪些好处呢&#xff1f;…

解决“request returned Internal Server Error for API route and version xxx”错误

一、问题描述 ragflow/README_zh.md at main infiniflow/ragflowhttps://github.com/infiniflow/ragflow/blob/main/README_zh.md 当我们使用Docker部署ragflow,确认服务器状态时,提示“request returned Internal Server Error for API route and version http://%2F%2F.%…

C++ 设计模式 十二:责任链模式 (读书 现代c++设计模式)

责任链 文章目录 责任链场景指针链代理链总结**责任链模式的核心思想****何时需要使用责任链模式&#xff1f;****责任链模式解决的核心问题****与其他设计模式的协同使用****与其他模式的对比****经典应用场景****实现步骤与关键点****注意事项****总结** 今天是第十二种设计模…

python量化交易——金融数据管理最佳实践——使用qteasy管理本地数据源

文章目录 统一定义的金融历史数据表最重要的数据表数据表的定义交易日历表的定义&#xff1a;交易日历表: trade_calendar qteasy是一个功能全面且易用的量化交易策略框架&#xff0c; Github地址在这里。使用它&#xff0c;能轻松地获取历史数据&#xff0c;创建交易策略并完…