public class CountTask extends RecursiveTask<Integer> {PRivate static final long serialVersionUID = -3454816350595604316L;private static final int threshold = 2; // 阈值private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}public static void main(String[] args) throws InterruptedException, ExecutionException {ForkJoinPool joinPool = new ForkJoinPool();//生成一个计算任务CountTask countTask = new CountTask(2, 266666);//执行任务Future< Integer> future = joinPool.submit(countTask);System.out.println(future.get()); int cc = 2+3+4+5+6+7+8+9+10+11; System.out.println(cc);}@Overrideprotected Integer compute() {int sum = 0;// 如果任务足够小就计算任务boolean cancompute = (end - start) <= threshold;if (cancompute) {for (int i = start; i <= end; i++) {sum = sum + i;}} else {// 如果任务大于阈值 ,就会分解成两个小任务计算int middle = (start + end) / 2;CountTask left = new CountTask(start, middle);CountTask right = new CountTask(middle + 1, end);// 执行子任务left.fork();right.fork();// 等待子任务执行结束,并得到子任务结果int leftresult = left.join();int rightresult = right.join();sum = leftresult + rightresult;}return sum;}}
Fork/Join框架主要有以下两个类组成. * ForkJoinPool
这个类实现了ExecutorService
接口和工作窃取算法(Work-Stealing Algorithm).它管理工作者线程,并提供任务的状态信息,以及任务的执行信息 * ForkJoinTask
这个类是一个将在ForkJoinPool
执行的任务的基类.
Fork/Join框架提供了在一个任务里执行fork()
和join()
操作的机制和控制任务状态的方法.通常,为了实现Fork/Join
任务,需要实现一个以下两个类之一的子类 * RecursiveAction
用于任务没有返回值的场景 * RecursiveTask
用于任务有返回值的场景.
fork-join框架
fork操作的作用是把一个大的问题划分成若干个较小的问题。在这个划分过程一般是递归进行的。直到可以直接进行计算。需要恰当地选取子问题的大小。太大的子问题不利于通过并行方式来提高性能,而太小的子问题则会带来较大的额外开销。每个子问题计算完成后,可以得到关于整个问题的部分解。join操作的作用是把这些分解手机组织起来,得到完整解。
简单的说,ForkJoin其核心思想就是分治。Fork分解任务,Join收集数据。

在fork/join框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行。那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行。这种方式减少了线程的等待时间,提高了性能。子问题中应该避免使用synchronized关键词或其他方式方式的同步。也不应该是一阻塞IO或过多的访问共享变量。在理想情况下,每个子问题的实现中都应该只进行CPU相关的计算,并且只适用每个问题的内部对象。唯一的同步应该只发生在子问题和创建它的父问题之间。

Fork/Join框架的主要类
一个fork/join框架之下的任务由ForkJoinTask类表示。ForkJoinTask实现了Future接口,可以按照Future接口的方式来使用。在ForkJoinTask类中之重要的两个方法fork和join。fork方法用以一部方式启动任务的执行,join方法则等待任务完成并返回指向结果。在创建自己的任务是,最好不要直接继承自ForkJoinTask类,而要继承自ForkJoinTask类的子类RecurisiveTask或RecurisiveAction类。两种的区别在于RecurisiveTask类表示的任务可以返回结果,而RecurisiveAction类不行。
简单总结:
ForkJoin主要提供了两个主要的执行任务的接口。RecurisiveAction与RecurisiveTask 。
RecurisiveAction :没有返回值的接口。RecurisiveTask :带有返回值的接口。
fork/join框架任务的执行由ForkJoinTask类的对象之外,还可以使用一般的Callable和Runnable接口来表示任务。
ForkJoin要利用线程池ForkJoinPool。每个线程池都有一个WorkQueue实例。ForkJoinPool推荐查看JDK8的源码,比JDK7更利于理解。
在ForkJoinPool类的对象中执行的任务大支可以分为两类,一类通过execute、invoke或submit提交的任务;另一类是ForkJoinTask类的对象在执行过程中产生的子任务,并通过fork方法来运行。一般的做法是表示整个问题的ForkJoinTask类的对象用第一类型是提交,而在执行过程中产生的子任务并不需要进行处理,ForkJoinPool类对象会负责子任务的执行。

ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。使用方法与Executor框架类似。ForkJoinPool提供如下两个常用的构造器:
ForkJoinPool(int parallelism) 创建一个包含parallelism个并行线程的ForkJoinPool。
ForkJoinPool() 以Runtime.availableProcessors()方法的返回值作为parallelism参数来创建ForkJoinPool。
ForkJoinPool有如下三个方法启动线程:
使用ForkJoinPool的submit(ForkJoinTask task) 或 invoke(ForkJoinTask task) 方法来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。
| 客户端非fork/join调用 | 内部调用fork/join |
异步执行 | execute(ForkJoinTask) | ForkJoinTask.fork |
等待获取结果 | invoke(ForkJoinTask) | ForkJoinTask.invoke |
执行,获取Future | submit(ForkJoinTask) | ForkJoinTask.fork(ForkJoinTask are Futures) |
ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(...),invoke(...),submit(...)),共同决定着线程是否阻塞,具体请看下面的测试用例。
ForkJoinTask 是一个抽象类,它还有两个抽象子类:RecurisiveTask和RecurisiveAction。
RecurisiveTask代表有返回值的任务。RecursiveTask<T>是泛型类。T是返回值的类型。
RecurisiveAction代表没有返回值的任务。
异常处理
ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码:
if(task.isCompletedAbnormally()) { System.out.println(task.getException());}getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。
工作窃取原理

