1、阻塞队列
阻塞队列:当队列已满的时候,向队列中添加元素的操作会被阻塞;当队列为空的时候,从队列中取元素的操作会被阻塞。
Java 中用 BlockingQueue 接口表示阻塞队列。BlockingQueue 接口作为 Queue 的子接口,主要作用不是作为容器,而是作为线程同步的工具。它的特征是生产者线程试图向 BlockingQueue 放入元素时,如果该队列已满,则该线程被阻塞;消费者线程试图从 BlockingQueue 中取出元素时,如果该队列已空,则该线程被阻塞。
由于 BlockingQueue 继承自 Queue,而 Queue 又继承自 Collection,所以 BlockingQueue 内并不是所有方法都是阻塞方法,并且通常是成对出现的:
1. add()、remove():队列已满时添加、队列已空时取出都会抛出异常。
2. offer()、poll():队列已满时添加、队列已空时取出,操作成功返回 true 否则返回 false。
3. put()、take():队列已满时添加、队列已空时取出,方法会被阻塞。
4. element()、peek():队列头取出元素但不删除,当队列已空时分别抛出异常、返回 false。
BlockingQueue 的常用实现类,都是线程安全的:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。放入元素时会进行排序,默认按照自然顺序升序排列,也可以在构造方法中传入 Comparator 实例或者自己实现 compareTo() 规定排序规则。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。放入队列中的元素必须实现 Delayed 接口,而 Delay 继承自 Comparable<Delayed> 且内部唯一方法 getDelay() 表示该元素的剩余时长。DelayQueue 会按照元素的剩余时长进行排列,剩余时间短的排在队列前边,只有剩余时间为 0 了才能取出这个元素,否则拿不出来,利用这一点可以实现单机缓存系统。
- SynchronousQueue:一个不存储元素的阻塞队列。内部没有容器,用来实现数据的直接传递。生产者把数据放入 SynchronousQueue 后,必须由消费者取出该数据后,消费者才能再一次存放数据。主要作用是解开了生产者和消费者之间的耦合。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。比其它阻塞队列多了“直接传送”、“尝试传送”的机制。通过调用 transfer() 可以让生产者产生的数据不放入阻塞队列,而是直接传递给消费者,前提是此时有消费者来拿数据,如果没有,transfer() 就被阻塞(不把数据放入队列,而是就一直等着)。而 tryTransfer() 则是“尝试传送”,如果有消费者来拿数据,那么生产者直接把数据给它;如果没有就还是把数据放到阻塞队列中。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
有界是指队列存放的数量有上限,到达上限后再向队列中存放元素就会被阻塞。如 ArrayBlockingQueue:
java"> /*** Creates an {@code ArrayBlockingQueue} with the given (fixed)* capacity and default access policy.* * @param capacity the capacity of this queue* 上方注释已经说明,capacity 是 fixed,即固定的* @throws IllegalArgumentException if {@code capacity < 1}*/public ArrayBlockingQueue(int capacity) {// 第二个参数表示是否公平访问队列,true 表示公平,即先阻塞的先访问。// false 是指不公平访问,阻塞线程争夺访问队列的资格。类似于公平锁的机制。this(capacity, false);}
而无界队列,虽然构造方法也需要传入一个容量,但是这个容量是可以改变的:
java"> /*** Creates a {@code PriorityBlockingQueue} with the specified* initial capacity that orders its elements according to their* {@linkplain Comparable natural ordering}.** @param initialCapacity the initial capacity for this priority queue* initial capacity 说明初始容量是可以改变的。* @throws IllegalArgumentException if {@code initialCapacity} is less* than 1*/public PriorityBlockingQueue(int initialCapacity) {this(initialCapacity, null);}
这个“无界”肯定也是有限制的,当容量过大时会造成 OOM。
2、线程池
2.1 为什么使用线程池
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。系统启动一个线程的成本较高,因为涉及到与操作系统交互,因此使用线程池可以很好的提高性能。在开发过程中,合理地使用线程池能够带来3个好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的(对 CPU 和内存的)消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。这样就缩短了线程执行任务的总时间,这个总时间是指 T1 创建线程所需时间、T2 执行线程任务的时间和 T3 销毁线程所需时间的总和。线程池会把 T1,T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样就相当于没有 T1、T3 的时间开销了。
- 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以统一分配、调优和监控。尤其是创建大量生存期很短的线程时,更应该考虑使用线程池。
2.2 线程池的构造方法与工作机制
线程池对象一般是通过 ThreadPoolExecutor 的构造方法创建出来的:
java"> /*** @param corePoolSize 核心线程数:保存在线程池中线程的数量,即使这些线程处于Idle状态,* 除非设置了属性 {@code allowCoreThreadTimeOut}* @param maximumPoolSize 非核心线程数:线程池允许的最大线程数* @param keepAliveTime 当线程的数量大于核心线程数时,超出数量的空闲线程在被终止之前,等待新任务* 到来的最长时间 * @param unit 参数 {@code keepAliveTime} 的时间单位* @param workQueue 用来存储被执行之前的任务的队列。这个队列只保持由 {@code execute} 方法提交的* {@code Runnable} 类型的任务。(最好还是使用有界的阻塞队列避免 OOM)* @param threadFactory executor用来创建新线程的工厂* @param handler 当到达线程数边界和队列容量上限时,线程池的执行会被阻塞,这时要用handler处理* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}* {@code keepAliveTime < 0}* {@code maximumPoolSize <= 0}* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
各个参数的作用:
corePoolSize:核心线程池的大小,如果核心线程池有空闲位置,这时新的任务就会被核心线程池新建一个线程执行,执行完毕后不会销毁线程,线程会进入缓存队列等待再次被运行。
maximunPoolSize:线程池能创建最大的线程数量(包含核心线程数在内)。如果核心线程池和缓存队列都已经满了(注意这个顺序),新的任务进来就会创建新的线程来执行。但是数量不能超过 maximumPoolSize,否则会采取拒绝接受任务策略,该策略由 RejectedExecutionHandler 执行。
keepAliveTime:非核心线程能够空闲的最长时间,超过这个时间,线程会被终止。这个参数默认只有在线程数量超过核心线程池大小时才会起作用。只要线程数量不超过核心线程大小,就不会起作用。
unit:时间单位,和 keepAliveTime 配合使用。
workQueue:一个阻塞队列,用来缓存等待被执行的任务(例如任务数有100,但是核心线程池的容量为10,那剩下的90个任务要存在 workQueue 中等待被执行,如果 workQueue 放不下,就启动新线程执行 workQueue 无法缓存的任务,如果线程数超过了 maximumPoolSize,就根据 handler 的值执行拒绝处理策略),一般有三种选择策略:
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue;
threadFactory:线程工厂,用来创建线程。
handler:拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种策略为:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常,这是默认策略。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试当前执行任务(重复此过程)。 ThreadPoolExecutor.CallerRunsPolicy:让调用者所在的线程处理该任务。
结合图片加深对以上参数含义和线程池工作流程的理解:
流程大致如下:
- 主线程创建线程池对象并通过 execute() 方法把任务提交给线程池。
- 线程池根据当前状态决定如何处理该任务,优先将该任务送到核心线程池 corePool 中执行。
- 如果核心线程池中所有的线程都在执行任务,那么就把该任务送到阻塞队列中缓存。
- 倘若阻塞队列也满了,就在 maximumPool 中创建新的线程执行该任务。
- 如果 maximumPool 中的线程数已经达到上限,就要根据 RejectedExecutionHandler 的拒绝策略处理该任务。如果使用的是 ThreadPoolExecutor.CallerRunsPolicy 策略,该任务会重新回到主线程中执行。
2.3 创建并合理配置线程池对象
可以直接使用上面介绍的 ThreadPoolExecutor 的构造方法及其另外3个重载构造方法创建线程池对象。也可以使用 Executors 中的静态方法:
java"> // 缓存线程池,缓冲池最大容量为 Integer.MAX_VALUEExecutorService executorService1 = Executors.newCachedThreadPool();// 容量为1的缓冲池ExecutorService executorService2 = Executors.newSingleThreadExecutor();// 容量为固定大小的缓冲池ExecutorService executorService3 = Executors.newFixedThreadPool(2);
这些方法的内部实际上还是调用了 ThreadPoolExecutor 的构造方法:
java"> public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
OKHttp 内部使用了缓冲线程池:
java"> okhttp3.Dispatcher.java:public synchronized ExecutorService executorService() {if (executorService == null) {executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); // false 代表不是守护线程}return executorService;}okhttp3.internal.Util.java:public static ThreadFactory threadFactory(final String name, final boolean daemon) {return new ThreadFactory() {@Overridepublic Thread newThread(Runnable runnable) {Thread result = new Thread(runnable, name);result.setDaemon(daemon);return result;}};}
创建线程池时,合理配置线程池中线程数量和阻塞队列的大小是十分重要的。在配置之前,先要弄清楚任务的特性,一般有 CPU 密集型(计算操作居多)、IO 密集型(网络通信、读写磁盘操作多)和混合型(前两者混合)三种。
CPU 密集型(频繁从内存取数据计算),最大线程数不应该超过(CPU 核心数 + 1),在 Java 中 CPU 核心数可以通过 Runtime.getRuntime().availableProcessors() 获取到。如果最大线程数量大于 CPU 核心数,CPU 在满载计算的同时还要切换线程,这样会有额外开销。
+1 是因为内存有限,操作系统会把磁盘的一部分划分出来当作内存来用,也就是虚拟内存。如果某个线程的数据存在了虚拟内存,执行时还是需要把这部分数据从磁盘调度到真实的内存中,但由于磁盘的读写速度远远低于内存,数据调度的这段时间,线程无法执行,从而出现页缺失的状态。为了尽量减少 CPU 的等待时间,额外+1个线程在执行数据调度的 CPU 上执行,尽量保证在任意时刻 CPU 都是在执行任务的。
IO 密集型(网络通信、从磁盘读写数据)最大线程数不应该超过(CPU 核心数 * 2),这是个经验值。线程等待数据到来时,不应该占用 CPU,真正开始读写数据时才占用。
混合型中,如果存在的 CPU 密集型任务和 IO 密集型任务执行时间相差不大的话,考虑把它们拆分为两个线程池,一个专门做 CPU 密集型任务,另一个专门做 IO 密集型任务。但如果执行时间相差很大的话,比如 IO 操作 5s,而 CPU 操作只需 10ms,那么就干脆视为 IO 密集型。
2.4 提交任务与关闭线程池
提交任务有两个方法,execute() 和 submit()。
execute() 接收 Runnable 类型的参数,没有返回值,而 submit() 可以接收 Runnable、Callable 类型的参数,有返回值,返回值类型为 Future。
关闭线程池也有两个方法,shutdown() 和 shutdownNow()。
shutdown() 尝试关闭线程池,把尚未执行的线程进行中断。shutdownNow() 不论线程是否在执行都尝试中断。由于线程的中断是一种协作机制,能否中断成功要看各个线程内部是如何处理中断信号的。
2.5 源码分析
进入源码,需要先了解线程池的状态控制相关的成员与方法:
java"> /*** ctl 表示线程池的控制状态,是一个原子整数,包含两个字段:* workerCount:有效的线程数量* runState:线程池的运行状态,如运行中、关闭等* * 为了把它们组合成一个 int 类型的整数,我们把 workerCount 限制为 (2^29)-1(大概5亿)* 而不是 (2^31)-1(大概20亿)* * workerCount 是已经允许启动并且不允许停止的线程数量,可能会出现短暂地与实际线程数量* 不同的情况,比如请求 ThreadFactory 创建线程失败时……* * runState 提供了生命周期控制,取值如下:* RUNNING:接受新任务并处理排队的任务* SHUTDOWN:不接受新任务但处理排队的任务* STOP:不接受新任务、不处理排队任务,中断正在执行的任务* TIDYING:所有任务已被终止,workerCount 为0,转换到 TIDYING 状态的线程* 将运行 terminated() 钩子方法* TERMINATED:terminated() 执行完毕* * 为了允许有序比较,这些值之间的数字顺序很重要。runState 单调递增,但是不必* 经历每一种状态转换,这些转换是:* RUNNING -> SHUTDOWN:调用了 shutdown(),可能是在 finalize() 中隐式调用* (RUNNING or SHUTDOWN) -> STOP:调用了 shutdownNow()* SHUTDOWN -> TIDYING:当队列和线程池都空了时* STOP -> TIDYING:当线程池空了时* TIDYING -> TERMINATED:当 terminated() 钩子方法执行完时* 此外,在 awaitTermination() 中等待的线程将在状态到达 TERMINATED 时返回 */private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3; // 29 位private static final int CAPACITY = (1 << COUNT_BITS) - 1; // (2^29)-1// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean isRunning(int c) {return c < SHUTDOWN;}
注释是把源码的英文翻译了,写了一堆,其实只需要记住 ctl 是一个原子整数,高 3 位表示线程池状态,低 29 位表示线程数即可。然后再来看 ThreadPoolExecutor 如何执行一个任务:
java"> public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1.如果运行中的线程数量小于核心线程数if (workerCountOf(c) < corePoolSize) {// 创建一个新的核心线程执行任务if (addWorker(command, true))return;c = ctl.get();}// 2.如果核心线程满载了,并且任务被成功添加到队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// double-check 一下是否应该新建一个线程执行这个任务,因为可能会有一个// 之前存在的线程,自上次检查之后挂掉了,也可能线程池变为 SHUTDOWN 状态,// 这样的话还需要回滚入队操作,并拒绝这个任务if (!isRunning(recheck) && remove(command))reject(command);// 如果线程池中一个线程都没有,那就新起一个线程执行任务else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3.任务不能入队,先尝试启动一个非核心新线程,如果失败,则只能拒绝该任务else if (!addWorker(command, false))reject(command);}
addWorker() :
java"> /*** 根据当前线程池的状态和给定的边界(corePoolSize 或 maximumPoolSize)检查能否创建一个新的* 线程。如果能,则相应地调整线程计数,并且创建一个新线程,让 firstTask 作为该线程的第一个* 任务执行。* * 如果线程池已经停止或者符合关闭条件,则此方法返回 false。在被调用时,如果线程工厂创建线程* 失败,也会返回 false。如果线程创建失败,不论是因为线程工厂返回 null 还是因为发生了异常* (典型的是 Thread.start() 中的 OutOfMemoryError),我们都会干净地回滚。* * firstTask:新线程应该首先运行的任务(如果为 none 则没有)。当线程数量小于 corePoolSize * 时(这种情况下我们总是启动一个新的),或者队列满了时(这种情况下必须绕过队列),* 会使用初始的第一个任务创建线程。初始就空闲的线程通常是通过 prestartCoreThread* 创建的,或者是替换其他即将死亡的工作线程。* core:如果为 true 就用 corePoolSize 作为边界,否则用 maximumPoolSize 作为边界。*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 判断线程池当前能否添加线程,有两个条件:// 1.线程池状态已经到了 SHUTDOWN 状态,不能添加// 2.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))// 工作线程数超出线程池容量,或者超出了指定边界 corePoolSize 或// maximumPoolSize 都会返回 falsereturn false;if (compareAndIncrementWorkerCount(c))// 如果原子操作增加线程数成功的话,跳出到 retrybreak retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// Worker 实际上就是封装了 firstTask 和通过线程工厂创建的线程w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
2.6 ThreadPoolExecutor 的类关系
Java 1.5 新增 Executors 工厂类产生线程池,它有几个静态工厂方法:
前三个方法返回 ExecutorService 对象,代表线程池;中间两个方法返回 ScheduledExecutorService,它是 ExecutorService 的子类,可以在指定的延迟后执行线程任务;最后两个方法是 Java8 新增的,可充分利用多 CPU 并行的能力,它们生成的 work stealing 池,相当于后台线程池,如果所有前台线程都死亡了,work stealing 池中的线程会自动死亡。
Executor 代表尽快执行的线程池,提供了如下三个方法:
ScheduledExecutorService 代表可在指定延迟后或周期性的执行线程任务的线程池,提供如下四个方法:
线程池用完后应该使用 shutdown() 关闭,此时线程池不再接收新任务,但会把以前所有已经提交过的任务执行完。当线程池中所有线程都执行完成后,池中的所有线程都会死亡;另外也可以使用 shutdownNow() 关闭线程池,它会试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
ForkJoinPool
Java7 新增的 ForkJoinPool 支持将一个任务拆分成多个“小任务”并进行计算,再把多个“小任务”的结果合并成总的计算结果。它是 ExecutorService 的实现类,是一种特殊的线程池,提供了如下构造方法:
Java8 增强了 ForkJoinPool 的功能,增加了通用池功能。通过如下两个静态方法提供通用池功能:
ForkJoinPool 的使用方法:
结合实例。假如现在我们要对容量很大的数组进行累加求和:
java">// 任务要继承 RecursiveTask,泛型是任务的返回值类型
public class CalTask extends RecursiveTask<Integer> {private static final int THRESHOLD = 20;private int[] arr;private int start;private int end;public CalTask(int[] arr, int start, int end) {this.arr = arr;this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;if (end - start < THRESHOLD) {for (int i = start; i < end; i++) {sum += arr[i];}return sum;} else {// 累加个数超过阈值,就二分成两个小任务,递归执行int middle = (start + end) / 2;CalTask left = new CalTask(arr, start, middle);CalTask right = new CalTask(arr, middle, end);// 分别执行两个小任务left.fork();right.fork();// 把两个小任务的结果合并起来return left.join() + right.join();}}public static void main(String[] args) {// 先构造一个数组并计算出总和以便校验int[] arr = new int[100];Random random = new Random(System.currentTimeMillis());int result = 0;for (int i = 0; i < arr.length; i++) {arr[i] = random.nextInt(20);result += arr[i];}// 开始执行任务ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();ForkJoinTask<Integer> future = forkJoinPool.submit(new CalTask(arr, 0, 100));try {System.out.println(result + "," + future.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 关闭线程池forkJoinPool.shutdown();}
}