掌握并发框架 Fork/Join
1、什么是Fork/Join
Fork/Join
框架是一个实现了ExecutorService
接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器
来提高应用程序的性能。
与ExecutorService接口其他实现框架相同的是,Fork/Join框架会将任务分配给线程池中的线程;而不同的是,Fork/Join框架在执行任务时使用了工作窃取算法!
Fork/Join
的运行流程大致如下:
需要注意的是,图里的次级子任务可以一直分下去,一直分到子任务足够小为止。伪代码来表示如下:
solve(任务):
if(任务已经划分到足够小):
顺序执行任务
else:
for(划分任务得到子任务)
solve(子任务)
结合所有子任务的结果回到上一层循环
return 最终结合的结果
通过上面伪代码可以看出,我们通过递归嵌套的计算得到最终结果,这里体现了 分而治之(divide and conquer) 的算法思想。
Fork在英文里有分叉的意思,Join在英文里连接、结合的意思;顾名思义,Fork就是要使一个大任务分解成若干个小任务,而Foin就是最后将各个小任务的结果结合起来得到大任务的结果。了解大数据的小伙伴应该知道,这种设计思想类似于大数据库里的MapReduce框架,概念“Map(映射)”和“Reduce(归约)”。
2、什么是工作窃取算法
工作窃取算法:指在多线程在执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行!
工作窃取流程如下所示:
值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。
被窃取任务的线程从双端队列的头部拿任务执行,而窃取任务的线程从双端队列的尾部拿任务执行。
另外,当一个线程在窃取其他线程的任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”!
3、Fork/Join底层原理
前面我们说Fork/Join框架简单来讲就是对大任务的分割与子任务的合并,所以要实现这个框架,先得有任务。
在Fork/Join框架里提供了抽象类ForkJoinTask
来实现任务。
3.1 ForkJoinTask
ForkJoinTask
是一个类似普通线程的实体,但是比普通线程轻量得多,它提供两个最重要的方法:fork()、join()。
ForkJoinTask.fork():使用线程池中的空闲线程异步提交任务
// 本文所有代码都引自Java 8
public final ForkJoinTask<V> fork() {
Thread t;
// ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理
// 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
// 如果不是则将线程加入队列
// 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
ForkJoinPool.common.externalPush(this);
return this;
}
其实fork()只做了一件事,那就是把任务塞进当前工作线程的工作队列里!
ForkJoinTask.join():等待处理任务的线程处理完毕,获得返回值
public final V join() {
int s;
// doJoin()方法来获取当前任务的执行状态
if ((s = doJoin() & DONE_MASK) != NORMAL)
// 任务异常,抛出异常
reportException(s);
// 任务正常完成,获取返回值
return getRawResult();
}
/**
* doJoin()方法用来返回当前任务的执行状态
**/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
return (s = status) < 0 ? s :
// 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
// 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
// tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
// doExec()方法执行任务
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
// 如果是处于顶端并且任务执行完毕,返回结果
tryUnpush(this) && (s = doExec()) < 0 ? s :
// 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
// awaitJoin():使用自旋使任务执行完成,返回结果
wt.pool.awaitJoin(w, this, 0L) :
// 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
externalAwaitDone();
}
我们都知道,Thread.join()会使线程阻塞,而ForkJoin.join()
会使线程免于阻塞。ForkJoin.join()
流程图:
3.2 ForkJoinPool
ForkJoinPool是用于执行ForkJoinTask任务的执行池(线程池)。
ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显然,线程的运行状态也是在这里处理!
我们来大致看下ForkJoinPool的源码:
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
// 任务队列
volatile WorkQueue[] workQueues;
// 线程的运行状态
volatile int runState;
// 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
// 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
static final ForkJoinPool common;
// 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用
// 其他构造方法都是源自于此方法
// parallelism: 并行度,
// 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory, // 工作线程工厂
UncaughtExceptionHandler handler, // 拒绝任务的handler
int mode, // 同步模式
String workerNamePrefix) { // 线程名prefix
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
}
WorkQueue: 双端队列,ForkJoinTask存放在这里。当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。
**runState:**ForkJoinPool的运行状态。SHUTDOWN状态用负数表示,其他用2的幂次表示。
4、Fork/Join的陷阱与注意事项
使用Fork/Join框架时,需要注意一些陷阱, 在下面 斐波那契数列
例子中你将看到示例。
4.1 避免不必要的fork()
划分成两个子任务后,不要同时调用两个子任务的fork()方法。
表面上看上去两个子任务都fork(),然后join()两次似乎更自然。但事实证明,直接调用compute()效率更高。因为直接调用子任务的compute()方法实际上就是在当前的工作线程进行了计算(线程重用),这比“将子任务提交到工作队列,线程又从工作队列中拿任务”快得多。
当一个大任务被划分成两个以上的子任务时,尽可能使用前面说到的三个衍生的invokeAll方法,因为使用它们能避免不必要的fork()。
4.2 注意fork()、compute()、join()的顺序
为了两个任务并行,三个方法的调用顺序需要万分注意。
right.fork(); // 计算右边的任务
long leftAns = left.compute(); // 计算左边的任务(同时右边任务也在计算)
long rightAns = right.join(); // 等待右边的结果
return leftAns + rightAns;
如果我们写成:
left.fork(); // 计算完左边的任务
long leftAns = left.join(); // 等待左边的计算结果
long rightAns = right.compute(); // 再计算右边的任务
return leftAns + rightAns;
或者
long rightAns = right.compute(); // 计算完右边的任务
left.fork(); // 再计算左边的任务
long leftAns = left.join(); // 等待左边的计算结果
return leftAns + rightAns;
这两种实际上都没有并行!
4.3 选择合适的子任务粒度
选择划分子任务的粒度(顺序执行的阈值)很重要,因为使用Fork/Join框架并不一定比顺序执行任务的效率高: 如果任务太大,则无法提高并行的吞吐量;如果任务太小,子任务的调度开销可能会大于并行计算的性能提升,我们还要考虑创建子任务、fork()子任务、线程调度以及合并子任务处理结果的耗时以及相应的内存消耗。
官方文档给出的粗略经验是: 任务应该执行100~10000
个基本的计算步骤。决定子任务的粒度的最好办法是实践,通过实际测试结果来确定这个阈值才是“上上策”。
和其他Java代码一样,Fork/Join框架测试时需要“预热”或者说执行几遍才会被JIT(Just-in-time)编译器优化,所以测试性能之前跑几遍程序很重要。
4.4 避免重量级任务划分与结果合并
Fork/Join的很多使用场景都用到数组或者List等数据结构,子任务在某个分区中运行,最典型的例子如并行排序和并行查找。拆分子任务以及合并处理结果的时候,应该尽量避免System.arraycopy这样耗时耗空间的操作,从而最小化任务的处理开销。
4.5 有哪些JDK源码中使用了Fork/Join思想?
我们常用的数组工具类 Arrays 在JDK 8之后新增的并行排序方法(parallelSort)就运用了 ForkJoinPool 的特性,还有 ConcurrentHashMap 在JDK 8之后添加的函数式方法(如forEach等)也有运用。
4.6 关于Fork/Join异常处理
Java的受检异常机制一直饱受诟病,所以在ForkJoinTask的invoke()、join()方法及其衍生方法中都没有像get()方法那样抛出个ExecutionException的受检异常。
所以你可以在ForkJoinTask中看到内部把受检异常转换成了运行时异常。
static void rethrow(Throwable ex) {
if (ex != null)
ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}
@SuppressWarnings("unchecked")
static <T extends Throwable> void uncheckedThrow(Throwable t) throws T {
throw (T)t; // rely on vacuous cast
}
@pdai: 代码已经复制到剪贴板
关于Java你不知道的10件事中已经指出,JVM实际并不关心这个异常是受检异常还是运行时异常,受检异常这东西完全是给Java编译器用的: 用于警告程序员这里有个异常没有处理。
但不可否认的是invoke、join()仍可能会抛出运行时异常,所以ForkJoinTask还提供了两个不提取结果和异常的方法quietlyInvoke()、quietlyJoin(),这两个方法允许你在所有任务完成后对结果和异常进行处理。
使用quitelyInvoke()和quietlyJoin()时可以配合isCompletedAbnormally()和isCompletedNormally()方法使用!
5、Fork/Join使用举例
RecursiveAction和RecursiveTask
上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类 RecursiveTask 和RecursiveAction
来替代ForkJoinTask。
RecursiveAction 和 RecursiveTask 都是ForkJoinTask的子类,前者可以看做是无返回值的ForkJoinTask,后者是有返回值的ForkJoinTask!
此外,两个子类都有执行主要计算的方法compute()
,当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。
5.1 计算1+2+3+…+10000
public class Test {
static final class SumTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
final int start; //开始计算的数
final int end; //最后计算的数
SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
//如果计算量小于1000,那么分配一个线程执行if中的代码块,并返回执行结果
if(end - start < 1000) {
System.out.println(Thread.currentThread().getName() + " 开始执行: " + start + "-" + end);
int sum = 0;
for(int i = start; i <= end; i++)
sum += i;
return sum;
}
//如果计算量大于1000,那么拆分为两个任务
SumTask task1 = new SumTask(start, (start + end) / 2);
SumTask task2 = new SumTask((start + end) / 2 + 1, end);
//执行任务
task1.fork();
task2.fork();
//获取任务执行的结果
return task1.join() + task2.join();
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> task = new SumTask(1, 10000);
pool.submit(task);
System.out.println(task.get());
}
}
输出结果:
ForkJoinPool-1-worker-1 开始执行: 1-625
ForkJoinPool-1-worker-7 开始执行: 6251-6875
ForkJoinPool-1-worker-6 开始执行: 5626-6250
ForkJoinPool-1-worker-10 开始执行: 3751-4375
ForkJoinPool-1-worker-13 开始执行: 2501-3125
ForkJoinPool-1-worker-8 开始执行: 626-1250
ForkJoinPool-1-worker-11 开始执行: 5001-5625
ForkJoinPool-1-worker-3 开始执行: 7501-8125
ForkJoinPool-1-worker-14 开始执行: 1251-1875
ForkJoinPool-1-worker-4 开始执行: 9376-10000
ForkJoinPool-1-worker-8 开始执行: 8126-8750
ForkJoinPool-1-worker-0 开始执行: 1876-2500
ForkJoinPool-1-worker-12 开始执行: 4376-5000
ForkJoinPool-1-worker-5 开始执行: 8751-9375
ForkJoinPool-1-worker-7 开始执行: 6876-7500
ForkJoinPool-1-worker-1 开始执行: 3126-3750
50005000
5.2 斐波那契数列
斐波那契数列:1、1、2、3、5、... 公式 : F(1) = 1,F(2) = 1, F(n) = F(n-1) + F(n-2) (n >= 3,n∈N*)
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 最大并发数4
Fibonacci fibonacci = new Fibonacci(20);
long startTime = System.currentTimeMillis();
Integer result = forkJoinPool.invoke(fibonacci);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
//以下为官方API文档示例
static class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
输出结果:
Fork/join sum: 6765 in 10 ms.
当然你也可以两个任务都fork,要注意的是两个任务都fork的情况,必须按照f1.fork(),f2.fork(), f2.join(),f1.join()这样的顺序,不然有性能问题,详见上面注意事项中的说明。
官方API文档是这样写到的,所以平日用invokeAll就好了。invokeAll会把传入的任务的第一个交给当前线程来执行,其他的任务都fork加入工作队列,这样等于利用当前线程也执行任务了。
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 最大并发数4
Fibonacci fibonacci = new Fibonacci(20);
long startTime = System.currentTimeMillis();
Integer result = forkJoinPool.invoke(fibonacci);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
//以下为官方API文档示例
static class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
Fibonacci f2 = new Fibonacci(n - 2);
invokeAll(f1,f2);
return f2.join() + f1.join();
}
}
输出结果:
Fork/join sum: 6765 in 12 ms.
6、总结
Fork/Join
框架是一个实现了ExecutorService
接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器
来提高应用程序的性能。
工作窃取算法,即在多线程在执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行!
ForkJoinTask提供了两个最重要的方法: fork()
使用线程池中的空闲线程异步提交任务, join()等待处理任务的线程处理完毕
,获得返回值。
ForkJoinPool是用于执行ForkJoinTask任务的执行池(线程池)。 ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显然,线程的运行状态也是在这里处理!
我们常用的数组工具类 Arrays
在JDK 8之后新增的并行排序方法(parallelSort)就运用了 ForkJoinPool 的特性,还有 ConcurrentHashMap
在JDK 8之后添加的函数式方法(如forEach等)也有运用!