例子 先定个小目标,1亿就太多,先赚个一百万吧
现在你是一个深圳片区的某公司高级销售主管.现在定了一个目标,就是要赚个一百,让你一个人去赚,肯定有难度的.好在有一般手下,把目标缩小,让小弟们去赚,我们坐等拿钱.ok,开始编程
首先我们要定义个赚钱任务 MakeMoneyTask
,如果要赚钱的目标小于最小目标,比如十万,那么就自己去完成,否则,就把任务分给小弟们去做.public class MakeMoneyTask extends RecursiveTask<Integer>{ private static final int MIN_GOAL_MONEY = 100000; private int goalMoney; private String name; private static final AtomicLong employeeNo = new AtomicLong(); public MakeMoneyTask(int goalMoney){ this.goalMoney = goalMoney; this.name = "员工" + employeeNo.getAndIncrement() + "号"; } @Override protected Integer compute() { if (this.goalMoney < MIN_GOAL_MONEY){ System.out.println(name + ": 老板交代了,要赚 " + goalMoney + " 元,为了买车买房,加油吧...."); return makeMoney(); }else{ int subThreadCount = ThreadLocalRandom.current().nextInt(10) + 2; System.out.println(name + ": 上级要我赚 " + goalMoney + ", 有点小多,没事让我" + subThreadCount + "个手下去完成吧," + "每人赚个 " + Math.ceil(goalMoney * 1.0 / subThreadCount) + "元应该没问题..."); List<MakeMoneyTask> tasks = new ArrayList<>(); for (int i = 0; i < subThreadCount; i ++){ tasks.add(new MakeMoneyTask(goalMoney / subThreadCount)); } Collection<MakeMoneyTask> makeMoneyTasks = invokeAll(tasks); int sum = 0; for (MakeMoneyTask moneyTask : makeMoneyTasks){ try { sum += moneyTask.get(); } catch (Exception e) { e.printStackTrace(); } } System.out.println(name + ": 嗯,不错,效率还可以,终于赚到 " + sum + "元,赶紧邀功去...."); return sum; } } private Integer makeMoney(){ int sum = 0; int day = 1; try { while (true){ Thread.sleep(ThreadLocalRandom.current().nextInt(500)); int money = ThreadLocalRandom.current().nextInt(MIN_GOAL_MONEY / 3); System.out.println(name + ": 在第 " + (day ++) + " 天赚了" + money); sum += money; if (sum >= goalMoney){ System.out.println(name + ": 终于赚到 " + sum + " 元, 可以交差了..."); break; } } } catch (InterruptedException e) { e.printStackTrace(); } return sum; }}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657最后我们写一个测试类public class TestMain { public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> task = pool.submit(new MakeMoneyTask(1000000)); do { try { TimeUnit.MILLISECONDS.sleep(5); }catch (InterruptedException e){ e.printStackTrace(); } }while (!task.isDone()); pool.shutdown(); System.out.println(task.get()); }}