九:深入理解 CountDownLatch —— 闭锁/倒计时锁

devtools/2024/9/19 18:38:57/ 标签: 面试, 职场和发展

目录

  • 1、背景
  • 2、CountDownLatch 入门
    • 2.1、概念
    • 2.2、案例
  • 3、CountDownLatch 源码分析
    • 3.1、类结构
    • 3.2、`await()` 方法 —— CountDownLatch
      • 3.2.1、`acquireSharedInterruptibly()` 方法 —— AQS
        • 3.2.1.1、`tryAcquireShared()` 方法 —— CountDownLatch.Sync
        • 3.2.1.2、`doAcquireSharedInterruptibly()` 方法 —— AQS
          • 3.2.1.2.1、`setHeadAndPropagate()` 方法 —— AQS
            • 3.2.1.2.1.1、`doReleaseShared()` 方法 —— AQS
    • 3.3、`countDown()` 方法 —— CountDownLatch
      • 3.3.1、`releaseShared()` 方法 —— AQS
        • 3.3.1.1、`tryReleaseShared()` 方法 —— CountDownLatch.Sync
      • 3.3.2、`doReleaseShared()` 方法 —— AQS
  • 4、应用案例
  • 5、总结

1、背景

先看一个常见的面试题:

如何实现让主线程等所有子线程执行完了后,主线程再继续执行?即:如何实现一个线程等其他线程执行完了后再继续执行?

这里我们可以使用 Thread#join() 方法实现。

Thread#join() 方法的实现原理:join() 方法内部,通常有一个循环结构,循环条件为 targetThread.isAlive(),即:目标线程是否仍然存活。当目标线程尚未结束时,当前线程会进入循环体内部调用 wait() 方法进行等待(释放锁);当目标线程在其 run() 方法执行完毕后,其生命周期状态变为已终止(TERMINATED),并自动调用 notifyAll() 方法【JVM 底层】,会唤醒所有因为调用 wait() 而在目标线程对象上等待的线程,包括通过 join() 方法暂停的当前线程

public static void main(String[] args) throws InterruptedException {Runnable task = () -> {Random random = new Random();try {Thread.sleep(random.nextInt(10000) + 1000);} catch (InterruptedException e) {e.printStackTrace();}};Thread thread1 = new Thread(task, "线程1");Thread thread2 = new Thread(task, "线程2");Thread thread3 = new Thread(task, "线程3");thread1.start();thread2.start();thread3.start();// 启动了3个线程,然后让四个线程一直检测自己是否已经结束thread1.join();thread2.join();thread3.join();System.out.println("主线程继续执行...");
}

这种方式虽然能够解决问题,但是有些不尽人意的地方:每个线程都得调用 join() 方法。有没有更好的方法呢?

这个时候并发工具类 CountDownLatch 来了

2、CountDownLatch 入门

2.1、概念

CountDownLatch:JDK1.5 提供的一个同步工具,基于 AQS 构建同步器【共享模式】。它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作;适用于在多线程的场景需要等待所有子线程全部执行完毕之后再做操作的场景

CountDownLatch 可以理解为并发计数器:当一个任务被拆分成多个子任务时,需要等待子任务全部完成后再操作,不然会阻塞线程(当前线程),每完成一个任务计数器会 -1,直到没有。

【注意】:一般用作多线程倒计时计数器,强制它们等待其他一组任务,计数器的减法是一个不可逆的过程。即:计数器值递减到 0 的时候,不能再复原。

接下来用 CountDownLatch 完成上述案例

2.2、案例

public static void main(String[] args) throws InterruptedException {int threadCount = 3;CountDownLatch countDownLatch = new CountDownLatch(threadCount);Runnable task = () -> {System.out.println(Thread.currentThread().getName() + " 线程开始");Random random = new Random();try {Thread.sleep(random.nextInt(10000) + 1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println( Thread.currentThread().getName() + " 线程执行完毕");countDownLatch.countDown();};for (int i = 0; i < threadCount; i++) {new Thread(task, "线程" + i).start();}countDownLatch.await();System.out.println("主线程继续执行...");
}

3、CountDownLatch 源码分析

3.1、类结构

public class CountDownLatch {private final Sync sync;public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public void countDown() {sync.releaseShared(1);}// 内部类:使用了 state 计数private static final class Sync extends AbstractQueuedSynchronizer {Sync(int count) {setState(count);}protected boolean tryReleaseShared(int releases) {//...}protected int tryAcquireShared(int acquires) {//...}}
}

3.2、await() 方法 —— CountDownLatch

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}

3.2.1、acquireSharedInterruptibly() 方法 —— AQS

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted()) {throw new InterruptedException();}// 判断计数器 state 是否等于 0if (tryAcquireShared(arg) < 0) {// 如果 state > 0 ,则添加到同步等待队列中doAcquireSharedInterruptibly(arg);}
}

