AQS原理(AbstractQueuedSynchronizer)

news/2024/10/4 7:44:08/

本篇为 [并发与多线程系列] 的第四篇,对应Java知识体系脑图中的 并发与多线程 模块。

这一系列将对Java中并发与多线程的内容来展开。



AQS原理(AbstractQueuedSynchronizer)

  • AQS原理(AbstractQueuedSynchronizer)
    • AQS整体结构
      • state状态值
      • Node对象
    • CLH队列锁
      • AQS 中的变体CLH
    • AQS中的两种模式
      • AQS 两种模式对比
      • AQS 的独占模式
        • 获取acquire
        • 释放release
      • AQS 的共享模式
        • 获取acquireShared
        • 释放releaseShared
    • 锁的属性:公平性,响应中断,超时
        • 公平性
        • 响应中断
        • 超时
    • 扩展设计:ConditionObject与等待队列
        • Condition中的操作
      • await 等待
      • signal 唤醒
    • AQS设计模式巧思:模版方法
    • AQS的应用



AQS原理(AbstractQueuedSynchronizer)

在Java多线程中,AQS(抽象队列同步器)是一个用来实现同步锁以及其他涉及到同步功能的核心组件,Java中的锁Lock就是基于AbstractQueuedSynchronizer来实现的。AQS的出现是用于优化锁的效率,替代synchronize关键字的解决方案。不过,Java1.6版本起synchronize经过锁优化之后,效率差距已经不大。

但是,AQS里很多的设计思路,都值得学习和借鉴,理解AQS原理才能更好的理解JUC(Java.util.concurrent)包内关于多线程的实现。它也是面试中高频关键知识点之一。

注:本文以下源码基于JDK1.8。


AQS中的重点

  • AQS整体结构、状态值、Node对象
  • 数据结构:CLH队列
  • 锁的模式:独占模式与共享模式(含源码篇幅较长)
  • 锁的属性:公平性,响应中断,超时
  • 扩展设计:ConditionObject与等待队列(含源码篇幅较长)
  • 设计模式巧思:模板方法
  • AQS的应用:ReentrantLock等

AQS整体结构

首先,我们先从AQS的类图看一下整体结构和设计。

AQS类图


由类图可以看出,AQS(AbstractQueuedSynchronizer)继承了AbstractOwnableSynchronizer,包含了Node、ConditionObject对象。

  • AbstractOwnableSynchronizer提供了实现独占模式的线程的方法,由AQS继承。

  • AQS有一些内置变量,status、head、tail等,这些都是AQS重要的结构设计,status是锁资源的标识;head、tail是CLH结构的设计。

  • 从AQS的方法内看出,提供了独占模式的方法,也提供了共享模式(shared)的方法,还包含有响应中断(interruptibly)的方法;

  • Node对象,是AQS基础对象,是一个最小资源单元对象,一个Node持有一个线程引用(thread),用Node来表示线程的状态;

  • ConditionObject对象,是AQS里的扩展设计,同样用了变体CLH的数据结构,实现了等待队列。


state状态值

在AQS中,同步状态标识status设置为全局变量,并用volatile关键字修饰,可以由多个线程及时获取到此时的锁的状态。

我们看一下state的源码:

java">     /**	* The synchronization state.	* 同步器的状态*/	private volatile int state;	/**	* getter方法,获取state	*/	protected final int getState() {	return state;	}	/**	* Setter方法,设置state	*/	protected final void setState(int newState) {	state = newState;	}	/**	* 原子修改当前值,且expect的值等于旧值时修改成功。即CAS设置	*/	protected final boolean compareAndSetState(int expect, int update) {	return unsafe.compareAndSwapInt(this, stateOffset, expect, update);	}

state有几个特点

  • state用volatile修饰,保证多线程中的可见性。
  • getState() 和 setState() 方法采用final修饰,限制AQS的子类重写它们两。
  • compareAndSetState()方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写。

例如在独占模式中,每次增加节点即调用获取方法acquire(1),其中头结点会CAS将state从0设为1,用state来表示锁的持有状态。因此state是用来标记锁资源的重要标识。


Node对象

在AQS中,Node对象是最小资源单元对象,是构成变体CLH的重要结构,一个Node持有一个线程引用(thread),用Node来表示线程的状态。Node的源码结构如下:

