深入剖析 Java 中的 AbstractQueuedSynchronizer(AQS)

embedded/2024/9/23 7:50:47/

简介

AQS是抽象类AbstractQueueSynchronizer的简称,翻译为抽象的队列同步器,它定义了一套多线程访问共享资源的同步器框架,许多同步器的实现都依赖于它,如ReentrantLockCountDownLatchSemaphore等。

数据结构

在这里插入图片描述
AQS 实现锁和同步工具依赖的核心数据结构:

java">public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizer {private transient volatile Node head;private transient volatile Node tail;private volatile int state;
}public abstract class AbstractOwnableSynchronizer {private transient Thread exclusiveOwnerThread;
}

state

AQS中的state的作用类似于ObjectMonitor中的_owner字段。只不过_owner字段是一个指针,存储的是获取锁的线程,而state是一个int类型的变量,存储0、1等整型值。

  • 0:表示锁没有被占用;
  • 1:表示锁已经被占用;
  • 大于 1:表示重入的次数。

当多个线程竞争锁时,它们会通过compareAndSetState的CAS操作来更新state的值,即先检查state的值是否为0,如果state值为0的话,将state值设置为1。谁设置成功,谁就获取了这个锁。

java">protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

state的访问方式有三种:

  • getState();
  • setState();
  • compareAndSetState()

exclusiveOwnerThread

exclusiveOwnerThread成员变量存储持有锁的线程,它配合state成员变量,可以实现锁的重入机制。

head 和 tail

AQS 使用双向链表来实现等待队列,用来存储等待锁的线程和等待条件变量的线程。双向链表的节点如下所示:

java">static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED =  1; // 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入此状态后的结点将不再变化。static final int SIGNAL    = -1; // 表示后继结点在等待当前结点唤醒。后继结点入队时,会将当前结点的状态更新为SIGNALstatic final int CONDITION = -2; // 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。static final int PROPAGATE = -3; // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。volatile Thread thread;volatile Node prev;volatile Node next;volatile int waitStatus;Node nextWaiter;
}

Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,变量waitStatus则表示当前Node结点的等待状态,共五种取值CANCELLEDSIGNALCONDITIONPROPAGATE0(新节点入队时的默认状态)

负值表示结点处于有限等待状态,而正值表示结点已被取消。所以源码中很多地方用>0,<0来判断结点的状态是否正常。

AQS中的headtail两个成员变量分别为双向链表的头指针和尾指针。

原理

AQS 定义了 8 个模板方法,分别用于 AQS 的两种工作模式:独占模式共享模式
关于模板方法设计模式,可参考 【模板方法】设计模式:构建可扩展软件的基石
在这里插入图片描述

Lock锁为排它锁,因此,Lock锁的底层实现只会用到AQS的独占模式;

ReadWriteLock锁中的读锁为共享锁,写锁为排它锁,因此,ReadWriteLock锁的底层实现既会用到AQS的独占模式,又会用到AQS的共享模式;

Semaphore、CountdownLatch这些同步工具只会用到AQS的共享模式。

AQS又定义了 4 个抽象方法。
在这里插入图片描述

ReentrantLock

ReentrantLock为例,说明下AQS 是怎么用的。
在这里插入图片描述
ReentrantLock定义了两个继承自AQS的子类:NonfairSyncFairSync,分别用来实现非公平锁和公平锁。因为 NonfairSync 和 FairSync 释放锁的逻辑是一样的,所以,NonfairSync 和FairSync 又抽象出了一个公共的父类Sync

java">public class ReentrantLock implements Lock {private final Sync sync;abstract static class Sync extends AbstractQueuedSynchronizer { ... }static final class NonfairSync extends Sync { ... }static final class FairSync extends Sync { ... }public ReentrantLock() {sync = new NonfairSync();}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();} public void lock() {sync.acquire(1);}public void unlock() {sync.release(1);}//...省略其他方法...
}

ReentrantLock中的lock()函数调用AQSacquire()模板方法来实现,unlock()函数调用AQSrelease()模板方法来实现。接下来,我们就来看下acquire()release()的底层实现原理。

