0%

Guarded-Suspension模式:等待唤醒机制的规范实现

消息队列主要用作流量削峰和系统解耦。以消息队列方式接入的服务,发送消息和消费结果这两个操作之间是异步的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Message {
String id;
String content;
}
//发送消息
void send(Message msg) {
//省略
}
//MQ消息返回后会调用该方法
//该方法的执行线程不同于发送消息的线程
void onMessage(Message msg) {
//省略
}
//处理浏览器发来的请求
Respond handleWebReq() {
//创建一消息
Message msg1 = new Message("1", "{...}");
//发送消息
send(msg1);
//问题:如何等待MQ返回的消息
String result = ...;
}

给MQ发送消息的线程是处理Web请求的线程T1,但消费MQ结果的线程并不是线程T1,问题:线程T1如何等待MQ的返回结果?

类似异步转同步中介绍过的异步转同步问题。

Guarded Suspension模式

所谓Guarded Suspension,直译“保护性的暂停”。

Guarded Suspension模式的结构中,一个对象GuardedObject,内部有一个成员变量-受保护的对象,以及两个成员方法-get(Predicate p)和onChanged(T obj)方法。
类比现实中预定包间,对象GuardedObject就是大堂经理,受保护对象就是餐厅里面的包间;受保护对象的get()方法对应的是就餐,就餐的前提是包间已经收拾好了,参数p就是用来描述这个前提条件的;受保护对象的onChanged()方法对应的就是服务员把包间收拾好了,通过onChanged()方法可以fire一个事件,而这个事件往往能改变前提条件p的计算结果。(左侧的绿色线程就是需要就餐的顾客,右侧的蓝色线程就是收拾包间的服务员)

GuardedObject的内部实现非常简单,是管程的一个经典用法,核心是:get()方法通过条件变量的await()方法实现等待,onChanged()方法通过条件变量的signalAll()方法实现唤醒功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class GuardedObject<T> {
//受保护的对象
T obj;
final Lock lock = new ReentrantLock();
final Condition done = lock.newCondition();
final int timeout = 1;
//获取受保护对象
T get(Predicate<T> p) {
lock.lock();
try {
//MESA管程推荐写法
while(!p.test(obj)) {
done.await(timeout, TimeUnit.SECONDS);
}
} catch(InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
//返回非空的受保护对象
return obj;
}
//事件通知方法
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}

扩展Guarded Suspension模式

Guarded Suspension模式里GuardedObject有两个核心方法,一个是get()方法,一个是onChanged()方法。在处理Web请求的方法handleWebReq()中,可以调用GuardedObject的get()方法来实现等待;在MQ消息的消费方法onMessage()中,可以调用GuardedObject的onChanged()方法来实现唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//处理浏览器发来的请求
Respond handleWebReq() {
//创建一消息
Message msg1 = new Message("1","{...}");
//发送消息
send(msg1);
//利用GuardedObject实现等待
GuardedObject<Message> go = new GuardedObject<>();
Message r = go.get(t-> != null);
}
void onMessage(Message msg) {
//如何找到匹配的go?
GuardedObject<Message> go = ???
go.onChanged(msg);
}

但是在实现的时候会遇到一个问题,handleWebReq()里面创建了GuardedObject对象的实例go,并且调用其get()方法等待结果,那在onMessage()方法中,如何才能够找到匹配的GuardedObject对象呢?
这个过程类似服务员告诉大堂经理某某包间已经收拾好了,大堂经理如何根据包间找到就餐的人。现实世界中,大堂经理的头脑中,有包间和就餐人之间的关系图。大堂经理可以立刻就把就餐人找出来。

扩展Guarded Suspension模式,解决Web程序中,每个发送到MQ的消息,都有一个唯一性的属性id,所以可以维护一个MQ消息id和GuardedObject对象实例的关系,这个关系可以类比大堂经理大脑里维护的包间和就餐人的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class GuardedObject<T> {
//受保护的对象
T obj;
final Lock lock = new ReentrantLock();
final Condition done = lock.newCondition();
final int timeout = 2;
//保存所有GuardedObject
final static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>();
//静态方法创建GuardedObject
static <K> GuardedObject create(K key) {
GuardedObject go = new GuardedObject();
gos.put(key, go);
return go;
}
static <K, T> void fireEvent(K key, T obj) {
GuardedObject go = gos.remove(key);
if(go != null) {
go.onChanged(obj);
}
}

//受保护对象
T get(Predicate<T> p) {
lock.lock();
try {
//MESA管程推荐写法
while(!p.test(obj)) {
done.await(timeout, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
//返回非空的受保护对象
return obj;
}
//事件通知方法
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}

扩展后的GuardedObject内部维护了一个Map,其Key是MQ消息id,而Value是GuardedObject对象实例,同时增加了静态方法create()和fireEvent();create()方法用来创建一个GuardedObject对象实例,并根据key值将其加入到Map中,而fireEvent()方法则是模拟的大堂经理根据包间找就餐人的逻辑。

利用扩展后的GuardedObject解决Web请求中的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//处理浏览器发来的请求
Respond handleWebReq() {
int id = 序号生成器.get();
//创建一消息
Message msg1 = new Message(id, "{...}");
//创建GuardedObject实例
GuardedObject<Message> go = GuardedObject.create(id);
//发送消息
send(msg1);
//等待MQ消息
Message r = go.get(t->t != null);
}
//每个请求都会创建一个GuardedObject,
//get和onChange不是在一个线程里执行的,也不在一个对象里
void onMessage(Message msg) {
//唤醒等待的线程
GuardedObject.fireEvent(msg.id, msg);
}

总结

Guarded Suspension模式本质上是一种等待唤醒机制的实现,只不过Guarded Suspension模式将其规范化了。规范化的好处是无需重头思考如何实现,也无需担心实现程序的可理解性问题,同时也能避免一不小心写出个Bug。(Future.get()就是Guarded Suspension的应用)
但Guarded Suspension模式在解决实际问题的时候,往往还是需要扩展的,扩展的方式有很多,本文对GuardedObject的功能进行了增强,Dubbo中DefaultFuture这个类也是采用这种方式。

Guarded Suspension模式也被称作Guarded Wait模式、Spin Lock模式(使用while循环去等待)等,非官方名字-多线程版本的if。
单线程场景中,if语句是不需要等待的,因为在只有一个线程的条件下,如果这个线程被阻塞,那就没有其他活动线程了,这意味着if判断条件的结果也不会发生变化了。但是多线程场景中,等待就变得有意义了,这种场景下,if判断条件的结果时可能发生变化的。所以,用多线程版本的if来理解这个模式会更简单。