Java ForkJoin 简介和应用

news/2024/11/28 17:40:05/

Java 并行框架 Fork Join

  • 一.Fork Join 简介
    • 1.框架说明
    • 2.任务说明
  • 二.应用示例
    • 1.RecursiveTask
      • 分组示例
      • 分组求和
    • 2.RecursiveAction
    • 3.CountedCompleter
  • 三.ForkJoin 实践
    • 代码
    • 测试
      • 1.测试用 Excel 文件
      • 2.读取结果

一.Fork Join 简介

1.框架说明

ForkJoinPool 继承自 AbstractExecutorService , AbstractExecutorService 实现了 ExecutorService 接口
ForkJoin 是 Java 自带的一个并行框架,关于并行和并发的差异则看机器是否为多核配置
ForkJoinPool 之于 ThreadPoolExecutor 的差异即封装了一个双端工作队列,用于缓存父子任务,同时引入工
作窃取算法,如果某个队列任务全部处理完成,则该任务队列的线程从其他队列头取任务进行处理,充分利
用子线程资源

2.任务说明

ForkJoin 基于分治思想将大的工作任务,分解为小的处理
任务抽象类 ForkJoinTask , 预置了三个默认实现 
RecursiveTask、RecursiveAction、CountedCompleter

二.应用示例

1.RecursiveTask

分组示例


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;/*** @author * @date 2023-06-08 19:29* @since 1.8*/
public class ForkRecursiveTask<T> extends RecursiveTask<List<T>> {/*** 任务拆分参数*/private int left;private int mid;private int right;private int batch;/*** 原始数据*/private List<T> list;public ForkRecursiveTask(int batch, List<T> list){this.batch = batch;this.list = list;}@Overridepublic List<T> compute() {left = 0;right = list.size();//判断是否开始处理if (right > batch){//ArrayList 二分拆分mid = (left + right)/2;List<T> tempLeft = list.subList(left,mid);List<T> tempRight = list.subList(mid,right);//创建子任务ForkRecursiveTask<T> l = new ForkRecursiveTask(batch,tempLeft);ForkRecursiveTask<T> r = new ForkRecursiveTask(batch,tempRight);//调用子任务,调用 5,6,7,8,9 (调用右侧,执行左侧,将右侧结果拼接到左侧)r.fork();//递归执行,执行 0,1,2,3,4List<T> tempL = l.compute();//阻塞取子任务结果List<T> tempR = r.join();//聚合:将右侧合并到左侧后面//tempR.addAll(tempL);tempL.addAll(tempR);return tempL;} else {return handler(list);}}/*** 处理方法* @param temp* @return*/private List<T> handler(List<T> temp){return new ArrayList<>(temp);}
}

测试类


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;/*** @author* @date 2023-06-08 19:59* @since 1.8*/
public class ForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(3);List<Task> list = new ArrayList<>(10);for (int i=0;i<10;i++){list.add(new Task(i));}ForkRecursiveTask<Task> taskTaskFork = new ForkRecursiveTask<>(3,list);System.out.println(pool.invoke(taskTaskFork));}
}

分组求和

分组求和,打印分组过程

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;/*** @author * @date 2023-06-09 20:59* @since 1.8*/
public class ForkRecursiveTaskM extends RecursiveTask<Integer> {private int batch;private int step = 2;private List<Integer> list;public ForkRecursiveTaskM(int batch,int step, List<Integer> list){this.batch = batch;this.step = step;this.list = list;}@Overrideprotected Integer compute() {int size = list.size();if (size > batch){//拆分任务int length = size/step;List<List<Integer>> temp = split(list,batch,step);List<ForkRecursiveTaskM> tasks = new ArrayList<>(temp.size());for (List<Integer> d :temp){tasks.add(new ForkRecursiveTaskM(batch,step,d));}//调用子任务for (ForkRecursiveTaskM t:tasks){t.fork();}//取结果int result = 0;for (ForkRecursiveTaskM t:tasks){result += t.join();}return result;} else {int result=0;for (Integer i:list){result += i;}System.out.println(list + "---" + result);return result;}}public <T> List<List<T>> split(List<T> list,int batch,int step){int size = list.size();int real = size%batch > 0 ? size/batch + 1 : size/batch;step = Math.min(real,step);int length = Math.max(size / step,batch);List<List<T>> temp = new ArrayList<>(step);for (int i = 0;i < step;i++){int start = i * length;int end = (i + 1) * length;if ((i + 1) == step){end = size;}temp.add(list.subList(start,end));}return temp;}
}

测试类


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;/*** @author* @date 2023-06-08 19:59* @since 1.8*/
public class ForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(3);List<Task> list = new ArrayList<>(10);for (int i=0;i<10;i++){list.add(new Task(i));}List<Integer> sums = new ArrayList<>(10);for (int i=0;i<10;i++){sums.add(i);}ForkRecursiveTaskM taskTaskFork = new ForkRecursiveTaskM(3,3,sums);System.out.println(pool.invoke(taskTaskFork));}
}

