【并发编程】ThreadPoolExecutor任务提交与停止流程及底层实现【新手探索版】

news/2024/10/17 23:31:05/

文章目录

  • 1. ThreadPoolExecutor任务提交
  • 2. 线程池状态[这部分是难点呀]
    • 2.1. addWorker添加worker线程
    • 2.2. 内部类Worker
    • 2.3. runWorker():执行任务
    • 2.4. getTask():获取任务
    • 2.5. processWorkerExit():worker线程退出
  • 3.3. 关闭线程池
    • 3.3.1. shutdown方法
    • 3.3.2. shutdownNow方法
  • 4. 其他方法
    • 4.1. reject方法
  • 5. 总结
  • 6. 参考

测试代码:

// 创建只含有单个线程的线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交任务
executor.submit(()->{System.out.println("C");
});
// 优雅停机
executor.shutdown();

1. ThreadPoolExecutor任务提交

我们看submit方法

// AbstractExecutorService
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;
}
  • newTaskFor:把Runnable构建为FutureTask任务
  • execute:执行任务
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1、线程数量小于核心线程,则添加Workerif (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 2、插入任务到队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3、队列也满了,则添加非核心workelse if (!addWorker(command, false))// 4、添加非核心Worker失败则"拒绝"reject(command);
}

从以上可以看到线程池的处理原则是:核心线程数 > 队列 > 非核心线程 > 拒绝策略

1、先添加核心线程来处理

2、核心线程不够,则添加到队列中

3、队列也不够,则添加“非核心线程”

4、非核心线程也不够,则执行拒绝策略

2. 线程池状态[这部分是难点呀]

public class ThreadPoolExecutor extends AbstractExecutorService {// 核心变量:该变量前3位表示"状态",后29为表示线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 该变量控制ctl变量用多少bit位表示状态【bit位的分界线】private static final int COUNT_BITS = Integer.SIZE - 3;// 表示线程的最大容量【即0001 1111 1111...】private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 以下变量用前3位控制线程池的状态private static final int RUNNING    = -1 << COUNT_BITS; // [即111]private static final int SHUTDOWN   =  0 << COUNT_BITS; // [即000]private static final int STOP       =  1 << COUNT_BITS; // [即001]private static final int TIDYING    =  2 << COUNT_BITS; // [即010]private static final int TERMINATED =  3 << COUNT_BITS; // [即011]// 计算ctl变量的前3位,表示“状态”private static int runStateOf(int c)     { return c & ~CAPACITY; }// 计算ctl变量的后29位,表示"工作线程数量"private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
}
  • ctl:是control(控制)的缩写。是线程池的最核心变量,该int变量即包含状态也包含线程的数量
  • COUNT_BITS:该变量ctl变量bit位的分界线
  • CAPACITY:表示线程池最大线程数量
  • 各种状态:用ctl的前3个bit位表示状态
  • runStateOf、workerCountOf方法:计算状态、线程数量

2.1. addWorker添加worker线程

private boolean addWorker(Runnable firstTask, boolean core) {// <1>retry:for (;;) {int c = ctl.get();// 获取状态int rs = runStateOf(c);// <1.1> 状态判断if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//增加c变量的线程数if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctl// 一旦状态改变,则需要返回外层循环重新判断状态if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// <2>boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask); // 创建Workerfinal Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();// 添加"工作线程需要上锁"try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();//启动工作线程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

addWorker代码分2个部分

  • 维护ctl变量(在<1>部分)

用cas给ctl变量表示线程数量的位置值加一。

<1.1>处,状态判断我是感觉写法挺烧脑子的。

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))return false;

要使整体条件成立则rs >= SHUTDOWN成立且后面括号的判断为false。后面括号分3种情况:

