@EnableScheduling 和 @Scheduled 实现定时任务的任务延期问题

embedded/2024/9/25 3:22:43/

前言

在复盘 ieg 一面看到定时任务阻塞的问题时,研究了下 @EnableScheduling 的源码,觉得可以单开一篇文章讲一讲

本文主要讲述了使用 @EnableScheduling 可能出现的线程阻塞导致定时任务延期的问题,也顺便解释了动态定时任务源码上的实现

引用文章:

@Schedule定时任务+分布式环境:@Schedule定时任务+分布式环境,这些坑你一定得注意!!! (qq.com)

java 中的线程池参数:java中四种线程池及poolSize、corePoolSize、maximumPoolSize_maximum-pool-size-CSDN博客

线程池的拒绝策略:线程池的RejectedExecutionHandler(拒绝策略)-CSDN博客

Java 中实现定时任务:Java中实现定时任务,有多少种解决方案?好久没更新博客了,最近上班做了点小东西,总结复盘一下。主要介绍了定时任务的三种 - 掘金 (juejin.cn)

线程阻塞问题

问题根源

Java中 使用 Springboot 自带的定时任务 @EnableScheduling 和 @Scheduled 注解,会装配一个 SchedulingConfiguration 的类

java">@Target(ElementType.TYPE)  
@Retention(RetentionPolicy.RUNTIME)  
@Import(SchedulingConfiguration.class)  
@Documented  
public @interface EnableScheduling {  }
java">@Configuration(proxyBeanMethods = false)  
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)  
public class SchedulingConfiguration {  @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)  public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {  return new ScheduledAnnotationBeanPostProcessor();  }  
}

这个配置类又会创建一个 ScheduledAnnotationBeanPostProcessor 的 Bean

在这个类的无参构造中又初始化了一个 ScheduledTaskRegistrar 的对象

java">public ScheduledAnnotationBeanPostProcessor() {  this.registrar = new ScheduledTaskRegistrar();  
}

在 创建单例或刷新上下文之后,会执行 finishRegistration 方法,最后执行 registrar 的 afterPropertiesSet 方法:

java">@Override  
public void afterSingletonsInstantiated() {  // Remove resolved singleton classes from cache  this.nonAnnotatedClasses.clear();  if (this.applicationContext == null) {  // Not running in an ApplicationContext -> register tasks early...  finishRegistration();  }  
}  @Override  
public void onApplicationEvent(ContextRefreshedEvent event) {  if (event.getApplicationContext() == this.applicationContext) {  // Running in an ApplicationContext -> register tasks this late...  // giving other ContextRefreshedEvent listeners a chance to perform  // their work at the same time (e.g. Spring Batch's job registration).  finishRegistration();  }  
}  private void finishRegistration() {  if (this.scheduler != null) {  this.registrar.setScheduler(this.scheduler);  }  // ...this.registrar.afterPropertiesSet();  
}

ScheduledTaskRegistrar 的成员变量包括任务的执行器以及几种类型的定时任务列表

java">@Nullable  
private TaskScheduler taskScheduler;  @Nullable  
private ScheduledExecutorService localExecutor;  @Nullable  
private List<TriggerTask> triggerTasks;  @Nullable  
private List<CronTask> cronTasks;

afterPropertiesSet 方法会获取一个执行器

java">@Override  
public void afterPropertiesSet() {  scheduleTasks();  
}  /**  
* Schedule all registered tasks against the underlying  
* {@linkplain #setTaskScheduler(TaskScheduler) task scheduler}.  
*/  
@SuppressWarnings("deprecation")  
protected void scheduleTasks() {  if (this.taskScheduler == null) {  this.localExecutor = Executors.newSingleThreadScheduledExecutor();  this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);  }  if (this.triggerTasks != null) {  for (TriggerTask task : this.triggerTasks) {  addScheduledTask(scheduleTriggerTask(task));  }  }  if (this.cronTasks != null) {  for (CronTask task : this.cronTasks) {  addScheduledTask(scheduleCronTask(task));  }  }  if (this.fixedRateTasks != null) {  for (IntervalTask task : this.fixedRateTasks) {  addScheduledTask(scheduleFixedRateTask(task));  }  }  if (this.fixedDelayTasks != null) {  for (IntervalTask task : this.fixedDelayTasks) {  addScheduledTask(scheduleFixedDelayTask(task));  }  }  
}

