目录
背景 CAS CAS 的标准模式 累加示例 写着玩的 RingBuffer 备注
背景 返回目录
大多数企业开发人员都理解数据库乐观并发控制,不过很少有人听说过 CAS(我去年才听说这个概念),CAS 是多线程乐观并发控制策略的一种,一些无锁的支持并发的数据结构都会使用到 CAS,本文对比 CAS 和 数据库乐观并发控制,以此达到强化记忆的目的。
CAS 返回目录
CAS = Compare And Swap
多线程环境下 this.i = this.i + 1 是没有办法保证线程安全的,因此就有了 CAS,CAS 可以保证上面代码的线程安全性,但是 CAS 并不会保证 Swap 的成功,只有 Compare 成功了才会 Swap,即:没有并发发生,即:在我读取和修改之间没有别人修改。另外说一点,如果 i 是局部变量,即:i = i + 1,那么这段代码是线程安全的,因为局部变量是线程独享的。
不明白 CAS 没关系,下面通过 CAS 的标准模式 和 一个简单的示例来理解 CAS。
CAS 的标准模式 返回目录
伪代码
1
var
localValue, currentValue;
2
do
3
{
4
localValue =
this
.
5
6
var
newValue =
执行一些计算;
7
8
currentValue = Interlocked.CompareExchange(
ref
this
.value, newValue, localValue);
9
}
while
(localValue !=
currentValue);
说明
把 this.value 看成是数据库数据,localValue 是某个用户读取的数据,newValue是用户想修改的值,这里有必要解释一下 CompareExchange 和 currentValue,它的内部实现代码是这样的(想想下面代码是线程安全的):
1
var
currentValue =
this
.value
2
if
(currentValue ==
localValue){
3
this
.value =
newValue;
4
}
5
return
currentValue;
CompareExchange 用 sql 来类比就是:update xxx set value = newValue where value = localValue,只不过返回的值不同。通过 CompareExchange 的返回结果我们知道 CAS 是否成功了,即:是否出现并发了,即:是否在我读取和修改之间别人已经修改过了,如果是,可以选择重试。
累加示例 返回目录
CAS 代码
1
using
System;
2
using
System.Collections.Generic;
3
using
System.Linq;
4
using
System.Text;
5
using
System.Threading.Tasks;
6
using
System.Threading;
7
8
namespace
InterlockStudy
9
{
10
class
ConcurrentIncrease
11
{
12
public
static
void
Test()
13
{
14
var
sum =
0
;
15
16
var
tasks =
new
Task[
10
];
17
for
(
var
i =
1
; i <=
10
; i++
)
18
{
19
tasks[i -
1
] = Task.Factory.StartNew((state) =>
20
{
21
int
localSum, currentSum;
22
do
23
{
24
localSum =
sum;
25
26
Thread.Sleep(
10
);
27
var
value = (
int
)state;
28
var
newValue = localSum +
value;
29
30
currentSum = Interlocked.CompareExchange(
ref
sum, newValue, localSum);
31
}
while
(localSum !=
currentSum);
32
}, i);
33
}
34
35
Task.WaitAll(tasks);
36
37
Console.WriteLine(sum);
38
}
39
}
40
}
数据库并发代码
1
public
static
void
Test13()
2
{
3
var
tasks =
new
Task[
10
];
4
for
(
var
i =
1
; i <=
10
; i++
)
5
{
6
tasks[i -
1
] = Task.Factory.StartNew((state) =>
7
{
8
int
localSum, result;
9
do
10
{
11
using
(
var
con =
new
SqlConnection(CONNECTION_STRING))
12
{
13
con.Open();
14
var
cmd =
new
SqlCommand(
"
select sum from Tests where Id = 1
"
, con);
15
var
reader =
cmd.ExecuteReader();
16
reader.Read();
17
localSum = reader.GetInt32(
0
);
18
19
System.Threading.Thread.Sleep(
10
);
20
var
value = (
int
)state;
21
var
newValue = localSum +
value;
22
23
cmd =
new
SqlCommand(
"
update Tests set sum =
"
+ newValue +
"
where sum =
"
+ localSum +
""
, con);
24
result =
cmd.ExecuteNonQuery();
25
}
26
}
while
(result ==
0
);
27
}, i);
28
}
29
30
Task.WaitAll(tasks);
31
}
32
}
说明
我们发现 CAS 版本的代码和数据库版本的代码出奇的相似,数据库的CAS操作是通过 update + where 来完成的。
写着玩的 RingBuffer 返回目录
代码
1
using
System;
2
using
System.Collections.Generic;
3
using
System.Collections.Concurrent;
4
using
System.Linq;
5
using
System.Text;
6
using
System.Threading.Tasks;
7
using
System.Threading;
8
9
namespace
InterlockStudy
10
{
11
internal
class
Node<T>
12
{
13
public
T Data {
get
;
set
; }
14
15
public
bool
HasValue {
get
;
set
; }
16
}
17
18
class
RingBuffer<T>
19
{
20
private
readonly
Node<T>
[] _nodes;
21
private
long
_tailIndex = -
1
;
22
private
long
_headIndex = -
1
;
23
private
AutoResetEvent _readEvent =
new
AutoResetEvent(
false
);
24
private
AutoResetEvent _writeEvent =
new
AutoResetEvent(
false
);
25
26
public
RingBuffer(
int
maxSize)
27
{
28
_nodes =
new
Node<T>
[maxSize];
29
30
for
(
var
i =
0
; i < maxSize; i++
)
31
{
32
_nodes[i] =
new
Node<T>
();
33
}
34
}
35
36
public
void
EnQueue(T data)
37
{
38
while
(
true
)
39
{
40
if
(
this
.TryEnQueue(data))
41
{
42
_readEvent.Set();
43
return
;
44
}
45
46
_writeEvent.WaitOne();
47
}
48
49
}
50
51
public
T DeQueue()
52
{
53
while
(
true
)
54
{
55
T data;
56
if
(
this
.TryDeQueue(
out
data))
57
{
58
_writeEvent.Set();
59
return
data;
60
}
61
62
_readEvent.WaitOne();
63
}
64
65
}
66
67
public
bool
TryEnQueue(T data)
68
{
69
long
localTailIndex, newTailIndex, currentTailIndex;
70
do
71
{
72
localTailIndex =
_tailIndex;
73
74
if
(!
this
.CanWrite(localTailIndex))
75
{
76
return
false
;
77
}
78
79
newTailIndex = localTailIndex +
1
;
80
81
if
(_nodes[newTailIndex %
_nodes.Length].HasValue)
82
{
83
return
false
;
84
}
85
86
currentTailIndex = Interlocked.CompareExchange(
ref
_tailIndex, newTailIndex, localTailIndex);
87
}
88
while
(localTailIndex !=
currentTailIndex);
89
90
_nodes[newTailIndex % _nodes.Length].Data =
data;
91
_nodes[newTailIndex % _nodes.Length].HasValue =
true
;
92
93
return
true
;
94
}
95
96
public
bool
TryDeQueue(
out
T data)
97
{
98
long
localHeadIndex, newHeadIndex, currentHeadIndex;
99
do
100
{
101
localHeadIndex =
_headIndex;
102
103
if
(!
this
.CanRead(localHeadIndex))
104
{
105
data =
default
(T);
106
return
false
;
107
}
108
109
newHeadIndex = localHeadIndex +
1
;
110
if
(_nodes[newHeadIndex % _nodes.Length].HasValue ==
false
)
111
{
112
data =
default
(T);
113
return
false
;
114
}
115
116
currentHeadIndex = Interlocked.CompareExchange(
ref
_headIndex, newHeadIndex, localHeadIndex);
117
}
118
while
(localHeadIndex !=
currentHeadIndex);
119
120
data = _nodes[newHeadIndex %
_nodes.Length].Data;
121
_nodes[newHeadIndex % _nodes.Length].HasValue =
false
;
122
123
return
true
;
124
}
125
126
private
bool
CanWrite(
long
localTailIndex)
127
{
128
return
localTailIndex - _headIndex <
_nodes.Length;
129
}
130
131
private
bool
CanRead(
long
localHeadIndex)
132
{
133
return
_tailIndex - localHeadIndex >
0
;
134
}
135
}
136
}
备注 返回目录
仓促成文,如果有必要可以再写篇文章,希望大家多批评。

