CountDownLatch、Semaphore详解——深入探究CountDownLatch、Semaphore源码

news/2025/3/30 15:32:36/

这篇文章将会详细介绍基于AQS实现的两个并发类CountDownLatch和Semaphore,通过深入底层源代码讲解其具体实现。

目录

CountDownLatch

 countDown()

 await()

Semaphore

 Semaphore类图

 Semaphore的应用场景

 acquire()

 tryAcquire()


CountDownLatch

/*** A synchronization aid that allows one or more threads to wait until* a set of operations being performed in other threads completes.*/

上面是CountDownLatch这个API的前两行注释,一句话已经说明了这个类的作用。

一种同步辅助工具,允许一个或多个线程等待其他线程正在执行的一组操作完成。

CountDownLatch是一个计数器,通过的构造方法指定初始计数器的值,调用CountDownLatch的countDown()方法让计数器的值减少1,当计数器的值变成0时,调用它的await()方法阻塞的当前线程会被释放(其实就是从for循环中结束返回),继续执行剩余的代码。

这个类的结构很简单,就不画类图了,直接贴出代码

package java.util.concurrent;import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class CountDownLatch {private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}private final Sync sync;public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public void countDown() {sync.releaseShared(1);}public long getCount() {return sync.getCount();}public String toString() {return super.toString() + "[Count = " + sync.getCount() + "]";}}

CountDownLatch是通过AQS的实现类Sync来实现这个计数器功能的,通过AQS的state属性保存计数器的值。

因为这里的计数器值state是共享的,所以重写了AQS的tryAcquireShared()和tryReleaseShared()方法。

为什么要重写这两个方法呢?

因为AQS里的几个重要的方法aquire()和release()会根据当前是独占模式还是共享模式去调用对应的tryAcquire()/tryAcquireShared()、tryRelease()/tryReleaseShared()方法。

而Java已经约定了,继承AQS应该指明state属性的语义:

- 在CountDownLatch中,state值表示计数器的值;

- 在Semaphore中,state值表示许可证的数量;

 

接下来,介绍CountDownLatch中的两个最重要的方法:

 countDown()

让计数器的值减1

    public void countDown() {sync.releaseShared(1);}

 在这个方法里调用了releaseShared()方法,因为静态内部类Sync没有重写这个方法,所以调用的是超类AbstractQueuedSynchronizer的方法。

    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

这个方法里先调用tryReleaseShared()方法,因为Sync重写了这个方法,所以调用的是Sync的方法。

        protected boolean tryReleaseShared(int releases) {for (;;) {// 获取state值,如果是0,直接返回falseint c = getState();if (c == 0) {return false;}// 通过Unsafe工具类的CAS方法尝试修改state的值为state - 1int nextc = c - 1;if (compareAndSetState(c, nextc)) {// 修改完成之后,根据修改之前的state值是否为1返回true或falsereturn nextc == 0;}}}

 await()

阻塞当前线程,直到state值变成0才唤醒。

    public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}

在方法内部调用了AQS的acquireSharedInterruptibly()方法,Interruptibly后缀的方法都可以被中断。

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

在这个方法中,先调用Thread类的静态方法interrupted()判断当前线程是否被中断,如果当前线程中断状态为true,则清除线程的中断标记并返回true。否则返回false。如果当前线程是被中断的状态,则抛出中断异常返回。关于interrupted()方法的详细介绍,请参考文章

interrupt()、interrupted()和isInterruptd()的区别icon-default.png?t=N7T8https://blog.csdn.net/heyl163_/article/details/132138422如果当前线程是正常的状态,调用tryAcquireShared()方法,因为Sync重写了这个方法,所以调用的是Sync的tryAcquireShared()方法。这个方法就是判断当前state属性值是否为0,如果不是0就返回-1,否则返回1

        protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}

继续回到上面的代码,当state不为0时,会执行doAcquireSharedInterruptibly()方法,注意,这里的一个死循环for会让当前线程一直阻塞在这里,直到tryAcquireShared()获取到的返回值大于0,也就是1时才退出循环,而根据上面的方法,此时state值为0。

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

总结:这里的等待是通过无条件的for循环让当前线程一直等待,直到state的值变为0才退出循环返回。所以,在这里可以替代线程的join()方法使用,这也是CountDownLatch的主要用途。

Semaphore

A counting semaphore.Conceptually, a semaphore maintains a set of permits. Each acquire blocks if necessary until a permit is available, and then takes it. Each release adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

一个计数器信号量,一个Semaphore包含一组许可证的集合,每个获取器都会在必要时阻塞,直到有许可证可用,然后获取它。每次释放都会增加一个许可,从而有可能释放阻塞的获取者。不过, Semaphore 并不使用实际的许可证对象;它只是对可用数量进行计数,并采取相应的行动。

以上是Semaphore的简单介绍,在源码的注释最开头就能看到。。

 Semaphore类图

 Semaphore的应用场景

接下来通过一个案例来介绍Semaphore适用于什么场景