1、如果rs == SHUTDOWN为false(隐含条件有:rs >= SHUTDOWN成立。

rs == SHUTDOWN,则rs可能得值有stop、tidying、terminated其中一个,即线程池已经终止,则表示终止状态下不必添加Worker线程。

2、如果firstTask == null为false(隐含的条件有:rs >= SHUTDOWN,rs == SHUTDOWN成立。

线程池是shutdown状态,且firstTask == null为false,则表示shutdown状态下不必添加Worker线程。

3、如果workQueue.isEmpty()为true(隐含条件:rs=SHUTDOWN,firstTask == null成立。

线程池是shutdown状态,且提交的任务也是null,且队列是空的,则不必添加Worker线程。【如果队列不是空的还是有必要添加Worker?】

备注:第3个条件真的有必要吗???因为workQueue某些任务挺耗时,添加添加Worker是为了加快workQueue中任务的处理?

如此复杂的条件判断的目的:特定条件下即使shutdown还是会添加Worker线程,并不是只要shutdown就不必添加Worker线程。

批判:完全没有必要把条件搞这么复杂,只能说作者太追求极致了。完全可以只要状态>=shutdown就返回false。而且firstTask=null怎么会发生呢,这个方法是private啊。

极致的条件是什么:①rs>=shutdown且②rs=shutdown且③firstTask=null且④workQueue非空,即这些条件都满足时还是可以添加Worker线程的。

  • 添加启动线程(在<2>部分)

1、创建Worker

firstTask变量表示Worker中的第一个任务

每一个Worker表示一个工作线程。

2、添加Worker到workers

由线程池的workers变量统一维护所有线程

3、添加线程的过程需要“上锁”,添加完线程之后调用t.start方法启动线程

2.2. 内部类Worker

/*** Worker类大体上管理着运行线程的中断状态 和 一些指标* Worker类投机取巧的继承了AbstractQueuedSynchronizer来简化在执行任务时的获取、释放锁* 这样防止了中断在运行中的任务,只会唤醒(中断)在等待从workQueue中获取任务的线程* 解释:*   为什么不直接执行execute(command)提交的command,而要在外面包一层Worker呢??*   主要是为了控制中断....重点主要原因啦*   用什么控制??*   用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁*   只有在等待从workQueue中获取任务getTask()时才能中断** worker实现了一个简单的不可重入的互斥锁,而不是用ReentrantLock可重入锁* 因为我们不想让在调用比如setCorePoolSize()这种线程池控制方法时可以再次获取锁(重入)* 解释:*   setCorePoolSize()时可能会interruptIdleWorkers(),在对一个线程interrupt时会要w.tryLock()*   如果可重入,就可能会在对线程池操作的方法中中断线程,类似方法还有:*   setMaximumPoolSize()*   setKeppAliveTime()*   allowCoreThreadTimeOut()*   shutdown()* 此外,为了让线程真正开始后才可以中断,初始化lock状态为负值(-1),在开始runWorker()时将state置为0,而state>=0才可以中断* * Worker继承了AQS,实现了Runnable,说明其既是一个可运行的任务,也是一把锁(不可重入)*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{final Thread thread; //利用ThreadFactory和 Worker这个Runnable创建的线程对象Runnable firstTask;volatile long completedTasks;Worker(Runnable firstTask) {//设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取setState(-1);// 在调用runWorker()前,禁止interrupt中断,//在interruptIfStarted()方法中会判断 getState()>=0this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); //根据当前worker创建一个线程对象//当前worker本身就是一个runnable任务,也就是不会用参数的firstTask创建线程,//而是调用当前worker.run()时调用firstTask.run()}public void run() {runWorker(this); //runWorker()是ThreadPoolExecutor的方法}// Lock methods// The value 0 represents the unlocked state. 0代表“没被锁定”状态// The value 1 represents the locked state. 1代表“锁定”状态protected boolean isHeldExclusively() {return getState() != 0;}/*** 尝试获取锁* 重写AQS的tryAcquire(),AQS本来就是让子类来实现的*/protected boolean tryAcquire(int unused) {//尝试一次将state从0设置为1,即“锁定”状态,//但由于每次都是state 0->1,而不是+1,那么说明不可重入//且state==-1时也不会获取到锁if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread()); //设置exclusiveOwnerThread=当前线程return true;}return false;}/*** 尝试释放锁* 不是state-1,而是置为0*/protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null); setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }/*** 中断(如果运行)* shutdownNow时会循环对worker线程执行* 且不需要获取worker锁,即使在worker运行时也可以中断*/void interruptIfStarted() {Thread t;//如果state>=0、t!=null、且t没有被中断//new Worker()时state==-1,说明不能中断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}

Worker类本身既实现了Runnable,又继承了AbstractQueuedSynchronizer,所以其既是一个可执行的任务,又可以达到锁的效果

主要是几个问题:

  • 为什么会有一把锁

肯定不是为了控制并发的,因为只有一个线程。主要是为了控制中断。用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁,只有在等待从workQueue中获取任务getTask()时才能中断

  • 控制中断体现在哪里

1、初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断

不允许中断体现在:

  1. shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state=-1时tryLock()失败,没办法中断interrupt()
  2. shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>=0才能interrupt的逻辑,而初始state=-1

2、为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程。Worker实现的AQS为不可重入锁,为了是在获得worker锁的情况下再进入其它一些需要加锁的方法

  • 有没有响应中断

很明显ThreadPoolExecutor没有响应中断(即没有检测和处理中断)。没有响应中断与控制中断是两回事。至于为什么要多搞一层Work来控制中断作者也不清楚。

  • 没有中断、 没有锁行不行

个人猜测:如果只是我们自己写代码完全可以,但这是大神写的代码,不可以

  • 为什么要自己继承AQS,而不是使用ReentrantLock

ReentrantLock是可重入的,而ThreadPoolExecute要求不可重入

2.3. runWorker():执行任务

看一下Worker如何执行任务的

// java.util.concurrent.ThreadPoolExecutor.Worker
public void run() {runWorker(this);
}
final void runWorker(Worker w) {Thread wt = Thread.currentThread();// <1>Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// <2> 获取任务【重点】while (task != null || (task = getTask()) != null) {// <3> 上锁,不是为了防止并发执行任务,为了在shutdown()时不终止正在运行的workerw.lock();// <4> 极端判断条件是:由Running或shutdown状态,突然变成stop状态if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))&& !wt.isInterrupted())wt.interrupt();// <5>try {// <5.1>beforeExecute(wt, task);Throwable thrown = null;try {// <5.2>task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// <5.3>afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}// <6> completedAbruptly变量表示是否是"暴力结束线程",执行到此处说明线程是因为taskTask返回为nullcompletedAbruptly = false;} finally {// <7> worker线程的退出【重点】processWorkerExit(w, completedAbruptly);}
}
  • <1>

在创建Worker时添加的任务为第一个任务(firstTask)

  • <2>

调用getTaskworkQueue取出任务。详情请看getTask方法详解。

  • <3>处,为什么要上锁

是为了控制中断,是控制,控制,不是响应中断,线程池没有响应中断。

  • <4>

此时的极端条件是什么:由Running或shutdown状态,突然变成stop状态

1、如果状态达到stop级别,必须要对worker中断。

2、可能暂时没有达到stop级别,但是别处立马调用了shutdownNow则立马达到stop级别,也要立刻中断

注意:线程池本身不响应中断,是否中断用户自行在task.run方法决定

  • <5>

1、<5.1>是任务执行开始之前的动作,默认空实现

2、<5.2>是执行任务,即调用run方法

3、<5.3>是任务结束之后的动作,默认空实现

  • <6>

completedAbruptly变量表示是否是暴力完成任务的,执行到<4>处说明是正常完成,不是暴力完成。

代码执行到<4>处位置,说明getTask方法取出任务为null,也就是任务不足了,就需要考虑销毁不必要的线程

  • <7>

此时需要终止工作线程。这一段后面有详解。

2.4. getTask():获取任务

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// <1> 极端条件是:rs == SHUTDOWN,且 workQueue不为空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// <2> 是否允许取出任务超时boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// <3> 极端条件:wc <= maximumPoolSize 且 wc > corePoolSize 且 超时 且 队列为空,此时需要销毁当前线程if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// <2> 取出任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;// <3> 到这里说明当前线程出现了等待超时,则可能要销毁非核心线程timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
  • <1>

此时条件复杂,我们看极端情况,极端条件是:【rs == SHUTDOWN且 workQueue不为空】,说明了即使是SHUTDOWN状态只要队列workQueue不为空,还是需要等待workQueue执行。

SHUTDOWN是优雅停机,会先等workQueue执行完毕

  • <2>处,

timed变量主要是控制当线程空闲时是否需要销毁线程。默认如果线程数量>corePoolSize且空闲,则销毁当前线程;如果线程<=corePoolSize但空闲是不会销毁线程的。(是否销毁默认取决于线程数量)

  • <3>

极端条件:wc <= maximumPoolSize 且 wc > corePoolSize 且 超时 且 队列为空,此时线程数量太多又没有任务所以需要销毁当前线程。

  • <4>处,取出任务。

1、如果允许取出任务超时,则调用限时的poll方法。如果超时返回null

2、如果不允许取出任务超时,则调用take方法阻塞获取task。

也就是说如果返回null则一定就是**当前线程出现了等待的情况,则考虑是否需要销毁不必要的线程了

2.5. processWorkerExit():worker线程退出

注意:这才是worker工作线程真正退出的地方,而不是调用线程池的shutdown方法工作线程就会停止,工作线程会等待所有任务都执行完成后才会停止。

private void processWorkerExit(Worker w, boolean completedAbruptly) {/*** 1、worker数量-1* 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,*那么正在工作的worker线程数量需要-1* 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了*/if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 代码和注释正好相反啊decrementWorkerCount();/*** 2、从Workers Set中移除worker*/final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; //把worker的完成任务数加到线程池的完成任务数workers.remove(w); //从HashSet<Worker>中移除} finally {mainLock.unlock();}/*** 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池* 主要是判断线程池是否满足终止的状态* 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程* 没有线程了,更新状态为tidying->terminated*/tryTerminate();/*** 4、是否需要增加worker线程* 线程池状态是running 或 shutdown* 如果当前线程是突然终止的,addWorker()* 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()* 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程*/int c = ctl.get();//如果状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个workerif (runStateLessThan(c, STOP)) {//不是突然完成的,即没有task任务可以获取而完成的,计算min,并根据当前worker数量判断是否需要addWorker()if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize//如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程if (min == 0 && ! workQueue.isEmpty())min = 1;//如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个if (workerCountOf(c) >= min)return; // replacement not needed}//添加一个没有firstTask的worker//只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态addWorker(null, false);}
}

processWorkerExit(Worker w, boolean completedAbruptly)

worker:要结束的worker

completedAbruptly:是否突然完成(是否因为异常退出)

执行流程:

1、worker数量-1

A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了

2、从Workers Set中移除worker,删除时需要上锁mainlock
3、tryTerminate()。大概逻辑:判断线程池是否满足终止的状态

tryTerminate虽然很长,但是只有一个作用,通过设置线程池状态为terminal来真正终止线程,终止的前提条件是:线程池状态为shutdown + 队列是空的 + 工作线程为0

操作:更新状态为tidying->terminated

4、是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程

如:我们自定义的任务task.run方法发生了异常导致线程退出,此时线程池需要补充新的线程进来

故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程

3.3. 关闭线程池

注意:关闭线程池并不能让线程池中的线程停止,只是对其发送了中断信号,仅仅是发送了信号而已。而ThreadPoolExecute没有响应中断信号,所以最终只有在所有的task都被执行后才会真正的结束worker线程。

3.3.1. shutdown方法

shutdown()后线程池将变成shutdown状态,此时不接收新任务**,但会处理完正在运行的和在阻塞队列中等待处理的任务**。

/*** 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续*/
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); //上锁 try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne) break;}} finally {mainLock.unlock(); //解锁 } }}
}

shutdown()执行流程:

1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock

2、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务

3、中断所有空闲线程 interruptIdleWorkers()

正在运行中的线程不会被中断

4、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理

5、解锁

6、尝试终止线程池 tryTerminate()

可以看到shutdown()方法最重要的几个步骤是:更新线程池状态为shutdown中断所有空闲线程tryTerminated()尝试终止线程池

interruptIdleWorkers() 中断空闲线程过程如下:

    /*** 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续*/private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); //上锁 try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne) break;}} finally {mainLock.unlock(); //解锁 } }}}

关键是:worker.tryLock() 是否成功

如果worker是空闲状态(即tryLock成功),AQS的state从0–>1,既然是空闲线程那么中断空闲线程。

3.3.2. shutdownNow方法

shutdownNow()后线程池将变成stop状态,此时不接收新任务**,不再处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工作线程**。

无论如何线程池ThreadPoolExecute本身没有响应中断,至于用户是否响应中断时用户自己的事情。

/*** 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表* 这个任务列表是从任务队列中排出(删除)的* <p>* 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()* <p>* 除了尽力尝试停止运行中的任务,没有任何保证* 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束*/
public List <Runnable> shutdownNow() {List <Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock(); //上锁try {//判断调用者是否有权限shutdown线程池checkShutdownAccess();//CAS+循环设置线程池状态为stopadvanceRunState(STOP);//中断所有线程,包括正在运行任务的interruptWorkers();tasks = drainQueue(); //将workQueue中的元素放入一个List并返回} finally {mainLock.unlock(); //解锁}//尝试终止线程池tryTerminate();return tasks; //返回workQueue中未执行的任务
}

该方法会中断所有的线程,包括正在运行的线程,用户是否响应是用户自己的事情

线程池中队列中等待执行的task会被移除,并返回给用户。

shutdownNow() 和 shutdown()的大体流程相似,差别是:

1、将线程池更新为stop状态

2、调用**interruptWorkers()**中断所有线程,包括正在运行的线程

interruptWorkers先对mainLock加锁,然后循环调用interruptIfStarted方法中断Worker的所有线程,其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用t.interrupt()。

3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

4. 其他方法

4.1. reject方法

private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();final void reject(Runnable command) {handler.rejectedExecution(command, this);
}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());
}