java">static final class Node {// 共享模式下等待的标识static final Node SHARED = new Node();// 独占模式下等待的标识static final Node EXCLUSIVE = null;// 节点的几种状态static final int CANCELLED =  1;static final int SIGNAL    = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;// 此时节点处于 上面状态中的某个值 (-3 -- 1)volatile int waitStatus;// 指向前驱节点volatile Node prev;// 指向后继节点volatile Node next;// 线程引用volatile Thread thread;// 指向等待获取condition 唤醒条件的队列节点Node nextWaiter;}

在Node节点的定义中,存在五种等待状态:

java">				// 节点的等待状态static final int CANCELLED =  1;static final int SIGNAL    = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;/***   CANCELLED: 因为超时、中断 被取消;节点进入取消状态永远不能再变化(即不能重新唤醒)。而且,取消状态的节点永远不会再被阻塞。*   SIGNAL: 此节点的后续节点已经被park方法阻塞,当此节点释放/取消时需要通过unpark方法唤醒后续节点。* 为了避免竞争,acquire方法会先提供一个信号,然后后续节点尝试原子获取(cas),如果失败就阻塞。*   CONDITION:  此节点正处于条件队列中(condition queue)等待获取条件,除非节点被移出,移出的同时会设置节点状态为0。*   PROPAGATE:  共享模式头结点的无条件传播。引入这个状态是为了优化锁竞争,让队列中的节点能有序的一个个唤醒。*   0: 以上四个之外的状态,一般是节点的初始态。*/volatile int waitStatus;

AQS-Node状态


CLH队列锁

在AbstractQueuedSynchronizer源码的注释中写道:AQS是CLH队列锁的变体,CLH队列锁通常使用的是自旋锁。

CLH锁是一种基于链表的可扩展、高性能的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。

在传统的CLH队列中,头结点拥有锁,后续节点会一直自旋等待状态改变,发现前驱节点释放锁并修改状态值后,结束自旋并尝试去争取锁,头结点出队。

CLH新增的节点Node会将自己的prev指向之前的 尾节点tail,并修改tail指向新节点,加入队尾。

因此CLH是一种头结点出列,新增插入尾节点的一种FIFO(先进先出)单链表队列,等待节点会自旋获取状态值。
CLH队列结构

AQS 中的变体CLH

在AQS中,数据结构使用的是变体的CLH,每个Node节点结构变得更为复杂一些。

在AQS中,Node是构成变体CLH的重要结构,一个Node持有一个线程引用(thread),在CLH基础上,多出了双向指针用于形成双向连边,每个Node对象都存在prev、next两个指针,形成了双向链表。因此AQS中的CLH结构入下:
AQS-变体CLH队列结构


我们可以看出,AQS中的变体仍然保持CLH的一些特性,FIFO,头结点持有锁,运行结束后出列;新增节点插入队尾。

与传统CLH不同的是,在Node结构中增加next指针,形成双向队列,可以循环遍历。而AQS的成员变量中存在head、tail,分别代表了队列的头节点、尾节点,加上双向队列的设计,持有他们AQS即持有了整个队列。


AQS中的两种模式

在AQS的类图中,AQS继承了AbstractOwnableSynchronizer,AbstractOwnableSynchronizer提供了实现独占模式的线程的方法。

java">/*** A synchronizer that may be exclusively owned by a thread.  This* class provides a basis for creating locks and related synchronizers* that may entail a notion of ownership.  The* {@code AbstractOwnableSynchronizer} class itself does not manage or* use this information. However, subclasses and tools may use* appropriately maintained values to help control and monitor access* and provide diagnostics.* * 一种可能被线程独占的同步器。这个类提供了创建锁和关联同步器的基础,* 意味着同步器可能存在所有权的概念。 这个类{@code AbstractOwnableSynchronizer}* 本身不管理 或者拥有这些信息,然而,子类和工具可能使用适当的状态值(state)去帮助控制和* 监视所有权,并提供诊断(conditionObject)。** @since 1.6* @author Doug Lea*/
public abstract class AbstractOwnableSynchronizerimplements java.io.Serializable {/*** Empty constructor for use by subclasses.* 供子类使用的空构造函数*/protected AbstractOwnableSynchronizer() { }/*** The current owner of exclusive mode synchronization.* 当前持有独占模式同步器的线程*/private transient Thread exclusiveOwnerThread;/*** set方法,设置独占线程*/protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;}/*** get方法,获取独占线程*/protected final Thread getExclusiveOwnerThread() {return exclusiveOwnerThread;}
}

