第十七章 Java多线程--线程池-ScheduledThreadPoolExecutor

news/2024/10/24 16:48:52/

目录

一、ScheduledThreadPoolExecutor基础概念

主要功能

使用方法

方法

注意事项

二、ScheduledThreadPoolExecutor应用&源码

1 ScheduleThreadPoolExecutor介绍

2 ScheduleThreadPoolExecutor应用

3 ScheduleThreadPoolExecutor源码

3.1 核心属性

3.2 schedule方法

3.3 At和With方法&任务的run方法


一、ScheduledThreadPoolExecutor基础概念

ScheduleThreadPoolExecutor 是 Java 平台上的 java.util.concurrent 包中的一个类,它是一个支持定时及周期性任务执行的线程池。ScheduleThreadPoolExecutor 实现了 ExecutorService 接口,并且扩展了 ThreadPoolExecutor,因此它不仅可以用来执行普通的异步任务,还可以调度任务在给定延迟后运行或定期运行。

主要功能

  1. 延迟执行:可以安排任务在一段时间延迟之后执行。

  2. 周期性执行:可以安排任务定期重复执行。

使用方法

创建 ScheduleThreadPoolExecutor 通常需要指定线程池中核心线程的数量。这些线程用于执行提交的任务。下面是一个简单的示例代码,展示了如何创建一个 ScheduleThreadPoolExecutor 和如何使用它来调度任务:

java">import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ScheduledThreadPoolExample {public static void main(String[] args) {// 创建一个包含3个线程的ScheduledThreadPoolExecutorScheduledThreadPoolExecutor scheduledThreadPool = new ScheduledThreadPoolExecutor(3);// 提交一个Runnable任务,该任务将在5秒后执行scheduledThreadPool.schedule(new Runnable() {public void run() {System.out.println("Task executed after delay");}}, 5, TimeUnit.SECONDS);// 提交一个Runnable任务,该任务首次将在2秒后开始执行,然后每隔4秒执行一次scheduledThreadPool.scheduleAtFixedRate(new Runnable() {public void run() {System.out.println("Periodic task executed");}}, 2, 4, TimeUnit.SECONDS);// 如果不再需要这个ScheduledThreadPoolExecutor,应该关闭它scheduledThreadPool.shutdown();}
}

方法

ScheduleThreadPoolExecutor 提供了一些关键的方法来调度任务:

  • schedule(Runnable command, long delay, TimeUnit unit):延迟指定的时间后执行命令。

  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):首次延迟指定的时间后执行命令,然后按照固定延迟重复执行。

  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):首次延迟指定的时间后执行命令,然后按照固定的频率重复执行。

注意事项

当使用 scheduleAtFixedRatescheduleWithFixedDelay 方法时,需要注意它们在处理任务执行时间超过预定延迟的情况时的行为是不同的。scheduleAtFixedRate 会尝试保持固定的频率,即使这意味着并发地执行任务;而 scheduleWithFixedDelay 则会在前一个任务完成后延迟指定的时间再次执行。

确保合理设置线程池大小,以避免资源过度消耗。如果任务数量超过了线程池的最大容量,那么额外的任务将会被放入队列中等待执行,或者根据拒绝策略进行处理。

二、ScheduledThreadPoolExecutor应用&源码

1 ScheduleThreadPoolExecutor介绍

从名字上就可以看出,当前线程池是用于执行定时任务的线程池。

Java比较早的定时任务工具是Timer类。但是Timer问题很多,串行的,不靠谱,会影响到其他的任务执行。

其实除了Timer以及ScheduleThreadPoolExecutor之外,正常在企业中一般会采用Quartz或者是SpringBoot提供的Schedule的方式去实现定时任务的功能。

ScheduleThreadPoolExecutor支持延迟执行以及周期性执行的功能。

2 ScheduleThreadPoolExecutor应用

定时任务线程池的有参构造

java">public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);
}

发现ScheduleThreadPoolExecutor在构建时,直接调用了父类的构造方法

ScheduleThreadPoolExecutor的父类就是ThreadPoolExecutor

