从ScheduledThreadPoolExecutor的任务突然不再跑了, 再看线程池ThreadLocalPool基本原理

news/2025/2/23 3:10:50/

从ScheduledThreadPoolExecutor的任务突然不再跑了, 再看线程池ThreadLocalPool基本原理

    • 背景
    • 问题解决
      • 是超过了间隔时间导致的吗
      • 异常没抛出
        • 线程池ThreadPoolExecutor基本原理
        • 为什么submit没抛出异常
        • 为什么ScheduledThreadPoolExecutor的scheduleAtFixedRate任务突然不再跑
    • 结论

背景

写了个简单的定时任务来监控线程池情况:

Executors.newFixedThreadPool(1).scheduleWithFixedDelay(this::run,0, dtpProperties.getIntervalSec(), TimeUnit.SECONDS);

但是发现有些时候跑着跑着, 就没有执行的日志继续打印出来了,也就是不知道为什么线程池突然挂了, 猜测是因为异常了导致的.

问题解决

是超过了间隔时间导致的吗

首先以为是线程池任务执行时间过长, 导致超过了fixed时间,
写了个demo试了下, sleep5秒, 间隔3秒, 是可以正常执行的.

要搞清楚为啥FixedThreadPool可以间隔一段时间跑一次, 之前看线程池源码可知, 对于线程池重要的两个方法, 一个是take一个poll, 分别表示是否允许淘汰worker, 来从等待队列获取任务, poll的情况表示允许淘汰worker

从下面的源码可以看到最重要的一个地方, getDelay, 获取的是距离下一次预定执行时间的剩余时间. 如果>0, 则直接返回null, 也就解释了为啥一定要等一段间隔执行

public RunnableScheduledFuture<?> poll() {final ReentrantLock lock = this.lock;lock.lock();try {RunnableScheduledFuture<?> first = queue[0];if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn finishPoll(first);} finally {lock.unlock();}}

因此它不管任务执行多久, 它是根据这次任务计算下次预定执行时间, 然后判断剩余时间来决定要不要执行的

异常没抛出

其实应该想到, 线程池对于线程的异常处理, 其实是有坑的, 比如常常见到的线程池submit不打印异常

线程池ThreadPoolExecutor基本原理

  1. 首先线程池有一个阻塞等待队列
  2. 线程池会将 runable任务, 封装成一个worker, 这个worker继承了Runable接口, 也有run方法.
    我们看看ThreadLocalPool的execute方法,可以看到创建新线程都用到了关键方法addWorker.
 public void execute(Runnable command) {....//比较现在线程数是不是小于核心线程,是则创建新线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果现在线程数>=核心线程,尝试放到队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//否则继续创建新线程else if (!addWorker(command, false))reject(command);}
  1. 看看addWorkder方法, 里面创建了一个worker, 可以看到worker实现了Runable类, 自己创建后, 成员变量初始化了Worker线程
  private boolean addWorker(Runnable firstTask, boolean core) {...try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {...if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

看看Worker类, 它有run方法

    private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//重要的run方法public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;...try {while (task != null || (task = getTask()) != null) {...try {...try {task.run();} catch (XXXException x) {thrown = x; throw x;} ...}} finally {...}}...} finally {...}}private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {...// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果设置了超时时间,或者当前线程>核心线程数,队列里一直为空拿不到任务,就返回null, runWorker方法中getTask=null跳出循环, 即实现了销毁线程if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//从队列中poll或take拿任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}}
  1. worker的run方法中, 调用runWorker, 里面是一个循环, 一直getTask, 即我们的Runable任务来执行, 这个任务可能是创建worker的时候携带过来的, 也可能是后续从等待队列中拿到的
  2. 再回到外层, 原来创建一个Worker里面有这么多文章, 那么他既然是线程类, 从哪里启动的呢? 回到addWorker方法, 原来在这
 if (workerAdded) {t.start();workerStarted = true;}

为什么submit没抛出异常

从runWorker里我们看到, 实际上它虽然catch了异常, 但是是有抛出的

} catch (XXXException x) {thrown = x; throw x;} 

别急, 我们看看 submit方法

    public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}

外层包了一个FutureTask, 再看它的run方法

  public void run() {...try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {...}}    protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final statefinishCompletion();}}