进入 newSingleThreadScheduledExecutor 可以看到,默认使用了一个 corePoolSize 为 1, maximumPoolSize 为 Integer.MAX_VALUE 的线程池

java">public static ScheduledExecutorService newSingleThreadScheduledExecutor() {  return new DelegatedScheduledExecutorService  (new ScheduledThreadPoolExecutor(1));  
}
java">public ScheduledThreadPoolExecutor(int corePoolSize) {  super(corePoolSize, Integer.MAX_VALUE,  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,  new DelayedWorkQueue());  
}
java">public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue) {  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  Executors.defaultThreadFactory(), defaultHandler);  
}

而线程池主要有几个重要的参数分别是:

  1. corePoolSize:线程池的基本大小。
  2. maximumPoolSize:线程池中允许的最大线程数。
  3. poolSize:线程池中当前线程的数量。

当提交一个新任务时,若

  1. poolSize < corePoolSize : 创建新线程处理该任务
  2. poolSize = corePoolSize : 将任务置于阻塞队列中
  3. 阻塞队列的容量达到上限,且这时 poolSize < maximumPoolSize :
  4. 阻塞队列满了,且 poolSize = maximumPoolSize : 那么线程池已经达到极限,会根据饱和策略 RejectedExecutionHandler 拒绝新的任务,默认是 AbortPolicy 会丢掉任务并抛出异常

解决方案

注入自己编写的线程池,自行设置参数:

java">@Configuration  public class MyTheadPoolConfig {  @Bean  public TaskExecutor taskExecutor() {  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  //设置核心线程数  executor.setCorePoolSize(10);  //设置最大线程数  executor.setMaxPoolSize(20);  //缓冲队列200:用来缓冲执行任务的队列  executor.setQueueCapacity(200);  //线程活路时间 60 秒  executor.setKeepAliveSeconds(60);  //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池  // 这里我继续沿用 scheduling 默认的线程名前缀  executor.setThreadNamePrefix("nzc-create-scheduling-");  //设置拒绝策略  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  executor.setWaitForTasksToCompleteOnShutdown(true);  return executor;  }  }

在定时任务的类上再加一个 @EnableAsync 注解,给方法添加一个 @Async 即可

java">@Slf4j  
@Component  
@EnableAsync  
@EnableScheduling  
public class ScheduleService {  @Autowired  TaskExecutor taskExecutor;  @Async(value = "taskExecutor")  @Scheduled(cron = "0/5 * * * * ? ")  public void testSchedule() {  try {  Thread.sleep(10000);  log.info("当前执行任务的线程号ID===>{}", Thread.currentThread().getId());  } catch (Exception e) {  e.printStackTrace();  }   }  
}

动态定时任务

上面提到了

@EnableScheduling 导入了 SchedulingConfiguration,SchedulingConfiguration 又创建了 ScheduledAnnotationBeanPostProcessor 的Bean,ScheduledAnnotationBeanPostProcessor 又实例化了 ScheduledTaskRegistrar 对象,即

@EnableScheduling -> SchedulingConfiguration -> ScheduledAnnotationBeanPostProcessor -> ScheduledTaskRegistrar

实际上,在 ScheduledAnnotationBeanPostProcessor 的 finishRegistration 方法中,会先获取所有实现了 SchedulingConfigurer 接口的 Bean,并执行他们的 configureTasks 方法

java">
private void finishRegistration() {  if (this.scheduler != null) {  this.registrar.setScheduler(this.scheduler);  }  if (this.beanFactory instanceof ListableBeanFactory) {  Map<String, SchedulingConfigurer> beans =  ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);  List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());  AnnotationAwareOrderComparator.sort(configurers);  for (SchedulingConfigurer configurer : configurers) {  configurer.configureTasks(this.registrar);  }  }// ...
}

我们可以通过配置一个实现了 SchedulingConfigurer 接口的 Bean,实现动态加载定时任务的执行时间

