ScheduledThreadPoolExecutor的源码剖析
本篇主要用于加强自己的线程池相关知识,涉及到底层,有些枯燥
跟下去.还是那句话,自己边看边码,事半功倍!!加油!!~
文章目录
- ScheduledThreadPoolExecutor的源码剖析
- 1.核心属性
- 内部类,核心类之一
- 内部类,核心类之一
- 内部任务执行的方法
- 1.schedule方法
1.核心属性
后面业务均会涉及
/*** 针对任务取消时一些业务判断会用到的标记*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
private volatile boolean removeOnCancel = false;/*** 计算器,若两个任务的执行时间节点一模一样,根据这个序列来判断谁先执行*/
private static final AtomicLong sequencer = new AtomicLong();/**
* 获取当前系统时间的毫秒值
*/
final long now() {return System.nanoTime();
}private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {
内部类,核心类之一
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {//全局唯一的序列,如果两个任务时间一致,基于当前属性判断private final long sequenceNumber;//任务执行的时间,单位是纳秒private long time;/*** period==0: 执行一次的延迟任务* period > 0: 代表的AT,* period < 0: 代表的是with*/private final long period;//outerTask: 周期性执行时,需将任务重新仍回阻塞队列,基于当前属性拿到任务,方便仍回阻塞队列RunnableScheduledFuture<V> outerTask = this;/*** Index into delay queue, to support faster cancellation.*/int heapIndex;/*** Creates a one-shot action with given nanoTime-based trigger time.*/ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}/*** 构建schedule方法的任务*/ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}/*** 构建AT和with的有参构造*/ScheduledFutureTask(Callable<V> callable, long ns) {super(callable);//任务要执行的时间this.time = ns;//等于0,代表任务是延迟执行,不是周期执行this.period = 0;//基于atomicLOng生成的序列this.sequenceNumber = sequencer.getAndIncrement();}public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}/*** Returns {@code true} if this is a periodic (not a one-shot) action.** @return {@code true} if periodic*/public boolean isPeriodic() {return period != 0;}/*** Sets the next time to run for a periodic task.*/private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}public boolean cancel(boolean mayInterruptIfRunning) {boolean cancelled = super.cancel(mayInterruptIfRunning);if (cancelled && removeOnCancel && heapIndex >= 0)remove(this);return cancelled;}/*** Overrides FutureTask version so as to reset/requeue if periodic.*/public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}
}
内部类,核心类之一
这个类就是delayQueue
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
内部任务执行的方法
1.schedule方法
概念: 延迟任务的执行方法
execute
方法也是调用的schedule
方法,只不过传入的延迟时间是0纳秒
public ScheduledFuture<?> schedule(Runnable command,//任务long delay,//延迟时间TimeUnit unit) {//延迟时间的单位//健壮性校验if (command == null || unit == null)throw new NullPointerException();//将任务和延迟时间封装到一起,最终组成ScheduledFutureTask//此处要分成三个方法去看://1.triggerTime:计算延迟时间 最终的返回结果:当前系统时间 + 延迟时间;其实就是将延迟时间转换为纳秒并且加上当前系统时间,再做一些健壮性校验//2.ScheduledFutureTask有参构造:将任务以及延迟时间封装到一起,并设置任务执行的方式//3.decorateTask: 让用户基于自身情况可以动态修改和扩展的接口RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));// 任务封装好,执行delayedExecute方法,执行任务delayedExecute(t);//返回FutureTaskreturn t;
}
triggerTime
做的事情
/***外部方法:对延迟时间做校验,若小于0,直接设置为0 *并且转换为纳秒单位*/
private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}/*** 将延迟时间加上当前系统时间* 后面的三元校验:是为了避免延迟时间超过Long的取值范围*/
long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
delayedExecute: 执行延迟任务的操作
private void delayedExecute(RunnableScheduledFuture<?> task) {//查看当前线程池是否是running状态,不是running,进入if (isShutdown())//执行拒绝策略reject(task);else {//线程池状态是running//直接将任务扔到延迟的阻塞队列中super.getQueue().add(task);//DCL操作,再次查看线程池的状态//若线程池在添加任务到阻塞队列后,状态不是runningif (isShutdown() &&//task.isPeriodic(): 返回的是false,因为我们这个是延迟执行代码,非周期执行//默认情况:延迟队列中的延迟任务,可以执行!canRunInCurrentRunState(task.isPeriodic()) &&//从阻塞队列中移除任务remove(task))//其实也是移除任务task.cancel(false);else//线程池状态正常,任务可以正常执行ensurePrestart();}
}
ensurePrestart
:任务可以正常执行后,做的操作
void ensurePrestart() {//拿到工作线程个数int wc = workerCountOf(ctl.get());//若工作线程个数小于核心线程数if (wc < corePoolSize)//添加核心线程去处理阻塞队列任务addWorker(null, true);else if (wc == 0)//若工作线程数为0,核心线程数也为0,这时添加一个非核心线程去处理阻塞队列任务addWorker(null, false);
}
canRunInCurrentRunState
:线程池状态不为running,查看任务是否可以执行
延迟执行: periodic==false
周期执行:periodic==true
continueExistingPeriodicTasksAfterShutdown
:周期执行任务,默认false
executeExistingDelayedTasksAfterShutdown
: 延迟执行任务,默认true可以执行
boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);
}
isRunningOrShutdown
:
shutdownOK为true:向下执行
final boolean isRunningOrShutdown(boolean shutdownOK) {int rs = runStateOf(ctl.get());//若状态是running,正常执行,返回true//若状态是shutdown,根据shutdownOK来决定return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}