Fork/Join框架是一个java 7提供的用于并行执行任务的框架,是一个把大型任务分割成若干个小任务最终汇总每个小任务结果后得到大任务结果的框架。
工作窃取算法是指某个线程从其他队列里窃取任务来执行。使用工作窃取算法可以充分利用线程进行并行计算,减少了线程间的竞争。但是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源,比如创建多个线程和双端队列。
1. 分割任务。需要有一个fork类把大任务分割成子任务,如果子任务还是很大,就需要不停地分割,直到分割出的子任务足够小。
2. 执行任务并合并结果。分割的子任务放在双端队列里,然后几个启动线程分别从双端队里里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上的两件事情:
(1) ForkJoinTask:使用Fork/Join框架,必须首先创建一个ForkJoin任务。她提供在任务中执行fork()和join()操作的机制。通常情况下,不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类:
RecursiveAction:用于没有返回结果的任务 RecursiveTask:用于有返回结果的任务
(2)ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行
分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
ForkJoinTask在执行的时候可能会抛出异常,但是没法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或者已经被取消了,并且可以通过ForkJoinTask的getException方法来获取异常
if(task.isCompletedAbnormally){ System.out.PRintln(task.getException())}ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提供提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
(1)ForkJoinTask的fork()方法的实现原理
当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoiWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。
public final ForkJoinTask<V> fork() { ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); return this; }pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后调用ForkJoinPool的signalWork()方法唤醒或者传一个工作线程来执行。
final void pushTask(ForkJoinTask<?> t) { ForkJoinTask<?>[] q; int s, m; if ((q = queue) != null) { //如果队列删除,则忽略 //queueTop 要推送或弹出的下一个队列插槽的索引(取模queue.length) long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; // or use putOrderedInt //最小有效队列槽的索引( queue.length取模),它始终是从非空中偷取的下一个位置。 if ((s -= queueBase) <= 2) pool.signalWork(); else if (s == m) growQueue();//创建或双倍扩展队列数组。 通过从旧数组窃取(deqs)并将最旧的数组放入新数组中来传递元素。 } }(2)ForkJoin的join方法实现原理
Join方法的主要作用是阻塞当前线程并等待获取结果。代码如下:
public final V join() { if (doJoin() != NORMAL) return reportResult();//返回非正常执行的结果 else return getRawResult();//getRawResult就算任务是非正常执行结果,也会返回,或者不知道执行结果时,直接返回null }首先,它调用了doJoin方法,得到当前任务的状态来判断任务的结果,任务状态有4中:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)、出现异常(EXCEPTIONAL)
如果任务状态是已完成,则直接返回任务结果 如果任务状态是被取消,则直接抛出CancellationException 如果任务状态是抛出异常,则直接抛出对应的异常
doJoin方法代码如下:
private int doJoin() { Thread t; ForkJoinWorkerThread w; int s; boolean completed; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { if ((s = status) < 0)//说明任务以执行完成 return s;//直接返回任务状态 if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {//从ForkJoinTask数组取出任务 try { completed = exec();//立即执行任务 } catch (Throwable rex) { return setExceptionalCompletion(rex);//设置任务状态为EXCEPTIONAL } if (completed)//任务已完成 return setCompletion(NORMAL);//设置任务状态为NORMAL } return w.joinTask(this)//可能运行一些任务,直到当前任务完成 } else return externalAwaitDone();//阻塞非工作线程,直到完成 }新闻热点
疑难解答