尚硅谷(140-155)
18 AQS
前置知识
- 公平锁和非公平锁
- 可重入锁
- 自旋思想
- LockSupport
- 双向链表
- 设计模式——模块设计
18.1 AQS入门级别理论知识
AQS一般指的是 AbstractQueuedSynchronized
AQS 是用来实现锁或者其他同步器组件的公共基础部分的抽象实现,是重量级基础架构以及整个JUC体系的基石,主要用于解决锁分配给谁的问题。
整体就是一个FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量表示持有锁的状态。
与AQS有关的:ReentrantLock
、CountDownLatch
、ReentrantReadWriteLock
、Semaphore
既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH队列(双向链表) 的变体实现的.将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。
AbstractQueuedSynchronized
部分源码
- Node 等待线程的保存位置
- head 前指针
- tail 后指针(所以是双向队列)
- state 线程状态位
18.2 AQS源码分析前置知识储备
AQS内部体系架构
18.2.1 AQS 自身
state
- 0,线程可以进入
- 大于等于1,有人占用资源,需要等待
CLH 队列
- Node 等待线程的保存位置
- head 前指针
- tail 后指针(所以是双向队列)
总结:有阻塞就需要排队,实现排队必然需要队列,state 变量 + CLH 双端队列
18.2.2 ASQ 内部 Node 类
static final class Node {// 表示线程以共享的模式等待锁static final Node SHARED = new Node();// 表示线程正在以独占的方式等待锁static final Node EXCLUSIVE = null;// 表示当前线程获取锁的请求已经取消了static final int CANCELLED = 1;// 表示当前线程已经准备好了,就等资源释放了static final int SIGNAL = -1;// 表示结点在等待队列中,结点线程等待唤醒static final int CONDITION = -2;// 当前线程处在SHARED情况下,该字段才会使用,传播static final int PROPAGATE = -3;// 当前结点在队列中的状态(上面四种)volatile int waitStatus;// 前驱指针volatile Node prev;// 后继指针volatile Node next;// 表示醋鱼该结点的线程volatile Thread thread;// 指向下一个处于condition状态的结点Node nextWaiter;final boolean isShared() {return nextWaiter == SHARED;}final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { }Node(Thread thread, Node mode) {this.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus;this.thread = thread;}}
18.3 AQS源码深度讲解和分析
Lock 接口的实现类,基本都是通过【聚合】一个【队列同步器】的子类完成线程访问控制的
ReentrantLock的原理
默认使用的是非公平锁,传入的是true的话则是公平锁
public ReentrantLock() {sync = new NonfairSync();
}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}
进入非公平锁
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;final void lock() {// 我们希望是没有线程占领资源的,如果没有占领资源,把0置为1if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());// 如果有线程占用,acquire置1,去排队elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
}
公平锁:直接置1
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}...
}
所以,创建代码的时候
// 非公平
Lock lock = new ReentrantLock();
// 公平
Lock lock = new ReentrantLock(true);
// 非公平
Lock lock = new ReentrantLock(false);
在尝试获得锁的过程中 公平锁和非公平锁,就差异了一个 !hasQueuedPredecessors()
,有没有前驱对象。
如果公平锁中有前驱对象,需要在队列中等待;非公平锁则需要去抢夺
// 公平锁
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}// 非公平锁
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) { // 由 0 变 1 setExclusiveOwnerThread(current); // 设置当前的独占线程return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
由于 FairSync 和 NonfairSync 都是要调用 acquire
方法,置 1 表示有线程抢到了锁
lock()方法
以 NonfairSync 为例,AQS acquire 主要的三条流程
// ReentrantLock 中的 lock 方法
public void lock() {sync.lock();
}
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;// Sync中的抽象方法abstract void lock();...
}
// FairSync 和 NonfairSync 两个方法都继承了 Sync方法的 lock 操作,下面的代码是 NonfairSync 内部类的 lock 方法实现
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);
}
// 进入acquire方法
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // Node.EXCLUSIVE 独占的 node 结点selfInterrupt();
}
1、调用tryAcquire
// 由AQS中的tryAcquire方法
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}// 在非公平的NonfairSync中的实现
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 从希望值的0 修改为1if (compareAndSetState(0, acquires)) {// 第一个线程抢占到了setExclusiveOwnerThread(current);return true;}}// 如果是被占用的线程就进入,不是的话返回falseelse if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
2、调用addWaiter方法:入队
private Node addWaiter(Node mode) {// 封装一个当前线程,mode 是 排他模式Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) { // 等待队列中有线程在等待了node.prev = pred; // 把当前队列中的尾指针指向的结点赋值给新进线程的前驱结点if (compareAndSetTail(pred, node)) { // 将新进的node设置为尾指针pred.next = node; // 把之前尾结点的后继结点设置为新进结点nodereturn node;}}// 进入结点enq(node);return node;
}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node())) // 入队的结点不是传入的结点(虚拟节点&哨兵节点,作用就是占位),new Node(){Thread = null;waitStatus = 0;},而是一个新的结点,设置为头/尾结点tail = head;} else {node.prev = t; // 把 t(也就是伪指针,指的是新的node)赋值给 node (传入的结点)的前指针if (compareAndSetTail(t, node)) { // 把node设置为尾指针t.next = node;return t;}}}
}
3、调用 acquireQueued 方法
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 正常的入队出队boolean interrupted = false;for (;;) {final Node p = node.predecessor(); // B结点的前驱结点是虚拟结点,也是头结点if (p == head && tryAcquire(arg)) { // 如果当前还有线程占用,就往下走,如果没有进入代码块。tryAcquire(arg) 可能来夺锁setHead(node); // 将当前结点设置为头结点p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) && // B结点的前驱结点也就是虚拟结点,node就是B结点parkAndCheckInterrupt()) interrupted = true;}} finally {// 意外中断,取消排队if (failed)cancelAcquire(node);}
}
private void setHead(Node node) {head = node; // 将当前结点B设置为头结点node.thread = null; // 将头结点的thread置为null,没有前驱指针node.prev = null;
}// shouldParkAfterFailedAcquire(p, node) 的源码,前驱结点为SIGNAL返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; // 获取虚拟结点在队列中的状态,有0(默认值),1,-1,-2,-3等状态if (ws == Node.SIGNAL) // 如果当前状态为-1,就是线程准备好了,返回truereturn true;if (ws > 0) { // 如果线程获取锁的活动取消了do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 前置节点(头结点)设置 waitStatus 为 Node.SIGNAL(-1)}return false;
}
// parkAndCheckInterrupt() 方法,也就是this就是B(NonfairSync对象)转着圈,等待这争取锁
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();
}
unlock() 方法
unlock --> release --> tryRelease/unparkSuccessor
// ReentrantLock 中的 unlock 方法
public void unlock() {sync.release(1);
}public final boolean release(int arg) {if (tryRelease(arg)) { // 释放锁Node h = head; // h 为虚拟结点if (h != null && h.waitStatus != 0) // 等待队列中有线程再等待同时 waitStatus = 0unparkSuccessor(h);return true;}return false;
}protected final boolean tryRelease(int releases) {int c = getState() - releases; // 1 - 1 = 0 = cif (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c); // 设置 state = 0;说明已经解锁return free; // 返回 true
}private void unparkSuccessor(Node node) {int ws = node.waitStatus; // 头结点为-1if (ws < 0)compareAndSetWaitStatus(node, ws, 0); // 设置头结点(虚拟结点)status为0Node s = node.next; // 来到真实的结点Bif (s == null || s.waitStatus > 0) { // 队列中没有等待的线程或者当前线程获取锁的请求已经取消了s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null) // 有线程在等待LockSupport.unpark(s.thread);
}
// 对队列中的等待进行唤醒
public static void unpark(Thread thread) {if (thread != null)UNSAFE.unpark(thread);
}
cancelAcquire()
cancelAcquire 方法取消流程
private void cancelAcquire(Node node) { // 比如5号节点if (node == null)return;node.thread = null; // 线程为nullNode pred = node.prev; // 5号节点的前驱结点就是4号结点while (pred.waitStatus > 0) // 如果4号结点也取消,指针继续前移node.prev = pred = pred.prev;Node predNext = pred.next; node.waitStatus = Node.CANCELLED; // 5号结点的waitStatus设置为1if (node == tail && compareAndSetTail(node, pred)) { // 5号结点是尾结点并且把尾指针指向4号结点compareAndSetNext(pred, predNext, null); // 把4号结点的后继结点设为null} else { // 删除的不是尾结点int ws; if (pred != head && // 前驱不是头结点((ws = pred.waitStatus) == Node.SIGNAL || // 如果前驱结点的 waitStatus 是 -1 (随时待命)(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 线程没有被取消,把前驱结点的线程的waitStatus置为-1pred.thread != null) { // 前驱结点的thread不为空Node next = node.next; // 把5号结点的后继结点给nextif (next != null && next.waitStatus <= 0) // 后继结点为不空compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node); // }node.next = node; // help GC}
}//
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0); // 设为默认值Node s = node.next;if (s == null || s.waitStatus > 0) { // 取消线程s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); // 出去抢夺锁
}