0%

CountDownLatch和CyclicBarrier:如何让多线程步调一致

对账系统,用户通过在线商城下单,生成电子订单保存在订单库;物流生成派送单给用户发货,派送单保存在派送单库。为防止漏派送或者重复派送,对账系统每天校验是否存在异常订单。
处理逻辑,首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。(单线程)

1
2
3
4
5
6
7
8
9
10
while(存在未对账订单) {
//查询未对账订单
pos = getPOrders();
//查询派送单
dos = getDOrders();
//执行对账操作
diff = check(pos, dos);
//差异写入差异库
save(diff);
}

并行优化

单线程串行化执行的对账系统,优化性能首先想到的是能否利用多线程并行处理

查询未对账订单getPOrders()和查询派送单getDOrders()这两个操作没有先后顺序的依赖,可以并行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
while(存在未对账订单) {
//查询未对账订单
Thread T1 = new Thread(() -> {
pos = getPOrders();
});
T1.start();
//查询派送单
Thread T2 = new Thread(() -> {
dos = getDOrders();
});
T2.start();

//主线程被阻塞,需要等待T1、T2执行完才能执行后续的对账操作
//(CompletableFuture)
T1.join();
T2.join();
//当T1和T2线程退出时,调用T1.join()和T2.join()的主线程会从阻塞态被唤醒。
//执行对账操作
diff = check(pos, dos);
//差异写入差异库
save(diff);
}

用CountDownLatch实现线程等待

while循环里面每次都会创建新的线程,创建线程是个耗时的操作。用线程池优化,从而循环利用线程。但是在线程池中join()方法失效了,主线程无法知道getPOrders()和getDOrders()这两个操作什么时候执行完。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单) {
//查询未对账订单
executor.execute(() -> {
pos = getPOrders();
});
//查询派送单
executor.execute(() -> {
dos = getDOrders();
});

//todo 实现等待

//执行对账操作
diff = check(pos, dos);
//差异写入差异库
save(diff);
}

最直接的办法是用一个计数器,初始值设置成2,当执行完pos=getPOrders();这个操作之后将计数器减1,执行完dos=getDOrders();之后也将计数器减1,在主线程里,等待计数器等于0(本质是一个条件变量,用管程实现简单);说明两个查询操作执行完了。

Java并发包工具类:CountDownLatch。计数器减1的操作通过latch.countDown();来实现。在主线程中,通过调用latch.await()来实现对计数器等于0的等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//创建2个线程的线程池  
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单) {
//计数器初始化为2
CountDownLatch latch = new CountDownLatch(2);
//查询未对账订单
executor.execute(() -> {
pos = getPOrders();
latch.countDown();
});
//查询派送单
executor.executr(() -> {
dos = getDOrders();
latch.countDown();
});

//等待两个查询操作结束
latch.await();

//执行对账操作
diff = check(pos, dos);
//差异写入差异库
save(diff);
}

完全并行

getPOrders()和getDOrders()两个查询操作并行了,但是两个查询操作和对账操作check()、save()之间还是串行的。在执行对账操作的时候,可以同时去执行下一轮的查询操作。

两次查询操作能够和对账操作并行,对账操作依赖查询操作的结果,类似生产者-消费者模型,两次查询操作是生产者,对账操作是消费者。需要有个队列来保存生产者的数据,消费者从这个队列消费数据。(对账过慢,需要限流查询)

对账系统,需要订单队列和派送单队列两个队列,两个队列的元素之间有对应关系。
订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间有一一对应的关系。
对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据不会乱掉。

最直接的想法是:一个线程T1执行订单的查询工作,一个线程T2执行派送单的查询工作,当线程T1和T2都各自生产完1条数据的时候,通知线程T3执行对账操作。(隐藏条件,线程T1和线程T2要互相等待,工作步调一致,不能一快一慢,要做到各自生产完1条数据的时候,通知线程T3)

用CyclicBarrier实现线程同步

利用计数器解决,计数器初始化为2,线程T1和T2生产完一条数据都将计数器减1,如果计数器大于0则线程T1或者T2等待。如果计数器等于0,则通知T3,并唤醒等待的线程T1或者T2,与此同时,将计数器重置为2,这样线程T1和线程T2生产下一条数据的时候就可以继续使用这个计数器了。

Java并发包工具类:CyclicBarrier。创建一个计数器初始值为2的CyclicBarrier,传入一个回调函数,当计数器减到0的时候,减到0的线程同步调用这个回调函数。

CyclicBarrier的回调函数执行在一个回合里最后执行await()的线程上,而且同步调用回调函数check(),调用完check之后,才会开始第二回合。所以check需要另开一个线程异步执行。
barrierAction是同步执行的,只有执行完才会开启下一轮。新建一个单线程的线程池异步执行就不用等待,既能解决同步的性能问题,也能解决数据的一致性问题。

计数器减1的操作通过barrier.await()来实现,线程调用barrier.await()的同时会阻塞并等待计数器变成0(计数器>0,调用await的线程调用后会阻塞,不会出现计数器被同一线程减到0的情况)。计数器减到0,而且回调函数执行完之后,才会唤醒等待的线程,继续执行下一条语句。
计数器变CyclicBarrier的计数器有自动重置功能,当减到0的时候,会自动重置设置的初始值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//订单队列
Vector<P> pos;
//派送单队列
Vector<D> dos;
//执行回调的线程池。
//check方法是非线程安全的,单线程使得两个队列的出队无须同步。
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
executor.execute(() -> check());
});

void check() {
P p = pos.remove(0);
D d = dos.remove(0);
//执行对账操作
diff = check(p,d);
//差异写入差异库。
//若有异常,catch住所有异常写日志,分析异常原因
save(diff);
}

//不会反复new创建线程,可以不使用线程池
void checkAll() {
//循环查询订单库
Thread T1 = new Thread(() -> {
while(存在未对账订单) {
//查询订单库
pos.add(getPOrders());
//等待
barrier.await();
}
});
T1.start();
//循环查询运单库
Thread T2 = new Thread(() -> {
while(存在未对账订单) {
//查询运单库
dos.add(getDOrders());
//等待
barrier.await();
}
});
T2.start();
}

总结

CountDownLatch主要用来解决一个线程等待多个线程的场景。CyclicBarrier是一组线程之间互相等待。
CountDownLatch的计数器是不能循环利用的,一旦计数器减到0,再有线程调用await(),该线程会直接通过。CyclicBarrier的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到0会自动重置到设置的初始值。
CyclicBarrier还可以设置回调函数。

推荐使用ThreadPoolExecutor实现自定义线程池,不使用Executors提供的线程池。
线程池提供了Future特性,也可以利用Future特性来实现线程之间的等待。

1
2
3
CompletableFuture<List> pOrderFuture = CompletableFuture.supplyAsync(this::getPOrders);
CompletableFuture<List> dOrderFuture = CompletableFuture.supplyAsync(this::getDOrders);
pOrderFuture.thenCombine(dOrderFuture, this::check).thenAccept(this::save);