关键来了, try-catch了异常, 然后没有抛出

为什么ScheduledThreadPoolExecutor的scheduleAtFixedRate任务突然不再跑

首先看下这个方法

 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {...//外层包装成了future类ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}

ScheduledFutureTask这个类同样的继承了 FutureTask<V>
看下run方法, 可以看到调用了父类的run方法, 同样的是FutureTask罪魁祸首没有抛出异常

 public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();//重点来了,调用了super(即FutureTask的run方法)else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}

同样的, 由于ScheduledFutureTask.super.runAndReset() 返回false, 所以也就不会调用reExecutePeriodic将任务放入下一次周期执行

结论

线程池 execute 可以抛出异常
线程池 submit 不会抛出异常, 同理, 使用FutureTask作为任务执行类的不会抛出异常, 其中就包括了ScheduledThreadPoolExecutorscheduleAtFixedRate


http://www.ppmy.cn/news/224899.html

相关文章

常用的LED显示屏驱动芯片和控制系统

常用的LED显示屏驱动芯片包括以下几种&#xff1a; TPIC6B595&#xff1a;这是一种串行输入、并行输出的LED显示屏驱动芯片&#xff0c;适用于驱动7段数码管等简单的LED显示屏。 MAX7219/MAX7221&#xff1a;这是一种常用的LED显示屏驱动器&#xff0c;可驱动8x8点阵LED显示屏。…

C++之stack容器

一、概念 概念: stack是一种先进后出(First In Last Out,FILO)的数据结构&#xff0c;它只有一个出口&#xff1b; 二、代码 #include <iostream> #include <stack>using namespace std;// 栈数据操作 概念: stack是一种先进后出(First In Last Out,FILO)的数据结…

空军一号多少钱

空军一号多少钱&#xff1f; 入秋了&#xff0c;天气也是一天比一天凉爽了&#xff0c;很多小伙伴开始为秋季准备新衣了&#xff0c;af1小麦色鞋款是不错的选择&#xff0c;特别是高帮的空军一号小麦色&#xff0c;颜值高&#xff0c;出门也百搭&#xff0c;符合这个季节的定义…

一张纸(5毫米)折叠多少次可以达到珠穆朗玛峰的高度(8848米)?

折叠次数计算&#xff1a;一张纸&#xff08;5毫米&#xff09;折叠多少次可以达到珠穆朗玛峰的高度(8848米)&#xff1f; 实现代码&#xff1a; a0.005 for i in range(1,100):aa*2if(a>8848):print(i)quit()运行演示&#xff1a;

奥运金牌金镶玉到底值多少钱

奥运会开幕以来&#xff0c;中国运动员夺金势头令世人瞩目&#xff0c;一个个冠军诞生&#xff0c;一块块金镶玉(金牌)被中国代表团收入囊中。此时&#xff0c;人们开始关注、议论、猜测&#xff1a;拿到奥运会冠军的运动员能得到多少奖金&#xff1f;一块奥运金牌金镶玉到底能…

第四章 部署远程访问服务

♥️作者介绍&#xff1a;奇妙的大歪 ♥️个人名言&#xff1a;但行前路&#xff0c;不负韶华&#xff01; ♥️个人简介&#xff1a;云计算网络运维专业人员 目录 一.什么是远程访问&#xff1f; 二.远程访问的组成 三.远程访问的方式有哪些&#xff1f; 一、远程访问软件…

8848高端商务手机,双系统配置让工作生活轻松切换

到8848这组数字你会想到啥? 收到8848 M3钛金手机当天&#xff0c;我在微信朋友圈发了一帖子。评论中最多提及的是珠穆朗玛峰&#xff0c;其次是手机品牌&#xff0c;也有朋友提及第一代互联网人老榕。 概念篇 8848是珠穆朗玛峰的海拔&#xff0c;借此巅峰可以导引入山水之境&a…

我国最高山峰是珠穆朗玛峰,8848米。现在我有一张足够大的纸,它的厚度是0.01米。请问,我折叠多少次,可以折成珠穆朗玛峰的高度?

package cn.zxj.com; public class Demo { public static void main(String[] args) { double zhufeng 8848; double paperInit 0.01; //折叠次数 int count 0; boolean flag true; while(flag){ paperInit*2; i…