Java线程间通信方式(3)

news/2024/11/28 21:46:44/

前文了解了线程通信方式中的CountDownLatch, Condition,ReentrantLock以及CyclicBarrier,接下来我们继续了解其他的线程间通信方式。

Phaser

Phaser是JDK1.7中引入的一种功能上和CycliBarrier和CountDownLatch相似的同步工具,相对这两者而言其用法更加灵活,同时Phaser也支持重用。

在Phaser中将需要协作完成的任务分成多个阶段,每个阶段的参与者可指定,参与者可以随时注册并参与到某个阶段或者取消参与本阶段。以选修课考试为例,说明Phaser的工作逻辑,假设现有选修课3门,政治,历史,地理,各选修人数分别为20,10,10.按Phaser实现考试逻辑如下:

  • 第一阶段考政治,总共应有9名同学参加考试,在考试开始时,8位同学开始答题,另外一位同学未到,考试中途,最后一位同学进入,开始考试,所有同学答题完成后,政治考试结束
  • 第二阶段考历史,总共9名同学参考考试,在考试结束前,3名同学弃考,则实际参与考试有6名同学,所有同学答题完成后,历史考试结束
  • 第三阶段考地理,总共9名同学参与考试,中途无意外,所有同学答题完成后,地理考试结束

至此选修课考试的三个阶段均完成,所以选修课考试这个任务结束,其中第一阶段中晚到参考考试的同学说的就是参与者可以随时注册并参与到某个阶段,第二阶段中弃考的同学说的就是参与者可以随时取消参与本阶段,当所有参与本阶段的参与者均取消,则意味着该阶段完成。

在Phaser中,针对一个阶段而言,每一个参与者都被称为一个party,可以通过构造函数指定参与者数量,也可以通过register使parties(party的总和)自增,当当前阶段的所有参与者等于parties的数量时,此时phase自增1,进入下一个阶段,回调onAdvance方法

Phaser提供的核心函数如下所示:

函数名称描述备注
register()注册一个party,使得parties+1/
bulkRegister(int parties)批量注册party,使得parties变为已有个数与传入参数之和/
arriveAndDeregister()当前任务已完成,使parties计数减1,不会形成阻塞/
arriveAndAwaitAdvance()已达到执行点,线程阻塞,等待下一阶段唤醒继续执行/
awaitAdvance(int phase)参数是一个已完成的阶段编号,通常以已完成任务的arrive或者arriveAndDeregister函数的返回值作为取值,如果传入参数的阶段编号和当前阶段编号相同,则在此处等待,如果不同或者Phaser已经是terminated状态,则立即返回/
arrive()达到当前阶段,不等待其他参与者到达/

arriveAndAwaitAdvance

以上述政治考试为例,学习Phaser基本使用

public static void main(String[] args) {// 创建PhaserPhaser phaser = new Phaser(){@Overrideprotected boolean onAdvance(int phase, int registeredParties) {switch (phase) {case 0:System.out.println("政治考试完成");break;case 1:System.out.println("历史考试完成");break;case 2:System.out.println("地理考试完成");break;}// 如果到达某一阶段,Phaser中参与者为0,则会销毁该Phaserreturn super.onAdvance(phase, registeredParties);}};IntStream.range(1,10).forEach(number->{phaser.register();Thread student= new Thread(()->{System.out.println("学生"+number+"arrive advance");// 等待其他线程,此时blockphaser.arriveAndAwaitAdvance();System.out.println("学生"+number+"政治开始答题");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("学生"+number+"政治交卷");// 考试完成,取消计数,参与者减1phaser.arriveAndDeregister();System.out.println("Phaser is terminated :" +phaser.isTerminated());});student.start();});System.out.println("Phaser is terminated :" +phaser.isTerminated());
}

输出如下:

1-4-5-1

从上面可以看出,Phaser中通过arriveAndAwaitAdvance阻塞当前线程,当所有线程到达阻塞栅栏时,唤醒等待线程继续执行,进而达到线程间同步协作。

