如何利用CAS实现一个同步框架
- state:共享标记位。利用CAS修改,达到同步管理
- 等待队列:存储需要等待获取锁的线程
共享标记位state=0 表示资源是空闲的;state=1表示有1个线程获取到资源,如何独占模式,判断持有锁的线程是否是当前线程,若是,则state变为2,达到可重入性
如果获取锁失败立即返回,则不需要入队
如果需要不断的尝试,业务侧可循环适用用tryLock不断重试
AQS作用
定义了加锁、释放锁的骨架
AbstractQueuesSynchorizer属性解释
Queues:队列
Synchorizer: 同步阻塞
volatile int state
用于记、判断共享资源正在被占用的情况
为什么适用int而不是boolean?
为了可重入(reentrantLock多次加锁,state需要一致递增,最后需要对称释放锁)
为了共享锁模式
volatile Node head、tail
头尾节点:线程没有获取到锁,需要排队
Node 的属性
- Node prev、next 双向链表
- Thread thread 当前入队等待的线程
- Node nextWaiter
-
int waitStatus 等待状态 1, -1, -2, -3
核心方法
以下方法是骨架,实际的实现由子类完成
tryAcquire(int arg)
tryRelease(int arg)
acquire(int arg)
获取锁
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
获取锁失败,则进入等待队列
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;// 尝试快速入队if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 完整的入队操作enq(node);return node;}
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}问题: compareAndSetTail 是原子的,但是整个if语句不是原子的,为什么不会出问题?
compareAndSetTail是通过原子的方式设置尾节点(此时包含了tail=node:将当前的node节点设置尾tail),pred.next = node;只是设置之前的tail节点的next节点,这个操作不会有其他线程干扰
问题: 为什么要使用 Node pred = tail?
因为tail节点的指针会被其他线程修改,类似写时复制,如果tail节点改变,pred的指针还是指向原来的tail节点
问题: 为什么要先尝试快速入队,而不是直接入队
应该是测试的结果,认为tryAcquire失败,大概率尾节点不为空,先尝试入队,如果成功,则少了一步完整队逻辑中的 tail为null的判断
完整的入队(循环cas直到入队成功)
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
对线程进程挂起与响应(推进队列中的线程进行消费)
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获取当前节点的上一个节点final Node p = node.predecessor();// 上一个节点是head节点 && 尝试获取锁成功// head节点是一个虚节点,当前节点获取锁后,变成了head节点if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 判断是否需要挂起,避免过度自旋,消耗CPUif (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
acquire完整的流程
- 先tryAcquire获取锁,获取锁失败,则进入等待队列
- 先尝试“快速入队”,失败后在使用“完整入队”。(完整入队比快速入队多判断了一个tail节点是否为null,因为tryAcquire失败,大概率tail不为null,使用快速入队能提高性能)
- 自旋获取锁(判断当前节点的前置节点是否是head节点,并且尝试获取锁,获取失败一直重试)
- 线程挂起(重试一定次数,线程会挂起,避免无效的消耗CPU)
release(int arg)
public final boolean release(int arg) {// 尝试释放锁if (tryRelease(arg)) {Node h = head;// 唤醒等待的线程if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
唤醒线程unpark
private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 从后往前找第一个 waitStatus <=0的for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
加锁、入队、出队、挂起、唤醒
问题:为什么从后往前找?
enq入队方法,从tail尾入队,compareAndSetTail将当前节点设置为tail后,FIFO的倒数第二个节点 ->tail还没有创建链接(t.next = node),如果从前往后找,可能会漏掉此节点。这也是为什么 node.prev = t 在 compareAndSetTail 之前的原因
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}