首先,在AQS中,等待队列是通过Node类来表示的,每个Node节点包含了等待线程的信息以及等待状态。下面是Node类的部分源码:
static final class Node {// 等待状态volatile int waitStatus;// 前驱节点volatile Node prev;// 后继节点volatile Node next;// 等待线程volatile Thread thread;// 等待条件Node nextWaiter;// ...省略其他代码...
}
从上面的代码可以看出,每个Node节点都有一个指向前驱节点和后继节点的指针,这样可以在O(1)时间内查找前驱和后继节点。
接下来,我们来看看AQS是如何使用双向链表来管理等待队列的。AQS内部有一个成员变量volatile Node head,它表示等待队列的头节点。当一个线程需要等待锁或条件时,它会创建一个Node节点并插入到等待队列的尾部。这个过程是通过以下方法实现的:
private Node addWaiter(Node mode) {// 创建一个Node节点,表示当前线程Node node = new Node(Thread.currentThread(), mode);// 尝试通过CAS操作将Node节点插入到等待队列的尾部Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;
}
在上面的代码中,首先创建了一个Node节点表示当前线程,然后尝试通过CAS操作将它插入到等待队列的尾部。如果CAS操作失败,说明有其他线程正在修改等待队列,此时会调用enq()方法来将节点插入到队列中。
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // 如果队列为空,需要先初始化队列if (compareAndSetHead(new Node()))tail = head;} else { // 如果队列不为空,将节点插入到队列尾部node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
在enq()方法中,首先获取等待队列的尾节点,如果尾节点为空,说明队列还没有初始化,需要先创建一个空的头节点来初始化队列。如果尾节点不为空,就将新的节点插入到尾节点的后面。如果CAS操作失败,说明有其他线程正在修改等待队列,这时需要重新尝试。
当一个线程持有锁的线程释放锁时,它会将等待队列的头节点出队并唤醒它的后继节点,这个过程是通过以下方法实现的:
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;
}private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);
}
在上面的代码中,首先将头节点设置为当前节点,然后将头节点的prev指针置为null,表示它已经出队了,将头节点的thread指针置为null,表示当前线程不再持有锁。接下来,通过unparkSuccessor()方法唤醒后继节点。这个方法中,首先检查当前节点的等待状态,如果它的等待状态小于0,说明它是一个被取消的节点,将它的等待状态置为0。然后,找到当前节点的后继节点,如果它不存在或者它的等待状态大于0,说明它不能被唤醒,这时就需要从等待队列的尾部开始往前找,找到一个等待状态小于等于0的节点来唤醒。最后,通过LockSupport.unpark()方法唤醒后继节点的线程。
从双向链表的特性来看,我认为AQS使用双向链表有三个方面的考虑。
第一个方面,没有竞争到锁的线程加入到阻塞队列,并且阻塞等待的前提是,当前线程所在节点的前置节点是正常状态,
这样设计是为了避免链表中存在异常线程导致无法唤醒后续线程的问题。
所以线程阻塞之前需要判断前置节点的状态,如果没有指针指向前置节点,就需要从head节点开始遍历,性能非常低。
第二个方面,在Lock接口里面有一个,lockInterruptibly()方法,这个方法表示处于锁阻塞的线程允许被中断。
也就是说,没有竞争到锁的线程加入到同步队列等待以后,是允许外部线程通过interrupt()方法触发唤醒并中断的。
这个时候,被中断的线程的状态会修改成CANCELLED。
被标记为CANCELLED状态的线程,是不需要去竞争锁的,但是它仍然存在于双向链表里面。
意味着在后续的锁竞争中,需要把这个节点从链表里面移除,否则会导致锁阻塞的线程无法被正常唤醒。
在这种情况下,如果是单向链表,就需要从Head节点开始往下逐个遍历,找到并移除异常状态的节点。
同样效率也比较低,还会导致锁唤醒的操作和遍历操作之间的竞争。
第三个方面,为了避免线程阻塞和唤醒的开销,所以刚加入到链表的线程,首先会通过自旋的方式尝试去竞争锁。
但是实际上按照公平锁的设计,只有头节点的下一个节点才有必要去竞争锁,后续的节点竞争锁的意义不大。
否则,就会造成羊群效应,也就是大量的线程在阻塞之前尝试去竞争锁带来比较大的性能开销。
所以为了避免这个问题,加入到链表中的节点在尝试竞争锁之前,需要判断前置节点是不是头节点,如果不是头节点,就没必要再去触发锁竞争的动作。
所以这里会涉及到前置节点的查找,如果是单向链表,那么这个功能的实现会非常复杂。