acquire()模板方法
java"> public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

函数流程如下:

  • 先调用tryAcquire方法去竞争锁,如果获取锁成功,则acquire方法直接返回;
  • 竞争锁失败,则执行addWaiter方法,将线程包裹为Node节点放入等待队列的尾部;
  • 最后调用acquireQueued阻塞当前线程;
  • 如果线程在等待过程中被中断过,它是不响应的,只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

acquire()不可中断,因此,在acquire()接收到中断时,继续阻塞等待锁,直到获取到锁之后,才调用selfInterrupt()将中断标记恢复。

tryAcquire()

tryAcquire()是抽象方法,在NonfairSyncFairSync中被实现。

java">static final class NonfairSync extends Sync {// 尝试获取锁,成功返回true,失败返回false。AQS用于实现锁时,acquires=1protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState(); //获取state值if (c == 0) { //1、锁没有被其他线程占用if (compareAndSetState(0, acquires)) { //CAS设置state值为1。此处体现了“非公平”,上来就抢锁setExclusiveOwnerThread(current); //设置exclusiveOwnerThreadreturn true; //获取锁成功}} else if (current == getExclusiveOwnerThread()) { //2、锁可重入int nextc = c + acquires; // state+1if (nextc < 0) //重入次数太多,超过了int最大值,溢出为负数,此情况罕见throw new Error("Maximum lock count exceeded");setState(nextc); // state=state+1,state记录重入的次数,解锁的时候用return true; //获取锁成功}return false; //3、锁被其他线程占用}    
}static final class FairSync extends Sync {protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { //1、锁没有被占用if (!hasQueuedPredecessors() &&  //此处体现了“公平”,先判断等待队列中没有线程时才获取锁compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}} else if (current == getExclusiveOwnerThread()) { //2、锁可重入int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
}
java">public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}

两个tryAcquire()方法的区别是在获取锁之前,FairSync会调用hasQueuedPredecessors()函数,查看等待队列中是否有线程在排队,如果有,那么tryAcquire()返回false,表示竞争锁失败,从而禁止“插队”行为。

addWaiter

在多线程环境下,往链表尾部添加节点会存在线程安全问题,因此,下面的代码采用自旋+CAS操作来解决这个问题。

java">private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;//尝试快速模式直接放到队尾if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 再次尝试入队尾enq(node);return node;
}private Node enq(final Node node) {//自旋执行CAS操作(添加节点到链表尾部),直到成功为止for (;;) {Node t = tail;if (t == null) { //链表为空,添加虚拟头节点if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;//CAS操作解决了两个线程同时往链表尾部添加节点时的线程安全问题if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
acquireQueued

acquireQueued()的代码实现如下所示,主要包含两部分逻辑:

  • 使用tryAcquire()函数来竞争锁;
  • 使用park()函数来阻塞线程。
java">final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor(); //拿到前驱节点//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。//如果线程是被中断唤醒的,那么p不一定等于head。只有p==head,线程才能去竞争锁if (p == head && tryAcquire(arg)) {setHead(node);//把node设置成虚拟头节点,也就相当于将它删除p.next = null; // help GCfailed = false;return interrupted;}//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true; //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true}} finally {//以上过程只要抛出异常,都要将这个节点标记为CANCELLED,等待被删除if (failed)cancelAcquire(node);}
}
//此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; //拿到前驱的状态if (ws == Node.SIGNAL)//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了return true;if (ws > 0) {/** 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就GC回收)!*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}// park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
// 1)被unpark();2)被interrupt()。
private final boolean parkAndCheckInterrupt() {//LockSupport.park(this)底层调用JVM提供的Unsafe.park()函数来实现LockSupport.park(this); //调用park()使线程进入waiting状态return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

acquireQueued() 方法不可中断,但LockSupport.park()是可以被中断的。

整个acquire()模板方法的流程如下:
在这里插入图片描述

release()模板方法

主要包含两部分逻辑:使用tryRelease()函数释放锁和调用unpark()函数唤醒链首节点(即虚拟头节点的后继节点)对应的线程。

java">public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head; //找到头结点if (h != null && h.waitStatus != 0)unparkSuccessor(h);//唤醒等待队列里的下一个线程,内部调用unpark()函数,唤醒排在h后面的线程return true;}return false;
}