首先ScheduleThreadPoolExecutor最多允许设置3个参数:

  • 核心线程数

  • 线程工厂

  • 拒绝策略

首先没有设置阻塞队列,以及最大线程数和空闲时间以及单位

阻塞队列设置的是DelayedWorkQueue,其实本质就是DelayQueue,一个延迟队列。DelayQueue是一个无界队列。所以最大线程数以及非核心线程的空闲时间是不需要设置的。

代码落地使用

java">public static void main(String[] args) {//1. 构建定时任务线程池ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(5,new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);return t;}},new ThreadPoolExecutor.AbortPolicy());//2. 应用ScheduledThreadPoolExecutor// 跟直接执行线程池的execute没啥区别pool.execute(() -> {System.out.println("execute");});// 指定延迟时间执行System.out.println(System.currentTimeMillis());pool.schedule(() -> {System.out.println("schedule");System.out.println(System.currentTimeMillis());},2, TimeUnit.SECONDS);// 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务开始时就计算// 周期性执行就是将执行完毕的任务再次社会好延迟时间,并且重新扔到阻塞队列// 计算的周期执行,也是在原有的时间上做累加,不关注任务的执行时长。System.out.println(System.currentTimeMillis());pool.scheduleAtFixedRate(() -> {System.out.println("scheduleAtFixedRate");System.out.println(System.currentTimeMillis());},2,3,TimeUnit.SECONDS);//        // 指定第一次的延迟时间,并且确认后期的周期执行时间,周期时间是在任务结束后再计算下次的延迟时间System.out.println(System.currentTimeMillis());pool.scheduleWithFixedDelay(() -> {System.out.println("scheduleWithFixedDelay");System.out.println(System.currentTimeMillis());try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}},2,3,TimeUnit.SECONDS);}

3 ScheduleThreadPoolExecutor源码

3.1 核心属性

后面的方法业务流程会涉及到这些属性。

