0%

Future:如何用多线程实现最优的“烧水泡茶”程序

ThreadPoolExecutor的void execute(Runnable command)方法可以提交任务,但是没有办法获取任务的执行结果(executor()方法没有返回值)。

获取任务执行结果

Java通过ThreadPoolExecutor提供的3个submit()方法和1个FutureTask工具类来支持获得任务执行结果的需求。

1
2
3
4
5
6
//提交Runnable任务
Future<?> submit(Runnable task);
//提交Callable任务
<T> Future<T> submit(Callable<T> task);
//提交Runnable任务及结果引用
<T> Future<T> submit(Runnable task, T result);

3个submit()方法之间的区别在于方法参数不同

  • 提交Runnable任务submit(Runnable task):这个方法的参数是一个Runnable接口,Runnable接口的Run()方法是没有返回值的,所以submit(Runnable task)这个方法返回的Future尽可以用来断言任务已经结束了,类似于Thread.join()。
  • 提交Callable任务submit(Callable task):这个方法的参数是一个Callable接口,它只有一个call()方法,并且这个方法是有返回值的,所以这个方法返回的Future对象可以通过调用其get()方法来获取任务的执行结果。
  • 提交 Runnable任务及结果引用submit(Runnable task, T result):这个方法若返回的Future对象是f,f.get()的返回值就是传给submit()方法的参数result。

它们的返回值都是Future接口,Future接口有5个方法

  • 取消任务的方法cancel()
  • 判断任务是否已取消的方法isCancelled()
  • 判断任务是否已结束的方法isDone()
  • 获取任务执行结果的方法get()
  • 支持超时获取任务执行结果的方法get(timeout, unit)

    两个get方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用get方法的线程会阻塞,直到任务执行完才会被唤醒。

Future是一个接口,FutureTask是一个工具类。这个工具类有两个构造函数,它们的参数和submit()方法类似。

1
2
FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);

FutureTask实现了Runnable和Future接口。
由于实现了Runnable接口,可以将FutureTask对象作为任务提交给ThreadPoolExecutor去执行,也可以直接被Thread(public class Thread implements Runnable;)执行;
由于实现了Future接口,也能用来获得任务的执行结果。

1
2
3
4
5
6
7
8
//创建FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(() -> 1+2);
//创建线程池
ExecutorService es = Executors.newCachedThreadPool();
//提交FutureTask
es.submit(futureTask);
//获取计算结果
Integer result = futureTask.get();
1
2
3
4
5
6
7
//创建FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(() -> 1+2);
//创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
//获取计算结果
Integer result = futureTask.get();

实现最优的“烧水泡茶”程序

《统筹方法》-华罗庚,烧水泡茶。

并发编程可以总结为三个核心问题:分工、同步、互斥。编写并发程序,首先要做的就是分工,所谓分工指的是如何高效的拆解任务并分配给线程。对于烧水泡茶这个程序,一种最优的分工方案:用两个线程T1和T2来完成烧水泡茶程序,T1负责洗水壶、烧开水、泡茶这三道工序,T2负责洗茶壶、洗茶杯、拿茶叶三道工序,其中T1在执行泡茶这道工序时需要等待T2完成拿茶叶的工序。对于T1的这个等待工作,Thread.join()、CountDownLatch、阻塞队列等方法都可以解决。Future特性也能实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* //首先创建两个FutureTask-ft1和ft2,
* //ft1完成洗水壶、烧开水、泡茶的任务,
* //ft2完成洗茶壶、洗茶杯、拿茶叶的任务;
* //ft1这个任务在执行泡茶任务前,需要等待ft2把茶叶拿来,
* //所以ft1内部需要引用ft2,并在执行泡茶之前,调用ft2的get()方法实现等待。
**/

//创建任务T2的FutureTask
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
//创建任务T1的FutureTask
FutureTask<String> ft1 = new FutureTask<>(new T1Task());
//线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
//线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
//等待线程T1执行结果
System.out.println(ft1.get());

//T1Task需要执行的任务:洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {
FutureTask<String> ft2;
//T1任务需要T2任务的FutureTask
T1Task(FutureTask<String> ft2) {
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1:洗水壶...");
TimeUtil.SECONDS.sleep(1);

System.out.println("T1:烧开水...");
TimeUtil.SECONDS.sleep(15);
//获取T2线程的茶叶
String tf = ft2.get();
System.out.println("T1:拿到茶叶...");

System.out.println("T1:泡茶...");
return "上茶:" + tf;
}
}
//T2Task需要执行的任务:洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println("T2:洗茶壶...");
TimeUtil.SECONDS.sleep(1);

System.out.println("T2:洗茶杯...");
TimeUtil.SECONDS.sleep(2);

System.out.println("T2:拿茶叶...");
TimeUtil.SECONDS.sleep(1);
return "龙井";
}
}

//某一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井

总结

利用Java并发包提供的Future可以很容易获得异步任务的执行结果,无论异步任务是通过线程池ThreadPoolExecutor执行的,还是通过手工创建子线程来执行的。
Future可以类比为现实世界里的提货单,如去蛋糕店订生日蛋糕,蛋糕店都是先给一张提货单,拿到提货单后没必要一直在店里等着,可以先去干点其他事,如看电影等。等看完电影后,基本上蛋糕也做好了,然后可以凭提货单领蛋糕了。

利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用Future来解决。在分析这种问题的过程中,可以用有向图描述一下任务之间的依赖关系,同时将线程的分工也做好,对照图来写代码,更形象且不易出错。