0%

CompletionService:如何批量执行异步任务

ThreadPoolExecutor+Future方案优化询价应用:用三个线程异步执行询价,通过三次调用Future的get()方法获取询价结果,之后将询价结果保存在数据库中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
//异步向电商s1询价
Future<Integer> f1 = executor.submit(()->getPriceByS1());
//异步向电商S2询价
Future<Integer> f2 = executor.submit(()->getPriceByS2());
//异步向电商S3询价
Future<Integer> f3 = executor.submit(()->getPriceByS3());
//获取电商S1报价并保存
r = f1.get();//阻塞当前线程
executor.execute(()->save(r));
//executor.execute(()->save(f1.get()));//f1.get()阻塞(当前)异步线程
//获取电商S2报价并保存
r = f2.get();
executor.execute(()->save(r));
//获取电商S3报价并保存
r = f3.get();
executor.execute(()->save(r));

问题:如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞在了f1.get()操作上。

解决方案:增加一个阻塞队列,获取到S1、S2、S3的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
//创建阻塞队列
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
//电商S1报价异步进入阻塞队列
executor.execute(()->bq.put(f1.get()));
//电商S2报价异步进入阻塞队列
executor.execute(()->bq.put(f2.get()));
//电商S3报价异步进入阻塞队列
executor.execute(()->bq.put(f3.get()));
//异步保存所有报价
for(int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}

利用CompletionService实现询价系统

CompletionService的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是CompletionService是把任务执行结果的Future对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。

如何创建CompletionService

CompletionService接口的实现类是ExecutorCompletionService,这个实现类的构造方法有两个:

  • ExecutorCompletionService(Executor executor);
  • ExecutorCompletionService(Executor executor, BlockingQueue<Future> completionQueue);

这两个构造方法都需要传入一个线程池,如果不指定completionQueue,那么默认会使用无界的LinkedBlockingQueue。任务执行结果的Future对象就是加入到completionQueue中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
//创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
//异步向电商S1询价
cs.submit(()->getPriceByS1());
//异步向电商S2询价
cs.submit(()->getPriceByS2());
//异步向电商S3询价
cs.submit(()->getPriceByS3());
//将询价结果异步保存到数据库
for(int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}

没有指定completionQueue,默认使用无界的LinkedBlockingQueue。之后通过CompletionService接口提供的submit()方法提交了三个询价操作,这三个询价操作将会被CompletionService异步执行。最后通过CompletionService接口提供的take()方法获取一个Future对象,调用Future对象的get()方法就能返回询价操作的执行结果。

CompletionService接口说明

CompletionService接口听过了5个方法:

1
2
3
4
5
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUtil unit) throws InterruptedException;

submit()相关的方法有两个,一个方法参数是Callable task;(询价系统示例代码,提交任务使用)。另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于ThreadPoolExecutor的 Future submit(Runnable task, T result)。

其余三个方法都是和阻塞队列相关的,take()、poll()都是从阻塞队列中获取并移除一个元素;它们的区别在于如果阻塞队列是空的,那么调用take()方法的线程会被阻塞,而poll()方法会返回null值。poll(long timeout, TimeUnit unit)方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了timeout unit时间,阻塞队列还是空的,那么该方法会返回null值。

利用CompletionService实现Dubbo中的Forking Cluster

Dubbo中有一种叫做Forking的集群模式,这种集群模式下,支持并行的调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了

为了保证地址转坐标服务的高可用和性能,并行的调用3个地图服务商的API,
然后只要有1个正确返回了结果r,那么地址转坐标这个服务就可以直接返回r了。
这种集群模式可以容忍2个地图服务商服务异常,但缺点是消耗的资源偏多。

1
2
3
4
5
6
7
8
9
//地址转坐标服务
geocoder(addr) {
//并行执行以下3个查询服务
r1=geocoderByS1(addr);
r2=geocoderByS2(addr);
r3=geocoderByS3(addr);
//只要r1、r2、r3有一个返回则返回
return r1/r2/r3;
}

利用CompletionService可以快速实现Forking这种集群模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
//创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
//保存Future对象的列表
List<Future<Integer>> futures = new ArrayList<>(3);
//提交异步任务,并保存future到futures
futures.add(cs.submit(()->geocoderByS1()));
futures.add(cs.submit(()->geocoderByS2()));
futures.add(cs.submit(()->geocoderByS3()));
//获取最快返回的任务执行结果
Integer r = 0;
try {
//只要有一个成功返回,则break
for(int i=0; i<3; ++i) {
r = cs.take().get();
//简单的通过判空来检查是否成功返回
if(r!=null) {
break;
}
}
} finally {
//取消所有任务
for(Future<Integer> f : futures)
f.cancel(ture);
}
//返回结果
return r;

首先创建一个线程池executor、一个CompletionService对象cs和一个Future类型的列表futures,每次通过调用CompletionService的submit()方法提交一个异步任务,会返回一个Future对象,把这些Future对象保存在列表futures中。通过调用cs.take().get(),能够拿到最快返回的任务执行结果,只要拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果。

总结

当需要批量提交异步任务的时候建议使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
除此之外,CompletionService能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如ForKing Cluster这样的需求。

CompletionService的实现类ExecutorCompletionService,需要自行创建线程池,可以让多个ExecutorCompletionService的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。