acquireSharedInterruptibly() 方法:共享模式下可中断地获取锁方法。如果计数器 state 为 0,则跳过逻辑【调用者不用阻塞,可继续往下执行】;否则,将此线程添加到同步等待队列中

3.2.1.1、tryAcquireShared() 方法 —— CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
3.2.1.2、doAcquireSharedInterruptibly() 方法 —— AQS
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {// 【共享模式】节点入同步队列final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 自旋for (;;) {// 获取 node 的前驱final Node p = node.predecessor();if (p == head) {// 再次获取 state:如果 state == 0,则 r = 1;否则 r = -1int r = tryAcquireShared(arg);if (r >= 0) {// state == 0,将 node 设置为头节点setHeadAndPropagate(node, r);p.next = null;failed = false;return;}}// 自旋两次后,阻塞线程//第一次,waitStatus 默认为 0,shouldParkAfterFailedAcquire() 方法将 waitStatus 赋值为 SIGNAL并返回 false;//第二次 for 循环,shouldParkAfterFailedAcquire() 方法返回 true,通过调用 parkAndCheckInterrupt() 将自己阻塞if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {throw new InterruptedException();           }}} finally {if (failed)cancelAcquire(node);}
}

假如:现在有 3 个线程 A、B、C 调用了 await() 方法,此时 state 的值还不为 0,所以这三个线程都会加入到 AQS 队列中。并且三个线程都处于阻塞状态

如下图:

在这里插入图片描述

线程 A、B、C 自旋两次,通过 shouldParkAfterFailedAcquire() 方法将 waitStatus 由 0 修改为 SIGNAL,并通过 parkAndCheckInterrupt() 方法进行阻塞起来;

在这里插入图片描述

它们现在都不会去调用 setHeadAndPropagate() 方法,只有等到 countdown() 方法使得 state=0 的时候才会被唤醒

3.2.1.2.1、setHeadAndPropagate() 方法 —— AQS

看完下面的 countDown() 方法再来看此方法

private void setHeadAndPropagate(Node node, int propagate) {// 旧 head 节点Node h = head;// 将当前节点设置为 head 节点setHead(node);// propagate 大于 0(一般情况下都会这样)或者 存在可唤醒的线程if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {Node s = node.next;// 只有一个节点或者存在多个节点且是共享模式,则释放所有等待的线程,各自尝试抢占锁if (s == null || s.isShared()) {doReleaseShared();        }}
}
3.2.1.2.1.1、doReleaseShared() 方法 —— AQS
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {continue;}// 唤醒后继节点【一个】unparkSuccessor(h);} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {continue;}}if (h == head) {break;}}
}

循环唤醒后续节点

3.3、countDown() 方法 —— CountDownLatch

public void countDown() {// 递减锁重入次数,当 state == 0 时,唤醒所有阻塞的线程sync.releaseShared(1);
}

3.3.1、releaseShared() 方法 —— AQS

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}

只有当 state 减为 0 的时候,tryReleaseShared() 才返回 true,继而会调用 doReleaseShared() 方法来唤醒处于 await 状态下的线程;否则,只是简单的 state = state - 1

3.3.1.1、tryReleaseShared() 方法 —— CountDownLatch.Sync
protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0) {return false;}int nextc = c-1;// 共享模式:CAS 操作(存在多个线程)if (compareAndSetState(c, nextc)) {// 只有最后一个计数器减完才为 0,返回truereturn nextc == 0;}}
}

【共享模式】:存在多个线程,所以需要自旋 + CAS 操作

tryReleaseShared() 方法:自旋,对计数器进行 CAS 操作 -1,如果计数器减到 0【需要唤醒阻塞的线程】,返回 true;否则,返回 false

3.3.2、doReleaseShared() 方法 —— AQS

