0%

CompletableFuture:异步编程没那么难

用多线程优化性能,就是将串行操作变成并行操作。在串行转换成并行的过程中,一定会涉及到异步化。

异步化,是并行方案得以实施的基础,利用多线程优化性能这个核心方案得以实施的基础。Java1.8版本提供了CompletableFuture来支持异步编程。

CompleTableFuture的核心优势

用CompletableFuture重新实现烧水泡茶程序。
分工:任务1负责洗水壶、烧开水,任务2负责洗茶壶、洗茶杯、拿茶叶,任务3负责泡茶。其中任务3需要等待任务1和任务2都完成后才能开始。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//任务1:洗水壶->烧开水
completableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
System.out.println("T1:洗水壶...");
sleep(1, TimeUnit.SECONDS);

System.out.println("T1:烧开水...");
sleep(15, TimeUnit.SECONDS);
});

//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("T2:洗茶壶...");
sleep(1, TimeUnit.SECONDS);

System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);

System.out.println("T2:拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return "龙井";
});

//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{
System.out.println("T1:拿到茶叶:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});

//等待任务3执行结果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
try {
u.sleep(t);
} catch(InterruptedException 2) {

}
}

//某一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井...
T1:泡茶...
上茶:龙井
  • 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要关注;
  • 语义更清晰,如f3=f1.thenCombine(f2,()->{})能够清晰的表述“任务3要等待任务1和任务2都完成后才能开始”;
  • 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。

创建CompletableFuture对象

创建CompletableFuture对象主要靠4个静态方法:

1
2
3
4
5
6
7
//使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

//指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

Runnable接口的run()方法没有返回值,Supplier接口的get()方法是有返回值的。后两个方法可以指定线程池参数。

默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数(可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。建议根据不同的业务类型创建不同的线程池,以避免相互干扰

创建完CompletableFuture对象之后,会自动的异步执行runnable.run()方法或者supplier.get()方法,对于一个异步操作,需要关注两个问题:

  • 一个是异步操作什么时候结束;
  • 另一个是如何获取异步操作的执行结果。

CompletableFuture类实现了Future接口,这两个问题都可以通过Future接口来解决。

CompletionStage接口

CompletableFuture实现了CompletionStage接口(40个方法)。

类比工作流,站在分工角度任务都是有时序关系的:串行关系、并行关系、汇聚关系等。(洗水壶和烧开水是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间是并行关系,烧开水、拿茶叶、泡茶是汇聚关系)。CompletionStage接口可以清晰的描述任务之间的这种时序关系。

描述串行关系

主要是thenApply、thenAccept、thenRun、thenCompose这四个系列的接口。

1
2
3
4
5
6
7
8
9
//Async代表的是异步执行fn、consumer、action
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

thenApply系列函数里参数fn的类型是接口Function<T, R>,这个接口里与CompletionStage相关的方法是R apply(T t),这个方法既能接收参数也支持返回值,所以thenApply系列方法返回的是CompletionStage

thenAccept系列方法里参数consumer的类型是接口Consumer,这个接口里与CompletionStage相关的方法是void accept(T t),这个方法虽然支持参数,但却不支持返回值,所以thenAccept系列方法返回的是CompletionStage

thenRun系列方法里参数action的类型是接口Runnable,所以action既不能接收参数也不支持返回值,所以thenRun系列方法返回的也是CompletionStage

thenCompose系列方法会新创建出一个子流程,最终结果和thenApply系列是相同的。

1
2
3
4
5
6
7
8
//示例
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(() -> "Hello World")
.thenApply(s -> s + "QQ")
.thenApply(String::toUpperCase);

System.out.println(f0.join());
//输出结果
HELLO WORLD QQ

描述AND汇聚关系

主要是thenCombine、thenAcceptBoth、runAfterBoth系列接口。

1
2
3
4
5
6
7
//核心参数fn、consumer、action的区别
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

描述OR汇聚关系

主要是applyToEither、acceptEither、runAfterEither系列接口。

1
2
3
4
5
6
7
//同上
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsynce(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

applyToEither示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});

CompletableFuture<String> f3 = f1.applyToEither(f2, s->s);

System.out.println(f3.join());

异常处理

fn、consumer、action的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常。
非异步编程里面,可以使用try{}catch{}来捕获并处理异常。
异步编程里面,CompletionStage接口提供了比try{}catch{}简单的方案,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

1
2
3
4
5
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

exceptionally()示例:

1
2
3
4
5
CompletableFuture<Integer> f0 = CompletableFuture
.supplyAsync(() -> 7/0)
.thenApply(r -> r*10)
.exceptionally(e -> 0);
System.out.println(f0.join());

exceptionally()的使用非常类似于try{}catch{}中的catch{},但是由于支持链式编程方式,相对更简单。
whenComplete()和handle()系列方法就类似于try{}finally{}中的finally{},无论是否发生异常都会执行whenComplete()中的回调函数consumer和handle()中的回调函数fn。whenComplete()和handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。

总结

JavaScript里面的异步问题基本上都是靠回调函数来解决的,回调函数在处理异常以及复杂的异步任务关系时往往力不从心-回调地狱(Callback Hell)

ReactiveX的发展(RxJava)完美解决了回调地狱。
Java1.8版本提供了CompletableFuture
Java1.9版本提供了Flow API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//0参0返
@FunctionalInterface
public interface Runnable {
public abstract void run();
}

//1参1返
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}

//1参0返
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}

//0参1返
@FunctionalInterface
public interface Supplier<T> {
T get();
}

//2参0返
@FunctionalInterface
public interface BiConsumer<T, U> {
void accept(T t, U u);
}