引入
之前我们探究了常见的阻塞队列的特点,在本文我们就以 ArrayBlockingQueue 为例,首先分析
BlockingQueue ,也就是阻塞队列的线程安全原理,然后再看看它的兄弟——非阻塞队列的并发安全原理。
ArrayBlockingQueue 源码分析
我们首先看一下 ArrayBlockingQueue 的源码,ArrayBlockingQueue 有以下几个重要的属性:
java"> /*** 用于存储队列元素的数组。该数组是固定大小的,一旦创建,其容量就不能再改变。* 数组中的元素类型为 Object,因为队列可以存储任意类型的元素。*/final Object[] items;/*** 下一次执行 take、poll、peek 或 remove 操作时,从数组中获取元素的索引位置。* 这是一个循环数组,当 takeIndex 达到数组的末尾时,会重新回到数组的起始位置。*/int takeIndex;/*** 下一次执行 put、offer 或 add 操作时,将元素插入到数组中的索引位置。* 同样,这是一个循环数组,当 putIndex 达到数组的末尾时,会重新回到数组的起始位置。*/int putIndex;/*** 当前队列中元素的数量。* 该值始终小于或等于数组的容量,用于跟踪队列中实际存储的元素数量。*/int count;
第一个就是最核心的、用于存储元素的 Object 类型的数组;然后它还会有两个位置变量,分别是takeIndex 和 putIndex,这两个变量就是用来标明下一次读取和写入位置的;另外还有一个 count 用来计数,它所记录的就是队列中的元素个数。
另外,我们再来看下面这三个变量:
java"> /*** 主锁,用于保护对队列的所有访问操作。* 所有对队列的读写操作都需要先获取这个锁,以确保线程安全。*/final ReentrantLock lock;/*** 用于等待 take 操作的条件对象。* 当队列中没有元素时,尝试从队列中取元素的线程会在此条件上等待。* 当有新元素被添加到队列中时,会通过这个条件唤醒等待的线程。*/private final Condition notEmpty;/*** 用于等待 put 操作的条件对象。* 当队列已满时,尝试向队列中添加元素的线程会在此条件上等待。* 当有元素从队列中被移除时,会通过这个条件唤醒等待的线程。*/private final Condition notFull;
这三个变量也非常关键,第一个就是一个 ReentrantLock,而下面两个 Condition 分别是由ReentrantLock 产生出来的,这三个变量就是我们实现线程安全最核心的工具。
ArrayBlockingQueue 实现并发同步的原理就是利用 ReentrantLock 和它的两个 Condition,读操作和写操作都需要先获取到 ReentrantLock 独占锁才能进行下一步操作。进行读操作时如果队列为空,线程就会进入到读线程专属的 notEmpty 的 Condition 的队列中去排队,等待写线程写入新的元素;同理,如果队列已满,这个时候写操作的线程会进入到写线程专属的 notFull 队列中去排队,等待读线程将队列元素移除并腾出空间。
下面,我们来分析一下最重要的 put 方法:
java"> /*** 将指定元素插入此队列的尾部,如果队列已满,则等待空间可用。** @param e 要插入的元素* @throws InterruptedException 如果在等待过程中当前线程被中断* @throws NullPointerException 如果指定的元素为 null*/public void put(E e) throws InterruptedException {// 检查传入的元素是否为 null,若为 null 则抛出空指针异常checkNotNull(e);// 获取用于控制队列访问的可重入锁final ReentrantLock lock = this.lock;// 以可中断的方式获取锁,允许线程在等待锁的过程中被中断lock.lockInterruptibly();try {// 当队列中的元素数量达到数组容量时,线程进入等待状态// 等待其他线程从队列中取出元素,释放空间while (count == items.length)notFull.await();// 当队列有空间时,将元素插入队列尾部enqueue(e);} finally {// 无论插入操作是否成功,最后都要释放锁lock.unlock();}}
在 put 方法中,首先用 checkNotNull 方法去检查插入的元素是不是 null。如果不是 null,我们会用ReentrantLock 上锁,并且上锁方法是 lock.lockInterruptibly()。在获取锁的同时是可以响应中断的,这也正是我们的阻塞队列在调用 put 方法时,在尝试获取锁但还没拿到锁的期间可以响应中断的底层原因。
紧接着 ,是一个非常经典的 try finally 代码块,finally 中会去解锁,try 中会有一个 while 循环,它会检查当前队列是不是已经满了,也就是 count 是否等于数组的长度。如果等于就代表已经满了,于是我们便会进行等待,直到有空余的时候,我们才会执行下一步操作,调用 enqueue 方法让元素进入队列,最后用 unlock 方法解锁。
和 ArrayBlockingQueue 类似,其他各种阻塞队列如 LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、DelayedWorkQueue 等一系列 BlockingQueue 的内部也是利用了 ReentrantLock 来保证线程安全,只不过细节有差异,比如 LinkedBlockingQueue 的内部有两把锁,分别锁住队列的头和尾,比共用同一把锁的效率更高,不过总体思想都是类似的。
非阻塞队列ConcurrentLinkedQueue
看完阻塞队列之后,我们就来看看非阻塞队列 ConcurrentLinkedQueue。
我们先看看它的源码注释:
An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.
This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.
Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw java. util. ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.
Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal. Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.
This class and its iterator implement all of the optional methods of the Queue and Iterator interfaces.
Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentLinkedQueue happen-before actions subsequent to the access or removal of that element from the ConcurrentLinkedQueue in another thread.
This class is a member of the Java Collections Framework.翻译:
这是一个基于链表节点的无界线程安全队列。该队列按照先进先出(FIFO)的顺序对元素进行排序。队列头部的元素是在队列中存在时间最长的元素,队列尾部的元素是在队列中存在时间最短的元素。新元素会被插入到队列尾部,而队列的检索操作则从队列头部获取元素。当有多个线程需要共享访问一个公共集合时,ConcurrentLinkedQueue 是一个合适的选择。和大多数其他并发集合实现一样,此类不允许使用 null 元素。
此实现采用了一种高效的非阻塞算法,该算法基于 Maged M. Michael 和 Michael L. Scott 所著的《Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms》(简单、快速且实用的非阻塞和阻塞并发队列算法)中描述的算法。
迭代器具有弱一致性,它返回的元素反映了迭代器创建时或创建之后某个时刻队列的状态。迭代器不会抛出 java.util.ConcurrentModificationException 异常,并且可以与其他操作并发进行。自迭代器创建以来,队列中包含的元素将恰好被返回一次。
请注意,与大多数集合不同,size 方法并非常量时间操作。由于这些队列具有异步特性,确定当前元素的数量需要遍历元素,因此如果在遍历过程中集合被修改,可能会报告不准确的结果。此外,批量操作(如 addAll、removeAll、retainAll、containsAll、equals 和 toArray)不能保证以原子方式执行。例如,与 addAll 操作并发执行的迭代器可能只会看到部分被添加的元素。
此类及其迭代器实现了 Queue 接口和 Iterator 接口的所有可选方法。
内存一致性效果:与其他并发集合一样,一个线程将对象放入 ConcurrentLinkedQueue 之前的操作,先行发生于另一个线程从该 ConcurrentLinkedQueue 中访问或移除该元素之后的操作。
此类是 Java 集合框架的成员之一。
顾名思义,ConcurrentLinkedQueue 是使用链表作为其数据结构的,我们来看一下关键方法 offer 的源码:
java"> /*** 将指定元素插入此队列的尾部。由于队列是无界的,此方法将永远不会返回 {@code false}。** @param e 要插入的元素* @return {@code true}(由 {@link Queue#offer} 指定)* @throws NullPointerException 如果指定的元素为 null*/public boolean offer(E e) {// 检查插入的元素是否为 null,如果为 null 则抛出 NullPointerExceptioncheckNotNull(e);// 创建一个新的节点,用于存储要插入的元素final Node<E> newNode = new Node<E>(e);// 从尾节点开始,尝试将新节点插入到队列中for (Node<E> t = tail, p = t;;) {// 获取当前节点的下一个节点Node<E> q = p.next;// 如果下一个节点为 null,说明当前节点是队列的最后一个节点if (q == null) {// p 是最后一个节点// 尝试使用 CAS 操作将新节点设置为当前节点的下一个节点if (p.casNext(null, newNode)) {// 成功的 CAS 操作是元素 e 成为此队列元素的线性化点,// 也是新节点 newNode 成为“活跃”节点的线性化点// 如果当前节点不是尾节点,尝试更新尾节点if (p != t) // 一次跳跃两个节点// 尝试更新尾节点为新节点,失败也没关系casTail(t, newNode); return true;}// 与其他线程的 CAS 竞争失败;重新读取下一个节点} // 如果当前节点的下一个节点是自身,说明我们已经脱离了链表else if (p == q)// 我们已经脱离了链表。如果尾节点没有改变,// 它也会脱离链表,在这种情况下,我们需要跳转到头节点,// 因为所有活跃节点总是可以从头节点到达。否则,新的尾节点是更好的选择。p = (t != (t = tail)) ? t : head;else// 经过两次跳跃后检查尾节点的更新情况p = (p != t && t != (t = tail)) ? t : q;}}
在这里我们不去一行一行分析具体的内容,而是把目光放到整体的代码结构上,在检查完空判断之后,可以看到它整个是一个大的 for 循环,而且是一个非常明显的死循环。在这个循环中有一个非常亮眼的 p.casNext 方法,这个方法正是利用了 CAS 来操作的,而且这个死循环去配合 CAS 也就是典型的乐观锁的思想。
我们就来看一下 p.casNext 方法的具体实现,其方法代码如下:
java"> /*** 尝试使用CAS(比较并交换)操作将当前节点的next引用从cmp更新为val。** @param cmp 期望的当前next引用值* @param val 要设置的新的next引用值* @return 如果CAS操作成功,则返回true;否则返回false*/boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}
可以看出这里运用了 UNSAFE.compareAndSwapObject 方法来完成 CAS 操作,而compareAndSwapObject 是一个 native 方法,最终会利用 CPU 的 CAS 指令保证其不可中断。
可以看出,非阻塞队列 ConcurrentLinkedQueue 使用 CAS 非阻塞算法 + 不停重试,来实现线程安全,适合用在不需要阻塞功能,且并发不是特别剧烈的场景。
总结
我们最后来做一下总结。通过我们对阻塞队列和非阻塞队列的并发安全原理的分析,可以知道,其中阻塞队列最主要是利用了 ReentrantLock 以及它的 Condition 来实现,而非阻塞队列则是利用 CAS 方法实现线程安全。