private void doReleaseShared() {// 自旋for (;;) {// 记录旧 head 节点Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;// 前驱节点状态为 SIGNAL,后继节点需要被唤醒if (ws == Node.SIGNAL) {// 将头结点的 waitstatue 设置为0,以后就不会再次唤醒该后继节点了,这一步是为了解决并发问题,保证只 unpark()一次,不成功就继续if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {// 如果 CAS 失败,则继续执行continue;                                    }// 唤醒头节点的一个后继节点unparkSuccessor(h);// ws == 0:head 节点刚入队列,未调用 shouldParkAfterFailedAcquire() 方法【将 waitStatus 由 0 修改为 SIGNAL】// CAS 操作:将 head 节点状态设置为 PROPAGATE,表示要向下传播,依次唤醒// CAS 操作失败场景:// 1.这时,刚好有节点入队列,且已调用了 shouldParkAfterFailedAcquire() 方法,修改为了 SIGNAL 状态// 2.有其它线程尝试将其设置为 PROPAGATE 状态} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {// CAS 操作失败,继续尝试continue;}}// 判断 head 节点是否是原 head 节点// 如果是:说明了之前唤醒的线程还未唤醒 | 就没唤醒过线程【执行 else-if 逻辑】,跳出循环// 如果不是:说明了之前唤醒的线程已唤醒【线程A】,跳过当前循环,继续在 for 循环中执行第二次if (h == head) {break;}}
}

一旦线程 A 被唤醒,代码又会继续回到 doAcquireSharedInterruptibly() 中来执行。如果当前 state 满足 ==0 的条件,则会执行 setHeadAndPropagate() 方法

对于下面这块代码:在 CountDownLatch 的实现中,头节点状态为 PROPAGATE 的情况并不常见

else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {// CAS 操作失败,继续尝试continue;
}

因为 CountDownLatch 的唤醒逻辑并不依赖于节点状态为 PROPAGATE。通常情况下,当计数器归零时,CountDownLatch 会直接一次性唤醒所有等待线程,而不会特别处理节点状态为 PROPAGATE 的情况

对于 CountDownLatch 来说,其核心逻辑相对简单:当计数器递减至 0 时,意味着所有等待的线程已完成其预定任务。此时,doReleaseShared() 方法的主要任务是确保所有等待在 CountDownLatch上的线程都能被唤醒,而不是传播某种释放信号

ROPAGATE 状态主要出现在其他基于 AbstractQueuedSynchronizer(AQS) 构建的同步组件(如SemaphoreReentrantReadWriteLock 的读锁等)中,用于表示释放操作应当继续向下传播,唤醒更多等待的线程。在这些组件中,当某个节点释放资源后,可能需要将释放操作传播到队列中的其他节点,此时会将节点状态设置为 PROPAGATE,以便后续逻辑处理

4、应用案例

等顾客们来齐了,服务员再来上菜,吃饭,人不齐不能动筷子,大家都坐那等着

public static void main(String[] args) throws InterruptedException {// 5 个顾客final int customerCount = 5;// 7 道菜,需要 7 个服务员final int waitressCount = 7;CountDownLatch customerCountDownLatch = new CountDownLatch(customerCount);CountDownLatch countDownLatch = new CountDownLatch(waitressCount);Runnable customerTask = () -> {try {SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");Random random = new Random();System.out.println(sdf.format(new Date()) + " " + Thread.currentThread().getName() + "出发去饭店");Thread.sleep((long) (random.nextDouble() * 3000) + 1000);System.out.println(sdf.format(new Date()) + " " + Thread.currentThread().getName() + "到了饭店");customerCountDownLatch.countDown();} catch (InterruptedException e) {throw new RuntimeException(e);}};Runnable waitressTask = () -> {try {SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");System.out.println(sdf.format(new Date()) + " " + Thread.currentThread().getName()  + "等待顾客");customerCountDownLatch.await();System.out.println(sdf.format(new Date()) + " " + Thread.currentThread().getName()  + "人齐了,开始上菜");Random random = new Random();Thread.sleep((long) (random.nextDouble() * 3000) + 1000);countDownLatch.countDown();System.out.println(Thread.currentThread().getName() + " 完成上菜,还差 " + countDownLatch.getCount() + " 个菜没上");} catch (Exception e) {e.printStackTrace();}};for (int i = 0; i < customerCount; i++) {new Thread(customerTask, "customer" + i).start();}for (int i = 0; i < waitressCount; i++) {new Thread(waitressTask, "waitress" + i).start();}countDownLatch.await();System.out.println("菜都上完了,可以吃了");
}

运行结果:

