0%

Fork/Join:单机版的MapReduce

对于简单的并行任务,可以通过”线程池+Future”的方案来解决;
如果任务之间有聚合关系,无论是AND聚合还是OR聚合,都可以通过CompletetableFuture来解决;
而批量的并行任务,则可以通过CompletionService来解决。

并发编程可以分为三个层面的问题,分别是分工、协作、互斥。
关注任务,从并发编程的细节中挑出来,从任务的视角看,线程池、Future、CompletiontableFuture、CompletionService可以列到分工中。

日常工作中的并发场景,简单并行、聚合、批量并行、分治。
分治-分而治之,是一种解决复杂问题的思维方法和模式;指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解。理论上,解决每一个问题都对应着一个任务,对于问题的分治,实际上就是对于任务的分治。

归并排序、快速排序、二分查找等都属于分治算法;大数据计算框架MapReduce背后的思想也是分治。
数据结构与算法-分治算法

Java并发包中的并行计算框架Fork/Join支持分治这种任务模型。

分治任务模型

分治任务模型可分为两个阶段:一个阶段是任务分解,也就是将任务迭代的分解为子任务,直至子任务可以直接计算出结果;另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获得最终结果。

在分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,往往都采用递归算法。

Fork/Join的使用

Fork/Join是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的Fork对应的是分治任务模型里的任务分解,Join对应的是结果合并。Fork/Join计算框架主要包含两部分,一部分是分治任务的线程池ForkJoinPool,另一部分是分治任务ForkJoinTask。这两部分的关系类似于ThreadPoolExecutor和Runnable的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型ForkJoinTask。

ForkJoinTask是一个抽象类,它的方法有很多,最核心的是fork()方法和join()方法,其中fork()方法会异步的执行一个子任务,而join()方法则会阻塞当前线程来等待子任务的执行结果。ForkJoinTask有两个子类:RecursiveActionRecursiveTask,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法compute(),区别是RecursiveAction定义的compute()没有返回值,而RecursiveTask定义的compute()方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要定义子类去扩展。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//Fork/Join并行计算框架计算斐波那契数列
static void main(String[] args) {
//创建分治任务线程池
ForkJoinPool fjp = new ForkJoinPool(4);
//创建分治任务
Fibonacci fib = new Fibonacci(30);
//启动分治任务
Integer result = fjp.invoke(fib);
//输出结果
System.out.println(result);
}

//递归任务
static class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
protected Integer compute() {
if(n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
//创建子任务
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
//等待子任务结果,并合并结果
return f2.compute() + f1.join();
}
}

首先需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的invoke()方法来启动分治任务。由于计算斐波那契数列需要有返回值,所以Fibonacci继承自RecursiveTask。分治任务Fibonacci需要实现compute()方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算Fibonacci(n-1)使用了异步子任务,这是通过f1.fork()这条语句实现的。

ForkJoinPool工作原理

Fork/Join并行计算的核心组件是ForkJoinPool。

ThreadPoolExecutor本质上是一个生产者-消费者模式的实现,内部有一个任务队列,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;ThreadPoolExecutor可以有多个工作线程,但是这些工作线程都共享一个任务队列。

ForkJoinPool本质上也是一个生产者-消费者的实现,但是更加智能。ThreadPoolExecutor内部只有一个任务队列,而ForkJoinPool内部有多个任务队列,当通过ForkJoinPool的invoke()或者submit()方法提交任务时,ForkJoinPool根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。

如果工作线程对应的任务队列空了,ForkJoinPool支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其它工作任务队列里的任务,如此一来,所有的工作线程都不会闲下来。

ForkJoinPool中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。

模拟MapReduce统计单词数量

统计一个文件里面每个单词的数量。用Fork/Join并行计算框架来实现。先用二分法递归的将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
static void main(String[] args) {
//字符串数组来模拟文件内容
//fc里面的元素与文件里面的行数据一一对应
String[] fc = {
"Hello world",
"hellp me",
"hello fork",
"hello join",
"fork join in world"
};
//创建ForkJoin线程池
ForkJoinPool fjp = new ForkJoinPool(3);
//创建任务
MR mr = new MR(fc, 0, fc.length);
//启动任务
Map<String, Long> result = fjp.invoke(mr);
//输出结果
result.forEach((k, v)->
System.out.println(k + ":" + v);
);
}

//MR模拟类
static class MR extends RecursiveTask<Map<String, Long>> {
private String[] fc;
private int start, end;
//构造函数
MR(String[] fc, int fr, int to) {
this.fc = fc;
this.start = fr;
this.end = to;
}
//递归。
//前半部分数据fork一个递归任务去处理(mr1.fork())
//后半部分数据在当前任务中递归处理(mr2.compute())
@Override
protected Map<String, Long> compute() {
if(end - start == 1) {
return calc(fc[start]);
} else {
int mid = (start+end)/2;
MR mr1 = new MR(fc, start, mid);
mr1.fork();
MR mr2 = new MR(fc, mid, end);
//计算子任务,并返回合并的结果
return merge(mr2.compute(), mr1.join());
}
}
//合并结果
private Map<String, Long> merge(Map<String, Long> r1, Map<String, Long> r2) {
Map<String, Long> result = new HashMap<>();
result.putAll(r1);
//合并结果
r2.forEach((k, v)->{
Long c = result.get(k);
if(c != null) {
result.put(k, C+v);
} else {
result.put(k, v);
}
});
return result;
}
//统计单词数量
private Map<String, Long> calc(String line) {
Map<String, Long> result = new HashMap<>{};
//分割单词
String[] words = line.split("\\s+");
//统计单词数量
for(String w : words) {
Long v = result.get(w);
if(v != null) {
result.put(w, v+1);
} else {
result.put(w, 1L);
}
}
return result;
}
}

总结

Fork/Join并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的MapReduce,可以把Fork/Join看做单机版的MapReduce。

Fork/Join并行计算框架的核心组件是ForkJoinPool。ForkJoinPool支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。
Java1.8提供的Stream API里面并行流也是以ForkJoinPool为基础的。默认情况下所有的并行流计算都共享一个ForkJoinPool,这个共享的ForkJoinPool默认的线程数是CPU的核数;如果所有的并行流计算都是CPU密集型计算的话,完全没有问题,但是如果存在I/O密集型的并行流计算,那么很可能会因为一个很慢的I/O计算而拖慢整个系统的性能。建议用不同的ForkJoinPool执行不同类型的计算任务。