java">// 这里是针对任务取消时的一些业务判断会用到的标记
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
private volatile boolean removeOnCancel = false;// 计数器,如果两个任务的执行时间节点一模一样,根据这个序列来判断谁先执行
private static final AtomicLong sequencer = new AtomicLong();// 这个方法是获取当前系统时间的毫秒值
final long now() {return System.nanoTime();
}// 内部类。核心类之一。
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {// 全局唯一的序列,如果两个任务时间一直,基于当前属性判断private final long sequenceNumber;// 任务执行的时间,单位纳秒private long time;/***  period == 0:执行一次的延迟任务*  period > 0:代表是At*  period < 0:代表是With*/private final long period;// 周期性执行时,需要将任务重新扔回阻塞队列,基础当前属性拿到任务,方便扔回阻塞队列RunnableScheduledFuture<V> outerTask = this;/*** 构建schedule方法的任务*/ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}/*** 构建At和With任务的有参构造*/  ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}}   // 内部类。核心类之一。
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
// 这个类就是DelayQueue,不用过分关注,如果没看过,看阻塞队列中的优先级队列和延迟队列  

3.2 schedule方法

execute方法也是调用的schedule方法,只不过传入的延迟时间是0纳秒

schedule方法就是将任务和延迟时间封装到一起,并且将任务扔到阻塞队列中,再去创建工作线程去take阻塞队列。

java">// 延迟任务执行的方法。
// command:任务
// delay:延迟时间
// unit:延迟时间的单位
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {// 健壮性校验。if (command == null || unit == null)throw new NullPointerException();// 将任务和延迟时间封装到一起,最终组成ScheduledFutureTask// 要分成三个方法去看// triggerTime:计算延迟时间。最终返回的是当前系统时间 + 延迟时间 // triggerTime就是将延迟时间转换为纳秒,并且+当前系统时间,再做一些健壮性校验// ScheduledFutureTask有参构造:将任务以及延迟时间封装到一起,并且设置任务执行的方式// decorateTask:当前方式是让用户基于自身情况可以动态修改任务的一个扩展口RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));// 任务封装好,执行delayedExecute方法,去执行任务delayedExecute(t);// 返回FutureTaskreturn t;
}// triggerTime做的事情
// 外部方法,对延迟时间做校验,如果小于0,就直接设置为0
// 并且转换为纳秒单位
private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
// 将延迟时间+当前系统时间
// 后面的校验是为了避免延迟时间超过Long的取值范围
long triggerTime(long delay) {return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}// ScheduledFutureTask有参构造
ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);// time就是任务要执行的时间this.time = ns;// period,为0,代表任务是延迟执行,不是周期执行this.period = 0;// 基于AtmoicLong生成的序列this.sequenceNumber = sequencer.getAndIncrement();
}// delayedExecute 执行延迟任务的操作
private void delayedExecute(RunnableScheduledFuture<?> task) {// 查看当前线程池是否还是RUNNING状态,如果不是RUNNING,进到ifif (isShutdown())// 不是RUNNING。// 执行拒绝策略。reject(task);else {// 线程池状态是RUNNING// 直接让任务扔到延迟的阻塞队列中super.getQueue().add(task);// DCL的操作,再次查看线程池状态// 如果线程池在添加任务到阻塞队列后,状态不是RUNNINGif (isShutdown() &&// task.isPeriodic():现在反回的是false,因为任务是延迟执行,不是周期执行// 默认情况,延迟队列中的延迟任务,可以执行!canRunInCurrentRunState(task.isPeriodic()) &&// 从阻塞队列中移除任务。remove(task))task.cancel(false);else// 线程池状态正常,任务可以执行ensurePrestart();}
}// 线程池状态不为RUNNING,查看任务是否可以执行
// 延迟执行:periodic==false
// 周期执行:periodic==true
// continueExistingPeriodicTasksAfterShutdown:周期执行任务,默认为false
// executeExistingDelayedTasksAfterShutdown:延迟执行任务,默认为true
boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);
}
// 当前情况,shutdownOK为true
final boolean isRunningOrShutdown(boolean shutdownOK) {int rs = runStateOf(ctl.get());// 如果状态是RUNNING,正常可以执行,返回true// 如果状态是SHUTDOWN,根据shutdownOK来决定return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}// 任务可以正常执行后,做的操作
void ensurePrestart() {// 拿到工作线程个数int wc = workerCountOf(ctl.get());// 如果工作线程个数小于核心线程数if (wc < corePoolSize)// 添加核心线程去处理阻塞队列中的任务addWorker(null, true);else if (wc == 0)// 如果工作线程数为0,核心线程数也为0,这是添加一个非核心线程去处理阻塞队列任务addWorker(null, false);
}

3.3 At和With方法&任务的run方法

这两个方法在源码层面上的第一个区别,就是在计算周期时间时,需要将这个值传递给period,基于正负数在区别At和With

所以查看一个方法就ok,查看At方法

java">// At方法,
// command:任务
// initialDelay:第一次执行的延迟时间
// period:任务的周期执行时间
// unit:上面两个时间的单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {// 健壮性校验if (command == null || unit == null)throw new NullPointerException();// 周期时间不能小于等于0.if (period <= 0)throw new IllegalArgumentException();// 将任务以及第一次的延迟时间,和后续的周期时间封装好。ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));// 扩展口,可以对任务做修改。RunnableScheduledFuture<Void> t = decorateTask(command, sft);// 周期性任务,需要在任务执行完毕后,重新扔会到阻塞队列,为了方便拿任务,将任务设置到outerTask成员变量中sft.outerTask = t;// 和schedule方法一样的方式// 如果任务刚刚扔到阻塞队列,线程池状态变为SHUTDOWN,默认情况,当前任务不执行delayedExecute(t);return t;
}// 延迟任务以及周期任务在执行时,都会调用当前任务的run方法。
public void run() {// periodic == false:一次性延迟任务// periodic == true:周期任务boolean periodic = isPeriodic();// 任务执行前,会再次判断状态,能否执行任务if (!canRunInCurrentRunState(periodic))cancel(false);// 判断是周期执行还是一次性任务else if (!periodic)// 一次性任务,让工作线程直接执行command的逻辑ScheduledFutureTask.super.run();// 到这个else if,说明任务是周期执行else if (ScheduledFutureTask.super.runAndReset()) {// 设置下次任务执行的时间setNextRunTime();// 将任务重新扔回线程池做处理reExecutePeriodic(outerTask);}
}
// 设置下次任务执行的时间
private void setNextRunTime() {// 拿到period值,正数:At,负数:Withlong p = period;if (p > 0)// 拿着之前的执行时间,直接追加上周期时间time += p;else// 如果走到else,代表任务是With方式,这种方式要重新计算延迟时间// 拿到当前系统时间,追加上延迟时间,time = triggerTime(-p);
}// 将任务重新扔回线程池做处理void reExecutePeriodic(RunnableScheduledFuture<?> task) {// 如果状态ok,可以执行if (canRunInCurrentRunState(true)) {// 将任务扔到延迟队列super.getQueue().add(task);// DCL,判断线程池状态if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);else// 添加工作线程ensurePrestart();}
}


