Java多线程与高并发专题——阻塞和非阻塞队列的并发安全原理是什么?

devtools/2025/3/13 9:22:30/

引入

之前我们探究了常见的阻塞队列的特点,在本文我们就以 ArrayBlockingQueue 为例,首先分析
BlockingQueue ,也就是阻塞队列的线程安全原理,然后再看看它的兄弟——非阻塞队列的并发安全原理。

ArrayBlockingQueue 源码分析

我们首先看一下 ArrayBlockingQueue 的源码,ArrayBlockingQueue 有以下几个重要的属性:

java">    /*** 用于存储队列元素的数组。该数组是固定大小的,一旦创建,其容量就不能再改变。* 数组中的元素类型为 Object,因为队列可以存储任意类型的元素。*/final Object[] items;/*** 下一次执行 take、poll、peek 或 remove 操作时,从数组中获取元素的索引位置。* 这是一个循环数组,当 takeIndex 达到数组的末尾时,会重新回到数组的起始位置。*/int takeIndex;/*** 下一次执行 put、offer 或 add 操作时,将元素插入到数组中的索引位置。* 同样,这是一个循环数组,当 putIndex 达到数组的末尾时,会重新回到数组的起始位置。*/int putIndex;/*** 当前队列中元素的数量。* 该值始终小于或等于数组的容量,用于跟踪队列中实际存储的元素数量。*/int count;

第一个就是最核心的、用于存储元素的 Object 类型的数组;然后它还会有两个位置变量,分别是takeIndex 和 putIndex,这两个变量就是用来标明下一次读取和写入位置的;另外还有一个 count 用来计数,它所记录的就是队列中的元素个数。

另外,我们再来看下面这三个变量:

java">    /*** 主锁,用于保护对队列的所有访问操作。* 所有对队列的读写操作都需要先获取这个锁,以确保线程安全。*/final ReentrantLock lock;/*** 用于等待 take 操作的条件对象。* 当队列中没有元素时,尝试从队列中取元素的线程会在此条件上等待。* 当有新元素被添加到队列中时,会通过这个条件唤醒等待的线程。*/private final Condition notEmpty;/*** 用于等待 put 操作的条件对象。* 当队列已满时,尝试向队列中添加元素的线程会在此条件上等待。* 当有元素从队列中被移除时,会通过这个条件唤醒等待的线程。*/private final Condition notFull;

这三个变量也非常关键,第一个就是一个 ReentrantLock,而下面两个 Condition 分别是由ReentrantLock 产生出来的,这三个变量就是我们实现线程安全最核心的工具。

ArrayBlockingQueue 实现并发同步的原理就是利用 ReentrantLock 和它的两个 Condition,读操作和写操作都需要先获取到 ReentrantLock 独占锁才能进行下一步操作。进行读操作时如果队列为空,线程就会进入到读线程专属的 notEmpty 的 Condition 的队列中去排队,等待写线程写入新的元素;同理,如果队列已满,这个时候写操作的线程会进入到写线程专属的 notFull 队列中去排队,等待读线程将队列元素移除并腾出空间。

下面,我们来分析一下最重要的 put 方法:

java">    /*** 将指定元素插入此队列的尾部,如果队列已满,则等待空间可用。** @param e 要插入的元素* @throws InterruptedException 如果在等待过程中当前线程被中断* @throws NullPointerException 如果指定的元素为 null*/public void put(E e) throws InterruptedException {// 检查传入的元素是否为 null,若为 null 则抛出空指针异常checkNotNull(e);// 获取用于控制队列访问的可重入锁final ReentrantLock lock = this.lock;// 以可中断的方式获取锁,允许线程在等待锁的过程中被中断lock.lockInterruptibly();try {// 当队列中的元素数量达到数组容量时,线程进入等待状态// 等待其他线程从队列中取出元素,释放空间while (count == items.length)notFull.await();// 当队列有空间时,将元素插入队列尾部enqueue(e);} finally {// 无论插入操作是否成功,最后都要释放锁lock.unlock();}}

在 put 方法中,首先用 checkNotNull 方法去检查插入的元素是不是 null。如果不是 null,我们会用ReentrantLock 上锁,并且上锁方法是 lock.lockInterruptibly()。在获取锁的同时是可以响应中断的,这也正是我们的阻塞队列在调用 put 方法时,在尝试获取锁但还没拿到锁的期间可以响应中断的底层原因。

紧接着 ,是一个非常经典的 try  finally 代码块,finally 中会去解锁,try 中会有一个 while 循环,它会检查当前队列是不是已经满了,也就是 count 是否等于数组的长度。如果等于就代表已经满了,于是我们便会进行等待,直到有空余的时候,我们才会执行下一步操作,调用 enqueue 方法让元素进入队列,最后用 unlock 方法解锁。