2.RecursiveAction

集合分组,并打印分组过程

import java.util.List;
import java.util.concurrent.RecursiveAction;/*** @author* @date 2023-06-14 20:09* @since 1.8*/
public class ForkRecursiveAction extends RecursiveAction {/*** 任务拆分参数*/private int left;private int mid;private int right;private int batch;/*** 原始数据*/private List<Task> list;public ForkRecursiveAction(int batch, List<Task> list){this.batch = batch;this.list = list;}@Overrideprotected void compute() {left = 0;right = list.size();//判断是否开始处理if (right > batch){//ArrayList 二分拆分mid = (left + right)/2;List<Task> tempLeft = list.subList(left,mid);List<Task> tempRight = list.subList(mid,right);//创建子任务ForkRecursiveAction l = new ForkRecursiveAction(batch,tempLeft);ForkRecursiveAction r = new ForkRecursiveAction(batch,tempRight);//调用子任务l.compute();r.compute();} else {System.out.println(list);}}
}

测试类


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;/*** @author* @date 2023-06-08 19:59* @since 1.8*/
public class ForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(3);List<Task> list = new ArrayList<>(10);for (int i=0;i<10;i++){list.add(new Task(i));}ForkRecursiveAction taskTaskFork = new ForkRecursiveAction(3,list);pool.invoke(taskTaskFork);}
}

3.CountedCompleter

集合拆分与合并

import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.atomic.AtomicReference;/*** @author* @date 2023-06-08 21:16* @since 1.8*/
public class ForkCountedCompleter<T> extends CountedCompleter<List<T>> {private int left;private int right;private int mid;private int batch;private List<T> list;private AtomicReference<List<T>> temp;public ForkCountedCompleter(CountedCompleter<BigInteger> parent, int batch, List<T> list, AtomicReference<List<T>> temp) {super(parent);this.batch = batch;this.list = list;this.temp = temp;}/*** 执行*/@Overridepublic void compute () {left = 0;right = list.size() ;if (right > batch){mid = (left + right)/2;//ArrayList 二分拆分List<T> tempLeft = list.subList(left,mid);List<T> tempRight = list.subList(mid,right);//创建子任务ForkCountedCompleter taskLeft = new ForkCountedCompleter(this,batch,tempLeft,temp);ForkCountedCompleter taskRight = new ForkCountedCompleter(this,batch,tempRight,temp);taskLeft.fork();taskRight.fork();//计数addToPendingCount(2);} else {//获取子任务结果temp.get().addAll(list);}//判断任务是否完成propagateCompletion();}
}

测试类


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;/*** @author* @date 2023-06-08 19:59* @since 1.8*/
public class ForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(3);List<Task> list = new ArrayList<>(10);for (int i=0;i<10;i++){list.add(new Task(i));}AtomicReference<List<Task>> temp = new AtomicReference<>(new ArrayList<>());ForkCountedCompleter<Task> taskTaskFork = new ForkCountedCompleter<>(null,3,list,temp);pool.invoke(taskTaskFork);System.out.println(temp);}
}

三.ForkJoin 实践

代码

利用 ForkJoin 实现 Excel 数据导入数据并行处理

Excel 解析依赖包

<dependency><groupId>com.monitorjbl</groupId><artifactId>xlsx-streamer</artifactId><version>2.2.0</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions>
</dependency>

Excel ForkJoin 处理类