默认的handler是AbortPolicy,它的处理方式是“抛出异常”

其它的handler:

  • DiscardPolicy

直接丢弃

  • DiscardOldestPolicy

丢弃”最老的“

  • CallerRunsPolicy

调用者线程来执行。但是如果线程池被shutdown,则任务会被丢弃。

5. 总结

1、把握int型ctl变量的高3位表示线程池的状态,低29为表示线程池中线程数量是前提

2、能了解线程池执行任务的总体流程(先core,在队列,在max,在拒绝)

3、能了解为什么要有Work类(主要是为了中断)

4、worker线程满足什么条件才会退出

task发生异常会退出但是又会补充新的线程进来

只有任务都执行完了且调用了shutdown方法,才会真正退出

5、shutdown方法对worker线程的影响

shutdown会对空闲线程interrupt

shutdown会改变线程池状态为shutdown,worker线程会被状态作出反应

  • shutdown之后不能添加新的task(体现在addWorker中)

  • worker线程执行完任务之后就退出了,而不是在workQueue队列上等待新任务

6、温故而知新,还是要经常看看经常想想

6. 参考

https://blog.csdn.net/qq_41573860/article/details/123291943


http://www.ppmy.cn/news/1132040.html

相关文章

Feign接口调用GET请求@RequestParam传参丢失