实际上,AQS提供了独占模式共享模式两种方式。我们可以从方法名称看到AQS内部提供的两套实现,其中包含了可中断的阻塞方法:

java">// 独占模式方法
# acquire(int arg) : void -- 不可中断的加锁
# acquireInterruptibly(int arg) : void --可中断的加锁
# tryAcquireNanos(int arg, long nanosTimeout): boolean --带超时时长的可中断加锁# release(int arg) : boolean --解锁// 共享模式方法
# acquireShared(int arg) : void
# acquireSharedInterruptibly(int arg) : void
# tryAcquireSharedNanos(int arg, long nanosTimeout) : boolean# releaseShared(int arg) : boolean

AQS 两种模式对比

AQS中的主流程:加锁、解锁,分别存在两种模式:共享模式、独占模式;这两种的主操作流程其实没有区别,主要区别在于,独占模式只允许一个线程获得资源,而共享模式允许多个线程获得资源,但是获得资源是原子性的,即全都成功才成功。
AQS-加锁流程图
AQS-解锁流程图


AQS 的独占模式

AQS使用变体的CLH — 双向链表来管理请求同步的Node,新的Node将会被插到链表的尾端,而链表的head总是代表着获得锁的线程,链表头的线程释放了锁之后,后面的节点会监视到状态,去竞争共享变量state。下面看一下AQS是如何实现独占模式下的acquire和release的。

获取acquire

首先看一下获取acquire方法的整体流程。
AQS-独占模式加锁

获取方法即将节点插入同步队尾,自旋等待,直到获取到锁的操作。以下贴出整个调用链路的源码:

    // 获取acquire方法完整调用链路public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 用中断的方法,让线程可以退出等待状态,变为就绪selfInterrupt();}// tryAcquire提供了一个模板方法,AQS中没有具体实现,交由子类实现// CAS尝试竞争state状态值,state特点中说过:AQS中用final修饰state的get/set,因此子类需自行实现,这种交由子类实现的特性,让实现同步器的方法可以自行定制独占/共享模式protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}// CAS将节点加入队尾private Node addWaiter(Node mode) {// Node EXCLUSIVE = null -> mode(nextWaiter)Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}// 循环重试一直到成功加入队尾,返回前置节点,即旧的尾节点private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // 队尾为空即队列没有元素,必须初始化if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}// 自旋竞争state,竞争state失败,证明其他线程持有锁;竞争成功,让前驱节点出队final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 前驱节点为头结点 且此节点CAS竞争state成功if (p == head && tryAcquire(arg)) {// 将当前节点设为头结点,并切断前驱结点,让其出队setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 设置前驱节点的状态为-1,并unpark此节点if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)// 最终失败,取消竞争操作cancelAcquire(node);}}private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}// 检测并设置前驱节点的状态,设置前驱成功,则park阻塞当前线程private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; if (ws == Node.SIGNAL)// SIGNAL=-1,前驱节点 成为了 头结点/获取到锁的节点return true;if (ws > 0) {// CANCELLED=1,前驱节点已被取消,跳过前驱节点并重试do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 基本上是从初始态0,CAS尝试设置前驱节点状态为-1 SIGNAL,标识当前节点正在等待被唤醒// waitStatus必须是0或者-3,保证不会在park之前被唤醒compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}// 阻塞当前线程,并检查中断状态private final boolean parkAndCheckInterrupt() {// 阻塞当前线程LockSupport.park(this);return Thread.interrupted();}

在以上源码中我们需要注意的是,在 acquireQueued方法中节点会自旋等待获取锁,而结束自旋的条件如下:

  • 自旋到前驱节点获得锁的时候,当前节点可以安心被park阻塞,interrupted=true;则当前驱节点释放锁,可以unpark当前节点,返回interrupted的值 (true)。
  • 前驱节点成为了头节点,头结点即拥有锁的节点;且tryAcquire 返回true,意味着当前CAS竞争state成功,前驱节点释放锁。
  • 当前节点竞争到锁,设置为头结点,将运行完毕的旧头结点指针设为空,成功出队。即节点的 出/入 同步队列都是在acquire方法中操作。

释放release

接下来我们看看release释放方法。
AQS-独占模式解锁