在广州过节和朋友去吃火锅就知道,桌子数量是固定的,一个店就那么大,就只有x桌。去到店里基本上是要排队的,要等取号在自己的号数前面的人吃完出来才能空出来桌位,否则就要在那里一直等。

上面的场景,假如店里刚好坐满,这时候突然来了1对情侣,这时候因为店里的人还没吃完,要等最少一桌的人用完餐之后出来才能轮到这队情侣用餐。

其实这里的操作就是acquire(),店里的桌数就是Semaphore的初始许可数。

 acquire()

public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}

方法内部调用了以下方法,当线程被中断时,清除中断标记,抛出中断异常。否则,调用tryAcquireShared()方法尝试得到一个许可证。

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

这里会根据调用构造方法指定的公平锁还是非公平锁调用FairSync或NoneFairSync的tryAcquireShared()方法。

如果是使用一个参数的构造方法实例化,则使用默认的非公平锁,否则根据参数fair来决定是否公平锁。

public Semaphore(int permits) {sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

以默认的NoneFairSync为例,tryAcquireShared()方法会调用其超类Sync中定义的nonfairTryAcquireShared()方法。

protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}
}

nonfairTryAcquireShared()方法中会一直尝试去扣减AQS中的state值,也就是信号量中的许可证的数量。

回到acquireSharedInterruptibly()方法,当我们尝试扣减state之后state小于0,也就是许可证数量不够了,就会执行doAcquireSharedInterruptibly()方法,这个方法在CountDownLatch里也讲了,主线程会一直在for循环里出不去,相当于阻塞在这里了,直到申请许可证成功,也就是上面的方法返回了不小于0的数。

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

 tryAcquire()

尝试获取许可证,如果许可证数量足够,则返回true,否则返回false。

public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;
}

好了,文章就分享到这里了,看完不要忘了点赞+收藏哦~


http://www.ppmy.cn/news/1074025.html

相关文章

C++设计模式_02_面向对象设计原则

文章目录 1. 面向对象设计&#xff0c;为什么&#xff1f;2. 重新认识面向对象3. 面向对象设计原则3.1 依赖倒置原则(DIP)3.2 开放封闭原则(OCP )3.3 单一职责原则( SRP )3.4 Liskov 替换原则 ( LSP )3.5 接口隔离原则 ( ISP )3.6 优先使用对象组合&#xff0c;而不是类继承3.7…

QT 常用类与组件

0 思维导图 1 信息调试类&#xff08;QDebug&#xff09; #include "widget.h" #include<iostream> //printf #include<QDebug> //qDebuf using namespace std; //coutWidget::Widget(QWidget *parent): QWidget(parent) {//输出函数//使用…

Linux学习之RAID删除

参考《Linux软件raid删除》 我部署 RAID的步骤在《Linux学习之RAID》 sudo umount /dev/md0先进行卸载。 sudo mdadm -S /dev/md0停止/dev/md0。 sudo mdadm -A -s /dev/md0可以重新开始/dev/md0&#xff0c;这里只是拓展一下。 sudo mdadm -S /dev/md0停止/dev/md0。 s…

【Go 基础篇】Go语言结构体基本使用

在Go语言中&#xff0c;结构体是一种重要的数据类型&#xff0c;用于定义和组织一组不同类型的数据字段。结构体允许开发者创建自定义的复合数据类型&#xff0c;类似于其他编程语言中的类。本文将深入探讨Go语言中结构体的定义、初始化、嵌套、方法以及与其他语言的对比&#…

com.squareup.okhttp3:okhttp 组件安全漏洞及健康度分析

组件简介 维护者square组织许可证类型Apache License 2.0首次发布2016 年 1 月 2 日最新发布时间2023 年 4 月 23 日GitHub Star44403GitHub Fork9197依赖包5,582依赖存储库77,217 com.squareup.okhttp3:okhttp 一个开源的 HTTP 客户端库&#xff0c;可以用于 Android 和 Jav…

Java XPath 使用(2023/08/29)

Java XPath 使用&#xff08;2023/08/29&#xff09; 文章目录 Java XPath 使用&#xff08;2023/08/29&#xff09;1. 前言2. 技术选型3. 技术实现 1. 前言 众所周知&#xff0c;Java 语言适合应用于 Web 开发领域&#xff0c;不擅长用来编写爬虫。但在 Web 开发过程中有时又…

代码随想录28| 122.买卖股票的最佳时机II, 55. 跳跃游戏, 45.跳跃游戏II

122.买卖股票的最佳时机II 链接地址 class Solution { public:int maxProfit(vector<int>& prices) {int result 0;for (int i 1; i < prices.size(); i) {int temp prices[i] - prices[i - 1];if (temp > 0) {result temp; }}return result;} };55. 跳跃游…

【数据结构与算法 三】常见数据结构与算法组合应用方式

一般的数据结构和对应的 很抱歉,作为一个文本AI模型,我无法直接绘制图表,但我可以为您列出常见的算法和数据结构分类,并为每个分类提供简要说明。您可以根据这些信息自行绘制图表。 算法分类: 搜索算法:用于在数据集中查找特定元素的算法,如线性搜索、二分搜索等。 排…