用多线程优化性能,就是将串行操作变成并行操作。在串行转换成并行的过程中,一定会涉及到异步化。
异步化,是并行方案得以实施的基础,利用多线程优化性能这个核心方案得以实施的基础。Java1.8版本提供了CompletableFuture来支持异步编程。
CompleTableFuture的核心优势
用CompletableFuture重新实现烧水泡茶程序。
分工:任务1负责洗水壶、烧开水,任务2负责洗茶壶、洗茶杯、拿茶叶,任务3负责泡茶。其中任务3需要等待任务1和任务2都完成后才能开始。
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| completableFuture<Void> f1 = CompletableFuture.runAsync(() -> { System.out.println("T1:洗水壶..."); sleep(1, TimeUnit.SECONDS);
System.out.println("T1:烧开水..."); sleep(15, TimeUnit.SECONDS); });
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("T2:洗茶壶..."); sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯..."); sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶叶..."); sleep(1, TimeUnit.SECONDS); return "龙井"; });
CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{ System.out.println("T1:拿到茶叶:" + tf); System.out.println("T1:泡茶..."); return "上茶:" + tf; });
System.out.println(f3.join());
void sleep(int t, TimeUnit u) { try { u.sleep(t); } catch(InterruptedException 2) {
} }
T1:洗水壶... T2:洗茶壶... T1:烧开水... T2:洗茶杯... T2:拿茶叶... T1:拿到茶叶:龙井... T1:泡茶... 上茶:龙井
|
- 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要关注;
- 语义更清晰,如f3=f1.thenCombine(f2,()->{})能够清晰的表述“任务3要等待任务1和任务2都完成后才能开始”;
- 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
创建CompletableFuture对象
创建CompletableFuture对象主要靠4个静态方法:
1 2 3 4 5 6 7
| static CompletableFuture<Void> runAsync(Runnable runnable) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
|
Runnable接口的run()方法没有返回值,Supplier接口的get()方法是有返回值的。后两个方法可以指定线程池参数。
默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数(可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的I/O操作,就会导致线程池中所有线程都阻塞在I/O操作上,从而造成线程饥饿,进而影响整个系统的性能。建议根据不同的业务类型创建不同的线程池,以避免相互干扰。
创建完CompletableFuture对象之后,会自动的异步执行runnable.run()方法或者supplier.get()方法,对于一个异步操作,需要关注两个问题:
- 一个是异步操作什么时候结束;
- 另一个是如何获取异步操作的执行结果。
CompletableFuture类实现了Future接口,这两个问题都可以通过Future接口来解决。
CompletionStage接口
CompletableFuture实现了CompletionStage接口(40个方法)。
类比工作流,站在分工角度任务都是有时序关系的:串行关系、并行关系、汇聚关系等。(洗水壶和烧开水是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间是并行关系,烧开水、拿茶叶、泡茶是汇聚关系)。CompletionStage接口可以清晰的描述任务之间的这种时序关系。
描述串行关系
主要是thenApply、thenAccept、thenRun、thenCompose这四个系列的接口。
1 2 3 4 5 6 7 8 9
| CompletionStage<R> thenApply(fn); CompletionStage<R> thenApplyAsync(fn); CompletionStage<Void> thenAccept(consumer); CompletionStage<Void> thenAcceptAsync(consumer); CompletionStage<Void> thenRun(action); CompletionStage<Void> thenRunAsync(action); CompletionStage<R> thenCompose(fn); CompletionStage<R> thenComposeAsync(fn);
|
thenApply系列函数里参数fn的类型是接口Function<T, R>,这个接口里与CompletionStage相关的方法是R apply(T t),这个方法既能接收参数也支持返回值,所以thenApply系列方法返回的是CompletionStage。
thenAccept系列方法里参数consumer的类型是接口Consumer,这个接口里与CompletionStage相关的方法是void accept(T t),这个方法虽然支持参数,但却不支持返回值,所以thenAccept系列方法返回的是CompletionStage。
thenRun系列方法里参数action的类型是接口Runnable,所以action既不能接收参数也不支持返回值,所以thenRun系列方法返回的也是CompletionStage。
thenCompose系列方法会新创建出一个子流程,最终结果和thenApply系列是相同的。
1 2 3 4 5 6 7 8
| CompletableFuture<String> f0 = CompletableFuture.supplyAsync(() -> "Hello World") .thenApply(s -> s + "QQ") .thenApply(String::toUpperCase);
System.out.println(f0.join());
HELLO WORLD QQ
|
描述AND汇聚关系
主要是thenCombine、thenAcceptBoth、runAfterBoth系列接口。
1 2 3 4 5 6 7
| CompletionStage<R> thenCombine(other, fn); CompletionStage<R> thenCombineAsync(other, fn); CompletionStage<Void> thenAcceptBoth(other, consumer); CompletionStage<Void> thenAcceptBothAsync(other, consumer); CompletionStage<Void> runAfterBoth(other, action); CompletionStage<Void> runAfterBothAsync(other, action);
|
描述OR汇聚关系
主要是applyToEither、acceptEither、runAfterEither系列接口。
1 2 3 4 5 6 7
| CompletionStage applyToEither(other, fn); CompletionStage applyToEitherAsynce(other, fn); CompletionStage acceptEither(other, consumer); CompletionStage acceptEitherAsync(other, consumer); CompletionStage runAfterEither(other, action); CompletionStage runAfterEitherAsync(other, action);
|
applyToEither示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { int t = getRandom(5, 10); sleep(t, TimeUnit.SECONDS); return String.valueOf(t); });
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { int t = getRandom(5, 10); sleep(t, TimeUnit.SECONDS); return String.valueOf(t); });
CompletableFuture<String> f3 = f1.applyToEither(f2, s->s);
System.out.println(f3.join());
|
异常处理
fn、consumer、action的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常。
非异步编程里面,可以使用try{}catch{}来捕获并处理异常。
异步编程里面,CompletionStage接口提供了比try{}catch{}简单的方案,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。
1 2 3 4 5
| CompletionStage exceptionally(fn); CompletionStage<R> whenComplete(consumer); CompletionStage<R> whenCompleteAsync(consumer); CompletionStage<R> handle(fn); CompletionStage<R> handleAsync(fn);
|
exceptionally()示例:
1 2 3 4 5
| CompletableFuture<Integer> f0 = CompletableFuture .supplyAsync(() -> 7/0) .thenApply(r -> r*10) .exceptionally(e -> 0); System.out.println(f0.join());
|
exceptionally()的使用非常类似于try{}catch{}中的catch{},但是由于支持链式编程方式,相对更简单。
whenComplete()和handle()系列方法就类似于try{}finally{}中的finally{},无论是否发生异常都会执行whenComplete()中的回调函数consumer和handle()中的回调函数fn。whenComplete()和handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。
总结
JavaScript里面的异步问题基本上都是靠回调函数来解决的,回调函数在处理异常以及复杂的异步任务关系时往往力不从心-回调地狱(Callback Hell)。
ReactiveX的发展(RxJava)完美解决了回调地狱。
Java1.8版本提供了CompletableFuture
Java1.9版本提供了Flow API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @FunctionalInterface public interface Runnable { public abstract void run(); }
@FunctionalInterface public interface Function<T, R> { R apply(T t); }
@FunctionalInterface public interface Consumer<T> { void accept(T t); }
@FunctionalInterface public interface Supplier<T> { T get(); }
@FunctionalInterface public interface BiConsumer<T, U> { void accept(T t, U u); }
|