14:19:28 customer4出发去饭店
14:19:28 waitress3等待顾客
14:19:28 customer3出发去饭店
14:19:28 waitress6等待顾客
14:19:28 customer1出发去饭店
14:19:28 waitress0等待顾客
14:19:28 waitress2等待顾客
14:19:28 waitress5等待顾客
14:19:28 waitress1等待顾客
14:19:28 customer2出发去饭店
14:19:28 customer0出发去饭店
14:19:28 waitress4等待顾客
14:19:29 customer3到了饭店
14:19:29 customer4到了饭店
14:19:29 customer2到了饭店
14:19:31 customer0到了饭店
14:19:31 customer1到了饭店
14:19:31 waitress3人齐了,开始上菜
14:19:31 waitress2人齐了,开始上菜
14:19:31 waitress5人齐了,开始上菜
14:19:31 waitress0人齐了,开始上菜
14:19:31 waitress6人齐了,开始上菜
14:19:31 waitress1人齐了,开始上菜
14:19:31 waitress4人齐了,开始上菜
waitress3 完成上菜,还差 6 个菜没上
waitress4 完成上菜,还差 5 个菜没上
waitress5 完成上菜,还差 4 个菜没上
waitress1 完成上菜,还差 3 个菜没上
waitress0 完成上菜,还差 2 个菜没上
waitress2 完成上菜,还差 1 个菜没上
waitress6 完成上菜,还差 0 个菜没上
菜都上完了,可以吃了

5、总结

  1. 通过构造方法初始化 CountDownLatch:设置 AQS 中的 state 的值
  2. 调用 countDown() 方法:调用 AQS 的释放同步状态的方法,每调用一次,state 就自减 1,直至为 0
  3. 调用 await() 方法:如果 state 不为 0,则阻塞线程并入队列。当 state 为 0 后,唤醒其它所有阻塞的线程

http://www.ppmy.cn/devtools/13950.html

相关文章

Mysql 锁学习笔记

目录 Innodb锁 共享锁与排它锁 锁兼容级别 意向锁 - 表级锁 代码示例 表级锁类型兼容性 行锁 代码示例 间隙锁 代码示例 临键锁 - 行锁加间隙锁 插入意向锁 自增锁 SELECT的加锁规则 (RR) 查看锁状态命令 3.0 前置条件 3.1 主键检索 3.2 唯一索引检索 3.3 普…

233 基于matlab的多通道非负矩阵分解(MNMF)算法

基于matlab的多通道非负矩阵分解&#xff08;MNMF&#xff09;算法。其能够寻找到一个非负矩阵W和一个非负矩阵H&#xff0c;满足条件VW*H,从而将一个非负的矩阵分解为左右两个非负矩阵的乘积。使用EM准则对混合信号进行分解。程序已调通&#xff0c;可直接运行。 233 多通道非…

ruoyi-nbcio-plus基于vue3的flowable的websocket消息组件的升级修改(一)

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://122.227.135.243:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a…

Java之多态

一、多态前言 1.为什么要使用多态 Java中使用多态的主要目的是提高代码的可重用性和扩展性&#xff0c;使得代码更加灵活和易于维护。通过多态&#xff0c;我们可以将不同的对象看做是同一种类型&#xff0c;从而使得我们可以使用同一种接口来操作这些对象&#xff0c;而不必…

如何使用自定义Promptbooks优化您的安全工作流程

在当今的数字化时代&#xff0c;安全工作流程的优化变得前所未有的重要。安全团队需要快速、有效地响应安全事件&#xff0c;以保护组织的数据和资产。Microsoft Copilot for Security提供了一种强大的工具——自定义Promptbooks&#xff0c;它可以帮助安全专家通过自动化和定制…

springboot项目整合kafka实现消息队列

一、Docker镜像、容器准备&#xff1a; 1.1拉取镜像&#xff1a; 前提是虚拟机安装了docker&#xff0c;可以自行看其他文章进行安装 docker pull ubuntu/kafka docker pull zookeeper1.2运行容器 先启动zookeeper容器&#xff0c;因为kafka依赖于zookeeper docker run -d …

【数据仓库工具箱】DW/BI系统的核心元素和基本要求

核心元素 DW/BI 环境划分为4个不同的&#xff0c;各具特色的组成部分。分别是&#xff1a;操作型源数据&#xff0c;ETL系统&#xff0c;数据展现和商业智能应用。 操作型源数据 记录的是操作型系统&#xff0c;用于获取业务事务。源数据关注的是处理性能和可用性。源系统一般…

七星创客新零售系统:颠覆性商业模式的崛起

大家好&#xff0c;我是微三云周丽&#xff0c;今天给大家分析当下市场比较火爆的商业模式&#xff01; 小编今天跟大伙们分享什么是七星创客新零售系统&#xff1f; 随着经济的快速发展和科技的不断进步&#xff0c;商业模式的革新成为了企业发展的关键。在这个新旧动能转换、…

【AI开发:音频】二、GPT-SoVITS使用方法和过程中出现的问题(GPU版)

1.FileNotFoundError: [Errno 2] No such file or directory: logs/guanshenxxx/2-name2text-0.txt 这个问题中包含了两个&#xff1a; 第一个&#xff1a;No module named pyopenjtalk 我的电脑出现的就是这个 解决&#xff1a;pip install pyopenjtalk 第二个&#xff1a…

