从ScheduledThreadPoolExecutor的任务突然不再跑了, 再看线程池ThreadLocalPool基本原理
- 背景
- 问题解决
- 是超过了间隔时间导致的吗
- 异常没抛出
- 线程池ThreadPoolExecutor基本原理
- 为什么submit没抛出异常
- 为什么ScheduledThreadPoolExecutor的scheduleAtFixedRate任务突然不再跑
- 结论
背景
写了个简单的定时任务来监控线程池情况:
Executors.newFixedThreadPool(1).scheduleWithFixedDelay(this::run,0, dtpProperties.getIntervalSec(), TimeUnit.SECONDS);
但是发现有些时候跑着跑着, 就没有执行的日志继续打印出来了,也就是不知道为什么线程池突然挂了, 猜测是因为异常了导致的.
问题解决
是超过了间隔时间导致的吗
首先以为是线程池任务执行时间过长, 导致超过了fixed时间,
写了个demo试了下, sleep5秒, 间隔3秒, 是可以正常执行的.
要搞清楚为啥FixedThreadPool可以间隔一段时间跑一次, 之前看线程池源码可知, 对于线程池重要的两个方法, 一个是take一个poll, 分别表示是否允许淘汰worker, 来从等待队列获取任务, poll的情况表示允许淘汰worker
从下面的源码可以看到最重要的一个地方, getDelay
, 获取的是距离下一次预定执行时间的剩余时间. 如果>0, 则直接返回null, 也就解释了为啥一定要等一段间隔执行
public RunnableScheduledFuture<?> poll() {final ReentrantLock lock = this.lock;lock.lock();try {RunnableScheduledFuture<?> first = queue[0];if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn finishPoll(first);} finally {lock.unlock();}}
因此它不管任务执行多久, 它是根据这次任务计算下次预定执行时间, 然后判断剩余时间来决定要不要执行的
异常没抛出
其实应该想到, 线程池对于线程的异常处理, 其实是有坑的, 比如常常见到的线程池submit不打印异常
线程池ThreadPoolExecutor基本原理
- 首先线程池有一个阻塞等待队列
- 线程池会将 runable任务, 封装成一个worker, 这个worker继承了Runable接口, 也有run方法.
我们看看ThreadLocalPool的execute方法,可以看到创建新线程都用到了关键方法addWorker
.
public void execute(Runnable command) {....//比较现在线程数是不是小于核心线程,是则创建新线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果现在线程数>=核心线程,尝试放到队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//否则继续创建新线程else if (!addWorker(command, false))reject(command);}
- 看看
addWorkder
方法, 里面创建了一个worker, 可以看到worker实现了Runable类, 自己创建后, 成员变量初始化了Worker线程
private boolean addWorker(Runnable firstTask, boolean core) {...try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {...if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
看看Worker类, 它有run
方法
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//重要的run方法public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;...try {while (task != null || (task = getTask()) != null) {...try {...try {task.run();} catch (XXXException x) {thrown = x; throw x;} ...}} finally {...}}...} finally {...}}private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {...// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果设置了超时时间,或者当前线程>核心线程数,队列里一直为空拿不到任务,就返回null, runWorker方法中getTask=null跳出循环, 即实现了销毁线程if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//从队列中poll或take拿任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}}
- worker的run方法中, 调用runWorker, 里面是一个循环, 一直
getTask
, 即我们的Runable任务来执行, 这个任务可能是创建worker的时候携带过来的, 也可能是后续从等待队列中拿到的 - 再回到外层, 原来创建一个Worker里面有这么多文章, 那么他既然是线程类, 从哪里启动的呢? 回到
addWorker
方法, 原来在这
if (workerAdded) {t.start();workerStarted = true;}
为什么submit没抛出异常
从runWorker里我们看到, 实际上它虽然catch了异常, 但是是有抛出的
} catch (XXXException x) {thrown = x; throw x;}
别急, 我们看看 submit方法
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}
外层包了一个FutureTask
, 再看它的run方法
public void run() {...try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {...}} protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}
关键来了, try-catch了异常, 然后没有抛出
为什么ScheduledThreadPoolExecutor的scheduleAtFixedRate任务突然不再跑
首先看下这个方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {...//外层包装成了future类ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}
ScheduledFutureTask
这个类同样的继承了 FutureTask<V>
看下run
方法, 可以看到调用了父类的run方法, 同样的是FutureTask
罪魁祸首没有抛出异常
public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();//重点来了,调用了super(即FutureTask的run方法)else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}
同样的, 由于ScheduledFutureTask.super.runAndReset()
返回false, 所以也就不会调用reExecutePeriodic
将任务放入下一次周期执行
结论
线程池 execute 可以抛出异常
线程池 submit 不会抛出异常, 同理, 使用FutureTask
作为任务执行类的不会抛出异常, 其中就包括了ScheduledThreadPoolExecutor
的scheduleAtFixedRate