对账系统,用户通过在线商城下单,生成电子订单保存在订单库;物流生成派送单给用户发货,派送单保存在派送单库。为防止漏派送或者重复派送,对账系统每天校验是否存在异常订单。
处理逻辑,首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。(单线程)
1 | while(存在未对账订单) { |

并行优化
单线程串行化执行的对账系统,优化性能首先想到的是能否利用多线程并行处理。
查询未对账订单getPOrders()和查询派送单getDOrders()这两个操作没有先后顺序的依赖,可以并行处理。
1 | while(存在未对账订单) { |

用CountDownLatch实现线程等待
while循环里面每次都会创建新的线程,创建线程是个耗时的操作。用线程池优化,从而循环利用线程。但是在线程池中join()方法失效了,主线程无法知道getPOrders()和getDOrders()这两个操作什么时候执行完。
1 | //创建2个线程的线程池 |
最直接的办法是用一个计数器,初始值设置成2,当执行完pos=getPOrders();这个操作之后将计数器减1,执行完dos=getDOrders();之后也将计数器减1,在主线程里,等待计数器等于0(本质是一个条件变量,用管程实现简单);说明两个查询操作执行完了。
Java并发包工具类:CountDownLatch。计数器减1的操作通过latch.countDown();来实现。在主线程中,通过调用latch.await()来实现对计数器等于0的等待。
1 | //创建2个线程的线程池 |
完全并行
getPOrders()和getDOrders()两个查询操作并行了,但是两个查询操作和对账操作check()、save()之间还是串行的。在执行对账操作的时候,可以同时去执行下一轮的查询操作。

两次查询操作能够和对账操作并行,对账操作依赖查询操作的结果,类似生产者-消费者模型,两次查询操作是生产者,对账操作是消费者。需要有个队列来保存生产者的数据,消费者从这个队列消费数据。(对账过慢,需要限流查询)
对账系统,需要订单队列和派送单队列两个队列,两个队列的元素之间有对应关系。
订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间有一一对应的关系。
对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据不会乱掉。
最直接的想法是:一个线程T1执行订单的查询工作,一个线程T2执行派送单的查询工作,当线程T1和T2都各自生产完1条数据的时候,通知线程T3执行对账操作。(隐藏条件,线程T1和线程T2要互相等待,工作步调一致,不能一快一慢,要做到各自生产完1条数据的时候,通知线程T3)
用CyclicBarrier实现线程同步
利用计数器解决,计数器初始化为2,线程T1和T2生产完一条数据都将计数器减1,如果计数器大于0则线程T1或者T2等待。如果计数器等于0,则通知T3,并唤醒等待的线程T1或者T2,与此同时,将计数器重置为2,这样线程T1和线程T2生产下一条数据的时候就可以继续使用这个计数器了。
Java并发包工具类:CyclicBarrier。创建一个计数器初始值为2的CyclicBarrier,传入一个回调函数,当计数器减到0的时候,减到0的线程同步调用这个回调函数。
CyclicBarrier的回调函数执行在一个回合里最后执行await()的线程上,而且同步调用回调函数check(),调用完check之后,才会开始第二回合。所以check需要另开一个线程异步执行。
barrierAction是同步执行的,只有执行完才会开启下一轮。新建一个单线程的线程池异步执行就不用等待,既能解决同步的性能问题,也能解决数据的一致性问题。
计数器减1的操作通过barrier.await()来实现,线程调用barrier.await()的同时会阻塞并等待计数器变成0(计数器>0,调用await的线程调用后会阻塞,不会出现计数器被同一线程减到0的情况)。计数器减到0,而且回调函数执行完之后,才会唤醒等待的线程,继续执行下一条语句。
计数器变CyclicBarrier的计数器有自动重置功能,当减到0的时候,会自动重置设置的初始值。
1 | //订单队列 |
总结
CountDownLatch主要用来解决一个线程等待多个线程的场景。CyclicBarrier是一组线程之间互相等待。
CountDownLatch的计数器是不能循环利用的,一旦计数器减到0,再有线程调用await(),该线程会直接通过。CyclicBarrier的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到0会自动重置到设置的初始值。
CyclicBarrier还可以设置回调函数。
推荐使用ThreadPoolExecutor实现自定义线程池,不使用Executors提供的线程池。
线程池提供了Future特性,也可以利用Future特性来实现线程之间的等待。
1 | CompletableFuture<List> pOrderFuture = CompletableFuture.supplyAsync(this::getPOrders); |