目录
- Doug Lea — JUC并发包的作者
- 锁框架
- Lock和Condition接口
- 可重入锁
- 公平锁与非公平锁
- 读写锁
- 锁降级和锁升级
- 队列同步器AQS
- 底层实现
- 公平锁一定公平吗?
- Condition实现原理
———————————————————————————————
在前面,我们了解了多线程的底层运作机制,我们终于知道,原来多线程环境下存在着如此之多的问题。在JDK5之前,我们只能选择synchronized关键字来实现锁,而JDK5之后,由于volatile关键字得到了升级(具体功能就是上一章所描述的),所以并发框架包便出现了,相比传统的synchronized关键字,我们对于锁的实现,有了更多的选择。
Doug Lea — JUC并发包的作者
如果IT的历史,是以人为主体串接起来的话,那么肯定少不了Doug Lea。这个鼻梁挂着眼镜,留着德王威廉二世的胡子,脸上永远挂着谦逊腼腆笑容,服务于纽约州立大学Oswego分校计算机科学系的老大爷。
说他是这个世界上对Java影响力最大的一个人,一点也不为过。因为两次Java历史上的大变革,他都间接或直接的扮演了举足轻重的角色。2004年所推出的Tiger。Tiger广纳了15项JSRs(Java Specification Requests)的语法及标准,其中一项便是JSR-166。JSR-166是来自于Doug编写的util.concurrent包。
锁框架
在JDK 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,Lock接口提供了与synchronized关键字类似的同步功能,但需要在使用时手动获取锁和释放锁。
Lock和Condition接口
使用并发包中的锁和我们传统的synchronized锁不太一样,这里的锁我们可以认为是一把真正意义上的锁,每个锁都是一个对应的锁对象,我只需要向锁对象获取锁或是释放锁即可。我们首先来看看,此接口中定义了什么:
public interface Lock {//获取锁,拿不到锁会阻塞,等待其他线程释放锁,获取到锁后返回void lock();//同上,但是等待过程中会响应中断void lockInterruptibly() throws InterruptedException;//尝试获取锁,但是不会阻塞,如果能获取到会返回true,不能返回falseboolean tryLock();//尝试获取锁,但是可以限定超时时间,如果超出时间还没拿到锁返回false,否则返回true,可以响应中断boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//释放锁void unlock();//暂时可以理解为替代传统的Object的wait()、notify()等操作的工具Condition newCondition();
}
这里我们可以演示一下,如何使用Lock类来进行加锁和释放锁操作:
public class Main {private static int i = 0;public static void main(String[] args) throws InterruptedException {Lock testLock = new ReentrantLock(); //可重入锁ReentrantLock类是Lock类的一个实现,我们后面会进行介绍Runnable action = () -> {for (int j = 0; j < 100000; j++) { //还是以自增操作为例testLock.lock(); //加锁,加锁成功后其他线程如果也要获取锁,会阻塞,等待当前线程释放i++;testLock.unlock(); //解锁,释放锁之后其他线程就可以获取这把锁了(注意在这之前一定得加锁,不然报错)}};new Thread(action).start();new Thread(action).start();Thread.sleep(1000); //等上面两个线程跑完System.out.println(i);}
}
可以看到,和我们之前使用synchronized相比,我们这里是真正在操作一个"锁"对象,当我们需要加锁时,只需要调用lock()方法,而需要释放锁时,只需要调用unlock()方法。程序运行的最终结果和使用synchronized锁是一样的。
那么,我们如何像传统的加锁那样,调用对象的wait()和notify()方法呢,并发包提供了Condition接口:
public interface Condition {//与调用锁对象的wait方法一样,会进入到等待状态,但是这里需要调用Condition的signal或signalAll方法进行唤醒(感觉就是和普通对象的wait和notify是对应的)同时,等待状态下是可以响应中断的void await() throws InterruptedException;//同上,但不响应中断(看名字都能猜到)void awaitUninterruptibly();//等待指定时间,如果在指定时间(纳秒)内被唤醒,会返回剩余时间,如果超时,会返回0或负数,可以响应中断long awaitNanos(long nanosTimeout) throws InterruptedException;//等待指定时间(可以指定时间单位),如果等待时间内被唤醒,返回true,否则返回false,可以响应中断boolean await(long time, TimeUnit unit) throws InterruptedException;//可以指定一个明确的时间点,如果在时间点之前被唤醒,返回true,否则返回false,可以响应中断boolean awaitUntil(Date deadline) throws InterruptedException;//唤醒一个处于等待状态的线程,注意还得获得锁才能接着运行void signal();//同上,但是是唤醒所有等待线程void signalAll();
}
这里我们通过一个简单的例子来演示一下:
public static void main(String[] args) throws InterruptedException {Lock testLock = new ReentrantLock();Condition condition = testLock.newCondition();new Thread(() -> {testLock.lock(); //和synchronized一样,必须持有锁的情况下才能使用awaitSystem.out.println("线程1进入等待状态!");try {condition.await(); //进入等待状态} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程1等待结束!");testLock.unlock();}).start();Thread.sleep(100); //防止线程2先跑new Thread(() -> {testLock.lock();System.out.println("线程2开始唤醒其他等待线程");condition.signal(); //唤醒线程1,但是此时线程1还必须要拿到锁才能继续运行System.out.println("线程2结束");testLock.unlock(); //这里释放锁之后,线程1就可以拿到锁继续运行了}).start();
}
可以发现,Condition对象使用方法和传统的对象使用差别不是很大。
思考:下面这种情况跟上面有什么不同?
public static void main(String[] args) throws InterruptedException {Lock testLock = new ReentrantLock();new Thread(() -> {testLock.lock();System.out.println("线程1进入等待状态!");try {testLock.newCondition().await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程1等待结束!");testLock.unlock();}).start();Thread.sleep(100);new Thread(() -> {testLock.lock();System.out.println("线程2开始唤醒其他等待线程");testLock.newCondition().signal();System.out.println("线程2结束");testLock.unlock();}).start();
}
通过分析可以得到,在调用newCondition()后,会生成一个新的Condition对象,并且同一把锁内是可以存在多个Condition对象的(实际上原始的锁机制等待队列只能有一个,而这里可以创建很多个Condition来实现多等待队列),而上面的例子中,实际上使用的是不同的Condition对象,只有对同一个Condition对象进行等待和唤醒操作才会有效,而不同的Condition对象是分开计算的。
最后我们再来讲解一下时间单位,这是一个枚举类,也是位于java.util.concurrent包下:
public enum TimeUnit {/*** Time unit representing one thousandth of a microsecond*/NANOSECONDS {public long toNanos(long d) { return d; }public long toMicros(long d) { return d/(C1/C0); }public long toMillis(long d) { return d/(C2/C0); }public long toSeconds(long d) { return d/(C3/C0); }public long toMinutes(long d) { return d/(C4/C0); }public long toHours(long d) { return d/(C5/C0); }public long toDays(long d) { return d/(C6/C0); }public long convert(long d, TimeUnit u) { return u.toNanos(d); }int excessNanos(long d, long m) { return (int)(d - (m*C2)); }},//...}
可以看到时间单位有很多的,比如DAY、SECONDS、MINUTES等,我们可以直接将其作为时间单位,比如我们要让一个线程等待3秒钟,可以像下面这样编写:
public static void main(String[] args) throws InterruptedException {Lock testLock = new ReentrantLock();new Thread(() -> {testLock.lock();try {System.out.println("等待是否未超时:"+testLock.newCondition().await(1, TimeUnit.SECONDS));} catch (InterruptedException e) {e.printStackTrace();}testLock.unlock();}).start();
}
当然,Lock类的tryLock方法也是支持使用时间单位的,各位可以自行进行测试。TimeUnit除了可以作为时间单位表示以外,还可以在不同单位之间相互转换:
public static void main(String[] args) throws InterruptedException {System.out.println("60秒 = "+TimeUnit.SECONDS.toMinutes(60) +"分钟");System.out.println("365天 = "+TimeUnit.DAYS.toSeconds(365) +" 秒");
}
也可以更加便捷地使用对象的wait()方法:
public static void main(String[] args) throws InterruptedException {synchronized (Main.class) {System.out.println("开始等待");TimeUnit.SECONDS.timedWait(Main.class, 3); //直接等待3秒System.out.println("等待结束");}
}
我们也可以直接使用它来进行休眠操作:
public static void main(String[] args) throws InterruptedException {TimeUnit.SECONDS.sleep(1); //休眠1秒钟
}
可重入锁
前面,我们讲解了锁框架的两个核心接口,那么我们接着来看看锁接口的具体实现类,我们前面用到了ReentrantLock,它其实是锁的一种,叫做可重入锁,那么这个可重入代表的是什么意思呢?简单来说,就是同一个线程,可以反复进行加锁操作:
public static void main(String[] args) throws InterruptedException {ReentrantLock lock = new ReentrantLock();lock.lock();lock.lock(); //连续加锁2次new Thread(() -> {System.out.println("线程2想要获取锁");lock.lock();System.out.println("线程2成功获取到锁");}).start();lock.unlock();System.out.println("线程1释放了一次锁");TimeUnit.SECONDS.sleep(1);lock.unlock();System.out.println("线程1再次释放了一次锁"); //释放两次后其他线程才能加锁
}
可以看到,主线程连续进行了两次加锁操作(此操作是不会被阻塞的),在当前线程持有锁的情况下继续加锁不会被阻塞,并且,加锁几次,就必须要解锁几次,否则此线程依旧持有锁。我们可以使用getHoldCount()方法查看当前线程的加锁次数:
public static void main(String[] args) throws InterruptedException {ReentrantLock lock = new ReentrantLock();lock.lock();lock.lock();System.out.println("当前加锁次数:"+lock.getHoldCount()+",是否被锁:"+lock.isLocked());TimeUnit.SECONDS.sleep(1);lock.unlock();System.out.println("当前加锁次数:"+lock.getHoldCount()+",是否被锁:"+lock.isLocked());TimeUnit.SECONDS.sleep(1);lock.unlock();System.out.println("当前加锁次数:"+lock.getHoldCount()+",是否被锁:"+lock.isLocked());
}
可以看到,当锁不再被任何线程持有时,值为0,并且通过isLocked()方法查询结果为false。
实际上,如果存在线程持有当前的锁,那么其他线程在获取锁时,是会暂时进入到等待队列的,我们可以通过getQueueLength()方法获取等待中线程数量的预估值:
public static void main(String[] args) throws InterruptedException {ReentrantLock lock = new ReentrantLock();lock.lock();Thread t1 = new Thread(lock::lock), t2 = new Thread(lock::lock);;t1.start();t2.start();TimeUnit.SECONDS.sleep(1);System.out.println("当前等待锁释放的线程数:"+lock.getQueueLength());System.out.println("线程1是否在等待队列中:"+lock.hasQueuedThread(t1));System.out.println("线程2是否在等待队列中:"+lock.hasQueuedThread(t2));System.out.println("当前线程是否在等待队列中:"+lock.hasQueuedThread(Thread.currentThread()));
}
我们可以通过hasQueuedThread()方法来判断某个线程是否正在等待获取锁状态。
同样的,Condition也可以进行判断:
public static void main(String[] args) throws InterruptedException {ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();new Thread(() -> {lock.lock();try {condition.await();} catch (InterruptedException e) {e.printStackTrace();}lock.unlock();}).start();TimeUnit.SECONDS.sleep(1);lock.lock();System.out.println("当前Condition的等待线程数:"+lock.getWaitQueueLength(condition));condition.signal();System.out.println("当前Condition的等待线程数:"+lock.getWaitQueueLength(condition));lock.unlock();
}
通过使用getWaitQueueLength()方法能够查看同一个Condition目前有多少线程处于等待状态。
公平锁与非公平锁
前面我们了解了如果线程之间争抢同一把锁,会暂时进入到等待队列中,那么多个线程获得锁的顺序是不是一定是根据线程调用lock()方法时间来定的呢,我们可以看到,ReentrantLock的构造方法中,是这样写的:
public ReentrantLock() {sync = new NonfairSync(); //看名字貌似是非公平的
}
其实锁分为公平锁和非公平锁,默认我们创建出来的ReentrantLock是采用的非公平锁作为底层锁机制。那么什么是公平锁什么又是非公平锁呢?
公平锁:多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁。
非公平锁:多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁。
简单来说,公平锁不让插队,都老老实实排着;非公平锁让插队,但是排队的人让不让你插队就是另一回事了。
我们可以来测试一下公平锁和非公平锁的表现情况:
public ReentrantLock() {sync = new NonfairSync(); //看名字貌似是非公平的
}
这里我们选择使用第二个构造方法,可以选择是否为公平锁实现:
public static void main(String[] args) throws InterruptedException {ReentrantLock lock = new ReentrantLock(false);Runnable action = () -> {System.out.println("线程 "+Thread.currentThread().getName()+" 开始获取锁...");lock.lock();System.out.println("线程 "+Thread.currentThread().getName()+" 成功获取锁!");lock.unlock();};for (int i = 0; i < 10; i++) { //建立10个线程new Thread(action, "T"+i).start();}
}
这里我们只需要对比将在1秒后开始获取锁…和成功获取锁!的顺序是否一致即可,如果是一致,那说明所有的线程都是按顺序排队获取的锁,如果不是,那说明肯定是有线程插队了。
运行结果可以发现,在公平模式下,确实是按照顺序进行的,而在非公平模式下,一般会出现这种情况:线程刚开始获取锁马上就能抢到,并且此时之前早就开始的线程还在等待状态,很明显的插队行为。
那么,接着下一个问题,公平锁在任何情况下都一定是公平的吗?有关这个问题,我们会留到队列同步器中再进行讨论。
读写锁
除了可重入锁之外,还有一种类型的锁叫做读写锁,当然它并不是专门用作读写操作的锁,它和可重入锁不同的地方在于,可重入锁是一种排他锁,当一个线程得到锁之后,另一个线程必须等待其释放锁,否则一律不允许获取到锁。而读写锁在同一时间,是可以让多个线程获取到锁的,它其实就是针对于读写场景而出现的。
读写锁维护了一个读锁和一个写锁,这两个锁的机制是不同的。
读锁:在没有任何线程占用写锁的情况下,同一时间可以有多个线程加读锁。
写锁:在没有任何线程占用读锁的情况下,同一时间只能有一个线程加写锁。
读写锁也有一个专门的接口:
public interface ReadWriteLock {//获取读锁Lock readLock();//获取写锁Lock writeLock();
}
此接口有一个实现类ReentrantReadWriteLock(实现的是ReadWriteLock接口,不是Lock接口,它本身并不是锁),注意我们操作ReentrantReadWriteLock时,不能直接上锁,而是需要获取读锁或是写锁,再进行锁操作:
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock = new ReentrantReadWriteLock();lock.readLock().lock();new Thread(lock.readLock()::lock).start();
}
这里我们对读锁加锁,可以看到可以多个线程同时对读锁加锁。
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock = new ReentrantReadWriteLock();lock.readLock().lock();new Thread(lock.writeLock()::lock).start();
}
有读锁状态下无法加写锁,反之亦然:
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock = new ReentrantReadWriteLock();lock.writeLock().lock();new Thread(lock.readLock()::lock).start();
}
并且,ReentrantReadWriteLock不仅具有读写锁的功能,还保留了可重入锁和公平/非公平机制,比如同一个线程可以重复为写锁加锁,并且必须全部解锁才真正释放锁:
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock = new ReentrantReadWriteLock();lock.writeLock().lock();lock.writeLock().lock();new Thread(() -> {lock.writeLock().lock();System.out.println("成功获取到写锁!");}).start();System.out.println("释放第一层锁!");lock.writeLock().unlock();TimeUnit.SECONDS.sleep(1);System.out.println("释放第二层锁!");lock.writeLock().unlock();
}
通过之前的例子来验证公平和非公平:
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);Runnable action = () -> {System.out.println("线程 "+Thread.currentThread().getName()+" 将在1秒后开始获取锁...");lock.writeLock().lock();System.out.println("线程 "+Thread.currentThread().getName()+" 成功获取锁!");lock.writeLock().unlock();};for (int i = 0; i < 10; i++) { //建立10个线程new Thread(action, "T"+i).start();}
}
可以看到,结果是一致的。
锁降级和锁升级
锁降级指的是写锁降级为读锁。当一个线程持有写锁的情况下,虽然其他线程不能加读锁,但是线程自己是可以加读锁的:
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock = new ReentrantReadWriteLock();lock.writeLock().lock();lock.readLock().lock();System.out.println("成功加读锁!");
}
那么,如果我们在同时加了写锁和读锁的情况下,释放写锁,是否其他的线程就可以一起加读锁了呢?
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock = new ReentrantReadWriteLock();lock.writeLock().lock();lock.readLock().lock();new Thread(() -> {System.out.println("开始加读锁!");lock.readLock().lock();System.out.println("读锁添加成功!");}).start();TimeUnit.SECONDS.sleep(1);lock.writeLock().unlock(); //如果释放写锁,会怎么样?
}
可以看到,一旦写锁被释放,那么主线程就只剩下读锁了,因为读锁可以被多个线程共享,所以这时第二个线程也添加了读锁。而这种操作,就被称之为"锁降级"(注意不是先释放写锁再加读锁,而是持有写锁的情况下申请读锁再释放写锁)
注意在仅持有读锁的情况下去申请写锁,属于"锁升级",ReentrantReadWriteLock是不支持的:
public static void main(String[] args) throws InterruptedException {ReentrantReadWriteLock lock = new ReentrantReadWriteLock();lock.readLock().lock();lock.writeLock().lock();System.out.println("所升级成功!");
}
可以看到线程直接卡在加写锁的那一句了。
队列同步器AQS
注意:难度巨大,如果对锁的使用不是很熟悉建议之后再来看!
前面我们了解了可重入锁和读写锁,那么它们的底层实现原理到底是什么样的呢?又是大家看到就想跳过的套娃解析环节。
比如我们执行了ReentrantLock的lock()方法,那它的内部是怎么在执行的呢?
public void lock() {sync.lock();
}
可以看到,它的内部实际上啥都没做,而是交给了Sync对象在进行,并且,不只是这个方法,其他的很多方法都是依靠Sync对象在进行:
public void unlock() {sync.release(1);
}
那么这个Sync对象是干什么的呢?可以看到,公平锁和非公平锁都是继承自Sync,而Sync是继承自AbstractQueuedSynchronizer,简称队列同步器:
abstract static class Sync extends AbstractQueuedSynchronizer {//...
}static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
所以,要了解它的底层到底是如何进行操作的,还得看队列同步器,我们就先从这里下手吧!
底层实现
AbstractQueuedSynchronizer(下面称为AQS)是实现锁机制的基础,它的内部封装了包括锁的获取、释放、以及等待队列。
一个锁(排他锁为例)的基本功能就是获取锁、释放锁、当锁被占用时,其他线程来争抢会进入等待队列,AQS已经将这些基本的功能封装完成了,其中等待队列是核心内容,等待队列是由双向链表数据结构实现的,每个等待状态下的线程都可以被封装进结点中并放入双向链表中,而对于双向链表是以队列的形式进行操作的,它像这样:
AQS中有一个head字段和一个tail字段分别记录双向链表的头结点和尾结点,而之后的一系列操作都是围绕此队列来进行的。我们先来了解一下每个结点都包含了哪些内容:
//每个处于等待状态的线程都可以是一个节点,并且每个节点是有很多状态的
static final class Node {//每个节点都可以被分为独占模式节点或是共享模式节点,分别适用于独占锁和共享锁static final Node SHARED = new Node();static final Node EXCLUSIVE = null;//等待状态,这里都定义好了//唯一一个大于0的状态,表示已失效,可能是由于超时或中断,此节点被取消。static final int CANCELLED = 1;//此节点后面的节点被挂起(进入等待状态)static final int SIGNAL = -1; //在条件队列中的节点才是这个状态static final int CONDITION = -2;//传播,一般用于共享锁static final int PROPAGATE = -3;volatile int waitStatus; //等待状态值volatile Node prev; //双向链表基操volatile Node next;volatile Thread thread; //每一个线程都可以被封装进一个节点进入到等待队列Node nextWaiter; //在等待队列中表示模式,条件队列中作为下一个结点的指针final boolean isShared() {return nextWaiter == SHARED;}final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() {}Node(Thread thread, Node mode) {this.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) {this.waitStatus = waitStatus;this.thread = thread;}
}
在一开始的时候,head和tail都是null,state为默认值0:
private transient volatile Node head;private transient volatile Node tail;private volatile int state;
不用担心双向链表不会进行初始化,初始化是在实际使用时才开始的,先不管,我们接着来看其他的初始化内容:
//直接使用Unsafe类进行操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
//记录类中属性的在内存中的偏移地址,方便Unsafe类直接操作内存进行赋值等(直接修改对应地址的内存)
private static final long stateOffset; //这里对应的就是AQS类中的state成员字段
private static final long headOffset; //这里对应的就是AQS类中的head头结点成员字段
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;static { //静态代码块,在类加载的时候就会自动获取偏移地址try {stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }
}//通过CAS操作来修改头结点
private final boolean compareAndSetHead(Node update) {//调用的是Unsafe类的compareAndSwapObject方法,通过CAS算法比较对象并替换return unsafe.compareAndSwapObject(this, headOffset, null, update);
}//同上,省略部分代码
private final boolean compareAndSetTail(Node expect, Node update) {private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
可以发现,队列同步器由于要使用到CAS算法,所以,直接使用了Unsafe工具类,Unsafe类中提供了CAS操作的方法(Java无法实现,底层由C++实现)所有对AQS类中成员字段的修改,都有对应的CAS操作封装。
现在我们大致了解了一下它的底层运作机制,我们接着来看这个类是如何进行使用的,它提供了一些可重写的方法(根据不同的锁类型和机制,可以自由定制规则,并且为独占式和非独占式锁都提供了对应的方法),以及一些已经写好的模板方法(模板方法会调用这些可重写的方法),使用此类只需要将可重写的方法进行重写,并调用提供的模板方法,从而实现锁功能(学习过设计模式会比较好理解一些)
我们首先来看可重写方法:
//独占式获取同步状态,查看同步状态是否和参数一致,如果返没有问题,那么会使用CAS操作设置同步状态并返回true
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}//独占式释放同步状态
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}//共享式获取同步状态,返回值大于0表示成功,否则失败
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}//共享式释放同步状态
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}//是否在独占模式下被当前线程占用(锁是否被当前线程持有)
protected boolean isHeldExclusively() {throw new UnsupportedOperationException();
}
可以看到,这些需要重写的方法默认是直接抛出UnsupportedOperationException,也就是说根据不同的锁类型,我们需要去实现对应的方法,我们可以来看一下ReentrantLock(此类是全局独占式的)中的公平锁是如何借助AQS实现的:
static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;//加锁操作调用了模板方法acquire//为了防止各位绕晕,请时刻记住,lock方法一定是在某个线程下为了加锁而调用的,并且同一时间可能会有其他线程也在调用此方法final void lock() {acquire(1);}...
}
我们先看看加锁操作干了什么事情,这里直接调用了AQS提供的模板方法acquire(),我们来看看它在AQS类中的实现细节:
@ReservedStackAccess //这个是JEP 270添加的新注解,它会保护被注解的方法,通过添加一些额外的空间,防止在多线程运行的时候出现栈溢出,下同
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //节点为独占模式Node.EXCLUSIVEselfInterrupt();
}
首先会调用tryAcquire()方法(这里是由FairSync类实现的),如果尝试加独占锁失败(返回false了)说明可能这个时候有其他线程持有了此独占锁,所以当前线程得先等着,那么会调用addWaiter()方法将线程加入等待队列中:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 先尝试使用CAS直接入队,如果这个时候其他线程也在入队(就是不止一个线程在同一时间争抢这把锁)就进入enq()Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//此方法是CAS快速入队失败时调用enq(node);return node;
}private Node enq(final Node node) {//自旋形式入队,可以看到这里是一个无限循环for (;;) {Node t = tail;if (t == null) { //这种情况只能说明头结点和尾结点都还没初始化if (compareAndSetHead(new Node())) //初始化头结点和尾结点tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t; //只有CAS成功的情况下,才算入队成功,如果CAS失败,那说明其他线程同一时间也在入队,并且手速还比当前线程快,刚好走到CAS操作的时候,其他线程就先入队了,那么这个时候node.prev就不是我们预期的节点了,而是另一个线程新入队的节点,所以说得进下一次循环再来一次CAS,这种形式就是自旋}
在了解了addWaiter()方法会将节点加入等待队列之后,我们接着来看,addWaiter()会返回已经加入的节点,acquireQueued()在得到返回的节点时,也会进入自旋状态,等待唤醒(也就是开始进入到拿锁的环节了):
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) { //可以看到当此节点位于队首(node.prev == head)时,会再次调用tryAcquire方法获取锁,如果获取成功,会返回此过程中是否被中断的值setHead(node); //新的头结点设置为当前结点p.next = null; // 原有的头结点没有存在的意义了failed = false; //没有失败return interrupted; //直接返回等待过程中是否被中断} //依然没获取成功,if (shouldParkAfterFailedAcquire(p, node) && //将当前节点的前驱节点等待状态设置为SIGNAL,如果失败将直接开启下一轮循环,直到成功为止,如果成功接着往下parkAndCheckInterrupt()) //挂起线程进入等待状态,等待被唤醒,如果在等待状态下被中断,那么会返回true,直接将中断标志设为true,否则就是正常唤醒,继续自旋interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}private final boolean parkAndCheckInterrupt() {LockSupport.park(this); //通过unsafe类操作底层挂起线程(会直接进入阻塞状态)return Thread.interrupted();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)return true; //已经是SIGNAL,直接trueif (ws > 0) { //不能是已经取消的节点,必须找到一个没被取消的do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node; //直接抛弃被取消的节点} else {//不是SIGNAL,先CAS设置为SIGNAL(这里没有返回true因为CAS不一定成功,需要下一轮再判断一次)compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; //返回false,马上开启下一轮循环
}
所以,acquire()中的if条件如果为true,那么只有一种情况,就是等待过程中被中断了,其他任何情况下都是成功获取到独占锁,所以当等待过程被中断时,会调用selfInterrupt()方法:
static void selfInterrupt() {Thread.currentThread().interrupt();
}
这里就是直接向当前线程发送中断信号了。
上面提到了LockSupport类,它是一个工具类,我们也可以来玩一下这个park和unpark:
public static void main(String[] args) throws InterruptedException {Thread t = Thread.currentThread(); //先拿到主线程的Thread对象new Thread(() -> {try {TimeUnit.SECONDS.sleep(1);System.out.println("主线程可以继续运行了!");LockSupport.unpark(t);//t.interrupt(); 发送中断信号也可以恢复运行} catch (InterruptedException e) {e.printStackTrace();}}).start();System.out.println("主线程被挂起!");LockSupport.park();System.out.println("主线程继续运行!");
}
这里我们就把公平锁的lock()方法实现讲解完毕了(让我猜猜,已经晕了对吧,越是到源码越考验个人的基础知识掌握,基础不牢地动山摇)接着我们来看公平锁的tryAcquire()方法:
static final class FairSync extends Sync {//可重入独占锁的公平实现@ReservedStackAccessprotected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread(); //先获取当前线程的Thread对象int c = getState(); //获取当前AQS对象状态(独占模式下0为未占用,大于0表示已占用)if (c == 0) { //如果是0,那就表示没有占用,现在我们的线程就要来尝试占用它if (!hasQueuedPredecessors() && //等待队列是否不为空且当前线程没有拿到锁,其实就是看看当前线程有没有必要进行排队,如果没必要排队,就说明可以直接获取锁compareAndSetState(0, acquires)) { //CAS设置状态,如果成功则说明成功拿到了这把锁,失败则说明可能这个时候其他线程在争抢,并且还比你先抢到setExclusiveOwnerThread(current); //成功拿到锁,会将独占模式所有者线程设定为当前线程(这个方法是父类AbstractOwnableSynchronizer中的,就表示当前这把锁已经是这个线程的了)return true; //占用锁成功,返回true}}else if (current == getExclusiveOwnerThread()) { //如果不是0,那就表示被线程占用了,这个时候看看是不是自己占用的,如果是,由于是可重入锁,可以继续加锁int nextc = c + acquires; //多次加锁会将状态值进行增加,状态值就是加锁次数if (nextc < 0) //加到int值溢出了?throw new Error("Maximum lock count exceeded");setState(nextc); //设置为新的加锁次数return true;}return false; //其他任何情况都是加锁失败}
}
在了解了公平锁的实现之后,是不是感觉有点恍然大悟的感觉,虽然整个过程非常复杂,但是只要理清思路,还是比较简单的。
加锁过程已经OK,我们接着来看,它的解锁过程,unlock()方法是在AQS中实现的:
public void unlock() {sync.release(1); //直接调用了AQS中的release方法,参数为1表示解锁一次state值-1
}
@ReservedStackAccess
public final boolean release(int arg) {if (tryRelease(arg)) { //和tryAcquire一样,也得子类去重写,释放锁操作Node h = head; //释放锁成功后,获取新的头结点if (h != null && h.waitStatus != 0) //如果新的头结点不为空并且不是刚刚建立的结点(初始状态下status为默认值0,而上面在进行了shouldParkAfterFailedAcquire之后,会被设定为SIGNAL状态,值为-1)unparkSuccessor(h); //唤醒头节点下一个节点中的线程return true;}return false;
}
private void unparkSuccessor(Node node) {// 将等待状态waitStatus设置为初始值0int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//获取下一个结点Node s = node.next;if (s == null || s.waitStatus > 0) { //如果下一个结点为空或是等待状态是已取消,那肯定是不能通知unpark的,这时就要遍历所有节点再另外找一个符合unpark要求的节点了s = null;for (Node t = tail; t != null && t != node; t = t.prev) //这里是从队尾向前,因为enq()方法中的t.next = node是在CAS之后进行的,而 node.prev = t 是CAS之前进行的,所以从后往前一定能够保证遍历所有节点if (t.waitStatus <= 0)s = t;}if (s != null) //要是找到了,就直接unpark,要是还是没找到,那就算了LockSupport.unpark(s.thread);
}
那么我们来看看tryRelease()方法是怎么实现的,具体实现在Sync中:
@ReservedStackAccess
protected final boolean tryRelease(int releases) {int c = getState() - releases; //先计算本次解锁之后的状态值if (Thread.currentThread() != getExclusiveOwnerThread()) //因为是独占锁,那肯定这把锁得是当前线程持有才行throw new IllegalMonitorStateException(); //否则直接抛异常boolean free = false;if (c == 0) { //如果解锁之后的值为0,表示已经完全释放此锁free = true;setExclusiveOwnerThread(null); //将独占锁持有线程设置为null}setState(c); //状态值设定为creturn free; //如果不是0表示此锁还没完全释放,返回false,是0就返回true
}
综上,我们来画一个完整的流程图:
这里我们只讲解了公平锁,有关非公平锁和读写锁,还请各位观众根据我们之前的思路,自行解读。
公平锁一定公平吗?
前面我们讲解了公平锁的实现原理,那么,我们尝试分析一下,在并发的情况下,公平锁一定公平吗?
我们再次来回顾一下tryAcquire()方法的实现:
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() && //注意这里,公平锁的机制是,一开始会查看是否有节点处于等待compareAndSetState(0, acquires)) { //如果前面的方法执行后发现没有等待节点,就直接进入占锁环节了setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
所以hasQueuedPredecessors()这个环节容不得半点闪失,否则会直接破坏掉公平性,假如现在出现了这样的情况:
线程1已经持有锁了,这时线程2来争抢这把锁,走到hasQueuedPredecessors(),判断出为 false,线程2继续运行,然后线程2肯定获取锁失败(因为锁这时是被线程1占有的),因此就进入到等待队列中:
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // 线程2进来之后,肯定是要先走这里的,因为head和tail都是nullif (compareAndSetHead(new Node()))tail = head; //这里就将tail直接等于head了,注意这里完了之后还没完,这里只是初始化过程} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) { //由于一开始head和tail都是null,所以线程2直接就进enq()了node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node); //请看上面return node;
}
而碰巧不巧,这个时候线程3也来抢锁了,按照正常流程走到了hasQueuedPredecessors()方法,而在此方法中:
public final boolean hasQueuedPredecessors() {Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;//这里直接判断h != t,而此时线程2才刚刚执行完 tail = head,所以直接就返回false了return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}
因此,线程3这时就紧接着准备开始CAS操作了,又碰巧,这时线程1释放锁了,现在的情况就是,线程3直接开始CAS判断,而线程2还在插入节点状态,结果可想而知,居然是线程3先拿到了锁,这显然是违背了公平锁的公平机制。
一张图就是:
因此公不公平全看hasQueuedPredecessors(),而此方法只有在等待队列中存在节点时才能保证不会出现问题。所以公平锁,只有在等待队列存在节点时,才是真正公平的。
Condition实现原理
通过前面的学习,我们知道Condition类实际上就是用于代替传统对象的wait/notify操作的,同样可以实现等待/通知模式,并且同一把锁下可以创建多个Condition对象。那么我们接着来看看,它又是如何实现的呢,我们先从单个Condition对象进行分析:
在AQS中,Condition有一个实现类ConditionObject,而这里也是使用了链表实现了条件队列:
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** 条件队列的头结点 */private transient Node firstWaiter;/** 条件队列的尾结点 */private transient Node lastWaiter;//...}
这里是直接使用了AQS中的Node类,但是使用的是Node类中的nextWaiter字段连接节点,并且Node的status为CONDITION:
我们知道,当一个线程调用await()方法时,会进入等待状态,直到其他线程调用signal()方法将其唤醒,而这里的条件队列,正是用于存储这些处于等待状态的线程。
我们先来看看最关键的await()方法是如何实现的,为了防止一会绕晕,在开始之前,我们先明确此方法的目标:
只有已经持有锁的线程才可以使用此方法
当调用此方法后,会直接释放锁,无论加了多少次锁
只有其他线程调用signal()或是被中断时才会唤醒等待中的线程
被唤醒后,需要等待其他线程释放锁,拿到锁之后才可以继续执行,并且会恢复到之前的状态(await之前加了几层锁唤醒后依然是几层锁)
好了,差不多可以上源码了:
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException(); //如果在调用await之前就被添加了中断标记,那么会直接抛出中断异常Node node = addConditionWaiter(); //为当前线程创建一个新的节点,并将其加入到条件队列中int savedState = fullyRelease(node); //完全释放当前线程持有的锁,并且保存一下state值,因为唤醒之后还得恢复int interruptMode = 0; //用于保存中断状态while (!isOnSyncQueue(node)) { //循环判断是否位于同步队列中,如果等待状态下的线程被其他线程唤醒,那么会正常进入到AQS的等待队列中(之后我们会讲)LockSupport.park(this); //如果依然处于等待状态,那么继续挂起if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //看看等待的时候是不是被中断了break;}//出了循环之后,那线程肯定是已经醒了,这时就差拿到锁就可以恢复运行了if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //直接开始acquireQueued尝试拿锁(之前已经讲过了)从这里开始基本就和一个线程去抢锁是一样的了interruptMode = REINTERRUPT;//已经拿到锁了,基本可以开始继续运行了,这里再进行一下后期清理工作if (node.nextWaiter != null) unlinkCancelledWaiters(); //将等待队列中,不是Node.CONDITION状态的节点移除if (interruptMode != 0) //依然是响应中断reportInterruptAfterWait(interruptMode);//OK,接着该干嘛干嘛
}
实际上await()方法比较中规中矩,大部分操作也在我们的意料之中,那么我们接着来看signal()方法是如何实现的,同样的,为了防止各位绕晕,先明确signal的目标:
只有持有锁的线程才能唤醒锁所属的Condition等待的线程
优先唤醒条件队列中的第一个,如果唤醒过程中出现问题,接着找往下找,直到找到一个可以唤醒的
唤醒操作本质上是将条件队列中的结点直接丢进AQS等待队列中,让其参与到锁的竞争中
拿到锁之后,线程才能恢复运行
好了,上源码:
public final void signal() {if (!isHeldExclusively()) //先看看当前线程是不是持有锁的状态throw new IllegalMonitorStateException(); //不是?那你不配唤醒别人Node first = firstWaiter; //获取条件队列的第一个结点if (first != null) //如果队列不为空,获取到了,那么就可以开始唤醒操作doSignal(first);
}
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null) //如果当前节点在本轮循环没有后继节点了,条件队列就为空了lastWaiter = null; //所以这里相当于是直接清空first.nextWaiter = null; //将给定节点的下一个结点设置为null,因为当前结点马上就会离开条件队列了} while (!transferForSignal(first) && //接着往下看(first = firstWaiter) != null); //能走到这里只能说明给定节点被设定为了取消状态,那就继续看下一个结点
}
final boolean transferForSignal(Node node) {/** 如果这里CAS失败,那有可能此节点被设定为了取消状态*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;//CAS成功之后,结点的等待状态就变成了默认值0,接着通过enq方法直接将节点丢进AQS的等待队列中,相当于唤醒并且可以等待获取锁了//这里enq方法返回的是加入之后等待队列队尾的前驱节点,就是原来的tailNode p = enq(node);int ws = p.waitStatus; //保存前驱结点的等待状态//如果上一个节点的状态为取消, 或者尝试设置上一个节点的状态为SIGNAL失败(可能是在ws>0判断完之后马上变成了取消状态,导致CAS失败)if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread); //直接唤醒线程return true;
}
其实最让人不理解的就是倒数第二行,明明上面都正常进入到AQS等待队列了,应该是可以开始走正常流程了,那么这里为什么还要提前来一次unpark呢?
这里其实是为了进行优化而编写,直接unpark会有两种情况:
● 如果插入结点前,AQS等待队列的队尾节点就已经被取消,则满足wc > 0
● 如果插入node后,AQS内部等待队列的队尾节点已经稳定,满足tail.waitStatus == 0,但在执行ws >0之后!compareAndSetWaitStatus(p, ws,Node.SIGNAL)之前被取消,则CAS也会失败,满足compareAndSetWaitStatus(p, ws,Node.SIGNAL) == false
如果这里被提前unpark,那么在await()方法中将可以被直接唤醒,并跳出while循环,直接开始争抢锁,因为前一个等待结点是被取消的状态,没有必要再等它了。
所以,大致流程下:
只要把整个流程理清楚,还是很好理解的。
自行实现锁类
既然前面了解了那么多AQS的功能,那么我就仿照着这些锁类来实现一个简单的锁:
要求:同一时间只能有一个线程持有锁,不要求可重入(反复加锁无视即可)
public class Main {public static void main(String[] args) throws InterruptedException {}/*** 自行实现一个最普通的独占锁* 要求:同一时间只能有一个线程持有锁,不要求可重入*/private static class MyLock implements Lock {/*** 设计思路:* 1. 锁被占用,那么exclusiveOwnerThread应该被记录,并且state = 1* 2. 锁没有被占用,那么exclusiveOwnerThread为null,并且state = 0*/private static class Sync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire(int arg) {if(isHeldExclusively()) return true; //无需可重入功能,如果是当前线程直接返回trueif(compareAndSetState(0, arg)){ //CAS操作进行状态替换setExclusiveOwnerThread(Thread.currentThread()); //成功后设置当前的所有者线程return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {if(getState() == 0)throw new IllegalMonitorStateException(); //没加锁情况下是不能直接解锁的if(isHeldExclusively()){ //只有持有锁的线程才能解锁setExclusiveOwnerThread(null); //设置所有者线程为nullsetState(0); //状态变为0return true;}return false;}@Overrideprotected boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}protected Condition newCondition(){return new ConditionObject(); //直接用现成的}}private final Sync sync = new Sync();@Overridepublic void lock() {sync.acquire(1);}@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {return sync.tryAcquire(1);}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Overridepublic void unlock() {sync.release(1);}@Overridepublic Condition newCondition() {return sync.newCondition();}}
}
到这里,我们对应队列同步器AQS的讲解就先到此为止了,当然,AQS的全部机制并非仅仅只有我们讲解的内容,大家可以自行探索