释放方法即将拥有资源的头结点设为初始态0,并唤醒条件适合的第一个后继节点,等待将头节点出队释放。以下贴出整个调用链路的源码:

    // 释放release方法完整调用链路public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;// 头结点 不为空 且 为-1 表明后继节点等待被唤醒if (h != null && h.waitStatus != 0)// 将节点CAS设置为0,并唤醒后继条件合适的一个节点unparkSuccessor(h);return true;}return false;}// tryRelease 同样也是提供了一个模板方法,AQS中没有具体实现,交由子类实现// ReentrantLock中的实现,释放state,CAS设置为0,保证资源的安全性protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}// 将节点CAS设置为0,并唤醒后继条件合适的一个节点private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0) //CAS 设置为0,验证后继节点是否在等待被唤醒compareAndSetWaitStatus(node, ws, 0);Node s = node.next;// 如果头结点的 下一个节点为空 或者已取消(只有取消状态=1 >0)if (s == null || s.waitStatus > 0) {s = null;// 双向链表的用处:// 自尾节点往前遍历,遍历到头结点后面第一个符合唤醒条件的节点,unpark唤醒它for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 唤醒指定的线程if (s != null)LockSupport.unpark(s.thread);}

释放的方法比入队简单的多,理解起来也不复杂。

以上就是独占模式下的出入队,若是希望线程可以在竞争的时候被中断,可以使用acquireInterruptibly。如果希望加上获取锁的时间限制,也可以使用tryAcquireNanos(int, long)方法来获取。他们两个方法不再赘述,可参考独占模式下的出入队原理对照理解即可。


AQS 的共享模式

需要注意的是,共享模式和独占模式的区别在于,独占模式只允许一个线程获得资源,而共享模式允许多个线程获得资源,但是获得资源是原子性的,即全都成功才成功。


获取acquireShared

我们看一下获取acquireShared方法,以下贴出整个调用链路的源码:

    // 获取acquireShared方法完整调用链路public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}// 提供了一个模板方法,AQS中没有具体实现,交由子类实现// Semaphore中的实现,比较当前可运行的 线程资源 和 入队个数,返回 线程资源-入队个数protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}// 自旋等待竞争stateprivate void doAcquireShared(int arg) {// 将节点加入同步队尾 Node SHARED = new Node() -> mode(nextWaiter)final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {// 前驱节点是头结点,CAS发现仍有可竞争的资源int r = tryAcquireShared(arg);if (r >= 0) {// 将当前节点放到队头 状态设置为0,且切断与头结点的连接setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)// 使用中断的方法,让线程退出等待状态,变为就绪selfInterrupt();failed = false;return;}}// 直到前驱节点获得锁,park当前节点if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}// CAS将节点加入队尾,如果失败,enq方法自旋直至成功加入队尾private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 自旋加入队尾直至成功enq(node);return node;}// 当前节点设置为0,取消绑定的线程,取消前驱指针private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // 记录头结点用于检查后继节点setHead(node);// 无条件传播 或旧头结点为空 或旧头结点为-1 SIGNAL 或新头结点为空 或新头结点为-1 SIGNAL 则取新头结点后继节点if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;// 后继节点为空 或后继节点也是共享模式节点if (s == null || s.isShared())doReleaseShared();}}private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}// 头结点设置为0并唤醒后继节点,若期间头结点被改变则设置其为无条件传播,下次可以跳过此节点private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {// CAS将运行的节点 -1设置为0,检测后继节点是否正等待唤醒if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // 再次检查头结点cas设置为0,并唤醒后继第一个条件合适的节点unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))// 设置头结点为无条件传播,下次可以跳过,保证传播性continue;   //CAS失败继续循环,继续循环,直到节点 = -1运行   }// 如果头结点被改变了继续循环,设置为无条件传播,进入循环获取新的头结点继续判断if (h == head)  break;}}// 再次cas设置头结点为0,并唤醒后继第一个条件合适的节点private void unparkSuccessor(Node node) {int ws = node.waitStatus;// 再次检查节点是否设置为0,未设置CAS设置为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;// 后继节点为空 或取消了 从同步队尾往前遍历取最靠近头结点的第一个条件合适的节点if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 唤醒指定的线程if (s != null)LockSupport.unpark(s.thread);}// 检测前驱节点的状态,返回当前节点是否竞争失败,失败后需park阻塞private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; if (ws == Node.SIGNAL)// 前驱节点 成为了 头结点/获取到锁的节点return true;if (ws > 0) {// 前驱节点已被取消,跳过前驱节点并重试do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// CAS尝试设置前驱节点状态为-1 SIGNAL,标识当前节点正在等待被唤醒// waitStatus必须是0或者-3,保证不会在park之前被唤醒compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}// park阻塞当前线程,并检查中断状态private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}