文章目录 问题现象排查解决GET加注解解决使用POST方式解决 时间戳传参失败 问题现象 项目使用的是Spring Cloud微服务&#xff0c;服务间调用使用的是Feign在一次服务调用时&#xff0c;发现GET传参丢失&#xff0c;没有传递过去任何参数加了RequestParam注解&#xff0c;发现…

ORA-01034: ORACLE not available?一文解决

1.情况描述 oracle用户sqlplus登陆数据库&#xff08;11gR2 单机asm&#xff09;&#xff0c;进去查询一些基本的视图发现报错 ORA-01034: ORACLE not available&#xff0c;详细如下 [oracleoomcserver db_1]$ sqlplus / as sysdba SQL*Plus: Release 11.2.0.4.0 Production…

Unity把UGUI再World模式下显示到相机最前方

Unity把UGUI再World模式下显示到相机最前方 通过脚本修改Shader 再VR里有时候要把3D的UI显示到相机最前方&#xff0c;加个UI相机会坏事&#xff0c;可以通过修改unity_GUIZTestMode来解决。 测试用例 测试用例如下&#xff1a; 场景包含一个红色的盒子&#xff0c;一个UI…

前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— Web APIs(三)

思维导图 全选案例 大按钮控制小按钮 小按钮控制大按钮 css伪类选择器checked <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><…

