ThreadPoolExecutor+Future方案优化询价应用:用三个线程异步执行询价,通过三次调用Future的get()方法获取询价结果,之后将询价结果保存在数据库中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ExecutorService executor = Executors.newFixedThreadPool(3);
Future<Integer> f1 = executor.submit(()->getPriceByS1());
Future<Integer> f2 = executor.submit(()->getPriceByS2());
Future<Integer> f3 = executor.submit(()->getPriceByS3());
r = f1.get(); executor.execute(()->save(r));
r = f2.get(); executor.execute(()->save(r));
r = f3.get(); executor.execute(()->save(r));
|
问题:如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞在了f1.get()操作上。
解决方案:增加一个阻塞队列,获取到S1、S2、S3的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库。
1 2 3 4 5 6 7 8 9 10 11 12 13
| BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
executor.execute(()->bq.put(f1.get()));
executor.execute(()->bq.put(f2.get()));
executor.execute(()->bq.put(f3.get()));
for(int i=0; i<3; i++) { Integer r = bq.take(); executor.execute(()->save(r)); }
|
利用CompletionService实现询价系统
CompletionService的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是CompletionService是把任务执行结果的Future对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。
如何创建CompletionService
CompletionService接口的实现类是ExecutorCompletionService,这个实现类的构造方法有两个:
- ExecutorCompletionService(Executor executor);
- ExecutorCompletionService(Executor executor, BlockingQueue<Future> completionQueue);
这两个构造方法都需要传入一个线程池,如果不指定completionQueue,那么默认会使用无界的LinkedBlockingQueue。任务执行结果的Future对象就是加入到completionQueue中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
cs.submit(()->getPriceByS1());
cs.submit(()->getPriceByS2());
cs.submit(()->getPriceByS3());
for(int i=0; i<3; i++) { Integer r = cs.take().get(); executor.execute(()->save(r)); }
|
没有指定completionQueue,默认使用无界的LinkedBlockingQueue。之后通过CompletionService接口提供的submit()方法提交了三个询价操作,这三个询价操作将会被CompletionService异步执行。最后通过CompletionService接口提供的take()方法获取一个Future对象,调用Future对象的get()方法就能返回询价操作的执行结果。
CompletionService接口说明
CompletionService接口听过了5个方法:
1 2 3 4 5
| Future<V> submit(Callable<V> task); Future<V> submit(Runnable task, V result); Future<V> take() throws InterruptedException; Future<V> poll(); Future<V> poll(long timeout, TimeUtil unit) throws InterruptedException;
|
submit()相关的方法有两个,一个方法参数是Callable task;(询价系统示例代码,提交任务使用)。另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于ThreadPoolExecutor的 Future submit(Runnable task, T result)。
其余三个方法都是和阻塞队列相关的,take()、poll()都是从阻塞队列中获取并移除一个元素;它们的区别在于如果阻塞队列是空的,那么调用take()方法的线程会被阻塞,而poll()方法会返回null值。poll(long timeout, TimeUnit unit)方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了timeout unit时间,阻塞队列还是空的,那么该方法会返回null值。
利用CompletionService实现Dubbo中的Forking Cluster
Dubbo中有一种叫做Forking的集群模式,这种集群模式下,支持并行的调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。
为了保证地址转坐标服务的高可用和性能,并行的调用3个地图服务商的API,
然后只要有1个正确返回了结果r,那么地址转坐标这个服务就可以直接返回r了。
这种集群模式可以容忍2个地图服务商服务异常,但缺点是消耗的资源偏多。
1 2 3 4 5 6 7 8 9
| geocoder(addr) { r1=geocoderByS1(addr); r2=geocoderByS2(addr); r3=geocoderByS3(addr); return r1/r2/r3; }
|
利用CompletionService可以快速实现Forking这种集群模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
List<Future<Integer>> futures = new ArrayList<>(3);
futures.add(cs.submit(()->geocoderByS1())); futures.add(cs.submit(()->geocoderByS2())); futures.add(cs.submit(()->geocoderByS3()));
Integer r = 0; try { for(int i=0; i<3; ++i) { r = cs.take().get(); if(r!=null) { break; } } } finally { for(Future<Integer> f : futures) f.cancel(ture); }
return r;
|
首先创建一个线程池executor、一个CompletionService对象cs和一个Future类型的列表futures,每次通过调用CompletionService的submit()方法提交一个异步任务,会返回一个Future对象,把这些Future对象保存在列表futures中。通过调用cs.take().get(),能够拿到最快返回的任务执行结果,只要拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果。
总结
当需要批量提交异步任务的时候建议使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
除此之外,CompletionService能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如ForKing Cluster这样的需求。
CompletionService的实现类ExecutorCompletionService,需要自行创建线程池,可以让多个ExecutorCompletionService的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。