0%

并发工具类总结

while(true)总不让人省心

隐藏在并发包中的管程通过破坏不可抢占条件来避免死锁问题,但是它的实现中有一个致命的问题:while(true)没有break条件,从而导致了死循环。除此之外,这个实现虽然不存在死锁问题,但还是存在活锁问题,解决活锁问题只需要随机等待一小段时间就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//转账成功之后break
//在while循环体结束前增加Thread.sleep(随机时间)
class Account {
private int balance;
private final Lock lock = new ReentrantLock();
//转账
void transfer(Account tar, int amt) {
while(true) {
if(this.lock.tryLock()) {
try {
if(tar.lock.tryLock()) {
try {
this.balance -= amt;
tar.balance += amt;
//新增:退出循环
break;
} finally {
tar.lock.unlock();
}
}//if
} finally {
this.lock.unlock();
}
}//if
//新增:sleep一个随机时间避免活锁
Thread.sleep(随机时间);
}//while
}//transfer
}

原子类中本质上也是一个while(true),隐藏的比较深。看上去while(!rf.compareAndSet(or, nr))是有终止条件的,而且跑单线程测试一直都没有问题。实际上却存在严重的并发问题,问题就处在对or的赋值在while循环之外,这样每次循环or的值都不会发生变化,所以一旦有一次循环rf.compareAndSet(or, nr)的值等于fasle,那之后无论循环多少次,都会等于false。即在特定场景下,变成了while(true)问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//把对or的赋值移到while循环之内
public class SafeWM {
class WMRange {
final int upper;
final int lower;
WMRange(int upper, int lower) {
//构造函数实现
}
}

final AtomicReference<WMRange> rf = new AtomicReference<>(new WMRange(0, 0));
//设置库存上限
void setUpper(int v) {
WMRange nr;
WMRange or;
//原代码WMRange or = rf.get();
do {
//移动到此处,每个回合都需要重新获取旧值
or = rf.get();
//检查参数合法性
if(v < or.lower) {
throw new IllegalArgumentException();
}
nr = new WMRange(v, or.lower);
} while(!rf.compareAndSet(or, nr));
}
}

signalAll()总让人省心

Dubbo用管程实现异步转同步是关于signal()和signalAll()的。Dubbo已经把signal()改成signalAll()了,更稳妥。

1
2
3
4
5
6
7
8
9
10
//RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
done.signalAll();
} finally {
lock.unlock();
}
}

Semaphore需要锁中锁

Semaphore对象池中Vector不能换成ArrayList。Semaphore可以允许多个线程访问一个临界区,意味着可能存在多个线程同时访问ArrayList,而ArrayList不是线程安全的,所以对象池中是不能将Vector换成ArrayList的。
Semaphore允许多个线程访问一个临界区,这是一把双刃剑,当多个线程进入临界区时,如果需要访问共享变量就会存在并发问题,所以必须加锁,即Semaphore需要锁中锁。

锁的申请和释放要成对出现

StampedLock中没有正确的释放锁。锁的申请和释放要成对出现,最佳实践:try{}finally{},但是try{}finally{}并不能解决所有锁的释放问题。
示例代码中,锁的升级会生成新的stamp,而finally中释放锁用的是锁升级前的stamp,本质上这也属于锁的申请和释放没有成对出现,只是它隐藏的有点深。需要对stamp重新赋值来解决问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private double x, y;
final StampedLock sl = new StampedLock();
//存在问题的方法
void moveIfAtOrigin(double newX, double newY) {
long stamp = sl.readLock();
try {
while(x == 0.0 && y == 0.0) {
long ws = sl.tryConvertToWriteLock(stamp);
if(wl != 0L) {
//问题出在没有对stamp重新赋值。新增
stamp = ws;
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
//此处unlock的是stamp
sl.unlock(stamp);
}
}

回调总要关心执行线程是谁

CountDownLatch&CyclicBarrier中CyclicBarrier的回调函数使用了一个固定大小为1的线程池是合理的。

  • 第一个是线程池大小是1,只有一个线程,主要原因是check()方法的耗时比getPOrders()和getDOrders()都要短,所以没必要用多个线程,同时单线程能保证访问的数据不存在并发问题。
  • 第二个是使用了线程池,如果不使用,直接在回调函数里调用check()方法是不可以的。CyclicBarrier是同步调用回调函数之后才唤醒等待的线程,如果在回调函数里直接调用check()方法,那就意味着在执行check()的时候,是不能同时执行getPOrders()和getDOrders()的,这样就起不到提升性能的作用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//CyclicBarrier部分源码
try {
//barrierCommand是回调函数
final Runnable command = barrierCommand;
//调用回调函数
if(command != null) {
command.run();
}
ranAction = true;
//唤醒等待的线程
nextGeneration();
return 0;
} finally {
if(!ranAction) {
breakBarrier();
}
}

所以当遇到回调函数的时候,需要确定:执行回调函数的线程是哪一个?
这个在多线程场景下非常重要。因为不同线程ThreadLocal里的数据是不同的,有些框架如Spring就用ThreadLocal来管理事务,如果不清楚回调函数用的是哪个线程,很可能会导致错误的事务管理,并最终导致数据不一致。

CyclicBarrier执行回调函数的线程是将CyclicBarrier内部计数器减到0的那个线程。
执行check()的时候,是不能同时执行getPOrders()和getDOrders(),因为执行这两个方法的线程一个在等待,一个正在忙着执行check()。

共享线程池:有福同享就要有难同当

CompletetableFuture中,没有异常处理、逻辑不严谨、findRuleByJdbc()这个方法隐藏着一个阻塞式I/O,这意味着会阻塞调用线程。
默认情况下所有的CompletetableFuture共享一个ForkJoinPool,当有阻塞式I/O时,可能导致所有的ForkJoinPool线程都阻塞,进而影响整个系统的性能。

1
2
3
4
5
6
7
8
9
10
11
//采购订单
PurchersOrder po;
CompletableFuture<Boolean> cf = CompletableFuture
.supplyAsync(()->{
//在数据库中查询规则
return findRuleByJdbc();
}).thenApply(r->{
//规则校验
return check(po, r);
});
Boolean isOK = cf.join();

利用共享,往往能快速实现功能,所谓是有福同享,但是代价就是有难要同当。在强调高可用的今天,大多数人更倾向于使用隔离的方案。

线程问题定位的利器:线程栈dump

ReadWriteLock并发容器中,本质上都是定位线上并发问题,方案就是通过查看线程栈来定位问题。重点是查看线程状态,分析线程进入该状态的原因是否合理,可以参考线程的生命周期加深理解。

为了便于分析定位线程问题,需要给线程赋予一个有意义的名字,对于线程池可以通过自定义ThreadFactory来给线程池中的线程赋予有意义的名字,也可以在执行run()方法时通过Thread.currentThread().setName();来给线程赋予一个更贴近业务的名字。