和 ArrayBlockingQueue 类似,其他各种阻塞队列如 LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、DelayedWorkQueue 等一系列 BlockingQueue 的内部也是利用了 ReentrantLock 来保证线程安全,只不过细节有差异,比如 LinkedBlockingQueue 的内部有两把锁,分别锁住队列的头和尾,比共用同一把锁的效率更高,不过总体思想都是类似的。

非阻塞队列ConcurrentLinkedQueue

看完阻塞队列之后,我们就来看看非阻塞队列 ConcurrentLinkedQueue。

我们先看看它的源码注释:

An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.
This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms   by Maged M. Michael and Michael L. Scott.
Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw java. util. ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.
Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal. Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.
This class and its iterator implement all of the optional methods of the Queue and Iterator interfaces.
Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentLinkedQueue happen-before actions subsequent to the access or removal of that element from the ConcurrentLinkedQueue in another thread.
This class is a member of the Java Collections Framework.

翻译:

这是一个基于链表节点的无界线程安全队列。该队列按照先进先出(FIFO)的顺序对元素进行排序。队列头部的元素是在队列中存在时间最长的元素,队列尾部的元素是在队列中存在时间最短的元素。新元素会被插入到队列尾部,而队列的检索操作则从队列头部获取元素。当有多个线程需要共享访问一个公共集合时,ConcurrentLinkedQueue 是一个合适的选择。和大多数其他并发集合实现一样,此类不允许使用 null 元素。
此实现采用了一种高效的非阻塞算法,该算法基于 Maged M. Michael 和 Michael L. Scott 所著的《Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms》(简单、快速且实用的非阻塞和阻塞并发队列算法)中描述的算法。
迭代器具有弱一致性,它返回的元素反映了迭代器创建时或创建之后某个时刻队列的状态。迭代器不会抛出 java.util.ConcurrentModificationException 异常,并且可以与其他操作并发进行。自迭代器创建以来,队列中包含的元素将恰好被返回一次。
请注意,与大多数集合不同,size 方法并非常量时间操作。由于这些队列具有异步特性,确定当前元素的数量需要遍历元素,因此如果在遍历过程中集合被修改,可能会报告不准确的结果。此外,批量操作(如 addAll、removeAll、retainAll、containsAll、equals 和 toArray)不能保证以原子方式执行。例如,与 addAll 操作并发执行的迭代器可能只会看到部分被添加的元素。
此类及其迭代器实现了 Queue 接口和 Iterator 接口的所有可选方法。
内存一致性效果:与其他并发集合一样,一个线程将对象放入 ConcurrentLinkedQueue 之前的操作,先行发生于另一个线程从该 ConcurrentLinkedQueue 中访问或移除该元素之后的操作。
此类是 Java 集合框架的成员之一。

顾名思义,ConcurrentLinkedQueue 是使用链表作为其数据结构的,我们来看一下关键方法 offer 的源码:

java">    /*** 将指定元素插入此队列的尾部。由于队列是无界的,此方法将永远不会返回 {@code false}。** @param e 要插入的元素* @return {@code true}(由 {@link Queue#offer} 指定)* @throws NullPointerException 如果指定的元素为 null*/public boolean offer(E e) {// 检查插入的元素是否为 null,如果为 null 则抛出 NullPointerExceptioncheckNotNull(e);// 创建一个新的节点,用于存储要插入的元素final Node<E> newNode = new Node<E>(e);// 从尾节点开始,尝试将新节点插入到队列中for (Node<E> t = tail, p = t;;) {// 获取当前节点的下一个节点Node<E> q = p.next;// 如果下一个节点为 null,说明当前节点是队列的最后一个节点if (q == null) {// p 是最后一个节点// 尝试使用 CAS 操作将新节点设置为当前节点的下一个节点if (p.casNext(null, newNode)) {// 成功的 CAS 操作是元素 e 成为此队列元素的线性化点,// 也是新节点 newNode 成为“活跃”节点的线性化点// 如果当前节点不是尾节点,尝试更新尾节点if (p != t) // 一次跳跃两个节点// 尝试更新尾节点为新节点,失败也没关系casTail(t, newNode); return true;}// 与其他线程的 CAS 竞争失败;重新读取下一个节点} // 如果当前节点的下一个节点是自身,说明我们已经脱离了链表else if (p == q)// 我们已经脱离了链表。如果尾节点没有改变,// 它也会脱离链表,在这种情况下,我们需要跳转到头节点,// 因为所有活跃节点总是可以从头节点到达。否则,新的尾节点是更好的选择。p = (t != (t = tail)) ? t : head;else// 经过两次跳跃后检查尾节点的更新情况p = (p != t && t != (t = tail)) ? t : q;}}

