目录
背景 CAS CAS 的标准模式 累加示例 写着玩的 RingBuffer 备注
背景 返回目录
大多数企业开发人员都理解数据库乐观并发控制,不过很少有人听说过 CAS(我去年才听说这个概念),CAS 是多线程乐观并发控制策略的一种,一些无锁的支持并发的数据结构都会使用到 CAS,本文对比 CAS 和 数据库乐观并发控制,以此达到强化记忆的目的。
CAS 返回目录
CAS = Compare And Swap
多线程环境下 this.i = this.i + 1 是没有办法保证线程安全的,因此就有了 CAS,CAS 可以保证上面代码的线程安全性,但是 CAS 并不会保证 Swap 的成功,只有 Compare 成功了才会 Swap,即:没有并发发生,即:在我读取和修改之间没有别人修改。另外说一点,如果 i 是局部变量,即:i = i + 1,那么这段代码是线程安全的,因为局部变量是线程独享的。
不明白 CAS 没关系,下面通过 CAS 的标准模式 和 一个简单的示例来理解 CAS。
CAS 的标准模式 返回目录
伪代码
1 var localValue, currentValue; 2 do 3 { 4 localValue = this . 5 6 var newValue = 执行一些计算; 7 8 currentValue = Interlocked.CompareExchange( ref this .value, newValue, localValue); 9 } while (localValue != currentValue);
说明
把 this.value 看成是数据库数据,localValue 是某个用户读取的数据,newValue是用户想修改的值,这里有必要解释一下 CompareExchange 和 currentValue,它的内部实现代码是这样的(想想下面代码是线程安全的):
1 var currentValue = this .value 2 if (currentValue == localValue){ 3 this .value = newValue; 4 } 5 return currentValue;
CompareExchange 用 sql 来类比就是:update xxx set value = newValue where value = localValue,只不过返回的值不同。通过 CompareExchange 的返回结果我们知道 CAS 是否成功了,即:是否出现并发了,即:是否在我读取和修改之间别人已经修改过了,如果是,可以选择重试。
累加示例 返回目录
CAS 代码
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using System.Threading; 7 8 namespace InterlockStudy 9 { 10 class ConcurrentIncrease 11 { 12 public static void Test() 13 { 14 var sum = 0 ; 15 16 var tasks = new Task[ 10 ]; 17 for ( var i = 1 ; i <= 10 ; i++ ) 18 { 19 tasks[i - 1 ] = Task.Factory.StartNew((state) => 20 { 21 int localSum, currentSum; 22 do 23 { 24 localSum = sum; 25 26 Thread.Sleep( 10 ); 27 var value = ( int )state; 28 var newValue = localSum + value; 29 30 currentSum = Interlocked.CompareExchange( ref sum, newValue, localSum); 31 } while (localSum != currentSum); 32 }, i); 33 } 34 35 Task.WaitAll(tasks); 36 37 Console.WriteLine(sum); 38 } 39 } 40 }
数据库并发代码
1 public static void Test13() 2 { 3 var tasks = new Task[ 10 ]; 4 for ( var i = 1 ; i <= 10 ; i++ ) 5 { 6 tasks[i - 1 ] = Task.Factory.StartNew((state) => 7 { 8 int localSum, result; 9 do 10 { 11 using ( var con = new SqlConnection(CONNECTION_STRING)) 12 { 13 con.Open(); 14 var cmd = new SqlCommand( " select sum from Tests where Id = 1 " , con); 15 var reader = cmd.ExecuteReader(); 16 reader.Read(); 17 localSum = reader.GetInt32( 0 ); 18 19 System.Threading.Thread.Sleep( 10 ); 20 var value = ( int )state; 21 var newValue = localSum + value; 22 23 cmd = new SqlCommand( " update Tests set sum = " + newValue + " where sum = " + localSum + "" , con); 24 result = cmd.ExecuteNonQuery(); 25 } 26 } while (result == 0 ); 27 }, i); 28 } 29 30 Task.WaitAll(tasks); 31 } 32 }
说明
我们发现 CAS 版本的代码和数据库版本的代码出奇的相似,数据库的CAS操作是通过 update + where 来完成的。
写着玩的 RingBuffer 返回目录
代码
1 using System; 2 using System.Collections.Generic; 3 using System.Collections.Concurrent; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7 using System.Threading; 8 9 namespace InterlockStudy 10 { 11 internal class Node<T> 12 { 13 public T Data { get ; set ; } 14 15 public bool HasValue { get ; set ; } 16 } 17 18 class RingBuffer<T> 19 { 20 private readonly Node<T> [] _nodes; 21 private long _tailIndex = - 1 ; 22 private long _headIndex = - 1 ; 23 private AutoResetEvent _readEvent = new AutoResetEvent( false ); 24 private AutoResetEvent _writeEvent = new AutoResetEvent( false ); 25 26 public RingBuffer( int maxSize) 27 { 28 _nodes = new Node<T> [maxSize]; 29 30 for ( var i = 0 ; i < maxSize; i++ ) 31 { 32 _nodes[i] = new Node<T> (); 33 } 34 } 35 36 public void EnQueue(T data) 37 { 38 while ( true ) 39 { 40 if ( this .TryEnQueue(data)) 41 { 42 _readEvent.Set(); 43 return ; 44 } 45 46 _writeEvent.WaitOne(); 47 } 48 49 } 50 51 public T DeQueue() 52 { 53 while ( true ) 54 { 55 T data; 56 if ( this .TryDeQueue( out data)) 57 { 58 _writeEvent.Set(); 59 return data; 60 } 61 62 _readEvent.WaitOne(); 63 } 64 65 } 66 67 public bool TryEnQueue(T data) 68 { 69 long localTailIndex, newTailIndex, currentTailIndex; 70 do 71 { 72 localTailIndex = _tailIndex; 73 74 if (! this .CanWrite(localTailIndex)) 75 { 76 return false ; 77 } 78 79 newTailIndex = localTailIndex + 1 ; 80 81 if (_nodes[newTailIndex % _nodes.Length].HasValue) 82 { 83 return false ; 84 } 85 86 currentTailIndex = Interlocked.CompareExchange( ref _tailIndex, newTailIndex, localTailIndex); 87 } 88 while (localTailIndex != currentTailIndex); 89 90 _nodes[newTailIndex % _nodes.Length].Data = data; 91 _nodes[newTailIndex % _nodes.Length].HasValue = true ; 92 93 return true ; 94 } 95 96 public bool TryDeQueue( out T data) 97 { 98 long localHeadIndex, newHeadIndex, currentHeadIndex; 99 do 100 { 101 localHeadIndex = _headIndex; 102 103 if (! this .CanRead(localHeadIndex)) 104 { 105 data = default (T); 106 return false ; 107 } 108 109 newHeadIndex = localHeadIndex + 1 ; 110 if (_nodes[newHeadIndex % _nodes.Length].HasValue == false ) 111 { 112 data = default (T); 113 return false ; 114 } 115 116 currentHeadIndex = Interlocked.CompareExchange( ref _headIndex, newHeadIndex, localHeadIndex); 117 } 118 while (localHeadIndex != currentHeadIndex); 119 120 data = _nodes[newHeadIndex % _nodes.Length].Data; 121 _nodes[newHeadIndex % _nodes.Length].HasValue = false ; 122 123 return true ; 124 } 125 126 private bool CanWrite( long localTailIndex) 127 { 128 return localTailIndex - _headIndex < _nodes.Length; 129 } 130 131 private bool CanRead( long localHeadIndex) 132 { 133 return _tailIndex - localHeadIndex > 0 ; 134 } 135 } 136 }
备注 返回目录
仓促成文,如果有必要可以再写篇文章,希望大家多批评。