文章目录
- 【README】
- 【1】spring任务执行器(TaskExecutor)抽象
- 【1.1】TaskExecutor实现类(执行器)
- 【1.2】使用TaskExecutor代码实践
- 【2】spring任务调度抽象(TaskScheduler)
- 【2.1】Trigger触发器接口
- 【2.1.1】 Trigger实现类
- 【2.2】任务调度器(TaskScheduler)实现
- 【3】任务调度与任务异步执行的注解支持
- 【3.1】启用调度注解
- 【3.1.1】@Scheduled与@Async注解源码
- 【3.2】@Scheduled注解
- 【2.4.1】以固定频率执行调度任务
- 【2.4.2】以固定频率及初始延迟执行调度任务
- 【2.4.3】单次执行的任务设置初始延时
- 【2.4.4】使用cron表达式配置调度任务
- 【3.3】@Async注解
- 【2.5.1】@Async中执行器限定
- 【2.5.2】使用@Async进行异常管理
- 【4】xml配置文件task命名空间
- 【4.1】task:scheduler元素
- 【4.2】task:executor元素
- 【4.3】task:scheduled-tasks元素
- 【5】任务调度与任务异步执行代码实践
- 【5.1】任务调度代码实践
- 【5.1.1】task:scheduler元素创建调度器底层原理
- 【5.2】任务异步执行代码实践
- 【6】cron表达式(非常重要)
- 【6.1】cron表达式宏指令
- 【6.2】使用cron表达式执行调度任务代码实践
- 【7】使用 Quartz调度器
- 【7.1】使用JobDetailFactoryBean
- 【7.2】使用MethodInvokingJobDetailFactoryBean
- 【7.3】使用Trigger与SchedulerFactoryBean关联job
- 【7.4】 spirng集成Quartz调度器代码实践
- 【8】Quartz调度器与java调度线程池ScheduledExecutorService的区别(重要)
- 【8.1】ScheduledExecutorService调度任务回顾
- 【8.1.1】每2s执行1次但业务逻辑耗时5s
- 【8.1.2】每2s执行1次但业务逻辑耗时1s
【README】
本文内容总结自spring官方文档 spring integrates task execution and scheduling, 墙裂推荐;
本文代码,参见: github springDiscover repo 的【chapter31schedule】章节
spring框架使用 TaskExecutor 与 TaskScheduler 接口提供了对任务异步执行与调度的抽象;
springTaskExecutor_15">【1】spring任务执行器(TaskExecutor)抽象
1)执行器概念:执行器是JDK为线程池概念取的名字。执行器名字源于一个事实,即无法保证底层实现是一个池子。一个执行器可能是单线程甚至是同步执行的。spring的抽象隐藏了javase与Jakarta ee环境的细节。
spring的TaskExecutor接口与java.util.concurrent.Executor接口是相同的,前者继承后者。该接口有一个方法 (execute(Runnable task)
) 用于接收一个任务在线程池语义与配置下执行。
public interface TaskExecutor extends Executor {void execute(Runnable task);
}
2)起初,TaskExecutor是为了给spring组件提供线程池抽象而创建的,如ApplicationEventMulticaster, JMS’s AbstractMessageListenerContainer,以及Quartz集成都使用TaskExecutor对池化线程进行抽象。
3)所以:简单理解,spring的任务执行器TaskExecutor,就当做 jdk的Executor来使用 ;
【1.1】TaskExecutor实现类(执行器)
1)spring提供了多个TaskExecutor实现类。
- SyncTaskExecutor:同步运行任务;即每次调用发生在主线程;
- SimpleAsyncTaskExecutor:不会复用任何线程,每次调用都会开启新线程;(不推荐使用;线程新建与回收非常耗费资源)
- ConcurrentTaskExecutor: 它是 Executor实例的一个适配器;作为ThreadPoolTaskExecutor的替代品通过bean属性公开Executor配置参数;一般不选择ConcurrentTaskExecutor;然而,当ThreadPoolTaskExecutor不够灵活时,ConcurrentTaskExecutor可以是一个替代品;
- ThreadPoolTaskExecutor: 最通用的任务执行器(推荐);它公开bean属性以配置ThreadPoolExecutor,并包装ThreadPoolExecutor。如果你想要适配不同的Executor,推荐使用ConcurrentTaskExecutor;
- DefaultManagedTaskExecutor: 它在JSR-236兼容运行时环境中使用JNDI获得的ManagedExecutorService (如Jakarta ee应用服务器),目的是替换CommonJ WorkManager;
2)从6.1开始,ThreadPoolTaskExecutor通过spring生命周期管理提供了暂停与恢复功能,优雅关闭功能。
【SyncTaskExecutor】同步任务执行器;
public class SyncTaskExecutor implements TaskExecutor, Serializable {public SyncTaskExecutor() {}public void execute(Runnable task) {Assert.notNull(task, "Runnable must not be null");task.run();}
}
【SimpleAsyncTaskExecutor】简单异步任务执行器
- 继承CustomizableThreadCreator,顾名思义,要自行创建线程,所以本文不推荐SimpleAsyncTaskExecutor ;
- 实现AsyncListenableTaskExecutor,AsyncListenableTaskExecutor继承 AsyncTaskExecutor,AsyncTaskExecutor继承TaskExecutor;
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable, AutoCloseable {public static final int UNBOUNDED_CONCURRENCY = -1;public static final int NO_CONCURRENCY = 0;private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();@Nullableprivate VirtualThreadDelegate virtualThreadDelegate;@Nullableprivate ThreadFactory threadFactory;@Nullableprivate TaskDecorator taskDecorator;private long taskTerminationTimeout;@Nullableprivate Set<Thread> activeThreads;private volatile boolean active = true;// ...
}
【ConcurrentTaskExecutor】内部封装了jdk的Executor
public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {private static final Executor STUB_EXECUTOR = (task) -> {throw new IllegalStateException("Executor not configured");};@Nullableprivate static Class<?> managedExecutorServiceClass;private Executor concurrentExecutor; // 内部封装了jdk的Executor private TaskExecutorAdapter adaptedExecutor;@Nullableprivate TaskDecorator taskDecorator; public ConcurrentTaskExecutor(@Nullable Executor executor) {this.concurrentExecutor = STUB_EXECUTOR;this.adaptedExecutor = new TaskExecutorAdapter(STUB_EXECUTOR);if (executor != null) {this.setConcurrentExecutor(executor);}}// ......
}
【ThreadPoolTaskExecutor】线程池任务执行器, 封装了ThreadPoolExecutor
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {private final Object poolSizeMonitor = new Object();private int corePoolSize = 1;private int maxPoolSize = Integer.MAX_VALUE;private int keepAliveSeconds = 60;private int queueCapacity = Integer.MAX_VALUE;private boolean allowCoreThreadTimeOut = false;private boolean prestartAllCoreThreads = false;private boolean strictEarlyShutdown = false;@Nullableprivate TaskDecorator taskDecorator;@Nullableprivate ThreadPoolExecutor threadPoolExecutor;private final Map<Runnable, Object> decoratedTaskMap;public ThreadPoolTaskExecutor() {this.decoratedTaskMap = new ConcurrentReferenceHashMap(16, ReferenceType.WEAK);}// ......
}
【SchedulingTaskExecutor】调度任务执行器
public interface SchedulingTaskExecutor extends AsyncTaskExecutor {default boolean prefersShortLivedTasks() {return true;}
}
【AsyncTaskExecutor】异步任务执行器,继承自 TaskExecutor
public interface AsyncTaskExecutor extends TaskExecutor {// ...... default Future<?> submit(Runnable task) {FutureTask<Object> future = new FutureTask(task, (Object)null);this.execute(future);return future;}default <T> Future<T> submit(Callable<T> task) {FutureTask<T> future = new FutureTask(task);this.execute(future, Long.MAX_VALUE);return future;}default CompletableFuture<Void> submitCompletable(Runnable task) {return CompletableFuture.runAsync(task, this);}default <T> CompletableFuture<T> submitCompletable(Callable<T> task) {return FutureUtils.callAsync(task, this);}
}
【小结】任务执行器的类层次关系:(TaskExecutor继承jdk的Executor)
- SyncTaskExecutor实现了 TaskExecutor;
- ConcurrentTaskExecutor:实现了SchedulingTaskExecutor, SchedulingTaskExecutor继承自AsyncTaskExecutor, AsyncTaskExecutor继承自TaskExecutor;
- ThreadPoolTaskExecutor:实现了SchedulingTaskExecutor, …同上;
【1.2】使用TaskExecutor代码实践
【TaskExecutorExample】业务场景:异步打印消息
public class TaskExecutorExample {private class MessagePrinterTask implements Runnable {private String message;public MessagePrinterTask(String message) {this.message = message;}public void run() {System.out.println("线程id=" + Thread.currentThread().getId() + ", message=" + message);try {TimeUnit.MILLISECONDS.sleep(200);} catch (InterruptedException e) {throw new RuntimeException(e);}}}private TaskExecutor taskExecutor;public TaskExecutorExample(TaskExecutor taskExecutor) {this.taskExecutor = taskExecutor;}public void printMessages() {for(int i = 0; i < 25; i++) {taskExecutor.execute(new MessagePrinterTask("Message" + i));}}// main函数: 案例执行入口public static void main(String[] args) {ThreadPoolTaskExecutor threadPoolTaskExecutor = buildThreadPoolTaskExecutor();new TaskExecutorExample(threadPoolTaskExecutor).printMessages();threadPoolTaskExecutor.shutdown();}private static ThreadPoolTaskExecutor buildThreadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();// 设置属性前,必须先调用initialize()初始化Executor,实际底层调用 ThreadPoolTaskExecutor#initializeExecutor()threadPoolTaskExecutor.initialize();threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 等待任务执行完成后再关闭执行器threadPoolTaskExecutor.setCorePoolSize(2);threadPoolTaskExecutor.setMaxPoolSize(3);threadPoolTaskExecutor.setKeepAliveSeconds(0);threadPoolTaskExecutor.setQueueCapacity(100);return threadPoolTaskExecutor;}
}
【执行效果】
线程id=15, message=Message0
线程id=16, message=Message1
线程id=16, message=Message2
线程id=15, message=Message3
线程id=16, message=Message5
线程id=15, message=Message4
线程id=16, message=Message6
线程id=15, message=Message7
线程id=16, message=Message8
线程id=15, message=Message9
线程id=16, message=Message10
线程id=15, message=Message11
线程id=16, message=Message12
线程id=15, message=Message13
线程id=15, message=Message14
线程id=16, message=Message15
线程id=15, message=Message16
线程id=16, message=Message17
线程id=15, message=Message18
线程id=16, message=Message19
线程id=15, message=Message20
线程id=16, message=Message21
线程id=15, message=Message22
线程id=16, message=Message23
线程id=15, message=Message24
【代码解说】
- ThreadPoolTaskExecutor的setter方法设置的属性值: 可以在xml文件中通过bean属性进行设置;
- 把ThreadPoolTaskExecutor作为TaskExecutor,ThreadPoolTaskExecutor内部封装了ThreadPoolExecutor ;
springTaskScheduler_280">【2】spring任务调度抽象(TaskScheduler)
1)spring有一个TaskScheduler接口,定义了很多方法用于调度任务在未来某个时间点执行;
public interface TaskScheduler {default Clock getClock() {return Clock.systemDefaultZone();} ScheduledFuture<?> schedule(Runnable task, Trigger trigger); // 只有这一个方法带有 触发器Trigger ScheduledFuture<?> schedule(Runnable task, Instant startTime);ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period);ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay);ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay);// ...
}
2)上述方法中:
-
最简单的方法是名为schedule(Runnable,Instant)的方法,仅带有Runnable与Instant共计2个参数。该方法会使得任务在指定时间后运行1次。
-
而其他方法都可以调度任务重复执行。 固定执行频率或固定延迟时间适用于简单,周期性执行的场景;但接收Trigger参数的方法会更加灵活;
- Trigger触发器:定义任务执行的触发规则 ;
【2.1】Trigger触发器接口
1)trigger的基本思想:任务执行时间根据过去的执行结果或任意条件来决定;如果任务触发条件需要考虑前一次执行结果,上一次执行的结果信息可以从TriggerContext触发器上下文中获取到。
2)Trigger接口如下:(Trigger接口有一个重要方法:nextExecution(TriggerContext))
public interface Trigger {/** @deprecated */@Deprecated(since = "6.0" )@Nullabledefault Date nextExecutionTime(TriggerContext triggerContext) {Instant instant = this.nextExecution(triggerContext);return instant != null ? Date.from(instant) : null;}@NullableInstant nextExecution(TriggerContext triggerContext);
}
TriggerContext是最重要的部分,它封装了所有的相关数据以及对扩展开放。
public interface TriggerContext {default Clock getClock() {return Clock.systemDefaultZone();}@NullableInstant lastScheduledExecution();@NullableInstant lastActualExecution();@NullableInstant lastCompletion();
}
【2.1.1】 Trigger实现类
1)spring提供了两种Trigger实现类:
- CronTrigger:Cron表达式触发器;它使用Cron表达式调度任务;
- PeriodicTrigger: 周期性触发器;接受固定周期参数,可选初始延迟时间,以及一个bool值用于指定周期应该被解释为固定频率还是固定延迟。
- 你可以在依赖Trigger的组件中使用PeriodicTrigger;
【2.2】任务调度器(TaskScheduler)实现
1)TaskScheduler的主要好处是:应用程序的调度需求与部署环境相分离;
- 当把应用部署到服务器环境,这种抽象分离非常重要。因为应用本身不应该直接创建线程; 在这种场景下,spring在JakartaEE环境提供的DefaultManagedTaskScheduler把任务调度委派给JSR-236的ManagedScheduledExecutorService ;
2)当不需要外部线程管理时,一个简单的替代方案是在应用程序本地安装ScheduledExecutorService ,它可以通过spring的ConcurrentTaskScheduler进行适配;为了简单,spring同时也提供了ThreadPoolTaskScheduler,ThreadPoolTaskScheduler内部把任务调度委派给ScheduledExecutorService,以提供与ThreadPoolTaskScheduler类似的通用bean样式的配置 。这些工作能够很好应用在本地嵌入式线程池设置。
3)从spring6.1开始, ThreadPoolTaskScheduler根据spring生命周期管理提供了暂停及恢复功能,优雅关闭功能。
- 这里有一个叫做SimpleAsyncTaskScheduler的新选项,这与jdk21的虚拟线程是一致的,使用单个调度器线程,但每次调度任务执行都开启一个新线程(每次执行都新建线程,不推荐);
4)总结:任务调度器实现类
- DefaultManagedTaskScheduler;
- ThreadPoolTaskScheduler;
- ConcurrentTaskScheduler
- SimpleAsyncTaskScheduler:
【DefaultManagedTaskScheduler】默认管理的任务调度器,继承自 ConcurrentTaskScheduler并发任务调度器;
public class DefaultManagedTaskScheduler extends ConcurrentTaskScheduler implements InitializingBean {private final JndiLocatorDelegate jndiLocator = new JndiLocatorDelegate();private String jndiName = "java:comp/DefaultManagedScheduledExecutorService";public DefaultManagedTaskScheduler() {super((ScheduledExecutorService)null);}// ... public void afterPropertiesSet() throws NamingException {ScheduledExecutorService executor = (ScheduledExecutorService)this.jndiLocator.lookup(this.jndiName, ScheduledExecutorService.class);this.setConcurrentExecutor(executor);this.setScheduledExecutor(executor);}
}
【ThreadPoolTaskScheduler】线程池任务调度器 ( 封装了ScheduledExecutorService调度线程池实例,实现SchedulingTaskExecutor调度任务执行器 与 TaskScheduler任务调度器 )
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler { @Nullable private ScheduledExecutorService scheduledExecutor;
// ....public ThreadPoolTaskScheduler() { this.listenableFutureMap = new ConcurrentReferenceHashMap(16, ReferenceType.WEAK);}// ...
}
【ConcurrentTaskScheduler】 并发任务调度器,继承ConcurrentTaskExecutor并发任务执行器( 封装了ScheduledExecutorService调度线程池实例) ;
public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements TaskScheduler {@Nullableprivate ScheduledExecutorService scheduledExecutor;// ... public ConcurrentTaskScheduler(@Nullable ScheduledExecutorService scheduledExecutor) {super(scheduledExecutor);if (scheduledExecutor != null) {this.initScheduledExecutor(scheduledExecutor);}}public ConcurrentTaskScheduler(Executor concurrentExecutor, ScheduledExecutorService scheduledExecutor) {super(concurrentExecutor);this.initScheduledExecutor(scheduledExecutor);}private void initScheduledExecutor(ScheduledExecutorService scheduledExecutor) {this.scheduledExecutor = scheduledExecutor;this.enterpriseConcurrentScheduler = managedScheduledExecutorServiceClass != null && managedScheduledExecutorServiceClass.isInstance(scheduledExecutor);}//...
}
【SimpleAsyncTaskScheduler】简单异步任务调度器
public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements TaskScheduler, ApplicationContextAware, SmartLifecycle, ApplicationListener<ContextClosedEvent> { @Nullableprivate Executor targetTaskExecutor;
// ... public SimpleAsyncTaskScheduler() {this.lifecycleDelegate = new ExecutorLifecycleDelegate(this.scheduledExecutor);this.clock = Clock.systemDefaultZone();this.phase = Integer.MAX_VALUE;}// ...
}
【3】任务调度与任务异步执行的注解支持
1)区别开 任务调度 与 任务异步执行 这两者不同概念 ;
- 任务调度:有一定触发规则,可以执行1次,也可以重复执行多次;
- 任务异步执行:在异步线程中仅执行1次;
【3.1】启用调度注解
1)为了启用@Scheduled与@Async注解,你可以把@EnableScheduling 与 @EnableAsync 添加到被@Configuration标注的类上,如下所示。
- 你可以选择相关联的注解,不用所有业务场景都使用@EnableScheduling 与 @EnableAsync注解。 如只需要支持@Scheduled注解,可以移除@EnableAsync注解;
- 为了精细化控制,你可以额外实现SchedulingConfigurer接口,AsyncConfigurer接口。
@Configuration
@EnableAsync
@EnableScheduling
public class AppConfig {
}
2)如果你偏爱xml配置,可以使用 <task:annotation-driven> 元素,如下。
<task:annotation-driven executor="myExecutor" scheduler="myScheduler"/>
<task:executor id="myExecutor" pool-size="5"/>
<task:scheduler id="myScheduler" pool-size="10"/>
- 上述xml配置提供的执行器myExecutor(引用其他bean)用来处理任务(被@Async标注的方法,即方法就是任务,参考下Runnable接口的execute方法),调度器myScheduler(引用其他bean)用来管理任务(被@Scheduled标注的方法,方法就是任务)。
【提示】
处理@Async注解的默认建议模式是代理,它允许通过代理拦截所有的方法调用,但同一个类中的本地调用无法被拦截 。对于更高级的拦截模式,考虑切换到 aspectj 模式并结合编译时或加载时织入。
【3.1.1】@Scheduled与@Async注解源码
【@Scheduled注解】 可以标注方法或者注解(无法标注类)
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
@Reflective
public @interface Scheduled {String CRON_DISABLED = "-";String cron() default "";String zone() default "";long fixedDelay() default -1L;String fixedDelayString() default "";long fixedRate() default -1L;String fixedRateString() default "";long initialDelay() default -1L;String initialDelayString() default "";TimeUnit timeUnit() default TimeUnit.MILLISECONDS;String scheduler() default "";
}
【@Async注解】 可以标注类与方法
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Reflective
public @interface Async {String value() default "";
}
【3.2】@Scheduled注解
1)@Scheduled注解:可以带上Trigger元数据(定义触发规则)作为参数标注方法。
【例】下面的方法每5秒触发一次,延迟时间固定; 这意味着执行周期从上一个任务完成时间算起;
@Scheduled(fixedDelay = 5000) // 时间单位默认毫秒 (fixedDelay = 5000就是触发器Trigger规则)
public void doSomething() {// something that should run periodically
}@Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS) // 也可以设置单位为秒
public void doSomething() {// something that should run periodically
}
【2.4.1】以固定频率执行调度任务
1)在注解中使用fixedRate属性设定固定频率执行;
下面例子中,每5秒触发1次(对相邻两次任务执行的开始时间进行测量)
@Scheduled(fixedRate = 5, timeUnit = TimeUnit.SECONDS)
public void doSomething() {// something that should run periodically
}
【2.4.2】以固定频率及初始延迟执行调度任务
1)可以同时在注解中指定固定频率fixedRate与初始延迟initialDelay;
- 初始延迟initialDelay指的是:方法第一次执行前需要等待的时间;
@Scheduled(initialDelay = 1000, fixedRate = 5000)
public void doSomething() {// something that should run periodically
}
【2.4.3】单次执行的任务设置初始延时
1)对于单次执行的任务,你可以仅指定初始延时,表明方法执行前需要等待的时间;
@Scheduled(initialDelay = 1000)
public void doSomething() {// something that should run only once
}
【2.4.4】使用cron表达式配置调度任务
@Scheduled(cron="*/5 * * * * MON-FRI") // 该方法(任务)每5秒执行1次,且仅限于周一到周五
public void doSomething() {// something that should run on weekdays only
}
【注意】
- 注意1)注意到以上调度方法的返回类型必须是void,且不接受任何入参;如果这些方法需要与其他对象交互,通常是通过依赖注入提供这些对象;
- 注意2)@Scheduled可以被作为重复注解使用;如果多个@Scheduled声明在同一个方法,则每一个@Scheduled都会使用各自的触发器独立运行。这会导致该方法被并行执行多次。请确保你指定的cron表达式不会发生重叠。
【3.3】@Async注解
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Reflective
public @interface Async {String value() default "";
}
1)@Async:@Async标注的方法(简称@Async方法)被调用时会触发异步执行;
- 即当调用者调用@Async标注的方法时,调用者会立即返回,@Async方法实际上会在被提交给spring TaskExecutor的任务中执行;
@Async
void doSomething() {// this will be run asynchronously
}
2) @Async标注的方法可以带参数,因为@Async方法是由调用者在运行时的正常调用,不像 @Schedule任务那样被容器管理;
@Async
void doSomething(String s) {// this will be run asynchronously
}
3)即使方法有返回值,也可以被异步调用; 然后,这种方法必须是 Future前缀类型的返回值。调用者可以在调用 Future#get()方法前执行其他任务。
- @Async标注方法的返回类型不仅可以声明为java.util.concurrent.Future,还可以声明为org.springframework.util.concurrent.ListenableFuture,或者是jdk8的java.util.concurrent.CompletableFuture。
@Async
Future<String> returnSomething(int i) {// this will be run asynchronously
}
4)不能将@Async与生命周期回调如@PostConstruct一起使用。
- 为了异步初始化spring bean,必须使用一个不同的初始化方法,然后在目标对象上调用@Aync标注的方法;
public class SampleBeanImpl implements SampleBean {@Asyncvoid doSomething() {// ...}
}public class SampleBeanInitializer {private final SampleBean bean;public SampleBeanInitializer(SampleBean bean) {this.bean = bean;}@PostConstructpublic void initialize() {bean.doSomething();}}
【提示】spring没有提供与@Async注解对等的xml配置。因为@Async方法首先应该被设计为异步执行,而不是在外部重新声明异步。然而,你可以使用spring aop手工设置spring的AsyncExecutionInterceptor。
【2.5.1】@Async中执行器限定
1)默认情况下,当使用@Async注解方法时,使用的默认执行器是启用async时配置的执行器,如“annotation-driven元素或者AsyncConfigurer 实现类。我们可以通过给@Async的value属性赋值来指定执行器(即限定@Async注解使用的执行器)。
@Async("otherExecutor") // otherExecutor 就是spirng容器中执行器bean的名字
void doSomething(String s) {// this will be run asynchronously by "otherExecutor"
}
【2.5.2】使用@Async进行异常管理
1)当 @Async标注的方法有Future类型的返回值,这种方法的异常很好管理,因为它的异常可以通过调用Future.get()方法捕获;
- 但是对于返回值是void类型的@Async方法,主线程(或调用方)是无法捕获到异常的;你可以提供一个异常处理器AsyncUncaughtExceptionHandler来处理这种异常 ;
- 可以使用AsyncConfigurer 或 <task:annotation-driven/> xml元素来配置自定义的AsyncUncaughtExceptionHandler;
public class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {// handle exception}
}
【4】xml配置文件task命名空间
1)从spring3.0开始,spring就提供了配置 TaskExecutor与TaskScheduler的xml命名空间,允许开发者以便捷的方式配置带触发器的调度任务。
【4.1】task:scheduler元素
1)以下xml元素创建了一个ThreadPoolTaskScheduler实例;
- task:scheduler元素的id属性:会作为线程池创建的线程名前缀;
- 如果不提供poo-size,则默认为1,即单线程;(task:scheduler元素只有2个配置属性)
<task:scheduler id="scheduler" pool-size="10"/>
【4.2】task:executor元素
1)以下xml元素创建了一个 ThreadPoolTaskExecutor 示例;
<task:executor id="executor" pool-size="10"/>
2)task:executor元素的其他属性
<task:executorid="executorWithPoolSizeRange"pool-size="5-25" <!-- 线程数量范围,最小5,最大25(即核心线程数=5,最大线程数=25) ;若pool-size配置为1值,则核心线程数与最大线程数相等 -->queue-capacity="100"/> <!-- 阻塞队列大小 -->
3)ThreadPoolTaskExecutor处理任务的逻辑是(线程池运行任务的步骤):
- 当活跃线程数小于核心线程数,则使用空闲线程执行提交的任务;
- 如果核心线程数满了(活跃线程数等于核心线程数),则把任务添加到阻塞队列,只要阻塞队列没有满;
- 若阻塞队列满了,且活跃线程数没有达到最大线程数,则创建非核心线程执行任务;
- 当活跃线程数达到最大线程数,则ThreadPoolTaskExecutor拒绝任务的提交(调用阻塞策略);
- 若阻塞队列满了,且活跃线程数没有达到最大线程数,则创建非核心线程执行任务;
- 问题:默认情况下,阻塞队列是无限大的;这会造成如下问题。(所以实际开发中必须配置阻塞队列的队列大小)
- 问题1: 阻塞队列无限大, 则任务可以无限提交,任务就会无限占用系统内容, 可能导致oom;
- 问题2:阻塞队列无限大,则最大线程数参数设置没有意义,即永远不会创建非核心线程;
4)带拒绝策略的task:executor 元素:(jdk线程池有4种拒绝策略,包括 AbortPolicy,DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy)
<task:executorid="executorWithCallerRunsPolicy"pool-size="5-25"queue-capacity="100"rejection-policy="CALLER_RUNS"/> <!-- 枚举类型,拒绝策略,CALLER_RUNS表示由调用方(主线程)执行提交的任务;异步变同步了 -->
5)task:executor元素配置keep-alive属性: keep-alive属性设置非核心线程没有执行任务的最大存活时间;即在keep-alive时间内,非核心线程没有执行任务则被销毁;
<task:executorid="executorWithKeepAlive"pool-size="5-25"keep-alive="120"/>
【4.3】task:scheduled-tasks元素
1)spring任务命名空间最强大的功能:是支持在spring容器中配置调度任务。
- task:scheduled-tasks元素的ref属性可以指定spring管理的对象;method属性提供了该对象被调用的方法名
<task:scheduled-tasks scheduler="myScheduler"><task:scheduled ref="beanA" method="methodA" fixed-delay="5000"/>
</task:scheduled-tasks><task:scheduler id="myScheduler" pool-size="10"/>
2)调度器引用自外部元素,调度器中每个任务都包含各自的触发器元数据配置;
3)触发器属性如下:
- fixed-delay(固定延迟):在上面的例子中,元数据定义了带有固定延迟的周期性触发器,该固定延迟表明每个任务执行完后,下一个任务等待多长时间(单位毫秒)开始执行;
- fixed-rate(固定频率):表明方法多久运行1次,无论上一个任务执行耗时多长时间;
- initial-delay(初始化延迟): 表明方法第一次执行前需要等待多长时间(单位毫秒);
- cron表达式:若需要定义更多灵活的规则,可以通过cron属性提供cron表达式;
<task:scheduled-tasks scheduler="myScheduler"><task:scheduled ref="beanA" method="methodA" fixed-delay="5000" initial-delay="1000"/><task:scheduled ref="beanB" method="methodB" fixed-rate="5000"/><task:scheduled ref="beanC" method="methodC" cron="*/5 * * * * MON-FRI"/>
</task:scheduled-tasks><task:scheduler id="myScheduler" pool-size="10"/>
【5】任务调度与任务异步执行代码实践
【5.1】任务调度代码实践
【SpringOfficialTaskScheduleMain】
public class SpringOfficialTaskScheduleMain {public static void main(String[] args) {ClassPathXmlApplicationContext springContainer =new ClassPathXmlApplicationContext("chapter31schedule/spring-official-schedule.xml");springContainer.registerShutdownHook();}
}
【spring-official-schedule.xml】
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:task="http://www.springframework.org/schema/task"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd" ><!-- 注册bean到spring容器,一般该bean中被@Scheduled标注的方法被调度执行 --><bean class="com.tom.springnote.chapter31schedule.springofficial.task.TomDiyScheduleTask"/><!-- task:annotation-driven元素 开启对@Async与@Scheduled注解标注的springbean的探测如果存在被@Async与@Scheduled标注的类或方法,则spring生成一个异步执行被标注方法的代理 --><task:annotation-driven scheduler="myScheduler"/><!-- task:scheduler元素 定义一个 ThreadPoolTaskScheduler 类型的线程池任务调度器实例--><!-- 注册 ThreadPoolTaskScheduler 实例,不一定用 task:scheduler元素,可以自定义 --><task:scheduler id="myScheduler" pool-size="10" /></beans>
【TomDiyTask】 定义一个任务类, 使用@Scheduled标注调度方法,以便task:annotation-driven元素可以探测到被@Scheduled标注的方法,并生成一个代理对象异步执行调度方法
public class TomDiyScheduleTask {@Scheduled(fixedRate = 2000, initialDelay = 3000)public void diySchedule() {System.out.printf("TomDiyTask#diySchedule(): 线程id=%s 当前时间=%s \n", Thread.currentThread().getId(), BusiDatetimeUtils.getNowText());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
【执行结果】
TomDiyTask#diySchedule(): 线程id=15 当前时间=2024-10-20 08:05:33.311
TomDiyTask#diySchedule(): 线程id=15 当前时间=2024-10-20 08:05:36.329
TomDiyTask#diySchedule(): 线程id=19 当前时间=2024-10-20 08:05:39.340
TomDiyTask#diySchedule(): 线程id=15 当前时间=2024-10-20 08:05:42.343
TomDiyTask#diySchedule(): 线程id=20 当前时间=2024-10-20 08:05:45.357
【有个问题】调度触发器规则为:fixedRate = 2000, initialDelay = 3000;表示初始延迟3s执行,每个2s重复执行;为什么上述执行结果是每3s执行1次呢?
- 因为业务逻辑diySchedule()方法内部睡眠了3s模拟业务逻辑处理;即无论是固定频率还是固定延时的重复调度任务,若任务执行耗时大于频率间隔或延时,则下一个任务需要等待上一个任务执行完成后才开始执行;(上述代码中,任务执行耗时=3s,频率间隔=fixedRate=2s;所以设置为每2s执行1次,但实际上3s执行1次) ;
【5.1.1】task:scheduler元素创建调度器底层原理
1)上述代码中,spring-official-schedule.xml中task:scheduler元素如下:
<task:scheduler id="myScheduler" pool-size="10" />
查看xsd文件中task:schedule元素描述如下。
Defines a ThreadPoolTaskScheduler instance with configurable pool size. See Javadocfor the org.springframework.scheduling.annotation.EnableScheduling annotation forinformation on a code-based alternative to this XML element.
2)意思是该元素task:scheduler定义了带有可配置线程池大小的ThreadPoolTaskScheduler实例到spring容器。
3) 通过查看ThreadPoolTaskScheduler源码可知: ThreadPoolTaskScheduler内部封装了jdk的ScheduledExecutorService,即实际上是把任务委派给ScheduledExecutorService(jdk调度线程池)执行;ScheduledExecutorService如何实例化 ?
【ThreadPoolTaskScheduler】
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {this.scheduledExecutor = this.createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler); // this one ScheduledExecutorService var4 = this.scheduledExecutor;if (var4 instanceof ScheduledThreadPoolExecutor threadPoolExecutor) {if (this.removeOnCancelPolicy) {threadPoolExecutor.setRemoveOnCancelPolicy(true);}if (this.continueExistingPeriodicTasksAfterShutdownPolicy) {threadPoolExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);}if (!this.executeExistingDelayedTasksAfterShutdownPolicy) {threadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);}}return this.scheduledExecutor;}protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {// 创建调度线程池执行器 return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler) {protected void beforeExecute(Thread thread, Runnable task) {ThreadPoolTaskScheduler.this.beforeExecute(thread, task);}protected void afterExecute(Runnable task, Throwable ex) {ThreadPoolTaskScheduler.this.afterExecute(task, ex);}};}//...
}
【ScheduledThreadPoolExecutor】 由ScheduledThreadPoolExecutor构造器代码可知,最大线程数传入的是Integer最大值, 阻塞队列传入的是延时工作队列;因为DelayedWorkQueue是无线大的队列,所以最大线程数不会用到,传入Integer.MAX_VALUE是没有问题的 。
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, // 最大线程数传入的是Integer最大值;DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue(), threadFactory, handler); // 阻塞队列传入的是延时工作队列 } // ...
}
【DelayedWorkQueue】延迟工作队列底层原理是堆数据结构,底层是数组。 可以自动扩容,所以队列长度无限大。
static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private final ReentrantLock lock = new ReentrantLock();private int size; /*** Resizes the heap array. Call only when holding lock.*/private void grow() {int oldCapacity = queue.length;int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%if (newCapacity < 0) // overflownewCapacity = Integer.MAX_VALUE;queue = Arrays.copyOf(queue, newCapacity);} }
【5.2】任务异步执行代码实践
【SpringOfficialTaskAsyncMain】
public class SpringOfficialTaskAsyncMain {public static void main(String[] args) {ClassPathXmlApplicationContext springContainer =new ClassPathXmlApplicationContext("chapter31schedule/spring-official-async.xml");TomDiyAsyncTask tomDiyAsyncTask = springContainer.getBean(TomDiyAsyncTask.class);tomDiyAsyncTask.asyncDoBusi();System.out.println("main(): 线程id=" + Thread.currentThread().getId());// 关闭线程池ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) springContainer.getBean("myExecutor");threadPoolTaskExecutor.shutdown();springContainer.registerShutdownHook();}
}
【spring-official-async.xml】
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:task="http://www.springframework.org/schema/task"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd" ><!-- 注册bean到spring容器,一般该bean中被@Scheduled标注的方法被调度执行 --><bean id="tomDiyAsyncTask" class="com.tom.springnote.chapter31schedule.springofficial.task.TomDiyAsyncTask"/><!-- task:annotation-driven元素 开启对@Async与@Scheduled注解标注的springbean的探测如果存在被@Async与@Scheduled标注的类或方法,则spring生成一个异步执行被标注方法的代理 --><task:annotation-driven executor="myExecutor" /><!--task:executor元素 定义一个 ThreadPoolTaskExecutor 类型的线程池任务执行器实例--><task:executor id="myExecutor" pool-size="5" queue-capacity="100" keep-alive="0" rejection-policy="ABORT" /></beans>
【TomDiyAsyncTask】
public class TomDiyAsyncTask {@Async("myExecutor")public void asyncDoBusi() {System.out.printf("TomDiyAsyncTask#asyncDoBusi(): 线程id=%s 线程name=%s 当前时间=%s \n",Thread.currentThread().getId(),Thread.currentThread().getName(),BusiDatetimeUtils.getNowText());}
}
【执行结果】
main(): 线程id=1
TomDiyAsyncTask#asyncDoBusi(): 线程id=15 线程name=myExecutor-1 当前时间=2024-10-20 08:01:36.605
【6】cron表达式(非常重要)
1)所有spring的cron表达式都必须遵循统一格式,无论使用@Schedule注解,还是使用task:scheduled-tasks元素;
- 一个良好格式的cron表达式,如 * * * * * * ,包含6个空格分隔的时间与日期字段,每个都有它的取值范围。
┌───────────── second (0-59)│ ┌───────────── minute (0 - 59)│ │ ┌───────────── hour (0 - 23)│ │ │ ┌───────────── day of the month (1 - 31) // 月中某天字段 │ │ │ │ ┌───────────── month (1 - 12) (or JAN-DEC)│ │ │ │ │ ┌───────────── day of the week (0 - 7) // 星期几字段(或周中某天)│ │ │ │ │ │ (0 or 7 is Sunday, or MON-SUN)│ │ │ │ │ │* * * * * *
2)cron表达式遵循的规则:
- 规则1:字段可以是星号(*),表示最小值-最大值(如对于秒字段,*等于0-59); 对于月中某天或星期几字段,可以使用问号 ? 替代星号(*) ;
- 规则2:逗号用于分隔列表项(如 0-7 可以写成 0,1,2,3,4,5,6,7)
- 规则3:横杠分隔的两个数字表示一个范围,且包含边界;(如 0-7 表示大于等于0,且小于等于7的范围)
- 规则4:在一个范围(或星号)后面跟上斜杠(/) :表明时间间隔;(即多久执行1次任务)(如 */5 * * * * * 表示每5秒执行1次)
- 规则5:英文名称可以用于设置月份字段与星期几字段;使用月份或星期几的前3个字母表示(没有大小写要求); (如 * * */1 * * MON 表示每周一每小时执行1次任务)
- 规则6:月中某天或星期几字段可以包含L字符;
- 月中某天字段包含L字符: L字符表示每月的最后一天;若L字符后面跟着负数偏移量 (如 L-n),表示每月的倒数第n天;
- 星期几字段包含L字符:L字符表示每周的最后一天(即周日);如果L字符前面带有数字前缀或3个字母前缀(如dL或DDDL),表示每个月最后一个星期d(或DDD);
- 规则7:月中某天字段可以是nW,表示距离该月第n日最近的工作日;如第n日是周六,则返回周五;如第n日是周日,则返回周一;
- 规则8:如果月中某天字段是LW: 表示该月的最后一个工作日;
- 规则9: 星期几字段可以是d#n(或者 DDD#n),表示每月第d周的星期n(或DDD);
3)cron表达式例子:
0 0 * * * * | 每天每小时执行1次 |
---|---|
*/10 * * * * * | 每10秒执行1次 |
0 0 8-10 * * * | 每天8点,9点,10点执行1次 |
0 0 6,19 * * * | 每天6点,19点执行1次 |
0 0/30 8-10 * * * | 每天8点到10点,每隔30分钟执行1次 |
0 0 9-17 * * MON-FRI | 周一到周五,每天9点,10点,11点,12点,13点,14点,15点,16点,17点执行; |
0 0 0 25 DEC ? | 每年12月25日零点执行1次 |
0 0 0 L * * | 每个月的最后一天零点执行1次 |
0 0 0 L-3 * * | 每个月倒数第3天零点执行1次 |
0 0 0 * * 5L | 每个月最后一个星期五的零点执行1次 |
0 0 0 * * THUL | 每个月最后一个星期五的零点执行1次 |
0 0 0 1W * * | 每个月第1个工作日后执行1次 |
0 0 0 LW * * | 每个月最后1个工作日执行1次 |
0 0 0 ? * 5#2 | 每个月第2个星期五的零点执行1次 |
0 0 0 ? * MON#1 | 每个月第1个星期一的零点执行1次 |
【6.1】cron表达式宏指令
1)背景: cron表达式如 0 0 * * * * 不好理解;为便于可读性,spring支持以下宏指令。可以使用宏指令而不是6个数字定义任务触发规则,如 @Scheduled(cron = “@hourly”)
Macro | 表达含义 |
---|---|
@yearly (or @annually ) | 每年1次 (0 0 0 1 1 * ) |
@monthly | 每月1次 (0 0 0 1 * * ) |
@weekly | 每周1次 (0 0 0 * * 0 ) |
@daily (or @midnight ) | 每天1次 (0 0 0 * * * ) |
@hourly | 每小时1次, (0 0 * * * * ) |
【6.2】使用cron表达式执行调度任务代码实践
【TomDiyScheduleTask】
public class TomDiyScheduleTask {@Scheduled(fixedRate = 2000, initialDelay = 3000) // 固定每2s执行1次,初始延时3s public void diySchedule() {System.out.printf("TomDiyTask#diySchedule(): 线程id=%s 当前时间=%s \n", Thread.currentThread().getId(), BusiDatetimeUtils.getNowText());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}}@Scheduled(cron = "*/10 * * * * *") // cron表达式,每10s执行1次 public void diySchedule2() {System.out.printf("TomDiyTask#diySchedule2(): 线程id=%s 当前时间=%s \n", Thread.currentThread().getId(), BusiDatetimeUtils.getNowText());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
【执行效果】
TomDiyTask#diySchedule(): 线程id=15 当前时间=2024-10-20 10:51:24.081
TomDiyTask#diySchedule(): 线程id=16 当前时间=2024-10-20 10:51:27.095
TomDiyTask#diySchedule2(): 线程id=15 当前时间=2024-10-20 10:51:30.002 // cron表达式触发调度任务
TomDiyTask#diySchedule(): 线程id=20 当前时间=2024-10-20 10:51:30.109
TomDiyTask#diySchedule(): 线程id=21 当前时间=2024-10-20 10:51:33.122
TomDiyTask#diySchedule(): 线程id=22 当前时间=2024-10-20 10:51:36.137
TomDiyTask#diySchedule(): 线程id=16 当前时间=2024-10-20 10:51:39.141
TomDiyTask#diySchedule2(): 线程id=20 当前时间=2024-10-20 10:51:40.003 // cron表达式触发调度任务
TomDiyTask#diySchedule(): 线程id=23 当前时间=2024-10-20 10:51:42.149
TomDiyTask#diySchedule(): 线程id=24 当前时间=2024-10-20 10:51:45.164
TomDiyTask#diySchedule(): 线程id=25 当前时间=2024-10-20 10:51:48.177
TomDiyTask#diySchedule2(): 线程id=21 当前时间=2024-10-20 10:51:50.004
TomDiyTask#diySchedule(): 线程id=25 当前时间=2024-10-20 10:51:51.186
【7】使用 Quartz调度器
1)quartz使用Trigger,Job,JobDetail 对象实现调度各种任务。 spring提供了很多类简化了在spring应用中使用Quartz的开发成本;quartz参见: spring集成quartz
2)
【7.1】使用JobDetailFactoryBean
1)JobDetail:包含了job运行的所有信息。spring提供了JobDetailFactoryBean , 以便在xml中以bean样式属性注册。
【spring-official-quartz.xml】 根据JobDetailFactoryBean的定义,jobClass必须是Job的子类 ;
<!-- 使用 JobDetailFactoryBean 注册 JobDetail实例;其中DiyQuartzJob必须是Job的子类 --><bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"><property name="jobClass" value="com.tom.springnote.chapter31schedule.springofficial.quartz.DiyQuartzJob" /></bean>
【DiyQuartzJob】 自定义quartz作业
public class DiyQuartzJob extends QuartzJobBean {@Overrideprotected void executeInternal(JobExecutionContext context) throws JobExecutionException {System.out.printf("DiyQuartzJob#executeInternal(): 线程id=%s 当前时间=%s \n", Thread.currentThread().getId(), BusiDatetimeUtils.getNowText());try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
【QuartzJobBean】
public abstract class QuartzJobBean implements Job {public QuartzJobBean() {}public final void execute(JobExecutionContext context) throws JobExecutionException {try {BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);MutablePropertyValues pvs = new MutablePropertyValues();pvs.addPropertyValues(context.getScheduler().getContext());pvs.addPropertyValues(context.getMergedJobDataMap());bw.setPropertyValues(pvs, true);} catch (SchedulerException var4) {throw new JobExecutionException(var4);}this.executeInternal(context);}protected abstract void executeInternal(JobExecutionContext context) throws JobExecutionException;
}
【7.2】使用MethodInvokingJobDetailFactoryBean
1)通常你仅需要调用特定对象的方法。使用MethodInvokingJobDetailFactoryBean可以实现这一点。
- 通过使用MethodInvokingJobDetailFactoryBean,你无需创建仅调用方法的作业;你仅需要创建实际的业务对象及关联的详细信息。
<bean id="jobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"><property name="targetObject" ref="exampleBusinessObject"/><property name="targetMethod" value="doIt"/>
</bean>
2)MethodInvokingJobDetailFactoryBean的并发问题:默认情况下,Quartz作业是无状态的,这可能会导致作业之间相互影响; 如果你为同一个JobDetail指定2个Trigger触发器,可能的情况是第2个触发器的作业在第1个触发器作业执行完成前就开始执行。即导致2个触发器的作业同时执行。 如果JobDetail类实现了Stateful接口,上述情况不会发生,即第2个job在第1个job执行完成前不会执行。
- 解决方法:为使得MethodInvokingJobDetailFactoryBean创建的作业不并发执行,设置concurrent为false,如下。
<bean id="jobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"><property name="targetObject" ref="exampleBusinessObject"/><property name="targetMethod" value="doIt"/><property name="concurrent" value="false"/>
</bean>
【小提示】 默认情况下,作业是以并发方式执行。
【7.3】使用Trigger与SchedulerFactoryBean关联job
1)我们需要使用Trigger触发器与SchedulerFactoryBean调度任务本身。 spring提供了2个Quartz FactoryBean的实现类 。
- CronTriggerFactoryBean :cron表达式触发器工厂bean;
- SimpleTriggerFactoryBean:简单触发器工厂bean;
2)spring提供了一个SchedulerFactoryBean,公开其触发器以便作为属性进行设置;SchedulerFactoryBean使用这些触发器调度实际任务;
<bean id="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean"><!-- see the example of method invoking job above --><property name="jobDetail" ref="jobDetail"/><!-- 10 seconds --><property name="startDelay" value="10000"/><!-- repeat every 50 seconds --><property name="repeatInterval" value="50000"/>
</bean><bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"><property name="jobDetail" ref="exampleJob"/><!-- run every morning at 6 AM --><property name="cronExpression" value="0 0 6 * * ?"/>
</bean><!-- 使用触发器列表装配SchedulerFactoryBean -->
<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"><property name="triggers"><list><ref bean="cronTrigger"/><ref bean="simpleTrigger"/></list></property>
</bean>
【注意】SchedulerFactoryBean的注意事项
- SchedulerFactoryBean可以识别classpath路径下的名为 quartz.properties文件,该文件基于quartz属性键,如常规quartz配置一样。
- 请注意:SchedulerFactoryBean的许多属性可以与quartz配置文件中普通Quartz属性相互影响,因此不推荐在两边都指定属性值(要么在spring中指定SchedulerFactoryBean属性值,要么指定普通Quartz属性值)。
【7.4】 spirng集成Quartz调度器代码实践
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:task="http://www.springframework.org/schema/task"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd" ><!-- 使用 JobDetailFactoryBean 注册 JobDetail实例;其中DiyQuartzJob必须是Job的子类 --><bean id="simpleJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"><property name="jobClass" value="com.tom.springnote.chapter31schedule.springofficial.quartz.DiyQuartzJob" /></bean><!-- 使用 MethodInvokingJobDetailFactoryBean 注册JobDetail实例,--><bean id="diyCommonJob" class="com.tom.springnote.chapter31schedule.springofficial.quartz.DiyCommonJob" /><bean id="diyCommonJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"><property name="targetObject" ref="diyCommonJob" /> <!-- diyCommonJob无需是Job的子类 --><property name="targetMethod" value="printNow" /> <!-- printNow方法是调度任务执行的业务逻辑 --><property name="concurrent" value="false"/></bean><!-- 注册触发器(定义触发规则) --><bean id="simpleTrigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean"><property name="jobDetail" ref="simpleJobDetail"/><property name="startDelay" value="1000"/> <!--初始延时为1s--><property name="repeatInterval" value="2000"/> <!--每2s执行1次--></bean><bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"><property name="jobDetail" ref="diyCommonJobDetail"/><property name="cronExpression" value="*/5 * * * * ?" /> <!-- cron表达式定义为每5s执行1次 --></bean><!-- 使用触发器列表装配SchedulerFactoryBean --><bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"><property name="triggers"><list><ref bean="cronTrigger"/><ref bean="simpleTrigger"/></list></property></bean></beans>
【DiyCommonJob】
public class DiyCommonJob {public void printNow() {System.out.printf("DiyCommonJob#printNow(): 线程id=%s 当前时间=%s \n", Thread.currentThread().getId(), BusiDatetimeUtils.getNowText());}
}
【DiyQuartzJob】
public class DiyQuartzJob extends QuartzJobBean {@Overrideprotected void executeInternal(JobExecutionContext context) throws JobExecutionException {System.out.printf("DiyQuartzJob#executeInternal(): 线程id=%s 当前时间=%s \n", Thread.currentThread().getId(), BusiDatetimeUtils.getNowText());try {TimeUnit.SECONDS.sleep(3); // 睡眠3s} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
【SpringOfficialQuartzTaskScheduleMain】
public class SpringOfficialQuartzTaskScheduleMain {public static void main(String[] args) {ClassPathXmlApplicationContext springContainer =new ClassPathXmlApplicationContext("chapter31schedule/spring-official-quartz.xml");springContainer.registerShutdownHook();}
}
【执行效果】
DiyQuartzJob#executeInternal(): 线程id=15 当前时间=2024-10-20 12:28:27.290 // 简单触发器,每2s执行1次
DiyQuartzJob#executeInternal(): 线程id=16 当前时间=2024-10-20 12:28:29.277
DiyCommonJob#printNow(): 线程id=17 当前时间=2024-10-20 12:28:30.003 // cron表达式触发器,每5s执行1次
DiyQuartzJob#executeInternal(): 线程id=18 当前时间=2024-10-20 12:28:31.277
DiyQuartzJob#executeInternal(): 线程id=19 当前时间=2024-10-20 12:28:33.296
DiyCommonJob#printNow(): 线程id=20 当前时间=2024-10-20 12:28:35.001 // cron表达式触发器,每5s执行1次
DiyQuartzJob#executeInternal(): 线程id=21 当前时间=2024-10-20 12:28:35.278
DiyQuartzJob#executeInternal(): 线程id=22 当前时间=2024-10-20 12:28:37.277
DiyQuartzJob#executeInternal(): 线程id=23 当前时间=2024-10-20 12:28:39.280
DiyCommonJob#printNow(): 线程id=24 当前时间=2024-10-20 12:28:40.006 // cron表达式触发器,每5s执行1次
【8】Quartz调度器与java调度线程池ScheduledExecutorService的区别(重要)
1)由章节【7.4】的执行效果可知:
- Quartz调度器:执行下一个任务,无需等待上一个任务执行完成;只要任务触发规则满足,则下一个任务开始执行; 如DiyQuartzJob睡眠3s,而cron表达式配置是每2s执行1次;最终的效果是每2s执行1次,而不是每3s执行1次;
- jdk调度线程池ScheduledExecutorService: 下一个任务开始执行前,需要等待上一个任务执行完成;即便任务触发规则满足,若上一个任务没有执行完成,则下一个任务不会执行; 如章节【5.1】所示;spring的ThreadPoolTaskScheduler,底层实现是调度线程池ScheduledExecutorService ;
- 补充:jdk定时器Timer也有这个问题;
【8.1】ScheduledExecutorService调度任务回顾
【8.1.1】每2s执行1次但业务逻辑耗时5s
1)业务场景:每2s执行1次但业务逻辑耗时5s;
【SpringOfficialTaskScheduleMain】
public class SpringOfficialTaskScheduleMain {public static void main(String[] args) {ClassPathXmlApplicationContext springContainer =new ClassPathXmlApplicationContext("chapter31schedule/spring-official-schedule.xml");springContainer.registerShutdownHook();}
}
【spring-official-schedule.xml】
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:task="http://www.springframework.org/schema/task"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd" ><!-- 注册bean到spring容器,一般该bean中被@Scheduled标注的方法被调度执行 --><bean class="com.tom.springnote.chapter31schedule.springofficial.task.TomDiyScheduleTask"/><!-- task:annotation-driven元素 开启对@Async与@Scheduled注解标注的springbean的探测如果存在被@Async与@Scheduled标注的类或方法,则spring生成一个异步执行被标注方法的代理 --><task:annotation-driven scheduler="myScheduler"/><!-- task:scheduler元素 定义一个 ThreadPoolTaskScheduler 类型的线程池任务调度器实例--><!-- 注册ThreadPoolTaskScheduler 实例,不一定用 task:scheduler元素,可以自定义 --><task:scheduler id="myScheduler" pool-size="10" /></beans>
【TomDiyScheduleTask】 固定频率每2s执行1次,但业务逻辑耗时5s
public class TomDiyScheduleTask {@Scheduled(fixedRate = 2000, initialDelay = 3000) // 固定频率每2s执行1次public void diySchedule() {System.out.printf("TomDiyTask#diySchedule(): 线程id=%s 当前时间=%s \n", Thread.currentThread().getId(), BusiDatetimeUtils.getNowText());try {TimeUnit.SECONDS.sleep(5); // 睡眠5s,即业务逻辑耗时5s } catch (InterruptedException e) {throw new RuntimeException(e);}}
}
【执行效果】
TomDiyTask#diySchedule(): 线程id=15 当前时间=2024-10-20 12:57:04.820
TomDiyTask#diySchedule(): 线程id=15 当前时间=2024-10-20 12:57:09.835
TomDiyTask#diySchedule(): 线程id=19 当前时间=2024-10-20 12:57:14.843
显然: 调度任务执行频率不是预设的2s,而是每5s执行1次; 显然,spring的ThreadPoolTaskScheduler执行调度任务没有达到我们的预设效果;ThreadPoolTaskScheduler底层是 ScheduledExecutorSerivce
- 解决方法: 把业务逻辑添加到线程池运行,即主线程仅仅是触发任务执行,但不执行具体业务逻辑,业务逻辑由异步线程执行;
【8.1.2】每2s执行1次但业务逻辑耗时1s
【TomDiyScheduleTask】 固定频率每2s执行1次,但业务逻辑耗时1s (只需要把睡眠时间修改为1s即可)
public class TomDiyScheduleTask {@Scheduled(fixedRate = 2000, initialDelay = 3000) // 固定频率每2s执行1次public void diySchedule() {System.out.printf("TomDiyTask#diySchedule(): 线程id=%s 当前时间=%s \n", Thread.currentThread().getId(), BusiDatetimeUtils.getNowText());try {// TimeUnit.SECONDS.sleep(5); // 睡眠5s,即业务逻辑耗时5s TimeUnit.SECONDS.sleep(1); // 睡眠1s } catch (InterruptedException e) {throw new RuntimeException(e);}}
}
【执行效果】 每2s执行1次,与我们预设的固定频率(fixedRate = 2000) 一致;
TomDiyTask#diySchedule(): 线程id=15 当前时间=2024-10-20 13:01:35.466
TomDiyTask#diySchedule(): 线程id=15 当前时间=2024-10-20 13:01:37.449
TomDiyTask#diySchedule(): 线程id=19 当前时间=2024-10-20 13:01:39.446
TomDiyTask#diySchedule(): 线程id=15 当前时间=2024-10-20 13:01:41.458
显然:当任务执行耗时小于执行周期(执行频率,或间隔时间), 任务执行频率与预设的执行频率一致 ;