【C++】vector的介绍 | 常见接口的使用

目录 vector的介绍 常见接口 构造函数 尾插push_back() vector的遍历 1.用方括号下标 遍历&#xff1a; 2.调用at()来访问&#xff1a; 3.用迭代器遍历&#xff1a; 4.范围for遍历&#xff1a; vector的空间修改 vector增删查改 覆盖assign() 查找find() 插入ins…

算法练习8——有序三元组中的最大值

LeetCode 100088 有序三元组中的最大值 I LeetCode 100086 有序三元组中的最大值 II 给你一个下标从 0 开始的整数数组 nums 。 请你从所有满足 i < j < k 的下标三元组 (i, j, k) 中&#xff0c;找出并返回下标三元组的最大值。如果所有满足条件的三元组的值都是负数&am…

在Qt中,怎么获取到在mainwindow.ui文件中添加的控件

2023年9月30日&#xff0c;周六晚上 假设我在mainwindow.ui中添加了一个名为textEdit的QTextEdit对象 在mainwindow.cpp中&#xff0c;可以通过ui对象来获取到这个控件

《Jetpack Compose从入门到实战》 第二章 了解常用UI组件

目录 常用的基础组件文字组件图片组件按钮组件选择器组件对话框组件进度条组件 常用的布局组件布局Scaffold脚手架 列表 书附代码 Google的图标库 常用的基础组件 文字组件 Composable fun TestText() {Column(modifier Modifier.verticalScroll(state rememberScrollState…