tryRelease()是抽象方法。不管是公平锁还是非公平锁,tryRelease()释放锁的逻辑相同。

java">// 释放锁,成功返回true,失败返回false。AQS用于实现锁时,releases=1
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c); //state-1 != 0,说明锁被重入多次,还不能解锁。return free;
}

总结

AQS 提供了一个强大而灵活的框架,用于构建锁和其他同步器。通过理解其内部原理和状态管理机制,我们可以创建出高效且线程安全的并发工具。

参考资料
《Java 编程之美》
《 Java并发之AQS详解》


http://www.ppmy.cn/embedded/110287.html

相关文章

文件IO相关知识

补充&#xff1a;一个头文件中可以包含多个内容 1.创建一个头文件&#xff0c;例如myhead.h 2.将能用到的所有头文件都放入该文件中 3.将myhead.h文件&#xff0c;移动到根目录下的usr目录中的include目录下 sudo mv myhead.h /usr/include/ 4.常用的头文件&#xff1a;,#inclu…

高精度监测土体压应力变化量的最佳选择 GEO系列振弦式土压力计

高精度监测土体压应力变化量的最佳选择 GEO系列振弦式土压力计 GEO系列振弦式土压力计广泛适用于长期测量土石坝、防波堤、护岸、码头岸壁、高层建筑、管道基础、桥墩、挡础所受土体的压应力&#xff0c;是了解土体对土中构筑物压应力变化量的有效监测设备。同时&#xff0c;它…

汤臣倍健,三七互娱,得物,顺丰,快手,游卡,oppo,康冠科技,途游游戏,埃科光电25秋招内推

汤臣倍健&#xff0c;三七互娱&#xff0c;得物&#xff0c;顺丰&#xff0c;快手&#xff0c;游卡&#xff0c;oppo&#xff0c;康冠科技&#xff0c;途游游戏&#xff0c;埃科光电25秋招内推 ①得物 【八大职类】技术、供应链、产品、运营、设计、职能、商品研究、风控等大类…

轻松发高分的好思路:GNN+时间序列预测!新SOTA效率翻了5倍

在时序预测领域&#xff0c;通过合理构建和应用图结构&#xff0c;GNN能有效捕捉时空数据中的复杂依赖关系&#xff0c;轻松提高预测的准确性。因此GNN时序预测在学术界和工业界都广受欢迎。 最近这个方向出现了很多效果很好的研究&#xff0c;比如GraFITi模型&#xff0c;利用…

每日一练:合并区间

一、题目要求 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 示例 1&#xff1a; 输入&#xff1a;in…

贪心算法求解拆楼房问题

这是一道典型的贪心算法问题&#xff0c;首先遍历找到一个高度大于0的楼房&#xff0c;然后以此为基准&#xff0c;划分一个区间&#xff0c;找到楼房内高度最小的楼房&#xff0c;每次都减去这个高度最小的值。 后续重复一样&#xff0c;再找减去后楼房高度的最小值&#xff…

matter中的Fabric(网络结构)

什么是Fabric&#xff1f; Fabric可以被理解为一组相互信任的设备和控制器&#xff0c;它们共享一个共同的信任域。这意味着在同一个Fabric中的设备和控制器之间可以进行安全的通信&#xff0c;而无需额外的身份验证或安全检查。每个Fabric有一个唯一的标识&#xff0c;确保Fab…

不同vlan之间的通信方法

1.通过路由器的物理接口 1.给PC1,PC2配置IP地址&#xff0c;网关2.进入交换机配置vlan&#xff0c;交换机所有口都配置access口并绑定vlan3.配置路由器&#xff0c;进入路由器的两个接口配置网关IP和掩码缺点&#xff1a;成本高&#xff0c;每增加一个vlan就需要一个物理端口和…