还记得我们在初始介绍线程池的时候提到了Executor框架的体系,到现在为止我们只有一个没有介绍,与ThreadPoolExecutor一样继承与AbstractExecutorService的ForkJoinPool.Fork/Join框架是java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
我们通过表面的意思去理解ForkJoin框架:Fork即把一个大任务切割成若干部分并行执行,join即把这些被切分的任务的执行结果合并一起汇总,我们可以用下图来表示:
Fork / Join的逻辑很简单:
(1)将每个大任务分离(fork)为较小的任务; (2)在单独的线程中处理每个任务(如果必要,将它们分离成更小的任务); (3)加入结果。Fork/Join框架的核心是由下列两个类组成的。
①ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm)。它管理工作者线程,并提供任务的状态信息,以及任务的执行信息。
②ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。
理解一个概念的最好方法是在实践中体会他,我们先写一个小程序,在此基础上一点一点来分析:
public class ForkJoinPoolTest { public static void main(String[] args) throws InterruptedException { ForkJoinPool pool = new ForkJoinPool(); pool.submit(new PRintTask(1,100)); pool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束 pool.shutdown(); }}class PrintTask extends RecursiveAction{ private int start; private int end; private int num; final int MAX = 50; public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { if(end - start < 50){ for(int i = start;i <= end; i++){ num += i; } System.out.println("当前任务结果为: "+num); }else{ int mid = (end + start)/2; PrintTask left = new PrintTask(start,mid); PrintTask right = new PrintTask(mid+1,end); left.fork(); right.fork(); } }}结果为:
当前任务结果为: 3775当前任务结果为: 1275Process finished with exit code 0我们通过结果可以看到当前任务被分裂为两个子任务去执行。而执行任务的类继承了RecursiveAction这个类,那他到底在Fork-Join框架中发挥什么作用呢?我们不妨看一下:
首先我们来看一下Fork-Join框架提交任务的方法仍旧还是submit和execute:
void execute(ForkJoinTask<?> task) //安排(异步)执行给定任务void execute(Runnable task) //在未来的某个时候执行给定的命令<T> ForkJoinTask<T> submit(Callable<T> task) //执行一个有返回值得任务,返回一个Future类型的实例代表任务的结果<T> ForkJoinTask<T> submit(ForkJoinTask<T> task) //提交一个ForkJoinTask类型的任务ForkJoinTask<?> submit(Runnable task) //提交一个Runnable类型的任务,返回一个Future类型的实例代表任务结果 <T> ForkJoinTask<T> submit(Runnable task, T result) //提交一个Runnable类型的任务,返回一个Future类型的实例代表任务结果由execute和submit的参数我们可以看到Fork-join框架可以提交ForkJoinTask,Callable和Runnable类型的任务。这个ForkJoinTask我们之前没见过,先来看一下:
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {}我们看到ForkJoinTask实现了Future接口,一个ForkJoinTask是一个轻量级的Future。对ForkJoinTask效率源于一组限制(这只是部分静态强制执行)反映其用途作为计算任务计算纯函数或纯粹孤立的对象操作。主要的协调机制fork(),安排异步执行,而不进行join(),直到任务的结果已经计算。通常我们并不直接继承 ForkJoinTask,它包含了太多的抽象方法。针对特定的问题,我们可以选择 ForkJoinTask 的不同子类来完成任务:
RecursiveAction:用于任务没有返回结果的场景。RecursiveTask:用于任务有返回结果的场景。上面的例子中我们就是继承了RecursiveAction子类用于没有返回结果的场景,下面我们再看一下RecursiveTask用于有返回结果的场景:
public class TestRecursiveTask { public static void main(String[] args) { Integer result = 0; ForkJoinPool pool = new ForkJoinPool(); Future<Integer> future = pool.submit(new SumTask(30)); try { result = future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println(result+"==========================="); }}class SumTask extends RecursiveTask<Integer> { int num; public SumTask(int num) { this.num = num; } @Override protected Integer compute() { if(num <= 20){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生产完成"+num+"个产品"); return num; }else{ SumTask task1 = new SumTask(20); SumTask task2 = new SumTask(num - 20); task1.fork(); task2.fork(); return task1.join() + task2.join(); } }}结果为:
生产完成20个产品生产完成10个产品30===========================Process finished with exit code 0我们看到继承RecursiveTask类指定了返回值类型为Integer,在compute方法中的返回值类型即为Integer类型。
从以上的例子中可以看到,通过使用 Fork/Join 模式,软件开发人员能够方便地利用多核平台的计算能力。尽管还没有做到对软件开发人员完全透明,Fork/Join 模式已经极大地简化了编写并发程序的琐碎工作。对于符合 Fork/Join 模式的应用,软件开发人员不再需要处理各种并行相关事务,例如同步、通信等,以难以调试而闻名的死锁和 data race 等错误也就不会出现,提升了思考问题的层次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。
新闻热点
疑难解答