在这里我们不去一行一行分析具体的内容,而是把目光放到整体的代码结构上,在检查完空判断之后,可以看到它整个是一个大的 for 循环,而且是一个非常明显的死循环。在这个循环中有一个非常亮眼的 p.casNext 方法,这个方法正是利用了 CAS 来操作的,而且这个死循环去配合 CAS 也就是典型的乐观锁的思想。

我们就来看一下 p.casNext 方法的具体实现,其方法代码如下:

java">        /*** 尝试使用CAS(比较并交换)操作将当前节点的next引用从cmp更新为val。** @param cmp 期望的当前next引用值* @param val 要设置的新的next引用值* @return 如果CAS操作成功,则返回true;否则返回false*/boolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}

可以看出这里运用了 UNSAFE.compareAndSwapObject 方法来完成 CAS 操作,而compareAndSwapObject 是一个 native 方法,最终会利用 CPU 的 CAS 指令保证其不可中断。

可以看出,非阻塞队列 ConcurrentLinkedQueue 使用 CAS 非阻塞算法 + 不停重试,来实现线程安全,适合用在不需要阻塞功能,且并发不是特别剧烈的场景。

总结

我们最后来做一下总结。通过我们对阻塞队列和非阻塞队列的并发安全原理的分析,可以知道,其中阻塞队列最主要是利用了 ReentrantLock 以及它的 Condition 来实现,而非阻塞队列则是利用 CAS 方法实现线程安全。


http://www.ppmy.cn/devtools/166730.html

相关文章

【网络协议详解】——QOS技术(学习笔记)

目录 QoS简介 QoS产生的背景 QoS服务模型 基于DiffServ模型的QoS组成 MQC简介 MQC三要素 MQC配置流程 优先级映射配置(DiffServ域模式) 优先级映射概述 优先级映射原理描述 优先级映射 PHB行为 流量监管、流量整形和接口限速简介 流量监管 流量整形 接口限速…

什么样的场景适用redis?redis缓存是什么?

基于 Java SSH 老项目、数据量大、查询慢、尽量少改动的现状&#xff0c;如果加入 Redis&#xff0c;可以从哪些场景切入&#xff1a; 1. 高频读取、低频更新的数据 场景示例&#xff1a; 商品信息、用户基础资料&#xff08;每日读取百万次&#xff0c;每周更新一次&#xff…

vue3自定义指令实现输入框值范围大小限制

// 自定义指令代码 export default (vue: any) > {const handler ($event: Event) > {const inputEl $event.target as HTMLInputElement;let maxValue inputEl.max ? parseFloat(inputEl.max) : 0;let minValue inputEl.min ? parseFloat(inputEl.min) : 0;let va…

【js逆向】iwencai国内某金融网站实战

地址&#xff1a;aHR0cHM6Ly93d3cuaXdlbmNhaS5jb20vdW5pZmllZHdhcC9ob21lL2luZGV4 在搜索框中随便输入关键词 查看请求标头&#xff0c;请求头中有一个特殊的 Hexin-V,它是加密过的&#xff1b;响应数据包中全是明文。搞清楚Hexin-V的值是怎么生成的&#xff0c;这个值和cooki…

学习网络安全需要哪些基础?

&#x1f345; 点击文末小卡片 &#xff0c;免费获取网络安全全套资料&#xff0c;资料在手&#xff0c;涨薪更快 学习网络安全&#xff0c;对于想要进入IT行业的朋友们来说是一件非常重要的事情。尤其是在当今社会&#xff0c;互联网已经渗透到工作和生活的方方面面&#xff0…

计算机网络开发(3)——端口复用、I\O多路复用

端口复用 由于有一个MSL&#xff0c;所以上一秒关闭的服务器&#xff0c;可能之前的端口还未释放&#xff1b;又或者是程序突然退出系统没有释放端口&#xff0c;导致端口被占用。 当有新的服务想要用这个端口的时候&#xff0c;会出现错误&#xff1a;服务会出现Bind error:A…

编写Dockerfile制作tomcat镜像,生成镜像名为tomcat:v1,并推送到私有仓库。

1.具体要求如下&#xff1a; 基于rockylinux:8基础镜像&#xff1b; 指定作者为openlab&#xff1b; 安装tomcat服务&#xff0c;暴露8080端口&#xff1b; 设置服务自启动。 总结步骤&#xff1a;基于rockylinux:8&#xff0c;安装Java环境&#xff0c;安装Tomcat&a…

批量将多个 CSV 合并成单个文件|按文件夹批量合并 CSV 文件

文档合并的拆分需求在我们工作当中是非常常见的&#xff0c;CSV 格式的文档也是我们工作当中经常要面对要处理的一种文档格式。它相对于 Excel 文档来说更加的轻便&#xff0c;更加的灵活。前面我们也介绍过如何将 CSV 格式的文档拆分成多个文档&#xff0c;那今天我们要介绍的…