0%

生产者-消费者模式:用流水线思想提高效率

Worker Thread模式类比的是工厂里车间工人的工作模式。
现实世界中,工厂里还有一种流水线的工作模式,类比到编程领域就是生产者-消费者模式。

Java线程池本质上就是用生产者-消费者模式实现的,每当使用线程池的时候,其实就是在应用生产者-消费者模式。为了提升性能,Log4j2中异步Appender内部也用到了生产者-消费者模式。

生产者-消费者模式的优点

生产者-消费者模式的核心是一个任务队列,生产者线程生产任务,并将任务添加到任务队列中,消费者线程从任务队列中获取任务并执行。

从架构设计的角度来看,生产者-消费者模式有一个很重要的优点-解耦。解耦对于大型系统的设计非常重要,解耦的一个关键就是组件之间的依赖关系和通信方式必须受限。
在生产者-消费者模式中,生产者和消费者没有任何依赖关系,它们彼此之间的通信只能通过任务队列,所以生产者-消费者模式是一个不错的解耦方案。

除了架构设计,生产者-消费者模式还有一个重要的优点-支持异步,并且能够平衡生产者和消费者的速度差异。在生产者-消费者模式中,生产者线程只需要将任务添加到任务队列而无需等待任务被消费者线程执行完,也就是说任务的生产和消费时异步的,这是与传统的方法之间调用的本质区别,传统的方法之间调用是同步的。

异步化处理最简单的方式就是创建一个新的线程去处理,中间增加一个任务队列,主要是用于平衡生产者和消费者的速度差异。假设生产者的速率很慢,而消费者的速率很高,假设1:3,如果生产者有3个线程,采用创建新的线程的方式,会创建3个消费线程;而采用生产者-消费者模式,消费者线程只需要1个就可以。
Java语言里,Java线程和操作系统线程是一一对应的,线程创建的太多,会增加上下文切换的成本,所以Java线程不是越多越好,适量即可。生产者-消费者模式支持采用适量的线程。

支持批量执行以提升性能

Thread-Per-Message模式中,如果使用轻量级线程,就没有必要平衡生产者和消费者的速度差异了,因为轻量级线程本身就是廉价的。但在批量执行任务的并发场景,还是更适宜使用生产者-消费者模式。
在数据库里Insert1000条数据,两种方案:1用1000个线程并发执行,每个线程insert一条数据;2用1个线程,执行一个批量的SQL,一次性把1000条数据insert进去。方案2效率更高。

两阶段终止模式中,监控系统动态采集,最终回传的监控数据要存入数据库。但被监控系统往往有很多,如果每一条回传数据都直接insert到数据库,那么采用生产者-消费者模式批量执行sql是更好的方案。

利用生产者-消费者模式实现批量执行SQL非常简单:将原来直接Insert数据到数据库的线程作为生产者线程,生产者线程只需要将数据添加到任务队列,然后消费者线程负责将任务从任务队列中批量取出并批量执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//创建5个消费者线程负责批量执行sql
//5个消费者线程以while(true){}循环方式批量获取任务并批量执行
//任务队列
BlockingQueue<Task> bq = new LinkedBlockingQueue<>(2000);
//启动5个消费者线程执行批量任务
void start() {
ExecutorService es = executors.newFixedThreadPool(5);
for(int i=0; i<5; i++) {
es.execute(()->{
try {
while(true) {
//批量获取任务
List<Task> ts = pollTasks();
//执行批量任务
execTasks(ts);
}
} catch(Exception e) {
e.printStackTrace();
}
});
}
}
//从任务队列中获取批量任务
List<Task> pollTasks() throws InterruptedException {
List<Task> ts = new LinkedList<>();
//阻塞式获取一条任务
//如果任务队列中没有任务,这样可避免无畏的循环
Task t = bq.take();
while(t != null) {
ts.add(t);
//非阻塞式获取一条任务
t = bq.poll();
}
return ts;
}
//批量执行任务
execTasks(List<Task> ts) {
...
}

支持分阶段提交以提升性能

利用生产者-消费者模式还可以轻松的支持分阶段提交的应用场景。
写文件如果同步刷盘性能会很慢,对于不是很重要的数据,往往采用异步刷盘的方式。如日志组件

  1. error级别的日志需要立即刷盘;
  2. 数据积累到500条需要立即刷盘;
  3. 存在未刷盘数据,且5秒钟内未曾刷盘,需要立即刷盘。

这种日志组件的异步刷盘操作本质上就是一种分阶段提交。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Logger {
//任务队列
final BlockingQueue<LogMsg> bq = new BlockingQueue<>();
//flush批量
static final int batchSize = 500;
//只需要一个线程写日志
ExecutorService es = Executors.newFixedThreadPool(1);
//启动写日志线程
void start() {
File file = File.createTempFile("foo", ".log");
final FileWriter writer = new FileWriter(file);
this.es.execute(()->{
try {
//未刷盘日志数量
int curIdx = 0;
long preFT = System.currentTimeMillis();
while(true) {
LogMsg log = bq.poll(5, TimeUnit.SECONDS);
//写日志
if(log!=null) {
writer.write(log.toString());
++curIdx;
}
//如果不存在未刷盘数据,则无需刷盘
if(curIdx <= 0) {
continue;
}
//根据规则刷盘
if(log != null
&& log.level == LEVEL.ERROR
|| curIdx == batchSize
|| System.currentTimeMillis() - preFT > 5000) {
writer.flush();
curIdx = 0;
preFT = System.currentTimeMills();
}
}
} catch(Exception e) {
e.printStackTace();
} finally {
try {
writer.flush();
writer.close();
} catch(IOException e) {
e.printStackTrace();
}
}
});
}

//写INFO级别日志
void info(String msg) {
bq.put(new LogMsg(LEVEL.INFO, msg));
}
//写ERROR级别日志
void error(String msg) {
bq.put(new LogMsg(LEVEL.ERROR, msg));
}
}

//日志级别
enum LEVEL {
INFO, ERROR
}
class LogMsg {
LEVEL level;
String msg;
//省略构造函数实现
LogMsg(LEVEL lvl, String msg) {}
//省略toString()实现
String toString() {}
}

毒丸

采用生产者-消费者模式实现分阶段提交。通过调用info()和error()方法写入日志,这两个方法都是创建了一个日志任务LogMsg,并添加到阻塞队列中。
调用info()和error()方法的线程是生产者;真正将日志写入文件的是消费者线程,在Logger这个类中,只创建了1个消费者线程,在这个消费者线程中,会根据刷盘规则执行刷盘操作。

总结

Java语言提供的线程池本身就是一种生产者-消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题,但是需要批量执行以及分阶段提交的场景还是需要自己来实现。

生产者-消费者模式在分布式计算中的应用非常广泛。在分布式场景下,可以借助分布式消息队列(MQ)来实现生产者-消费者模式。MQ一般都支持两种消息模型,一种是点对点模型,一种是发布订阅模型。这两种模型的区别:点对点模型里一个消息只会被一个消费者消费,和Java的线程池非常类似;发布订阅模型里一个消息会被多个消费者消费,本质上是一种消息的广播,在多线程编程领域,可以结合观察者模式实现广播功能。