秒懂 Java Fork-Join

news/2025/1/24 13:57:45/

fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。

在实际使用过程中,这种 「 分而治之 」的方法意味着框架首先要 fork ,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。然后,join 部分开始工作,将所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待每个子任务执行完毕。

为了提供有效的并行执行,fork/join 框架使用了一个名为 ForkJoinPool 的线程池,用于管理 ForkJoinWorkerThread 类型的工作线程。

ForkJoinPool 线程池

ForkJoinPool 是 fork/join 框架的核心,是 ExecutorService 的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息。

工作线程一次只能执行一个任务。

ForkJoinPool 线程池并不会为每个子任务创建一个单独的线程,相反,池中的每个线程都有自己的双端队列用于存储任务 ( double-ended queue )( 或 deque,发音 deck )。

这种架构使用了一种名为工作窃取( work-stealing )算法来平衡线程的工作负载。

工作窃取( work-stealing )算法

要怎么解释 「 工作窃取算法 」 呢 ?

简单来说,就是 空闲的线程试图从繁忙线程的 deques 中 窃取 工作

默认情况下,每个工作线程从其自己的双端队列中获取任务。但如果自己的双端队列中的任务已经执行完毕,双端队列为空时,工作线程就会从另一个忙线程的双端队列尾部或全局入口队列中获取任务,因为这是最大概率可能找到工作的地方。

这种方法最大限度地减少了线程竞争任务的可能性。它还减少了工作线程寻找任务的次数,因为它首先在最大可用的工作块上工作。

ForkJoinPool 线程池的实例化

Java 8

在Java 8 中,创建 ForkJoinPool 实例的最简单的方式就是使用其静态方法 commonPool()。

commonPool() 静态方法,见名思义,就是提供了对公共池的引用,公共池是每个 ForkJoinTask 的默认线程池。

根据Oracle 的官方文档,使用预定义的公共池可以减少资源消耗,因为它会阻止每个任务创建一个单独的线程池。

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Java 7

如果要在 Java 7 中实现相同的行为,则需要通过创建 ForkJoinPool 的实例并将其赋值给实用程序类的公共静态字段。

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

使用构造函数实例化 ForkJoinPool 时,可以创建具有指定级别的并行性,线程工厂和异常处理程序的自定义线程池。在上面的示例中,线程池的并行度级别为 2 ,意味着线程池将使用 2 个处理器核心。

然后就可以通过这个公共静态字段轻松的访问 ForkJoinPool 的实例

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

ForkJoinTask<V>

ForkJoinTask 是 ForkJoinPool 线程之中执行的任务的基本类型。我们日常使用时,一般不直接使用 ForkJoinTask ,而是扩展它的两个子类中的任意一个

1、 任务不返回结果(返回void)的RecursiveAction;
2、 返回值的任务的RecursiveTask<V>

这两个类都有一个抽象方法 compute() ,用于定义任务的逻辑。

我们所要做的,就是继承任意一个类,然后实现 compute() 方法。

RecursiveAction 使用示例

出于演示目的,示例当然是尽可能的简单,因此,我们的示例,执行了一个比较荒谬的任务:将输入转为大写并记录。

所有的代码如下所示

public class CustomRecursiveAction extends RecursiveAction {private String workload = "";private static final int THRESHOLD = 4;private static Logger logger = Logger.getAnonymousLogger();public CustomRecursiveAction(String workload) {this.workload = workload;}@Overrideprotected void compute() {if (workload.length() > THRESHOLD) {ForkJoinTask.invokeAll(createSubtasks());} else {processing(workload);}}private List<CustomRecursiveAction> createSubtasks() {List<CustomRecursiveAction> subtasks = new ArrayList<>();String partOne = workload.substring(0, workload.length() / 2);String partTwo = workload.substring(workload.length() / 2, workload.length());subtasks.add(new CustomRecursiveAction(partOne));subtasks.add(new CustomRecursiveAction(partTwo));return subtasks;}private void processing(String work) {String result = work.toUpperCase();logger.info("This result - (" + result + ") - was processed by "+ Thread.currentThread().getName());}
}

在这个示例中,我们使用了一个字符串类型 ( String ) 的名为 workload 属性来表示要处理的工作单元。

同时,为了演示 fork/join 框架的 fork 行为,在该示例中,如果 workload.length() 大于指定的阈值,那么就使用 createSubtask() 方法拆分任务。

在createSubtasks() 方法中,输入的字符串被递归地划分为子串,然后创建基于这些子串的 CustomRecursiveTask 实例。

当递归分割字符串完毕时,createSubtasks() 方法返回 List<CustomRecursiveAction> 作为结果。

然后在compute() 方法中使用 invokeAll() 方法将任务列表提交给 ForkJoinPool 线程池。

我们来总结下创建 RecursiveAction 的步骤:

1、 创建一个表示工作总量的对象;
2、 选择合适的阈值;
3、 定义分割工作的方法;
4、 定义执行工作的方法;

类似的,我们可以使用相同的方式开发自己的 RecursiveAction 类。

RecursiveTask<V> 使用示例

对于有返回值的任务,除了将每个子任务的结果在一个结果中合并,其它逻辑和 RecursiveAction 都差不多。

public class CustomRecursiveTask extends RecursiveTask<Integer> {private int[] arr;private static final int THRESHOLD = 20;public CustomRecursiveTask(int[] arr) {this.arr = arr;}@Overrideprotected Integer compute() {if (arr.length > THRESHOLD) {return ForkJoinTask.invokeAll(createSubtasks()).stream().mapToInt(ForkJoinTask::join).sum();} else {return processing(arr);}}private Collection<CustomRecursiveTask> createSubtasks() {List<CustomRecursiveTask> dividedTasks = new ArrayList<>();dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, 0, arr.length / 2)));dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, arr.length / 2, arr.length)));return dividedTasks;}private Integer processing(int[] arr) {return Arrays.stream(arr).filter(a -> a > 10 && a < 27).map(a -> a * 10).sum();}
}

