0%

Lock和Condition(下):Dubbo如何用管程实现异步转同步

Java SDK并发包里的Condition实现了管程模型里面的条件变量。

Java语言内置的管程里只有一个条件变量,而Lock&Condition实现的管程是支持多个条件变量的,这是二者的一个重要区别。

在很多并发场景下,支持过个条件变量能够让并发程序可读性更好,实现起来也更容易。如实现一个阻塞队列,就需要两个条件变量。

利用两个条件变量实现阻塞队列

一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队)。

Lock和Condition实现的管程,线程等待和通知需要调用await()、signal()、signalAll(),它们的语义和wait()、notify()、notifyAll()是相同的。不一样的是,Lock&Condition实现的管程里只能使用await()、signal()、signalAll(),wait()、notify()、notifyAll()只有在synchronized实现的管程里才能使用。

Java SDK并发包里的Lock和Condition是管程的一种实现。

同步与异步

平时写的代码,基本都是同步的。
同步与异步的区别:调用发是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步

同步,是Java代码默认的处理方式。如果想让程序支持异步,可以通过两种方式实现:

  1. 调用方创建一个子线程,在子线程中执行方法调用,这种调用称为异步调用;
  2. 方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接return,这种方法称为异步方法。

Dubbo源码分析

在编程领域异步场景:TCP协议本身就是异步的;RPC调用,在TCP协议层面,发送完RPC请求后,线程是不会等待RPC的响应结果的。(框架做了异步转同步的事情)

1
2
3
DemoService service = 初始化;  
String message = service.sayHello("dubbo");
System.out.println(message);

默认情况下sayHello()方法是个同步方法,执行service.sayHello(“dubbo”)的时候,线程会停下来等结果。

此时将调用线程dump出来的话,会发现调用线程阻塞了,线程状态是Timed_Waiting。本来发送请求时一步的,但是调用线程却阻塞了,说明Dubbo做了异步转同步的事情。通过调用栈发现线程是阻塞在DefaultFuture.get()方法上,说明Dubbo异步转同步的功能是通过DefaultFuture这个类实现的。

DefaultFuture.get()之前,DubboInvoker调用了DefaultFuture.get()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class DubboInvoker {
Result doInvoke(Invocation inv) {
return currentClient
.request(inv, timeout)
.get();
}
}

//当RPC返回结果之前,阻塞调用线程,让调用线程等待;
//当RPC返回结果后,唤醒调用线程,让调用线程重新执行。
//等待-通知机制,加锁只是利用管程实现线程的阻塞和唤醒,并没有用到共享的资源
public class DefaultFuture {
//创建锁与条件变量
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

//调用方通过该方法等待结果
Object get(int timeout) {
long start = System.nanoTime();
lock.lock();
try {
while(!isDone()) {
done.await(timeout);//等待中的线程,被唤醒后,会重新去获取锁,但是获取锁后,会执行wait后的代码。
long cur = System.nanoTime();
if(isDone() || cur - start > timeout) {
break;
}
}
} finally {
lock.unlock();
}
if(!isDone()) {
throw new TimeoutException();
}
return returnFromResponxe();
}
//RPC结果是否已经返回
boolean isDone() {
return response != null;
}
//RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
//if(done != null) {//始终为true
// done.signal();//signal会导致很多请求超时
//}
done.signalAll();
} finally {
lock.unlock();
}
}
}

调用线程通过调用get()方法等待RPC返回结果,这个方法里:调用lock()获取锁,在finally里调用unlock()释放锁;获取锁后,通过经典的在循环中调用await()方法来实现等待。

当RPC结果返回时 ,会调用doReceived()方法,这个方法里面,调用lock()获取锁,在finally里面调用unlock()释放锁,获取锁后通过调用signal()来通知调用线程,结果已经返回,不用继续等待了。


websocket也是一个异步的通信协议,基于这个协议实现一个简单的RPC,也会遇到异步转同步的问题。

公有云的API本身也是异步的,如创建云主机就是一个异步API,调用虽然成功了,但是云主机并没有创建成功,需要调用另外一个API去轮询云主机的状态。
如果需要再项目内部封装创建云主机的API,也会面临异步转同步的问题,同步的API更易用。

总结

Lock&Condition是管程的一种实现,相对于synchronized实现的管程来说更加灵活、功能也更丰富。