http://www.ppmy.cn/news/1541621.html

相关文章

【表情识别】Python+卷积神经网络算法+人工智能+深度学习+Django网页界面+算法模型+TensorFlow

表情识别系统&#xff0c;本系统使用Python作为主要编程语言&#xff0c;通过TensorFlow搭建ResNet50卷积神经算法网络模型&#xff0c;通过对7种表情图片数据集&#xff08;‘Neutral’, ‘Anger’, ‘Disgust’, ‘Fear’, ‘Happy’, ‘Sad’, ‘Surprise’&#xff09;进行…

Tailscale自建中转服务器derper搭建笔记(基于docker)

自己搭建derper服务器&#xff0c;让Tailscale中转更流畅。 Tailscale是很好的远程组网工具&#xff0c;在两台机器P2P打洞成功的情况下可以实现网络直连&#xff0c;但如果打洞失败就会进行数据中转&#xff0c;我们的数据要跑到国外再跑回来&#xff0c;这样速度就很慢了。 …

单片机STC8H8K64U开发板_RA6809开发板 驱动彩屏显示

单片机STC8H8K64U开发板&#xff0c;型号RT8H8K001 预留Type C接口&#xff0c;可供电SWD下载&#xff1a; RA6809开发板&#xff0c;型号RT6809CNN01 预留Type C接口供电&#xff0c;预留MCU接口、电容触摸屏接口、液晶屏接口&#xff1a; 双臂合一&#xff0c;驱动和控…

从图像识别到聊天机器人:Facebook AI的多领域应用

随着人工智能技术的快速发展&#xff0c;Facebook已在多个领域内广泛应用AI技术&#xff0c;以提升用户体验、提高效率并推动创新。从图像识别到聊天机器人&#xff0c;Facebook的AI应用涵盖了社交媒体的方方面面&#xff0c;下面我们将深入探讨这些应用的具体实现及其对用户生…

《YOLO 目标检测》—— YOLO v3 详细介绍

&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;还未写完&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xf…

WebGL 添加背景图

1. 纹理坐标&#xff08;st坐标&#xff09;简介 ST纹理坐标&#xff08;也称为UV坐标&#xff09;是一种二维坐标系统&#xff0c;用于在三维模型的表面上精确地定位二维纹理图像。这种坐标系统通常将纹理的左下角映射到(0,0)&#xff0c;而右上角映射到(1,1)。 S坐标&#x…

python 结构作业

基础练习 练习目标 if-else判断语句 while循环语句 01. 计算车费 题目描述 小红打车&#xff0c;起步价8元(3公里), 每公里收费 2 元&#xff0c;她打车行驶了 n 公里&#xff0c;计算车费 输入描述 输入一个公里数 输出描述 输出应付车费 示例 输入&#xff1a; 5 …

使用gpt2-medium基座说明模型微调

预训练与微调的背景 预训练&#xff1a;在大规模数据集上训练模型&#xff0c;以捕捉通用的特征和模式。例如&#xff0c;GPT-2 模型在大量文本上进行训练&#xff0c;学习语言的基本结构和语法。微调&#xff1a;在特定领域或任务的数据上对预训练模型进行训练&#xff0c;以…