java">@Data  
@Slf4j  
@Component  
@RequiredArgsConstructor  
@PropertySource("classpath:task-config.ini")  
public class ScheduleTask implements SchedulingConfigurer {  // private Long timer = 100 * 1000L;  @Value("${printTime.cron}")  private String cron;  @Override  public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {  // 间隔触发的任务  taskRegistrar.addTriggerTask(new Runnable() {  @Override  public void run(){  // ...}}, new Trigger() {  @Override  public Date nextExecutionTime(TriggerContext triggerContext) {  // 使用CronTrigger触发器,可动态修改cron表达式来操作循环规则  CronTrigger cronTrigger = new CronTrigger(cron);  Date nextExecutionTime = cronTrigger.nextExecutionTime(triggerContext);  return nextExecutionTime;  // 使用PerodicTrigger触发器,修改timer变量指定操作间隔,单位为毫秒// PeriodicTrigger periodicTrigger = new PeriodicTrigger(timer);  // Date nextExecutionTime = periodicTrigger.nextExecutionTime(triggerContext);  // return nextExecutionTime;  }  });  }  
}

http://www.ppmy.cn/embedded/116417.html

相关文章

虎先锋,你也喜欢线程控制嘛

讲讲线程控制捏 线程创建 这是创建线程调用的接口&#xff1a; #include <pthread.h> int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); 这个接口上一篇文章已经介绍过了 线程等待 那么我们来看看…

前端面试题(一)

1. HTML 和 CSS 面试题 如何在 HTML 中嵌入 CSS&#xff1f; CSS 可以通过三种方式嵌入&#xff1a;外部样式表&#xff08;<link>标签&#xff09;&#xff0c;内部样式&#xff08;<style>标签&#xff09;&#xff0c;和行内样式&#xff08;通过元素的style属性…

【趣学Python算法100例】打鱼还是晒网

问题描述 中国有句俗语叫“三天打鱼两天晒网”。某人从1990年1月1日起便开始“三天打鱼两天晒网”&#xff0c;问这个人在以后的某一天中是“打鱼”还是“晒网”。 问题分析 根据题意可以将解题过程分为以下三步&#xff1a; 计算从1990年1月1日开始至指定日期共有多少天。…

【C语言】猜数字游戏

个人主页 &#xff1a; zxctscl 如有转载请先通知 文章目录 前言1. 随机数生成1.1 rand1.2 srand1.3 time1.4 设置随机数的范围 2. 猜数字游戏实现2.1 游戏菜单2.2 主函数部分2.3 game函数部分2.4 附代码2.5 优化代码 前言 前面学习的这些知识&#xff0c;我们就可以写一些稍微…

数据库性能优化之分表

markdown # 1 背景 在生产站点&#xff0c;我们发现 MySQL 任务表的数据超过了 1700 万行&#xff0c;占用了高达 23G 的空间&#xff0c;导致数据库性能急剧下降&#xff0c;并出现了大量的 504 错误。分析数据后发现&#xff0c;有两个客户疯狂地创建任务&#xff0c;其中一…

Python闭包与装饰日高级概念

在Python中&#xff0c;闭包&#xff08;Closure&#xff09;和装饰器&#xff08;Decorator&#xff09;是两个高级且强大的编程概念&#xff0c;它们在函数式编程和面向对象编程中扮演着重要角色。下面将详细讲解这两个概念。 一、闭包&#xff08;Closure&#xff09; 1. …

如何使用ssm实现基于VUE的儿童教育网站的设计与实现+vue

TOC ssm676基于VUE的儿童教育网站的设计与实现vue 第一章 课题背景及研究内容 1.1 课题背景 信息数据从传统到当代&#xff0c;是一直在变革当中&#xff0c;突如其来的互联网让传统的信息管理看到了革命性的曙光&#xff0c;因为传统信息管理从时效性&#xff0c;还是安全…

vue3开发中易遗漏的常见知识点

文章目录 组件样式的特性Scoped CSS之局部样式的泄露Scoped CSS之深度选择器CSS Modules在CSS中使用v-bind 非props属性继承组件通信父子组件的相互通信props/$emit父组件传递数据给子组件子组件传递数据给父组件 非父子组件的相互通信Provide/inject全局事件总线 组件插槽作用…