0%

Worker-Thread模式:避免重复创建线程

Thread-Per-Message模式,对应到现实世界,就是委托代办。这种分工模式如果用Java Thread实现,频繁的创建、销毁线程非常影响性能,同时无限制的创建线程还可能导致OOM,在Java领域使用场景受限。

Worker Thread模式及其实现

Worker Thread模式可以类比现实世界里车间的工作模式:一条流水线推送任务,有活一起干,没活一起歇着。车间里的工人数量往往是确定的。

实现Worker Thread模式,用阻塞队列做任务池,然后创建固定数量的线程消费阻塞队列中的任务。这个方案就是Java语言提供的线程池。

线程池有许多优点:避免重复创建、销毁线程,同时能够限制创建线程的上限等。
用Java的Thread实现Thread-Per-Message模式难以应对高并发场景,频繁创建、销毁Java线程的成本有点高,而且无限制的创建线程还可能导致应用OOM。线程池则恰好能解决这些问题。
相比于Thread-Per-Message模式的实现,区别在于创建了一个最多线程数为500的线程池es,然后通过es.execute()方法将请求处理的任务提交给线程池处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ExecutorService es = Executors.newFixedThreadPool(500);
final ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080));
//处理请求
try {
while(true) {
//接收请求
SocketChannel sc = ssc.accept();
//将请求处理任务提交给线程池
es.execute(()->{
try {
//读socket
ByteBuffer rb = ByteBuffer.allocateDirect(1024);
sc.read(rb);
//模拟处理请求
Thread.sleep(2000);
//写socket
ByteBuffer wb = (ByteBuffer) rb.flip();
sc.write(wb);
//关闭Socket
sc.close();
} catch(Exception e) {
throw new UncheckedIOException(e);
}
});
}
} finally {
ssc.close();
es.shutdown();
}

正确的创建线程池

Java的线程池既能够避免无限制的创建线程导致OOM,也能避免无限制的接收任务导致OOM。建议用创建有界的队列来接收任务

为了便于调试和诊断问题,**建议给线程赋予一个业务相关的名字

当请求量大于有界队列的容量时,就需要合理的拒绝请求。建议在创建线程池时,清晰的指明拒绝策略
**。

1
2
3
4
5
6
7
8
9
10
11
ExecutorService es = new ThreadPoolExecutor(
50,
500,
60L,
TimeUnt.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
r->{
return new Thread(r, "echo-"+r.hashCode());
},
new ThreadPoolExecutor.CallerRunsPolicy()
);

避免线程死锁

使用线程池过程中,要注意线程死锁的场景。如果提交到相同线程池的任务不是相互独立的,而是有依赖关系的,那么就有可能导致线程死锁。
应用每运行一段时间偶尔就会处于无响应的状态,监控数据看上去一切都正常,但是实际上已经不能正常工作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//L1、L2阶段共用的线程池
ExecutorService es = Executors.newFixedThreadPool(2);
//L1阶段的闭锁
CountDownLatch l1 = new CountDownLatch(2);
for(int i=0; i<2; i++) {
System.out.println("L1");
//执行L1阶段任务
es.excute(()->{
//L2阶段的闭锁
CountDownLatch l2 = new CountDownLatch(2);
//执行L2阶段子任务
for(int j=0; j<2; j++) {
es.execute(()->{
System.out.println("L2");
l2.countDown();
});
}
//等待L2阶段任务执行完
l2.await();
l1.countDown();
});
}
//等待L1阶段任务执行完
l1.await();
System.out.println("end");

上述代码,将一个大型的计算任务分成两个阶段,第一个阶段的任务会等待第二阶段的字任务完成。每一阶段都使用了线程池,而且两个阶段使用了同一个线程池。
代码永远执行不到最后一行,执行过程中没有任何异常,但是应用已经停止响应了。

查看示例代码停止响应后的线程栈,发现线程池中的两个线程全部都阻塞在l2.await();这行代码上。说明线程池里所有的线程都在等待L2阶段的任务执行完,但是因为线程池里的线程都阻塞了,没有空闲线程执行L2阶段的任务了。

最简单粗暴的办法是将线程池的最大线程数调大,如果能够确定任务的数量不是非常多的话,这个办法可行,否则行不通。
这种问题通用的解决方案是为不同的任务创建不同的线程池。L1阶段的任务和L2阶段的任务如果各自都有自己的线程池,就不会出现这种问题。

提交到相同线程池的任务一定是相互独立的,否则就一定要慎重。

总结

Thread-Per-Message模式,类似于现实世界的委托他人办理;Worker Thread模式类似于车间里工人的工作模式。

区别:委托代办人做事,往往是和代办人直接沟通的;对应到编程领域,其实现也是主线程直接创建了一个子线程,主子线程之间是可以直接通信的。而车间工人的工作方式则是完全围绕任务展开的,一个具体的任务被哪个工人执行,预先是无法知道的;对应到编程领域,则是主线程提交任务到线程池,但主线程并不关心任务被哪个线程执行。

Worker Thread模式能够避免线程频繁创建、销毁的问题,而且能够限制线程的最大数量。Java语言里可以直接使用线程池来实现Worker Thread模式,线程池是一个非常基础和优秀的工具类。但使用线程池需要格外谨慎,正确的创建线程池、避免线程死锁、注意ThreadLocal内存泄漏、对于提交到线程池的任务做好异常处理。