(十五)ForkJoin框架

news/2024/11/29 6:33:55/

ForkJoinPool

ForkJoinPool是一种“分治算法”的多线程并行计算框架,自Java7引入。它将一个大的任务分为若干个子任务,这些子任务分别计算,然后合并出最终结果。
ForkJoinPool比普通的线程池可以更好地实现计算的负载均衡,提高资源利用率。

创建ForkJoinPool

构造方法

共有三个public的构造方法,最多的有4个参数,分别是并行度、工作线程工厂,线程未捕获异常的处理器、工作队列模式(FIFO或LIFO,默认是LIFO)、工作线程名称前缀。一般在使用无参或一个参数的构造方法即可,(或者使用commonPool),如果需要定制线程继承ForkJoinWorkerThread,则使用4个参数的构造方法。
//ForkJoinPool.commonPool();
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();
}private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode, String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

RecursiveAction和RecursiveTask

这两个类都是ForkJoinTask子类,用于实现子任务的逻辑。区别是前者没有返回值,后者有返回值。使用时,针对不同类型的任务,可以分别继承这两个类,实现其compute方法。

使用

案例1(RecursiveAction):快速排序

基本思想:
1、利用数组的某个元素(一般取第一个元素)把数组划分成两半,左边子数组里面的元素小于等于该元素,右边子数组里面的元素大于等于该元素。
2、对左右的2个子数组分别排序。
将数组划分为两部分后,对子数组分别排序是独立的子问题,这个过程可以递归分解子问题,所以可以利用多个线程分别为子数组排序。
package com.example.demo;import org.junit.Test;import java.security.SecureRandom;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.*;
import java.util.stream.Stream;public class ForkJoinTest {ForkJoinPool pool = ForkJoinPool.commonPool();SecureRandom random = new SecureRandom();@Testpublic void testSort() throws ExecutionException, InterruptedException {StringJoiner before = new StringJoiner(",","[","]");//20个数的数组int[] arr = getRandomIntArray(20);Arrays.stream(arr).mapToObj(String::valueOf).forEach(before::add);System.out.println(before.toString());Instant start = Instant.now();ForkJoinTask<Void> task = pool.submit(new QuickSortTask(arr));//阻塞直到完成排序task.get();long i = Duration.between(start,Instant.now()).get(ChronoUnit.NANOS);System.out.println("排序时间:" + i + "纳秒");StringJoiner after = new StringJoiner(",","[","]");Arrays.stream(arr).mapToObj(String::valueOf).forEach(after::add);System.out.println(after.toString());}private int[] getRandomIntArray(int count) {int bound = count * 10;int[] array = new int[count];for (int i = 0; i < count; i++){array[i] = random.nextInt(bound);}return array;}public static class QuickSortTask extends RecursiveAction {private int start;private int end;private int[] array;public QuickSortTask(int[] array){this.array = array;this.start = 0;this.end = array.length-1;}public QuickSortTask(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected void compute() {int mid = part(array, start, end);//当左边还有元素时if (mid != start) {QuickSortTask task1 = new QuickSortTask(array, start, mid - 1);task1.fork();task1.join();}//当右边还有元素时if (mid != end) {QuickSortTask task2 = new QuickSortTask(array, mid + 1, end);task2.fork();task2.join();}}/*** <p>返回基准值的下标,基准值左的元素都小于等于基准值,基准值右的元素大于等于基准值</p>* @param array* @param start* @param end* @return*/private int part(int[] array, int start, int end) {int i = start, j = end;//基准值的下标int base = start;//左右扫描相遇时结束while (i < j) {//从右向左扫描,如果当前值比基准值小,则置换,已经置换过的元素不再扫描(j的右边)while (i < j && array[j] >= array[base]) {j--;}if (i < j) {swap(array, j, base);base = j;}//从左向右扫描,如果当前值比基准值大,则置换,已经置换过的元素不再扫描(i的左边)while (i < j && array[i] <= array[base]) {i++;}if (i < j) {swap(array,i, base);base = i;}}return base;}private void swap(int[] array, int x, int y) {if (x != y) {int temp = array[x];array[x] = array[y];array[y] = temp;}}}
}

案例2(RecursiveTask):求和

例如从1加到100,如果不用高斯的方法,可以用程序实现累加,将数拆分成小组,每个小组互相独立,每个小组组内分别相加,最后把组的结果相加,这个过程可以使用ForkJoin。
RecursiveTask<T>
package com.example.demo;import org.junit.Test;import java.security.SecureRandom;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.*;
import java.util.stream.Stream;public class ForkJoinTest {ForkJoinPool pool = ForkJoinPool.commonPool();@Testpublic void testSum() throws ExecutionException, InterruptedException {ForkJoinTask<Integer> submit = pool.submit(new SumTask(1, 100));System.out.println(submit.get());}public static class SumTask extends RecursiveTask<Integer>{private int startNum;private int endNum;//决定当子任务处理的元素个数小于此值时不再切分任务,直接进行计算private static final int THRESHOLD = 10;public SumTask(int startNum, int endNum){this.startNum = startNum;this.endNum = endNum;}@Overrideprotected Integer compute() {int sum = 0;if (endNum - startNum + 1 < THRESHOLD){for (int i = startNum; i <= endNum; i++){sum += i;}return sum;}int mid = split(startNum, endNum);SumTask task1 = new SumTask(startNum, mid);SumTask task2 = new SumTask(mid + 1, endNum);ForkJoinTask<Integer> fork1 = task1.fork();ForkJoinTask<Integer> fork2 = task2.fork();sum = fork1.join() + fork2.join();return sum;}private int split(int startNum, int endNum) {return (startNum + endNum)/2;}}
}

关闭

和ThreadPoolExecutor一样,ForkJoinPool使用完也要关闭,依然是使用shutdown和shutdownNow方法,shutdown只拒绝新提交的任务;shutdownNow会取消现有的全局队列和局部队列中的任务,同时唤醒所有空闲的线程,让这些线程自动退出。
public void shutdown();
public List<Runnable> shutdownNow();
ForkJoinPool pl=new ForkJoinPool();
try {boolean flag;do {flag = pl.awaitTermination(500,TimeUnit.MILLISECONDS);} while (!flag);
} catch (Exception e){e.printStackTrace();
}

http://www.ppmy.cn/news/19277.html

相关文章

用户画像增量更新系列二

进行用户日志数据处理 原始日志数据 结果: 思路&#xff1a;按照user_id的行为一条条处理&#xff0c;根据用户的行为类型判别。 由于sqlDF每条数据可能会返回多条结果&#xff0c;我们可以使用rdd.flatMap函数或者yield 格式&#xff1a;["user_id", "action…

Springboot+vue基于java的家教管理平台

系统分为用户和管理员&#xff0c;教师三个角色 用户的主要功能有&#xff1a; 1.用户注册和登陆系统 2.查看系统的公告信息 3.用户查看家教教师简历信息 4.用户查看课程信息 5.用户查看招聘教师信息&#xff0c;在线应聘教师 6.用户个人中心修改个人资料&#xff0c;修改密码…

Linux常用命令——sort命令

在线Linux命令查询工具(http://www.lzltool.com/LinuxCommand) sort 将文件进行排序并输出 补充说明 sort命令是在Linux里非常有用&#xff0c;它将文件进行排序&#xff0c;并将排序结果标准输出。sort命令既可以从特定的文件&#xff0c;也可以从stdin中获取输入。 语法…

基于Springboot vue前后端分离在线培训考试系统源码

# 云帆培训考试系统 管理账号&#xff1a;admin/admin 学员账号&#xff1a;person/person # 介绍 一款多角色在线培训考试系统&#xff0c;系统集成了用户管理、角色管理、部门管理、题库管理、试题管理、试题导入导出、考试管理、在线考试、错题训练等功能&#xff0c;考…

移动web 空间转换 3D

移动web 空间转换 3D空间转换 3D3D位移透视3D旋rotateXrotateY左手法则立体呈现空间转换 3D 3D坐标系 3D 坐标系比2D 多了一个Z轴。 一定要记住3个坐标轴取值的正反&#xff1a; X 轴 往右越大&#xff0c;是正值&#xff0c; 否则反之Y 轴 往下越大&#xff0c;是正值&…

图论(入门版)

目录 1 向、权 2 最小生成树 2.1 Prim算法 2.2 Kruskal算法 3 最大流问题 3.1 Naive算法 3.2 Ford—Fulkerson算法 3.3 Edmonds—Karp算法 3.4 Dinic算法 4 最小割问题 5 二部图 5.1 判断是否是二部图的方法 5.2 匈牙利算法&#xff08;最小匹配问题&a…

Docker基本操作

Docker基本操作一、镜像操作1.镜像名称2.镜像命令&#xff08;1&#xff09;拉取、查看镜像&#xff08;2&#xff09;保存、导入镜像二、容器操作1.容器相关命令2.创建并运行一个容器3.进入容器&#xff0c;修改文件4.小结三、数据卷&#xff08;容器数据管理&#xff09;1.什…

基于自适应适应度-距离平衡的随机分形搜索算法(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客 ​​​​​​​ &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容…