0%

软件事务内存

数据库在解决并发问题方面的事务机制非常简单易用-软件事务内存(Software Transactional Memory,STM)。

传统的数据库事务,支持4个特性ACID:原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability),STM不涉及到持久性,只支持ACI。

STM实现转账

引发的死锁的产生原因以及解决方案中并发转账,简单的使用synchronized将transfer()方法编程同步方法并不能解决并发问题,因为还存在死锁问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class UnsafeAccount {
//余额
private long balance;
//构造函数
public UnsafeAccount(long balance) {
this.balance = balance;
}
//转账
void transfer(UnsafeAccount target, long amt) {
if(this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}

转账操作使用数据库事务解决。如果所有SQL都正常执行,则通过commit()方法提交事务;如果SQL在执行过程中有异常,则通过rollback()方法回滚事务。数据库保证在并发情况下不会有死锁,而且还能保证ACID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Connection conn = null;
try{
//获取数据库连接
conn = DriverManager.getConnection();
//设置手动提交事务
conn.setAutoCommit(false);
//执行转账SQL
。。。
//提交事务
conn.commit();
} catch (Exception e) {
//出现异常回滚事务
conn.rollbacl();
}

使用STM实现。Java语言不支持STM,借助第三方类库Multiverse来支持。相较于UNsafeAccount,将余额的类型从long变成了TxnLong,将转账的操作放到了atomic(()->{})中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Account {
//余额
private TxnLong balance;
//构造函数
public Account(long balance) {
this.balance = StmUtils.newTxnLong(balance);
}
//转账
public void transfer(Account to, int amt) {
//原子化操作
atomic(()->{
if(this.balance.get() > amt) {
this.balance.decrement(amt);
to.balance.increment(amt);
}
});
}
}

数据库事务广泛使用的是MVCC(Mutli-Version Concurrency Control),多版本并发控制。
事务隔离隔离事务

MVCC可以简单的理解为数据库事务在开启的时候,会给数据库打一个快照,以后所有的读写都是基于这个快照的。当提交事务的时候,如果所有读写过的数据在该事务执行期间没有发生过变化,那么久可以提交;如果发生了变化,说明事务中有和其他事务读写的数据冲突了,这个时候是不可以提交的。

为了记录数据是否发生了变化,可以给每条数据增加一个版本号,这样每次成功修改数据都会增加版本号的值。MVCC的工作原理和StampedLock中的乐观锁类似。

实现STM

首先让Java中的对象有版本号,VersionedRef这个类的作用就是将对象value包装成带版本号的对象。按照MVCC理论,数据的每一次修改都对应着一个唯一的版本号,不存在仅仅改变value或者version的情况,用不变模式可以很好的解决这个问题,VersionedRef设计成不可变的。

1
2
3
4
5
6
7
8
9
10
//带版本号的对象引用
public final class VersionedRef<T> {
final T value;
final long version;
//构造方法
public VersionedRef(T value, long version) {
this.value = value;
this.version = version;
}
}

所有对数据的读写操作,一定是在一个事务里面,TxnRef这个类负责完成事务内的读写操作,读写操作委托给了接口Txn,Txn代表的是读写操作所在的当前事务,内部持有的curRef代表的是系统中的最新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//支持事务的引用
public class TxnRef<T> {
//当前数据,带版本号
volatile VersionedRef curRef;
//构造方法
public TxnRef(T value) {
this.curRef = new VersionedRef(value, 0L);
}
//获取当前事务中的数据
public T getValue(Txn txn) {
return txn.get(this);
}
//在当前事务中设置数据
public void setValue(T value, Txn txn) {
txn.set(this, value);
}
}
1
2
3
4
5
//事务接口
public interface Txn {
<T> T get(TxnRef<T> ref);
<T> void set(TxnRef<T> ref, T value);
}

STMTxn是Txn最关键的一个实现类,事务内对于数据的读写,都是通过它来完成的。STMTxn内部有两个Map:

  • inTxnMap-用于保存当前事务中所有读写的数据的快照;
  • writeMap-用于保存当前事务需要写入的数据。

每个事务都有一个唯一的事务ID txnId,全局递增的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//STM事务实现类
public final class STMTxn implements Txn {
//事务ID生成器
private static AtomicLong txnSeq = new AtomicLong(0);
//当前事务所有的相关数据
private Map<TxnRef, VersionedRef> inTxnMap = new HashMap<>();
//当前事务所有需要修改的数据
private Map<TxnRef, Object> writeMap = new HashMap<>();
//当前事务ID
private long txnId;
STMTxn() {
txnId = txnSeq.incrementAndGet();
}

//获取当前事务中的数据
@Override
public <T> T get(TxnRef<T> ref) {
//将需要读取的数据,加入inTxnMap
if(!inTxnMap.containsKey(ref)) {
inTxnMap.put(ref, ref.curRef);
}
return (T) inTxnMap.get(ref).value;
}
//在当前事务中修改数据
@Override
public <T> void set(TxnRef<T> ref, T value) {
//将需要修改的数据。加入inTxnMap
if(!inTxnMap.containsKey(ref)) {
inTxnMap.put(ref, ref.curRef);
}
writeMap.put(ref, value);
}
//提交事务
boolean commit() {
synchronized (STM.commitLock) {
//是否校验通过
boolean isValid = true;
//校验所有读过的数据是否发生过变化
for(Map.Entry<TxnRef, VersionedRef> entry : inTxnMap.entrySet()) {
VersionedRef curRef = entry.getKey().curRef;
VersionedRef readRef = entry.getValue();
//通过版本号来验证数据是否发生过变化
if(curRef.version != readRef.version) {
idValid = false;、
break;
}
}
}
//如果校验通过,则所有更改生效
if(isValid) {
writeMap.forEach((k, v)->{
k.curRef = new VersionedRef(v, txnId);
});
}
return isValid;
}
}

STMTxn有三个核心方法,分别是读数据的get()方法、写数据的set()方法和提交事务的commit()方法。

  • get()方法将要读取数据作为快照放入inTxnMap,同时保证每次读取的数据都是一个版本。
  • set()方法会将要写入的数据放入writeMap,但如果写入的数据没被读取过,也会将其放入inTxnMap。
  • commit()方法,简化实现,使用了互斥锁,所以事务的提交是串行的。首先检查inTxnMap中的数据是否发生过变化,如果没有发生变化,那么就将writeMap中的数据写入(这里的写入是TxnRef内部持有的curRef);如果发生过变化,那么就不能讲writeMap中的数据写入了。

模拟实现Multiverse中的原子化操作atomic()。使用了类似CAS的操作,如果事务提交失败,那么就重新创建一个新的事务,重新执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@FunctionalInterface
public interface TxnRunnable {
void run(Txn txn);
}
//STM
public final class STM {
//私有化构造方法
private STM() {
//提交数据需要用到的全局锁
static final Object commitLock = new Object();
//原子化提交方法
public static void atomic(TxnRunnable action) {
boolean committed = false;
//如果,没有提交成功,则一直重试
while(!committed) {
//创建新的事务
STMTxn txn = new STMTxn();
//执行业务逻辑
action.run(txn);
//提交事务
commited = txn.commit();
}
}
}
}

线程安全的转账操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Account {
//余额
private TxnRef<Integer> balance;
//构造方法
public Account(int balance) {
this.balance = new TxnRef<Integer>(balance);
}
//转账操作
public void transfer(Account target, int amt) {
STM.atomic((txn)->{
Integer from = balance.getValue(txn);
balance.setValue(from-amt, txn);
Integer to = target.balance.getValue(txn);
target.balance.setValue(to+amt, txn);
});
}
}

总结

STM仅适用于存储共享变量,编程语言中各种I/O操作很难支持回滚。支持STM的编程语言主要是函数式语言,函数式语言里的数据天生具备不可变性,利用这种不可变性实现STM更加简单。