可以看到,共享模式下的获取acquire方法整体流程与独占模式的大体一致,区别点仅有两个地方:

  • tryAcquireShared的实现,在Semaphore中,此方法实现大概为获取当前可运行的线程个数,返回available - acquires,即可运行的线程数 - 入队阻塞个数。以此方法实现多线程共享模式。
  • addWaiter(mode) 传入的mode不同,共享模式下 SHARED = new Node()。
  • 共享模式可以获得多个资源,在自旋获取state时,若前驱节点已经获得资源,且资源数量>=0,则切断头结点,将当前节点设为新的头结点且状态设为0,用中断的方式由等待态变为就绪。

释放releaseShared

我们看一下releaseShared释放方法,以下贴出整个调用链路的源码:

    // 释放releaseShared方法完整调用链路public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}// tryReleaseShared 同样也是提供了一个模板方法,AQS中没有具体实现,交由子类实现// Semaphore中的实现是,CAS设置 上一次剩余可运行线程数 为 当前可运行线程数 + 释放个数protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}// 确保一个释放行为的传播性,即使有其他正在进行的获取/释放操作;这个方法通常会尝试唤醒头结点的后继节点,如果它处于等待唤醒的状态,如果不是,那么设置为 PROPAGATE -3 传播状态,保证在此释放结束之内,传播能继续下去。此外,我们必须循环,以防在执行此操作时添加新节点。另外,与其他的unparkSuccessor用法不同,我们需要知道CAS重置状态是否失败,如果是,则重新检查。private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {// 头结点cas设置为0,检验后继阶段是否正在等待唤醒 -1 -> 0if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;   // 循环检查// unpark唤醒后继节点unparkSuccessor(h);}// cas设置为-3 无条件传播else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;      // cas失败继续循环}if (h == head)         // 如果头结点变更,取当前新头结点状态继续循环break;}}private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0) //CAS 设置为0compareAndSetWaitStatus(node, ws, 0);Node s = node.next;// 如果头结点的 下一个节点为空 或者已取消(只有取消状态=1 >0)if (s == null || s.waitStatus > 0) {s = null;// 双向链表的用处:// 自尾节点往前遍历,遍历到头结点后面第一个符合唤醒条件的节点,unpark唤醒它for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

锁的属性:公平性,响应中断,超时

接下来说一下锁的几个属性,公平性,响应中断,超时,这几个属性也都分别对应了一个方法。


公平性

为何会有公平性这一说呢,在AQS源码中并没有看到,但是到在应用工具类里,即继承了AQS的实现中,例如在ReentrantLock里,可以看到Sync和FairSync这一组继承AQS的实现类,里面存在nonfairTryAcquire和fairTryAcquire两个方法。

因此公平和非公平这是在应用层所衍生出来的锁属性。

那么它们有什么差异呢,总结来说,非公平性即立即尝试插一次队,我们从源码入手:
AQS-公平性源码对比
由源码可以看出,他俩只有一点差异,即非公平性的锁会立即尝试一次竞争锁,如果成功则插队;否则后续逻辑与公平性的锁一样,自旋等待前一节点成为头结点后才竞争锁。

响应中断

响应中断和超时,都是AQS里加锁操作时的异常处理场景,可以视为应用级别的锁属性,我们把AQS中存在的方法体列出来,可以看到存在可中断、带超时时长的加锁方法。

java">// 独占模式方法
# acquire(int arg) : void -- 不可中断的加锁
# acquireInterruptibly(int arg) : void --可中断的加锁
# tryAcquireNanos(int arg, long nanosTimeout): boolean --带超时时长的可中断加锁# release(int arg) : boolean --解锁// 共享模式方法
# acquireShared(int arg) : void --不可中断的加锁
# acquireSharedInterruptibly(int arg) : void --可中断的加锁
# tryAcquireSharedNanos(int arg, long nanosTimeout) : boolean --带超时时长的可中断加锁# releaseShared(int arg) : boolean --解锁

简单来说,可中断即对线程的检查, 若线程已中断,抛出异常进行中断结束任务。以下是acquireInterruptibly的源码,我们来看看:

java">public final void acquireInterruptibly(int arg)throws InterruptedException {// 检查线程的状态,若已中断抛出异常结束操作if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))// 和加锁acquire主流程一致,已经忘记了的话可以往前翻翻流程图doAcquireInterruptibly(arg);}

超时