awaitAdvance

有时候,当Phaser 在当前阶段结束时,我们需要兜底做一些策略,比如说资源的释放,状态的检查上报等,此时就需要用到awaitAdvance,awaitAdvance接受一个阶段编号,如果当前阶段编号和传入的相等,则会进入等待状态,等到所有参与者都到达该阶段栅栏时,被唤醒。实例代码如下:

public static class ThreadA implements Runnable {private Phaser phaser;public ThreadA(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " start ");phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName() + " end " );}
}public static class ThreadB implements Runnable {private Phaser phaser;public ThreadB(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " start " );phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName() + " end ");}
}public static class ThreadC implements Runnable {private Phaser phaser;public ThreadC(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " start ");System.out.println(Thread.currentThread().getName() + " phaser.getPhase()=" + phaser.getPhase());phaser.awaitAdvance(0);System.out.println(Thread.currentThread().getName() + " end ");}
}public static class ThreadD implements Runnable {private Phaser phaser;public ThreadD(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {try {System.out.println(Thread.currentThread().getName() + " begin sleep");Thread.sleep(5000);System.out.println(Thread.currentThread().getName() + " sleep completed ");phaser.arriveAndAwaitAdvance();System.out.println(Thread.currentThread().getName() + " end ");} catch (InterruptedException e) {e.printStackTrace();}}
}public static void main(String[] args) {// 声明PhaserPhaser phaser = new Phaser(3) {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {System.out.println("Phaser arrived at :"+phase);return super.onAdvance(phase, registeredParties);}};Thread t1 = new Thread(new ThreadA(phaser));Thread t2 = new Thread(new ThreadB(phaser));Thread t3 = new Thread(new ThreadC(phaser));Thread t4 = new Thread(new ThreadD(phaser));t1.setName("ThreadA");t2.setName("ThreadB");t3.setName("ThreadC");t4.setName("ThreadD");t1.start();t2.start();t3.start();t4.start();
}

如上代码所示,声明Phaser有三个参与者ThreadA,ThreadB,ThreadD,在三个参与者都执行到arriveAndAwaitAdvance之前,ThreadC 阻塞等待,当三个参与者都执行到arriveAndAwaitAdvance后,回调onAdvance方法,此时被阻塞的参与者被唤醒执行,之后ThreadC被唤醒继续执行,运行结果如下:

1-4-5-2

Exchanger

Exchanger用于两个线程之间的通信,无论哪个线程先调用Exchanger,都会等待另外一个线程调用时进行数据交换,示例代码如下:

private static Exchanger<String> exchanger = new Exchanger<>();public static void main(String[] args) {new Thread(()->{try {System.out.println(Thread.currentThread().getName()+" sleep start");Thread.sleep(10000);System.out.println(Thread.currentThread().getName()+" sleep end");System.out.println(Thread.currentThread().getName()+" send data to Exchanger");String aa = exchanger.exchange("data from Thread1");System.out.println(Thread.currentThread().getName() + "   "+aa);} catch (InterruptedException e) {e.printStackTrace();}}, "Thread1").start();new Thread(()->{try {System.out.println(Thread.currentThread().getName()+" send data to Exchanger");String bb = exchanger.exchange("data from Thread2");System.out.println(Thread.currentThread().getName() + "   "+bb);} catch (InterruptedException e) {e.printStackTrace();}}, "Thread2").start();
}

运行输出如下:

1-4-5-3

总结

结合前文,我们一共学习了种线程间通信方式,主要有:

  1. Object.wait/Object.notify/Object.notifyAll + synchronized
  2. Semaphore(信号量)
  3. CountDownLatch
  4. CyclicBarrier
  5. Condition+ReentrantLock
  6. Phaser
  7. Exchanger

大家日常开发中可灵活使用,针对各通信方式比较见下表:

通信方式应用场景是否可重用子任务异常处理备注
Object.wait/Object.notify/Object.notifyAll + synchronized大多数线程通信场景依赖开发者维护,在finally块中完成释放,避免死锁/
Semaphore(信号量)通知唤醒类线程间通信场景依赖开发者维护,在finally块中释放信号量,避免死锁/
CountDownLatch串行多线程运行场景不加处理的话,子任务发生异常导致退出,则所有等待的线程都会一致等待,直到超时时间来临/
CyclicBarrier聚合类线程通信场景不加处理的话,如果在所有线程都到达屏障陷入阻塞前,如果有线程发生异常导致未到达栅栏提前退出,则所有等待在栅栏都会以BrokenBarrierException或InterruptedException异常退出/
Condition+ReentrantLock大多数线程通信场景依赖开发者维护,在finally块中完成释放,避免死锁/
Phaser适用CountDownLatch与CyclicBarrier组合场景依赖开发者维护,在finally块中取消参与者,避免死锁/
Exchanger线程间数据交换场景依赖开发者维护,确保两个线程状态正常,并行运行/

http://www.ppmy.cn/news/52124.html

相关文章

盘点几款还不错的企业网盘产品

企业网盘的出现&#xff0c;为企业提供文件安全管理&#xff0c;团队协作服务&#xff0c;解决了便捷性与安全性等问题&#xff0c;受到了企业的青睐。市面上的企业网盘工具也是五花八门&#xff0c;我们该如何选择适合自己团队的网盘工具呢&#xff1f; 本文盘点了几款还不错的…

三、 oracle 数据库适配记录

oracle 数据库适配记录 说明:由于Oracle数据库本身和MYSQL数据库有一定的语法,建表结构,物理模式等差别,导致在适配过程中,可能会出现各种错误情况,特此从一些数据库知识、适配过程、脚本制作、及问题处理方面做出记录。 一、oracle概述及版本描述 Oracle数据库名(DB_…

【Matlab】基于紧格式动态线性化的无模型自适应控制

例题来源&#xff1a;侯忠生教授的《无模型自适应控制&#xff1a;理论与应用》&#xff08;2013年科学出版社&#xff09;。 对应书本 4.2 单输入单输出系统(SISO)紧格式动态线性化(CFDL)的无模型自适应控制(MFAC) 例题4.1 题目要求 matlab代码 clc; clear all;%% 期望轨迹…

【JAVA】easyExcel导出导入使用

EasyExcel是阿里巴巴开源插件之一&#xff0c;主要解决了poi框架使用复杂&#xff0c;sax解析模式不容易操作&#xff0c;数据量大起来容易OOM&#xff0c;解决了POI并发造成的报错。主要解决方式&#xff1a;通过解压文件的方式加载&#xff0c;一行一行地加载&#xff0c;并且…

OSCP-Twiggy(ZeroMQ、SaltStack)

目录 扫描 ​编辑WEB 扫描 WEB 80端口 运行着一个名为Mezzanine的东西。快速的谷歌搜索显示这是一个内容管理系统,所以让我们看看它是否对任何可以在我们的目标机器上获得shell的东西都是脆弱的: mezzanine版本是4.3.1,并且此漏洞已在4.2.1中修补。 searchsploit没有返回…

国民技术N32G430开发笔记(1)-macos开发环境搭建

macos开发环境搭建 1、安装arm-none-eabi- 工具链 brew tap ArmMbed/homebrew-formulae brew install arm-none-eabi-gcc如果没有安装brew&#xff0c;请先安装brew。 2、安装vscode 3、安装pyocd a、python官网下载最新版的python b、pip3 install -U pyocd 我的开发板为N3…

【C/C++】C++11 无序关联容器的诞生背景

文章目录 背景无序关联容器适用场景有序关联容器适用场景 背景 C11 引入了无序关联容器&#xff08;unordered_map、unordered_set、unordered_multimap 和 unordered_multiset&#xff09;是为了提供一种高效的元素存储和查找方式。相比于有序关联容器&#xff08;map、set、…

5年测试总结,自动化测试DevOps-CICD持续集成流程设计...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 测试进阶&#xf…