文章目录
Pre
J.U.C - 深入解析ReentrantLock原理&源码
概述
配合synchronized同步锁在同步代码块中调用加锁对象notify和wait方法,可以实现线程间同步,在JUC锁里面也有相似的实现:Condition类,利用条件变量Condition的signal和await方法用来配合JUC锁也可以实现多线程之间的同步。
和synchronized不同的是,一个synchronized锁只能用一个共享变量(锁对象)的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量,对线程的等待、唤醒操作更加详细和灵活。
在调用加锁对象的notify和wait方法前必须先获取该共享变量的内置锁。同样,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁,因此Condition必须要配合锁一起使用,一个Condition的实例必须与一个Lock绑定,Condition一般都是作为Lock的内部实现。
Condition__24">Condition 主要方法
Condition提供了一系列的方法来阻塞和唤醒线程:
- 1)
await()
:造成当前线程在接到信号或被中断之前一直处于等待状态。 - 2)
await(long time,TimeUnit unit)
:当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 - 3)
awaitNanos(long nanosTimeout)
:当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值<= 0,则可以认定它已经超时。 - 4)
awaitUninterruptibly()
:当前线程在接到信号之前一直处于等待状态。注意:该方法对中断不敏感。 - 5)
awaitUntil(Date deadline)
:当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回false。 - 6)
signal()
:唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。 - 7)
signalAll()
:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
Condition_45">Condition案例
如何使用 ReentrantLock
和 Condition
实现“等待”和“唤醒”的功能。创建了两个线程:Thread-await
和 Thread-signal
,分别实现等待和唤醒的功能。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ConditionExample {public static void main(String[] args) {// 创建一个重入独占锁Lock lock = new ReentrantLock();// 使用锁对象创建一个条件变量Condition condition = lock.newCondition();// 创建等待线程Thread awaitThread = new Thread(() -> {lock.lock();try {System.out.println("Thread-await: 已获得锁,开始等待...");// 调用await方法,当前线程会释放锁并被阻塞等待condition.await();System.out.println("Thread-await: 被唤醒,继续执行...");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();System.out.println("Thread-await: 释放锁");}}, "Thread-await");// 创建唤醒线程Thread signalThread = new Thread(() -> {lock.lock();try {// 模拟一些处理时间Thread.sleep(2000);System.out.println("Thread-signal: 已获得锁,唤醒等待线程...");// 调用signal方法,唤醒一个被阻塞的线程condition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();System.out.println("Thread-signal: 释放锁");}}, "Thread-signal");// 启动等待线程awaitThread.start();// 让等待线程先执行一段时间try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 启动唤醒线程signalThread.start();}
}
-
创建锁和条件变量:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition();
-
创建等待线程
Thread-await
:- 获取锁。
- 调用
condition.await()
方法,当前线程会释放锁并被阻塞等待。 - 被唤醒后,继续执行并释放锁。
-
创建唤醒线程
Thread-signal
:- 获取锁。
- 模拟一些处理时间(2秒)。
- 调用
condition.signal()
方法,唤醒一个被阻塞的线程。 - 释放锁。
-
启动线程:
- 先启动等待线程
Thread-await
。 - 让等待线程先执行一段时间(1秒)。
- 再启动唤醒线程
Thread-signal
。
- 先启动等待线程
运行过程
-
线程
Thread-await
获得锁并调用await
方法:- 输出:“Thread-await: 已获得锁,开始等待…”
- 当前线程释放锁并被阻塞等待。
-
线程
Thread-signal
获得锁并调用signal
方法:- 模拟处理时间(2秒)。
- 输出:“Thread-signal: 已获得锁,唤醒等待线程…”
- 调用
condition.signal()
方法,唤醒被阻塞的Thread-await
线程。 - 释放锁。
-
线程
Thread-await
被唤醒并重新获得锁:- 输出:“Thread-await: 被唤醒,继续执行…”
- 继续执行并释放锁。
-
线程
Thread-await
释放锁:- 输出:“Thread-await: 释放锁”
Condition_165">Condition的源码解析
在案例中,lock. newCondition()
的作用其实是新建了 一 个ConditionObject
对象,ConditionObject
是AQS的内部类,可以访问AQS内部的变量(例如状态变量state)和方法。
在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。
/*** 返回一个 {@link Condition} 实例,用于与这个 {@link Lock} 实例一起使用。** <p>返回的 {@link Condition} 实例支持与内置监视器锁一起使用的 {@link Object} 监视器方法({@link Object#wait() wait},* {@link Object#notify notify} 和 {@link Object#notifyAll notifyAll})相同的功能。** <ul>** <li>如果在调用 {@link Condition} 的任何 {@linkplain Condition#await() 等待} 或 {@linkplain Condition#signal 通知} 方法时未持有此锁,* 则会抛出 {@link IllegalMonitorStateException}。** <li>当调用条件的 {@linkplain Condition#await() 等待} 方法时,锁会被释放,并且在这些方法返回之前,锁会被重新获取,锁的持有计数恢复到调用方法时的状态。** <li>如果线程在等待时被 {@linkplain Thread#interrupt 中断},则等待将终止,抛出 {@link InterruptedException},并且线程的中断状态将被清除。** <li>等待的线程按先进先出(FIFO)顺序被通知。** <li>从等待方法返回的线程重新获取锁的顺序与最初获取锁的线程相同,默认情况下未指定,但对于 <em>公平</em> 锁,优先考虑等待时间最长的线程。** </ul>** @return 条件对象*/public Condition newCondition() {return sync.newCondition();}final ConditionObject newCondition() {return new ConditionObject();}
1. 等待:condition. await
调用Condition的await()系列方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
注意这个Condition队列和AQS队列有所区别,Condition的等待队列是一个单向队列
/*** 实现可中断的条件等待。* <ol>* <li>如果当前线程被中断,抛出 InterruptedException。* <li>保存由 {@link #getState} 返回的锁状态。* <li>使用保存的锁状态作为参数调用 {@link #release},如果失败则抛出 IllegalMonitorStateException。* <li>阻塞直到被信号唤醒或被中断。* <li>通过调用带有保存的锁状态作为参数的 {@link #acquire} 的专门版本来重新获取锁。* <li>如果在第 4 步被阻塞时被中断,抛出 InterruptedException。* </ol>*/public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException(); // 如果当前线程被中断,抛出 InterruptedExceptionConditionNode node = new ConditionNode(); // 创建一个新的条件节点int savedState = enableWait(node); // 保存锁状态LockSupport.setCurrentBlocker(this); // 设置当前阻塞者,用于向后兼容boolean interrupted = false, cancelled = false, rejected = false; // 初始化标志变量while (!canReacquire(node)) { // 循环直到可以重新获取锁if (interrupted |= Thread.interrupted()) { // 检查当前线程是否被中断if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)break; // 如果节点状态为 COND,则中断后退出循环} else if ((node.status & COND) != 0) { // 检查节点状态是否为 CONDtry {if (rejected)node.block(); // 如果被拒绝,调用 block 方法elseForkJoinPool.managedBlock(node); // 否则调用 managedBlock 方法} catch (RejectedExecutionException ex) {rejected = true; // 标记为被拒绝} catch (InterruptedException ie) {interrupted = true; // 标记为中断}} elseThread.onSpinWait(); // 如果在入队过程中被唤醒,进行自旋等待}LockSupport.setCurrentBlocker(null); // 清除当前阻塞者node.clearStatus(); // 清除节点状态acquire(node, savedState, false, false, false, 0L); // 重新获取锁if (interrupted) { // 如果线程被中断if (cancelled) {unlinkCancelledWaiters(node); // 移除已取消的等待者throw new InterruptedException(); // 抛出 InterruptedException}Thread.currentThread().interrupt(); // 重新设置线程的中断状态}}
Condition_signal_279">2. 唤醒Condition. signal
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中
/*** 将等待时间最长的线程(如果存在)从这个条件的等待队列移动到拥有锁的等待队列。** @throws IllegalMonitorStateException 如果 {@link #isHeldExclusively} 返回 {@code false}*/public final void signal() {ConditionNode first = firstWaiter; // 获取等待队列中的第一个节点if (!isHeldExclusively()) // 检查当前线程是否独占持有锁throw new IllegalMonitorStateException(); // 如果没有持有锁,抛出异常if (first != null) // 如果等待队列不为空doSignal(first, false); // 调用 doSignal 方法,将第一个节点从条件等待队列移动到锁的等待队列}
/*** 移除并转移一个或所有等待者到同步队列。*/private void doSignal(ConditionNode first, boolean all) {while (first != null) { // 遍历等待队列ConditionNode next = first.nextWaiter; // 获取下一个等待节点if ((firstWaiter = next) == null) // 更新 firstWaiter,如果 next 为 null,则 lastWaiter 也设为 nulllastWaiter = null;if ((first.getAndUnsetStatus(COND) & COND) != 0) { // 检查节点状态是否为 CONDenqueue(first); // 将节点从条件等待队列移动到同步队列if (!all) // 如果只需要唤醒一个等待者,则退出循环break;}first = next; // 继续遍历下一个节点}}
Condition_320">Condition总结
阻塞:await()
方法中,在线程释放锁资源之后,如果节点不在AQS等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁。
释放:signal()
后,节点会从condition队列移动到AQS等待队列,则进入正常锁的获取流程