和可中断相对应的另一种场景,就是超时,AQS允许设置超时时长的加锁,若超时同样抛出异常进行中断结束任务。我们看看tryAcquireNanos的源码:

java">public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {// 检查线程的状态,若已中断抛出异常结束操作if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;// 计算超时截止时间final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}nanosTimeout = deadline - System.nanoTime();// 若超时直接返回失败if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);// 检查线程的状态,若已中断抛出异常结束操作if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

扩展设计:ConditionObject与等待队列

在AQS中,存在ConditionObject对象,也是采用的CLH变体结构设计的队列,可以视为AQS队列(文中后续称为Sync Queue)的双生队列,我们暂且称其为Condition Queue。

Condition Queue在AQS里的角色类似于缓冲区,即从Sync Queue移到Condition Queue上,可以自主操控节点处于等待环节;后续节点需要加锁时,可以移回Sync Queue等待系统调度,能操作线程暂停和恢复,从而实现精准顺序唤醒。
AQS与Condition


Condition中的操作

JUC包中存在Condition接口,它提供了await()和signal()两个阻塞和唤醒线程的方法,而ConditionObject就是其实现类。

在AQS中,ConditionObject 提供的await()和signal()方法,能够为多线程之间交互提供帮助,能让线程暂停和恢复,可以实现精准顺序唤醒,是很重要的方法。


await 等待

在Node的节点状态中,有等待状态 CONDITION = -2,此状态代表阶段处于等待状态,插入入ConditionObject 等待队列中。

在AQS中,调用了await方法时,当前正持有锁的头节点会被放入等待队列中。接下来我们看一下完整调用链路:

