文章目录
- 1 基本介绍
- 1.1 为什么使用线程池
- 1.2 一个简单的例子
- 1.3 线程池实现类:以ThreadPoolExecutor为例
- 1.3.1 ThreadPoolExecutor的构造方法
- 1.3.2 常用的阻塞队列:
- 1.3.3 常用的拒绝策略:
- 1.3.2 ThreadPoolExecutor的工作模型
- 1.4 线程池的种类
- 1.5 线程池的状态
- 2 源码梳理
- 3 注意点
- 参考
1 基本介绍
1.1 为什么使用线程池
在多线程开发过程中,会面临以下问题:
- 线程的创建、销毁需要额外的开销,非常消耗系统资源。
- 如果我们不能对线程进行良好的管理,大量创建线程,会给系统带来资源耗尽的风险。
而线程池可以解决上述问题:
- 线程池通过线程复用,减少了线程创建与销毁的额外开销。
- 通过核心线程数、最大线程数等手段限制了线程的个数,能够合理利用系统资源。
- 通过配合不同的队列、拒绝策略等手段,可以满足不同场景的需要。
1.2 一个简单的例子
下面以一个简单的例子说明如何使用线程池:
public class ThreadPoolTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executor = Executors.newSingleThreadExecutor();FutureTask<String> task = new FutureTask<>(new Callable<String>() {@Overridepublic String call() {return "Hello World";}});executor.submit(task);System.out.println(task.get());}
}
- Executors:相当于一个工具类。通过这个工具类,我们不必关注如何创建一个线程池、如何调度线程执行任务,只需要提供一个Runnable对象,在其中实现自己的业务逻辑,然后提交任务即可。
- ExecutorService:是一个接口类,在其中拓展了很多的功能,如关闭线程池、提交任务。
1.3 线程池实现类:以ThreadPoolExecutor为例
根据类图我们可以发现,线程池的最终实现类为ThreadPoolExecutor。实际上还有ScheduledThreadPoolExecutor,而本文是以使用最广泛的ThreadPoolExecutor类进行讲解。
1.3.1 ThreadPoolExecutor的构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {}
ThreadPoolExecutor的构造函数运用了重载的方式,可以根据需要很灵活地构造出一个ThreadPoolExecutor类。每个参数的具体说明如下:
- corePoolSize:核心线程数,值得一提的是核心线程数不会销毁的,具体如何实现,在源码梳理中会有讲解。
- maximumPoolSize:最大线程数,如果maximumPoolSize大于corePoolSize,那么线程池就会创建“非核心线程”来执行任务,但是“非核心线程”的数量不会超过maximumPoolSize减去corePoolSize的值。
- keepAliveTime和unit:超时时间,如果“非核心线程”在指定的时间内一直处于空闲状态,则将会被销毁。
- workQueue:阻塞队列,ThreadPoolExecutor实际是一个生产消费模型,所创建的任务不会立马执行,而是放入队列中,由线程池消费、执行。
- threadFactory:创建线程的⼯⼚ ,⽤于批量创建线程,统一为线程指定如是否守护线程、线程优先级等参数。如果不指定该参数,会新建⼀个默认的线程⼯⼚。
- handler:拒绝处理策略,如果当线程数量⼤于maximumPoolSize时就会采⽤拒绝处理策略,默认是丢弃消息,并抛出RejectedExecutionException异常。
1.3.2 常用的阻塞队列:
- LinkedBlockingQueue:链式阻塞队列,底层数据结构是链表。
- ArrayBlockingQueue:数组阻塞队列,底层数据结构是数组,需要指定队列的⼤⼩。
- SynchronousQueue:同步队列,内部容量为0,每个put操作必须等待⼀个take操作,反之亦然。
- DelayQueue:延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列
中获取到该元素 。
1.3.3 常用的拒绝策略:
- ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执⾏程序(如果再次失败,重复此过程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调⽤线程处理该任务。
1.3.2 ThreadPoolExecutor的工作模型
1.4 线程池的种类
通过Executors类方法,我们可以创建种线程池:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
- CachedThreadPool:核心线程数为0,最大线程数为Integer.MAX_VALUE,闲置线程最大存活时间为60秒,阻塞队列为长度0的SynchronousQueue。
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
- FixedThreadPool:核心线程数与最大线程数一致,阻塞队列为LinkedBlockingQueue;
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
- SingleThreadExecutor:核心线程数和最大线程数都为1,阻塞队列为LinkedBlockingQueue,也就是说newFixedThreadPool(1)其实可以看作为newSingleThreadExecutor()。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());}
- ScheduledThreadPool:创建的是一个可以在指定延迟时间后执行任务的线程池,构造函数可以指定核心线程数,最大线程数为Integer.MAX_VALUE,最大空闲时间为0.01秒,阻塞队列为DelayedWorkQueue的延迟队列。
- WorkStealingPool:这是Jdk1.8中新增的线程池,内部由ForkJoinPool实现,由于能够合理的使用CPU进行对任务进行并行操作,所以适合耗时很长的任务。
1.5 线程池的状态
在线程池内部维护了一个属性:ctl。它的高3位用来表示线程池状态,低29位用来保存线程个数。其中线程池的状态如下:
- RUNNING:可以正常接受新任务并且处理阻塞队列中的任务。
- SHUTDOWN:拒绝新任务但是可以继续处理阻塞队列里的任务。
- STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务。
- TIDYING:在包含阻塞队列中的任务在内的所有任务执行完毕后,线程个数设置为0,然后调用terminnated方法。
- TERMINATED:调用terminated方法后,线程池处于终止状态。
各个状态的转化如下:
* RUNNING -> SHUTDOWN* On invocation of shutdown() //调用了 shutdown方法* (RUNNING or SHUTDOWN) -> STOP* On invocation of shutdownNow() //调用了 shutdownNow方法* SHUTDOWN -> TIDYING* When both queue and pool are empty // 当线程池和任务队列都为空时* STOP -> TIDYING* When pool is empty //当线程池为空时* TIDYING -> TERMINATED* When the terminated() hook method has completed // 当 terminated() hook方法执行完毕后
2 源码梳理
先说明一下,本文所用的源码是Jdk 11版本,网上很多资料是用的Jdk 8,虽有细微差别,但是影响不大。
正如例子中所调用方法:executor.submit(task),这背后的执行逻辑是什么样子的?
ExecutorService是一个接口,而它的实现类是AbstractExecutorService,以下是AbstractExecutorService中的submit方法,也是入口。
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();// 将 Runnable对象封装成 FutureTask,这样就可以通过 get方法获取调用结果了。RunnableFuture<Void> ftask = newTaskFor(task, null);// 调用 ThreadPoolExecutor的 execute方法,提交任务。execute(ftask);return ftask;}
由此可以看出,先将Runnable对象封装成FutureTask对象,然后调用ThreadPoolExecutor的execute方法。execute方法具体如下:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 获取ctl的值,高3位是线程池状态、低29位是线程个数int c = ctl.get();// workerCountOf(c)是获取当前线程数。// 如果当前线程数小于核心线程数,则调用 addWorker方法创建一个核心线程执行任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 由上方的 return可以判断出,代码执行到这里,说明当前线程数大于等于核心线程数// isRunning(c)是获取当前线程池的状态// workQueue.offer(command) 是将任务添加到 workQueue队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次检查线程池状态,防止添加任务到任务队列后,线程池的状态发生了改变// 如果线程池停止了,则调用 remove方法将该任务从执行队列中移除,if (! isRunning(recheck) && remove(command))// 执行拒绝策略reject(command);else if (workerCountOf(recheck) == 0)// 如果线程池未停止,处于running状态,且当前线程池的工作线程数为0,则创建一个新的线程。addWorker(null, false);}// 如果workQueue已经满了,无法放下任务,则新建“非核心线程”来执行该任务。// 如果也无法通过新建“非核心线程”来执行该任务(当线程数量⼤于maximumPoolSize),则执行拒绝策略。else if (!addWorker(command, false))reject(command);}
下面我们便进入addWorker()方法看看线程池是如何新开线程执行任务的。
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {// 这里源码中注释为:在必要的时候检查 workQueue是否为空。// 这里的必要的时候是指:// 1. 线程池状态位为 STOP、TIDYING和 TERMINATED。// 2. 线程池状态位为 SHUTDOWN并且已经有了一个 firstTask需要执行。// 3. 线程池状态位为 SHUTDOWN并且队列为空。if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {// 如果 core是 true,那说明该要创建的线程是核心线程// 判断线程数是否超出限制,如果超出则失败。if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;// 配合循环,以 CAS的方式添加线程数,成功了则跳出循环if (compareAndIncrementWorkerCount(c))break retry;// 如果 CAS失败了,则在线程池状态正常的情况下,再次重新尝试重试。// 因为这个时候可能已经有任务完成了,工作线程数就减少了。c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN))continue retry;}}//以上代码是为了对线程状态、线程数量做判断,相当于是准备工作。boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个 Worker对象w = new Worker(firstTask);// 实例化一个 Thread对象final Thread t = w.thread;if (t != null) {// 加上独占锁,这里是为了防止有多个线程调用线程池的 execute方法final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 重新检查线程池状态,避免在获取独占锁之前有其他线程调用了 shutdown方法int c = ctl.get();if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.isAlive()) throw new IllegalThreadStateException();// 添加任务。需要注意一下,这里是往 HashSet<Worker> workers中添加任务,而非 workQueue。// workers指的是正在工作的线程集合,而workQueue是等待执行的阻塞队列。workers.add(w);int s = workers.size();// 记录已执行的最大任务个数if (s > largestPoolSize)largestPoolSize = s;// 添加任务成功workerAdded = true;}} finally {// 释放锁mainLock.unlock();}if (workerAdded) {//如果任务添加成功那么开始执行任务t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
正如我们调用一个 Thread类的 start方法,在新建线程获得 CPU时间后,就会调用 run方法。下面我们来看一下 Worker类。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {// 将 Runnable对象装饰为 Worker对象Worker(Runnable firstTask) {// 将线程的 state设置为 -1,以防线程被中断。setState(-1); this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 这里少一个 @Override注解,所以看的不大明显,其实在新建线程获得 CPU时间后就是调用的该方法。public void run() {// 实际执行委托给外部的 runWorker方法runWorker(this);}}
那么,下面便可以进入runWorker方法了
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 将线程的 state设置为 0,这样当调用线程池的 shutdownNow方法时就可以中断 Worker线程了。w.unlock();boolean completedAbruptly = true;try {// 这个循环是关键所在!!!// 如果当前的 firstTask为空,便会调用 getTask方法去从阻塞队列 workQueue中获取任务。while (task != null || (task = getTask()) != null) {w.lock();// 再次检查线程池状态,如果线程池处于中断状态,当前线程将中断。 if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {// 这里便是我们需要执行的业务逻辑代码。task.run();// 任务结束后需要执行的业务逻辑代码。afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 执行清理工作,主要是统计整个线程池完成的任务个数,并从工作集合中删除当前 Worker。// 另外判断当前线程个数是否小于 corePoolSize,如果是,则增加线程数。processWorkerExit(w, completedAbruptly);}}
那么问题来了,线程池是如何实现线程复用的呢,实际上就是上述循环判断所调用的getTask()方法中。
private Runnable getTask() {boolean timedOut = false; // 注意这里是一个循环for (;;) {int c = ctl.get();// 依旧是在必要的时候检查 workQueue是否为空。if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}// 获取正在工作的线程数int wc = workerCountOf(c);// allowCoreThreadTimeOut变量是指的是否允许核心线程空闲超时。// allowCoreThreadTimeOut默认是 false,也就是核心线程即使空闲也不会被销毁。// 如果 allowCoreThreadTimeOut为 true。核心线程在 keepAliveTime内仍空闲则会被销毁。 // allowCoreThreadTimeOut为 false,那么如果正在运行的线程数大于核心线程数,那么 timed则为 true,否则为 false。boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 在以下情况下递减 worker数量:// 1. 如果正在工作的线程数超过了最大线程数,但是缓存队列已经空了。// 2. 如果有设置允许线程超时或者线程数量超过了核心线程数量,并且线程在规定时间内均未 poll到任务且队列为空。if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 根据 timed状态从workQueue中获取任务。// 实际上就是如果正在运行的线程数大于核心线程数,就执行 poll方法,该方法设定了超时时间。// 如果正在运行的线程数小于等于核心线程数,就执行 take方法,如果workQueue为空,则当前线程就会阻塞,直到成功获取任务为止,那么核心线程就不会被销毁了。Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
3 注意点
- 《阿里把把开发手册》不建议我们直接使用 Executors类创建线程池,而是通过 ThreadPoolExecutor的方式,因为Executors类中创建的线程池不够灵活,比如设置阻塞队列、拒绝策略等,存在使得系统资源耗尽的风险。
- 创建线程池时需要指定线程名称,这样可以很方便的定位问题。在创建线程池时可以传入自定义的 ThreadFactory,我们可以自定义一个 ThreadFactory,然后加入一个 namePrefix的初始化方法即可。这部分可以参考《Java并发编程之美》第11章的内容。
- 使用线程池的情况下,当程序结束是需要显示调用 shutdown方法。这是因为线程池默认的 ThreadFactory创建的线程是用户线程,如果不调用 shutdown方法,线程池的线程资源将一直不会被释放。
- 当拒绝策略为 ThreadPoolExecutor.DiscardPolicy和 ThreadPoolExecutor.DiscardOldestPolicy时,在被拒绝的任务的FutureTask对象上调用 get方法会导致线程一直阻塞,所以应该尽量使用带超时参数的 get方法。
- 在线程池里设置了ThreadLocal变量,则一定要调用 remove方法清理,因为线程池中的核心线程是一直存在的,如果不清理,会引发内存泄漏
参考
- 电子工业出版社,翟陆续、薛宾田 著,《Java并发编程之美》。
- RedSpider社区,《深入浅出Java多线程》。