import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;/*** @author* @date 2023-06-22 13:11* @since 1.8*/
@Slf4j
public class ExcelFork extends RecursiveTask<List<User>> {Logger logger = LoggerFactory.getLogger(ExcelFork.class);private int start;private int end;private int batch;private Sheet sheet;/*** 构造任务参数* @param start* @param end* @param sheet*/public ExcelFork(int start, int end,int batch, Sheet sheet) {this.start = start;this.end = end;this.batch = batch;this.sheet = sheet;}/**** @return*/@Overrideprotected List<User> compute() {//数据异常if (start > end ) {return new ArrayList<>(0);}//最小任务取数据if (end - start <= batch) {return minimumTask(sheet, start, end);} else {/*** 二分拆分,直到每块数据量满足设置的阈值* 如果是读取 Excel 一般从第 1 行开始 除 2 取整左侧可能较小* 例:0,1,2,3,4,5 0 为表头 2个一组则数据行如下:1,2;3,4,5 在分 3,4;5 最终就是 3 个子任务*/int mid = (start + end) / 2;/*** 分别创建子任务*/ExcelFork rightTask = new ExcelFork(start, mid,batch, sheet);ExcelFork leftTask = new ExcelFork(mid + 1, end,batch, sheet);//写法一rightTask.fork();List<User> leftList =  leftTask.compute();List<User> rightList = rightTask.join();//将左边和右边的数据合并leftList.addAll(rightList);return leftList;}}/*** 最小任务单元* @param sheet* @param start* @param end* @return*/private List<User> minimumTask(Sheet sheet, int start, int end) {List<User> mapList = new ArrayList<>(batch);User user;Row row;for (int i = start; i <= end; i++) {try {row = sheet.getRow(i);String code = String.valueOf(row.getCell(0).getNumericCellValue());String name = row.getCell(1).getStringCellValue();user = new User(code,name,0);mapList.add(user);} catch (Exception e) {logger.info("Exception:", e);}}return mapList;}
}

人员类


/*** @author* @date 2023-06-22 13:01* @since 1.8*/
public class User {private String code;private String name;private int age;public User(int age,String name){this.age = age;this.name = name;}public User(String code,String name,int age){this.code = code;this.name = name;this.age = age;}public String getCode() {return code;}public void setCode(String code) {this.code = code;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString(){return "{\"Code\":\"" + code + "\",\"Name\":\"" + name + "\",\"age\":" + age + "}";}
}

测试类


import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;import java.io.*;
import java.util.List;
import java.util.concurrent.ForkJoinPool;/*** @author * @date 2023-06-22 13:22* @since 1.8*/
public class Test {public static void main(String[] args) throws IOException {File file = new File("C:\\Users\\Desktop\\Test ForkJoin.xlsx");InputStream inputStream = new FileInputStream(file);Workbook workbook = new XSSFWorkbook(inputStream);ForkJoinPool forkJoinPool = new ForkJoinPool(4);;Sheet sheet = workbook.getSheetAt(0);//开启任务ExcelFork joinTask = new ExcelFork(1, sheet.getLastRowNum(),2000, sheet);List<User> result = forkJoinPool.invoke(joinTask);System.out.println(result);}
}

测试

1.测试用 Excel 文件

在这里插入图片描述

2.读取结果

在这里插入图片描述


http://www.ppmy.cn/news/516564.html

相关文章

51单片机超声波测距和报警+Proteus仿真

系统描述 基于51单片机超声波测距和报警&#xff0c;采用HC-SR04测距&#xff0c;数码管显示测量距离。按键设置报警上下限&#xff0c;超限报警。 源码下载地址&#xff1a;51单片机超声波测距和报警Proteus仿真 硬件设计 仿真图1&#xff1a; 仿真图2&#xff1a; 程…

双目相机视差测距

文章目录 1.测距公式2.参数解析3.备注信息 1.测距公式 双目测距公式&#xff1a;disf*b/disp 2.参数解析 dis:距离信息&#xff0c;描述相机到目标点深度数据 f:相机焦距&#xff0c;焦距信息为像素距离 b:双目相机之间距离&#xff08;厘米&#xff09; disp:视差矩阵 3.备…

单目测距 视觉测距

文章目录 单目测距在kitti数据集中的测试结果C工程原理代码注释 其他视觉测距算法-基于相似三角形的单目测距算法原理代码 参考资料 单目测距 在kitti数据集中的测试结果 C工程 C工程代码下载地址。 原理 主要的思想就是借鉴3D Bounding Box Estimation Using Deep Learning…

FPGA开源项目:双目测距(一)之双目图像采集显示以及图片保存

1.简述 这个项目是大三下学期暑假(也就是2019年8份&#xff09;完成的&#xff0c;当时的视频效果已发布在bilibili上&#xff0c;这是我们的省级的科研立项&#xff0c;其实就我一个人负责完成。发布bilibili后很多人比较感兴趣&#xff0c;打算年初回学校完成毕设期间开源的…

双目相机测距代码演示

双目测距的操作流程有四步&#xff1a;相机标定——双目校正——双目匹配——计算深度&#xff0c;具体内容参考 &#xff1a; https://blog.csdn.net/qq_38236355/article/details/88933839 其中相机标定通常用matlab现成的工具箱进行标定&#xff0c;具体操作参考&#xff1…

FMCW雷达测距实验

目录 1. FMCW雷达基本原理1.1 基本结构与原理 2. L波段FMCW雷达的基本介绍2.1 参数配置2.2 数据采集板2.3 雷达测距实验2.4 干扰机实验 本文介绍如何设计FMCW雷达测距实验。 1. FMCW雷达基本原理 1.1 基本结构与原理 雷达系统所用信号的频率随时间变化呈线性升高&#xff0c;这…

激光测距仪系统设计 c语言程序),激光测距仪系统设计(机械图,电路图,c语言程序)...

激光测距仪系统设计(机械图,电路图,c语言程序)(毕业论文22000字,cad图纸,答辩ppt) 摘 要 本次激光测距仪系统设计采用的是相位式测距法,相位激光测距又称调幅连续波激光测距通常是基于对目标回波相位的探测,在诸如军事、航空、工业和体育等领域已经取得广泛的应用。相位激光测…

红外测距模块 51单片机_智能激光测距

编者按:本文转载于酷耍(http:/kooshua.com) 一、设计目的 超声波测距和激光测距是现在比较常见的两种测距方式。两种方式相对比而言,激光测距的优点是以极小的一束激光发射出去再返回,精度为毫米级,几乎不受干扰,弥补了超声波测距易受环境干扰、误差大的缺陷。因此,采用激…