Fork/Join 框架是jdk7提供的一个用于并行执行任务的框架,是一个把大任务分成若干小任务,最终汇总每个小任务结果,最终得到大任务计算结果的框架。Fork:就是把一个大任务切分成若干小任务,并行执行;Join:就是合并这些小任务的执行结果,最终得到大任务的结果。Fork/Join 根据工作窃取算法进行设计工作窃取算法:
(work-stealing),是指某个线程从其他队列里窃取任务来执行。为什么需要工作窃取算法?假如有些线程一起工作,可能有些线程的工作早早结束,结束的线程与其等着,不如去帮其他线程干活,于是就去其他线程的队列里,窃取一个任务来执行。而在这时,他们会访问同一队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列维护;被窃取任务的线程永远从双端队列头部拿取任务执行;窃取任务线程永远从双端队列尾部拿取任务执行;Fork/Join 框架设计:Fork:分割任务。首先,我们需要一个fork类,来把大任务拆分成子任务,有可能子任务还很大,需要不停的分割,直到分割出子任务足够小。抽象类ForkJoinTask提供了两个子抽象类RecursiveAction:没有返回结果的任务RecursiveTask:有返回结果的任务Join:执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取线程并执行。子任务执行完的结果同一放到一个队列里,启动一个线程从队列里拿数据然后合并这些数据ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行使用场景:可以采用分而治之的算法场景计算密集型的任务示例:
计算 1+2+3+……+100,如果加数之间差值大于等于10,则拆分为子任务。
java">package org.test.mem.thread.pool.forkjoin;import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;/*** Fork/Join 框架是jdk7提供的一个用于并行执行任务的框架,是一个把大任务分成若干小任务,最终汇总每个小任务结果,最终得到大任务计算结果的框架。* <p>* Fork:就是把一个大任务切分成若干小任务,并行执行;* Join:就是合并这些小任务的执行结果,最终得到大任务的结果。* <p>* Fork/Join 根据工作窃取算法进行设计* <p>* 工作窃取算法:(work-stealing),是指某个线程从其他队列里窃取任务来执行。** 为什么需要工作窃取算法?* 假如有些线程一起工作,可能有些线程的工作早早结束,结束的线程与其等着,不如去帮其他线程干活,于是就去其他线程的队列里,窃取一个任务来执行。* 而在这时,他们会访问同一队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列维护;* 被窃取任务的线程永远从双端队列头部拿取任务执行;* 窃取任务线程永远从双端队列尾部拿取任务执行;** Fork/Join 框架设计:* Fork:分割任务。首先,我们需要一个fork类,来把大任务拆分成子任务,有可能子任务还很大,需要不停的分割,直到分割出子任务足够小。* 抽象类ForkJoinTask提供了两个子抽象类* RecursiveAction:没有返回结果的任务* RecursiveTask:有返回结果的任务** Join:执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取线程并执行。子任务执行完的结果同一放到一个队列里,启动一个线程从队列里拿数据* 然后合并这些数据** ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行*** 使用场景:* 可以采用分而治之的算法场景* 计算密集型的任务** 实例:* 计算 1+2+3+……+100,如果加数之间差值大于等于10,则拆分为子任务。** */
public class ForkJoinTest extends RecursiveTask<Integer> {private static final int THRESHOLD = 10;private int start;private int end;public ForkJoinTest(int start, int end) {this.start = start;this.end = end;}public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();//计算任务ForkJoinTest task = new ForkJoinTest(1, 100);//异步执行任务ForkJoinTask<Integer> result = forkJoinPool.submit(task);try {System.out.println(result.get());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}}@Overrideprotected Integer compute() {int sum = 0;if (end - start < THRESHOLD) {for (int i = start; i <= end; i++) {sum += i;
// System.out.println(Thread.currentThread().getName() + ":" + i + " sum:" + sum);}System.out.println(Thread.currentThread().getName() +" sum:" +sum +" start:"+start+" end:"+end);return sum;}else{int middle = (end + start) / 2;ForkJoinTest leftForkJoin = new ForkJoinTest(start, middle);ForkJoinTest rightForkJoin = new ForkJoinTest(middle + 1, end);leftForkJoin.fork();rightForkJoin.fork();int leftResult = leftForkJoin.join();int rightResult = rightForkJoin.join();sum = leftResult + rightResult;return sum;}}
}