java">		// await完整调用链路public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 将当前线程绑定到等待节点,插入等待队列尾部,返回当前插入的节点Node node = addConditionWaiter();// 释放当前持有锁的节点,保存当前持有锁的状态state,唤醒后继节点int savedState = fullyRelease(node);// 0 表示等待过程,即在等待队列中未被中断,保证线程的有效性// REINTERRUPT = 1;表示在移出等待队列,恢复到同步队列中时被重新中断// THROW_IE = -1;表示在signal前被cancel中断退出等待队列,并抛出InterruptedExceptionint interruptMode = 0;// 等待节点被锁定直到 要么被cancel中断,要么被重新中断,则break// 要么被signal()方法唤醒,isOnSyncQueue:检查当前节点是否在同步队列中while (!isOnSyncQueue(node)) {LockSupport.park(this);// 不在同步队列中,即在等待队列中,检查是否被中断,保证线程的有效性if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 已成功加入同步队尾,判断线程是被signal唤醒还是被cancel中断了// 尝试自旋竞争state 成功 且 在未signal前被中断if (acquireQueued(node, savedState) && interruptMode != THROW_IE)				// 中断模式设置为 重新中断,此情况与signal 中的unpark对应,由于前驱节点CAS竞争失败,表明前驱节点已成为头结点,而作为后继节点却竞争成功,所以应重新中断interruptMode = REINTERRUPT;if (node.nextWaiter != null) // 此节点的后继仍有等待节点,遍历等待队列清除不是等待的节点,包括当前节点unlinkCancelledWaiters();if (interruptMode != 0)// 再次检查中断状态,根据状态返回结果reportInterruptAfterWait(interruptMode);}// 先判断队尾节点状态,不为等待,则遍历等待队列清除不是等待状态的节点,并将当前节点插入等待队尾private Node addConditionWaiter() {Node t = lastWaiter;// 如果最后一个等待节点已经取消等待,清除它if (t != null && t.waitStatus != Node.CONDITION) {// 遍历等待队列,移出不是等待的节点unlinkCancelledWaiters();t = lastWaiter;}// 将当前线程绑定到Node 节点上,设置类型waiterState =CONDITION -2Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)// 尾节点为空,即等待队列为空,将新标记为等待的节点加入队尾firstWaiter = node;else// 尾节点存在,等待队列有节点,插入队尾t.nextWaiter = node;lastWaiter = node;return node;}// 从等待头节点遍历等待队列,将不是等待状态的节点移出private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {// 取后继等待节点Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {// 若当前节点已经不为等待状态,将后继指针切断t.nextWaiter = null;// 当前节点的前驱节点也为空,那么后继节点则为头结点if (trail == null)firstWaiter = next;else// 前驱节点不为空,将前驱节点的后继指向后继节点,即当前节点被移出队伍trail.nextWaiter = next;if (next == null)// 如果后继节点也为空,则前驱节点就成为了尾节点lastWaiter = trail;}// 当前节点仍为等待状态,往后继节点继续遍历elsetrail = t; // 指向当前节点t = next; //指向后继节点}}// 将当前运行的线程状态全部释放final int fullyRelease(Node node) {boolean failed = true;try {// 获取当前state,即当前运行线程的状态,保存下来int savedState = getState();// 释放线程(独占模式)if (release(savedState)) {failed = false;// 保存出队前的状态,用于唤醒线程return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}// 验证当前节点是否在同步队列中final boolean isOnSyncQueue(Node node) {if (node.waitStatus == Node.CONDITION || node.prev == null)// 节点状态仍为等待,或者前驱没有节点(同步队列从队尾插入,除了获取锁的头结点,一般都有前驱节点)return false;if (node.next != null) // 如果存在后继节点,它肯定也是在队列里的  return true;// 从同步队尾往前循环遍历,验证节点的存在return findNodeFromTail(node);}// 从尾部循环遍历,是否存在节点private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}// 检查是否有中断,如果在single()之前中断,则返回-1;如果在single()之后,则返回1;如果没有中断,则返回0。private int checkInterruptWhileWaiting(Node node) {// 在中断的前提下从等待队列移到同步队列成功返回 THROW_IE = -1;表示在single()之前,被cancel中断退出等待队列,并抛出InterruptedException // 在中断的前提下,节点状态已经从CONDITION 变为0了,且在同步队列中,返回 REINTERRUPT = 1;表示在signal()后被重新中断 // 没有被中断返回 0 return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}// 检测线程的中断状态public static boolean interrupted() {return currentThread().isInterrupted(true);}// 节点取消等待状态将其从等待队列移出,放入同步队尾final boolean transferAfterCancelledWait(Node node) {// 节点仍为等待状态,CAS从等待-2设置为0,设置成功证明等待节点还未被signal(),但是此时在signal前已经中断,则最后要抛出异常InterruptedException来表明节点状况if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {// 转移节点,从等待队列中拿出,循环插入同步队尾直到成功enq(node);return true;}// 节点已经不处于 CONDITION 状态,则节点已为0 ,即是在被single()之后// 在一个转移过程中被取消是罕见的现象,即将节点从等待队列加入到队尾过程,所以自旋等待 signal方法中 enq 方法结束才可以操作,所以此时自旋即可// 自旋判断节点是否已经在同步队列中,未在 则暂停线程让出资源while (!isOnSyncQueue(node))Thread.yield();// 已经在同步队列,节点状态仍为 CONDITION,代表在signal后被重新中断了return false;}// 自旋竞争state,若竞争state失败,证明其他线程持有锁final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();// 前驱节点为头结点 且此节点CAS竞争state成功if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 自旋检测前驱节点state,竞争state,竞争失败执行park阻塞// park的前提是检查到前驱节点拥有锁,释放后可以安心unpark此节点if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)// 最终失败,取消竞争操作cancelAcquire(node);}}// 根据状态返回结果,或者不操作,private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)//若为THROW_IE -1,则抛出中断异常;throw new InterruptedException();else if (interruptMode == REINTERRUPT)// 若为 REINTERRUPT 1,表示再次被中断,则对当前线程进行中断操作selfInterrupt();}

signal 唤醒

与await 方法相对于的,就是signal 唤醒方法,它可以将被await 等待的线程进行唤醒。

接下来我们看一下signal 方法,以下是它的完整调用链路:

java">		//signal 方法完整调用链路public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}// 同样也是提供了一个模板方法,AQS中没有具体实现,交由子类实现// 用于判断当前线程是否持有锁protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}// ReentrantLock中的实现:判断当前线程是否为拥有锁的独占线程protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}// 遍历等待队列,将一个合适的等待节点设为0插入同步队尾,并unpark当前线程提供一个竞争机会private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)// 后继等待节点为空,即等待队列已经没有节点了,设置尾部指针为空lastWaiter = null;// 当前等待首节点的后继指针切断,设为空first.nextWaiter = null;// 等待首节点设置为0插入同步队尾失败 且 后继节点不为空,赋值为后继节点继续循环} while (!transferForSignal(first) &&(first = firstWaiter) != null);}// 将等待节点插入队尾,并设置为初始态=0,除了前置节点正在拥有锁的情况下,unpark当前线程提供一个竞争机会final boolean transferForSignal(Node node) {if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))// 设置等待节点状态为0失败return false;// 设置节点为0成功,循环插入同步队尾直至插入成功,返回前置节点,即旧的尾节点Node p = enq(node);int ws = p.waitStatus;// 只有CANCELLED = 1 > 0,如果 前置节点已经被取消,尝试唤醒当前线程// 前置节点设置为 SIGNAL=-1 失败,意味着CAS竞争锁失败,尝试唤醒当前线程if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);// 只要成功插入同步队尾即算作成功return true;}