docker 安装geoipupdate

前提是docker已安装 一&#xff1a;执行命令&#xff1a; docker run --env-file /usr/local/etc/GeoIP.conf -v /usr/local/GeoIP2:/usr/share/GeoIP ghcr.io/maxmind/geoipupdate /usr/local/etc/GeoIP.conf &#xff1a;本地配置的账号&#xff0c;秘钥 GEOIPUPDATE_AC…

低视力者出行升级:适配服务助力双手解放与环境感知

作为一名资深记者&#xff0c;我有幸深入了解并记录低视力者在日常出行中所面临的挑战与解决方案。近年来&#xff0c;低视力者辅助设备适配服务提供领域的创新成果&#xff0c;尤其是结合手机应用的辅助设备&#xff0c;正在以人性化、智能化的方式&#xff0c;帮助低视力者实…

[C++][算法基础]求a的b次方模p的值(快速幂)

给定 n 组 &#xff0c;对于每组数据&#xff0c;求出 的值。 输入格式 第一行包含整数 n。 接下来 n 行&#xff0c;每行包含三个整数 。 输出格式 对于每组数据&#xff0c;输出一个结果&#xff0c;表示 的值。 每个结果占一行。 数据范围 1≤n≤100000, 1≤≤2 …

Leetcode算法训练日记 | day35

专题九 贪心算法 一、柠檬水找零 1.题目 Leetcode&#xff1a;第 860 题 在柠檬水摊上&#xff0c;每一杯柠檬水的售价为 5 美元。顾客排队购买你的产品&#xff0c;&#xff08;按账单 bills 支付的顺序&#xff09;一次购买一杯。 每位顾客只买一杯柠檬水&#xff0c;然…

视频评价工具AVQT介绍

AVQT介绍 AVQT(Advanced Video Quality Tool)是一个用于评估压缩视频感知质量的工具。它通过模拟人类如何评价压缩视频的质量来进行工作。AVQT 是是苹果在 WWDC 21 上推出的一款评估视频感知质量的工具。AVQT可以用于计算视频的帧级和片段级得分,其中片段通常持续几秒钟。这…

RabbitMQ spring boot TTL延时消费

关于延时消费主要分为两种实现&#xff0c;一种是rabbitmq的TTL机制&#xff0c;一种是rabbitmq的插件实现。 实现一&#xff1a;TTL 1、设置队列的过期时间 2、设置消息的过期时间 添加相关maven依赖 <dependency><groupId>org.springframework.boot</grou…

如何让AI生成自己喜欢的歌曲-AI音乐创作的正确方式 - 第507篇

历史文章 AI音乐&#xff0c;8大变现方式——Suno&#xff1a;音乐版的ChatGPT - 第505篇 日赚800&#xff0c;利用淘宝/闲鱼进行AI音乐售卖实操 - 第506篇 导读 在使用AI生成音乐&#xff08;AI写歌&#xff09;的时候&#xff0c;你是不是有这样的困惑&#xff1a; &…

黑客零基础入门教程:从零开始学习黑客技术

黑客技术往往被误解&#xff0c;但实际上&#xff0c;学习黑客技术可以帮助我们更好地理解网络安全&#xff0c;保护个人信息免受攻击。本文为零基础的朋友提供了一个黑客技术的学习入门指南&#xff0c;帮助你从基础到实践逐步深入了解和掌握相关技能。 了解基本概念 在开始…

SpringBoot+Vue开发记录(四)

说明&#xff1a; 本篇文章的主要内容是软件架构以及项目的前端Vue创建 一、软件架构 我道听途说的&#xff0c;听说这个东西很关键很重要什么的。 软件架构&#xff08;software architecture&#xff09;是一个系统的草图,是一系列相关的抽象模式&#xff0c;用于指导大型软…

Ajax技术是啥?在web开发中有啥用?

一、Ajax是啥&#xff1f; Ajax技术是一种让网页能在不完全刷新页面的情况下&#xff0c;通过JavaScript与服务器进行异步数据交换&#xff0c;并更新部分网页内容的技术。 简单来说&#xff0c;Ajax的核心原理就是在JavaScript的控制下&#xff0c;网页悄悄地向服务器请求数…

新加坡VPS服务器Linux系统的安全性如何增强

增强新加坡VPS服务器上Linux系统的安全性是至关重要的&#xff0c;以下是一些常见的方法和建议&#xff1a; 更新系统和软件&#xff1a; 定期更新操作系统和安装的软件包&#xff0c;确保系统中的所有组件都是最新版本&#xff0c;以修补已知的漏洞和安全问题。 配置防火墙&am…