在上面这个示例中,任务由存储在 CustomRecursiveTask 类的 arr 字段中的数组表示。

createSubtask() 方法递归地将任务划分为较小的工作,直到每个部分小于阈值。然后,invokeAll()方法将子任务提交给公共拉取并返回 Future 列表。

要触发执行,需要为每个子任务调用 join() 方法。

上面这个示例中,我们使用了 Java 8 的流 ( Stream ) API , sum() 方法用于将子结果组合到最终结果中。

将任务提交到 ForkJoinPool 线程池中

只要使用很少的方法,就可以把任务提交到 ForkJoinPool 线程池中。

1、 submit()或execute()方法;

这两个方法的调用方式是相同的

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

2、 使用invoke()方法fork任务并等待结果,不需要任何手动连接(join);

int result = forkJoinPool.invoke(customRecursiveTask);

3、 invokeAll()方法是将ForkJoinTasks序列提交给ForkJoinPool的最方便的方法它将任务作为参数(两个任务,varargs或集合),fork它们,并按照生成它们的顺序返回Future对象的集合;
4、 或者,我们还可以使用单独的fork()和join()方法;

  • fork() 方法将任务提交给线程池,但不会触发任务的执行。
  • join() 方法则用于触发任务的执行。在 RecursiveAction 的情况下,join() 返回 null,但对于 RecursiveTask<V> ,它返回任务执行的结果。

customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();

上面的RecursiveTask<V> 示例中,我们使用 invokeAll() 方法向线程池提交一系列子任务。同样的工作,也可以使用 fork() 和 join() 来完成,但这可能会对结果的排序产生影响。

为了避免混淆,当涉及到多个任务且要保证任务的顺序时,通常都是使用 ForkJoinPool.invokeAll() 。

结束语

使用fork/join 框架可以加速处理大型任务,但要实现这一结果,应遵循一些指导原则:

  • 使用尽可能少的线程池。绝大多数情况下,最好的决定是每个应用程序或系统只使用一个线程池。 (是线程池而不是线程)。
  • 当不需要任何调整时,使用默认的公共线程池。
  • 使用合理的阈值。将 ForkJoingTask 任务拆分为子任务。
  • 避免在 ForkJoingTasks 中出现任何阻塞

http://www.ppmy.cn/news/11524.html

相关文章

Spring Boot核心原理《一》Spring Boot的启动流程

文章结构1. 启动入口2. 创建 SpringApplication3. 运行 SpringBoot 应用程序prepareContext 方法&#xff08;重点&#xff09;refreshContext 方法&#xff08;重点&#xff09;本文以Spring Boot版本 2.0.2.RELEASE 为例介绍。 1. 启动入口 首先从 SpringBoot 启动流程进去…

目标检测再升级!YOLOv8模型训练和部署

一个不知名大学生&#xff0c;江湖人称菜狗 original author: jacky Li Email : 3435673055qq.com Time of completion&#xff1a;2023.1.12 Last edited: 2023.1.12 目录 目标检测再升级&#xff01;YOLOv8模型训练和部署 简介 YOLOv8创新改进点 区别 1、C2f模块是什么&…

Linux杂谈之java命令

一 java &#xff08;1&#xff09;基本解读 ① JAVA8 官方命令行参数 linux版的java 重点关注&#xff1a; java、javac、jar、keytool 这三个参数学习方式&#xff1a; 通过man java和官方文档快速学习 如何在官网搜索 java的命令行参数用法 ② 语法格式 ③ 描述 1)…

Vue知识点

Vue基础语法 插值操作 Mustache语法 可以直接写变量&#xff0c;也可以写简单的表达式 {{firstName lastName}}’ {{firstName lastName}} {{firstName}} {{lastName}} 其他指令使用 v-noce&#xff1a; <h2 v-once>{{message}}</h2> 某些情况下&#xff…

Web Spider XHR断点 千千XX 歌曲下载(三)

Web Spider XHR断点 千千XX 歌曲下载 首先声明: 此次案例只为学习交流使用&#xff0c;切勿用于其他非法用途 注&#xff1a;网站url、接口url请使用base64.b64decode自行解码 文章目录Web Spider XHR断点 千千XX 歌曲下载前言一、资源推荐二、任务说明三、网站分析四、XHR断点…

网站人工客服咨询系统的优势和实现方式介绍

人工客服系统对网站有许多好处&#xff0c;其中一些主要的有: 增加客户满意度: 客服人员可以直接回答客户的问题&#xff0c;提供有效的帮助&#xff0c;这可以提高客户对网站的满意度。 提高转化率: 通过实时互动和个性化沟通可以提高客户的转化率&#xff0c;增加销售额。 …

MyBatis面试专题及答案【二】

9、MyBatis 与 Hibernate 有哪些不同&#xff1f;答&#xff1a;1&#xff09;Mybatis 和 hibernate 不同&#xff0c;它不完全是一个 ORM 框架&#xff0c;因为 MyBatis 需要程序员自己编写 Sql 语句&#xff0c;不过 mybatis 可以通过 XML 或注解方式灵活配置要运行的 sql 语…

【遥感综述】

遥感综述小集合 Image fusion meets deep learning: A survey and perspective&#xff08;张浩&#xff0c;马佳义&#xff09; Sharpening fusion 多光谱图像锐化和高光谱图像锐化是两种典型的锐化融合任务。 Multi-spectral sharpening多光谱锐化 多光谱锐化是将低空间…