AQS设计模式巧思:模版方法

AQS设计为一个抽象类,但在他的类里却没有任何一个抽象方法,取而代之的是定义了很多抛出UnsupportedOperationException异常的空方法,这是为什么呢?

这就是AQS设计的巧思,模版方法的设计,AQS提供了一套加锁、解锁的模版方法,但是它的子类不需要实现所有方法,可以实现自己需要的方法即可,未实现的方法在调用是就会抛出异常,用这个设计保证了模版里方法的安全性。

这一个实现抽象时很好的设计思路,日常编码时非常值得借鉴。

java">protected boolean tryAcquire(int arg) {// 若子类未实现,调用此方法会抛出异常throw new UnsupportedOperationException();
}

AQS的应用

因为模版方法的设计,我们可以自定义实现自己的线程操作工具,在JUC包中也存在了一些官方提供的实现类工具,例如ReentrantLock、CyclicBarrier、CountDownLatch、Semaphore等常用的工具类,他们都各有特点,后续我们会展开说明。


http://www.ppmy.cn/news/1534295.html

相关文章

关系数据库标准语言SQL(11,12)

目录 带有EXISTS谓词的子查询 exists谓词 例子 not exists谓词 例子 不同形式的查询间的替换 用EXISTS/NOT EXISTS实现全称量词 用EXISTS/NOT EXISTS:实现逻辑蕴涵 集合查询 并操作UNION 交操作INTERSECT 差操作EXCEPT 基于派生表的查询 select语句的基本格式 带有…

C(十)for循环 --- 黑神话情景

前言&#xff1a; "踏过三界宝刹&#xff0c;阅过四洲繁华。笑过五蕴痴缠&#xff0c;舍过六根牵挂。怕什么欲念不休&#xff0c;怕什么浪迹天涯。步履不停&#xff0c;便是得救之法。" 国际惯例&#xff0c;开篇先喝碗鸡汤。 今天&#xff0c;杰哥写的 for 循环相…

【Android 13源码分析】Activity生命周期之onCreate,onStart,onResume-1

忽然有一天&#xff0c;我想要做一件事&#xff1a;去代码中去验证那些曾经被“灌输”的理论。                                                                                  – 服装…

LeetCode hot100---双指针专题(C++语言)

双指针 (1)快慢双指针 适用于使用双指针进行元素移动&#xff0c;覆盖(2)首尾双指针 计算区域面积&#xff0c;三数之和1、移动0 &#xff08;1&#xff09;题目描述以及输入输出 (1)题目描述: 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#…

opencv实战项目(三十):使用傅里叶变换进行图像边缘检测

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一&#xff0c;什么是傅立叶变换&#xff1f;二&#xff0c;图像处理中的傅立叶变换&#xff1a;三&#xff0c;傅里叶变换进行边缘检测&#xff1a; 一&#xff0c…

JavaScript 数组方法

数组(array)是按次序排列的一组值。每个值的位置都有编号(从0开始)&#xff0c;整个数组用方括号表示。两端的方括号是数组的标志。 var a["a","b","c"]; 除了在定义时赋值&#xff0c;数组也可以先定义后赋值。 var arr[];arr[1]"a"…

分治算法:谈一谈大规模计算框架 MapReduce 中的分治思想

分治算法:谈一谈大规模计算框架 MapReduce 中的分治思想 在当今大数据时代,处理大规模数据的需求日益增长。MapReduce 作为一种广泛应用的大规模计算框架,其核心思想正是分治算法。本文将深入探讨 MapReduce 中的分治思想,并通过具体案例进行说明。 一、分治算法概述 分…

【JavaScript】数组函数汇总

JavaScript数组函数是处理和操作数据的基础&#xff0c;对于JavaScript开发至关重要。函数式编程方法&#xff0c;如map()、filter()和reduce()&#xff0c;能够提高代码的简洁性和功能性。数据不可变性是现代JavaScript开发中的一个重要概念&#xff0c;函数如concat()和slice…