Java 并行框架 Fork Join
- 一.Fork Join 简介
- 1.框架说明
- 2.任务说明
- 二.应用示例
- 1.RecursiveTask
- 分组示例
- 分组求和
- 2.RecursiveAction
- 3.CountedCompleter
- 三.ForkJoin 实践
- 代码
- 测试
- 1.测试用 Excel 文件
- 2.读取结果
一.Fork Join 简介
1.框架说明
ForkJoinPool 继承自 AbstractExecutorService , AbstractExecutorService 实现了 ExecutorService 接口
ForkJoin 是 Java 自带的一个并行框架,关于并行和并发的差异则看机器是否为多核配置
ForkJoinPool 之于 ThreadPoolExecutor 的差异即封装了一个双端工作队列,用于缓存父子任务,同时引入工
作窃取算法,如果某个队列任务全部处理完成,则该任务队列的线程从其他队列头取任务进行处理,充分利
用子线程资源
2.任务说明
ForkJoin 基于分治思想将大的工作任务,分解为小的处理
任务抽象类 ForkJoinTask , 预置了三个默认实现
RecursiveTask、RecursiveAction、CountedCompleter
二.应用示例
1.RecursiveTask
分组示例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;/*** @author * @date 2023-06-08 19:29* @since 1.8*/
public class ForkRecursiveTask<T> extends RecursiveTask<List<T>> {/*** 任务拆分参数*/private int left;private int mid;private int right;private int batch;/*** 原始数据*/private List<T> list;public ForkRecursiveTask(int batch, List<T> list){this.batch = batch;this.list = list;}@Overridepublic List<T> compute() {left = 0;right = list.size();//判断是否开始处理if (right > batch){//ArrayList 二分拆分mid = (left + right)/2;List<T> tempLeft = list.subList(left,mid);List<T> tempRight = list.subList(mid,right);//创建子任务ForkRecursiveTask<T> l = new ForkRecursiveTask(batch,tempLeft);ForkRecursiveTask<T> r = new ForkRecursiveTask(batch,tempRight);//调用子任务,调用 5,6,7,8,9 (调用右侧,执行左侧,将右侧结果拼接到左侧)r.fork();//递归执行,执行 0,1,2,3,4List<T> tempL = l.compute();//阻塞取子任务结果List<T> tempR = r.join();//聚合:将右侧合并到左侧后面//tempR.addAll(tempL);tempL.addAll(tempR);return tempL;} else {return handler(list);}}/*** 处理方法* @param temp* @return*/private List<T> handler(List<T> temp){return new ArrayList<>(temp);}
}
测试类
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;/*** @author* @date 2023-06-08 19:59* @since 1.8*/
public class ForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(3);List<Task> list = new ArrayList<>(10);for (int i=0;i<10;i++){list.add(new Task(i));}ForkRecursiveTask<Task> taskTaskFork = new ForkRecursiveTask<>(3,list);System.out.println(pool.invoke(taskTaskFork));}
}
分组求和
分组求和,打印分组过程
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;/*** @author * @date 2023-06-09 20:59* @since 1.8*/
public class ForkRecursiveTaskM extends RecursiveTask<Integer> {private int batch;private int step = 2;private List<Integer> list;public ForkRecursiveTaskM(int batch,int step, List<Integer> list){this.batch = batch;this.step = step;this.list = list;}@Overrideprotected Integer compute() {int size = list.size();if (size > batch){//拆分任务int length = size/step;List<List<Integer>> temp = split(list,batch,step);List<ForkRecursiveTaskM> tasks = new ArrayList<>(temp.size());for (List<Integer> d :temp){tasks.add(new ForkRecursiveTaskM(batch,step,d));}//调用子任务for (ForkRecursiveTaskM t:tasks){t.fork();}//取结果int result = 0;for (ForkRecursiveTaskM t:tasks){result += t.join();}return result;} else {int result=0;for (Integer i:list){result += i;}System.out.println(list + "---" + result);return result;}}public <T> List<List<T>> split(List<T> list,int batch,int step){int size = list.size();int real = size%batch > 0 ? size/batch + 1 : size/batch;step = Math.min(real,step);int length = Math.max(size / step,batch);List<List<T>> temp = new ArrayList<>(step);for (int i = 0;i < step;i++){int start = i * length;int end = (i + 1) * length;if ((i + 1) == step){end = size;}temp.add(list.subList(start,end));}return temp;}
}
测试类
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;/*** @author* @date 2023-06-08 19:59* @since 1.8*/
public class ForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(3);List<Task> list = new ArrayList<>(10);for (int i=0;i<10;i++){list.add(new Task(i));}List<Integer> sums = new ArrayList<>(10);for (int i=0;i<10;i++){sums.add(i);}ForkRecursiveTaskM taskTaskFork = new ForkRecursiveTaskM(3,3,sums);System.out.println(pool.invoke(taskTaskFork));}
}
2.RecursiveAction
集合分组,并打印分组过程
import java.util.List;
import java.util.concurrent.RecursiveAction;/*** @author* @date 2023-06-14 20:09* @since 1.8*/
public class ForkRecursiveAction extends RecursiveAction {/*** 任务拆分参数*/private int left;private int mid;private int right;private int batch;/*** 原始数据*/private List<Task> list;public ForkRecursiveAction(int batch, List<Task> list){this.batch = batch;this.list = list;}@Overrideprotected void compute() {left = 0;right = list.size();//判断是否开始处理if (right > batch){//ArrayList 二分拆分mid = (left + right)/2;List<Task> tempLeft = list.subList(left,mid);List<Task> tempRight = list.subList(mid,right);//创建子任务ForkRecursiveAction l = new ForkRecursiveAction(batch,tempLeft);ForkRecursiveAction r = new ForkRecursiveAction(batch,tempRight);//调用子任务l.compute();r.compute();} else {System.out.println(list);}}
}
测试类
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;/*** @author* @date 2023-06-08 19:59* @since 1.8*/
public class ForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(3);List<Task> list = new ArrayList<>(10);for (int i=0;i<10;i++){list.add(new Task(i));}ForkRecursiveAction taskTaskFork = new ForkRecursiveAction(3,list);pool.invoke(taskTaskFork);}
}
3.CountedCompleter
集合拆分与合并
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.atomic.AtomicReference;/*** @author* @date 2023-06-08 21:16* @since 1.8*/
public class ForkCountedCompleter<T> extends CountedCompleter<List<T>> {private int left;private int right;private int mid;private int batch;private List<T> list;private AtomicReference<List<T>> temp;public ForkCountedCompleter(CountedCompleter<BigInteger> parent, int batch, List<T> list, AtomicReference<List<T>> temp) {super(parent);this.batch = batch;this.list = list;this.temp = temp;}/*** 执行*/@Overridepublic void compute () {left = 0;right = list.size() ;if (right > batch){mid = (left + right)/2;//ArrayList 二分拆分List<T> tempLeft = list.subList(left,mid);List<T> tempRight = list.subList(mid,right);//创建子任务ForkCountedCompleter taskLeft = new ForkCountedCompleter(this,batch,tempLeft,temp);ForkCountedCompleter taskRight = new ForkCountedCompleter(this,batch,tempRight,temp);taskLeft.fork();taskRight.fork();//计数addToPendingCount(2);} else {//获取子任务结果temp.get().addAll(list);}//判断任务是否完成propagateCompletion();}
}
测试类
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;/*** @author* @date 2023-06-08 19:59* @since 1.8*/
public class ForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(3);List<Task> list = new ArrayList<>(10);for (int i=0;i<10;i++){list.add(new Task(i));}AtomicReference<List<Task>> temp = new AtomicReference<>(new ArrayList<>());ForkCountedCompleter<Task> taskTaskFork = new ForkCountedCompleter<>(null,3,list,temp);pool.invoke(taskTaskFork);System.out.println(temp);}
}
三.ForkJoin 实践
代码
利用 ForkJoin 实现 Excel 数据导入数据并行处理
Excel 解析依赖包
<dependency><groupId>com.monitorjbl</groupId><artifactId>xlsx-streamer</artifactId><version>2.2.0</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions>
</dependency>
Excel ForkJoin 处理类
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;/*** @author* @date 2023-06-22 13:11* @since 1.8*/
@Slf4j
public class ExcelFork extends RecursiveTask<List<User>> {Logger logger = LoggerFactory.getLogger(ExcelFork.class);private int start;private int end;private int batch;private Sheet sheet;/*** 构造任务参数* @param start* @param end* @param sheet*/public ExcelFork(int start, int end,int batch, Sheet sheet) {this.start = start;this.end = end;this.batch = batch;this.sheet = sheet;}/**** @return*/@Overrideprotected List<User> compute() {//数据异常if (start > end ) {return new ArrayList<>(0);}//最小任务取数据if (end - start <= batch) {return minimumTask(sheet, start, end);} else {/*** 二分拆分,直到每块数据量满足设置的阈值* 如果是读取 Excel 一般从第 1 行开始 除 2 取整左侧可能较小* 例:0,1,2,3,4,5 0 为表头 2个一组则数据行如下:1,2;3,4,5 在分 3,4;5 最终就是 3 个子任务*/int mid = (start + end) / 2;/*** 分别创建子任务*/ExcelFork rightTask = new ExcelFork(start, mid,batch, sheet);ExcelFork leftTask = new ExcelFork(mid + 1, end,batch, sheet);//写法一rightTask.fork();List<User> leftList = leftTask.compute();List<User> rightList = rightTask.join();//将左边和右边的数据合并leftList.addAll(rightList);return leftList;}}/*** 最小任务单元* @param sheet* @param start* @param end* @return*/private List<User> minimumTask(Sheet sheet, int start, int end) {List<User> mapList = new ArrayList<>(batch);User user;Row row;for (int i = start; i <= end; i++) {try {row = sheet.getRow(i);String code = String.valueOf(row.getCell(0).getNumericCellValue());String name = row.getCell(1).getStringCellValue();user = new User(code,name,0);mapList.add(user);} catch (Exception e) {logger.info("Exception:", e);}}return mapList;}
}
人员类
/*** @author* @date 2023-06-22 13:01* @since 1.8*/
public class User {private String code;private String name;private int age;public User(int age,String name){this.age = age;this.name = name;}public User(String code,String name,int age){this.code = code;this.name = name;this.age = age;}public String getCode() {return code;}public void setCode(String code) {this.code = code;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString(){return "{\"Code\":\"" + code + "\",\"Name\":\"" + name + "\",\"age\":" + age + "}";}
}
测试类
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;import java.io.*;
import java.util.List;
import java.util.concurrent.ForkJoinPool;/*** @author * @date 2023-06-22 13:22* @since 1.8*/
public class Test {public static void main(String[] args) throws IOException {File file = new File("C:\\Users\\Desktop\\Test ForkJoin.xlsx");InputStream inputStream = new FileInputStream(file);Workbook workbook = new XSSFWorkbook(inputStream);ForkJoinPool forkJoinPool = new ForkJoinPool(4);;Sheet sheet = workbook.getSheetAt(0);//开启任务ExcelFork joinTask = new ExcelFork(1, sheet.getLastRowNum(),2000, sheet);List<User> result = forkJoinPool.invoke(joinTask);System.out.println(result);}
}