CAS

系统 1685 0

目录

背景 CAS CAS 的标准模式 累加示例 写着玩的 RingBuffer 备注

背景 返回目录

大多数企业开发人员都理解数据库乐观并发控制,不过很少有人听说过 CAS(我去年才听说这个概念),CAS 是多线程乐观并发控制策略的一种,一些无锁的支持并发的数据结构都会使用到 CAS,本文对比 CAS 和 数据库乐观并发控制,以此达到强化记忆的目的。

CAS 返回目录

CAS = Compare And Swap

多线程环境下 this.i = this.i + 1 是没有办法保证线程安全的,因此就有了 CAS,CAS 可以保证上面代码的线程安全性,但是 CAS 并不会保证 Swap 的成功,只有 Compare 成功了才会 Swap,即:没有并发发生,即:在我读取和修改之间没有别人修改。另外说一点,如果 i 是局部变量,即:i = i + 1,那么这段代码是线程安全的,因为局部变量是线程独享的。

不明白 CAS 没关系,下面通过 CAS 的标准模式 和 一个简单的示例来理解 CAS。

CAS 的标准模式 返回目录

伪代码

          
             1
          
          
            var
          
          
             localValue, currentValue;


          
          
             2
          
          
            do
          
          
             3
          
          
                                {


          
          
             4
          
                                   localValue = 
          
            this
          
          
            .


          
          
             5
          
          
             6
          
          
            var
          
           newValue =
          
             执行一些计算;


          
          
             7
          
          
             8
          
                                   currentValue = Interlocked.CompareExchange(
          
            ref
          
          
            this
          
          
            .value, newValue, localValue);


          
          
             9
          
                               } 
          
            while
          
           (localValue !=
          
             currentValue);
          
        

说明

把 this.value 看成是数据库数据,localValue 是某个用户读取的数据,newValue是用户想修改的值,这里有必要解释一下 CompareExchange 和 currentValue,它的内部实现代码是这样的(想想下面代码是线程安全的):

          
            1
          
          
            var
          
           currentValue = 
          
            this
          
          
            .value


          
          
            2
          
          
            if
          
          (currentValue ==
          
             localValue){


          
          
            3
          
          
            this
          
          .value =
          
             newValue;


          
          
            4
          
          
            }


          
          
            5
          
          
            return
          
           currentValue;
        

CompareExchange  用 sql 来类比就是:update xxx set value = newValue where value = localValue,只不过返回的值不同。通过 CompareExchange 的返回结果我们知道 CAS 是否成功了,即:是否出现并发了,即:是否在我读取和修改之间别人已经修改过了,如果是,可以选择重试。

累加示例 返回目录

CAS 代码

          
             1
          
          
            using
          
          
             System;


          
          
             2
          
          
            using
          
          
             System.Collections.Generic;


          
          
             3
          
          
            using
          
          
             System.Linq;


          
          
             4
          
          
            using
          
          
             System.Text;


          
          
             5
          
          
            using
          
          
             System.Threading.Tasks;


          
          
             6
          
          
            using
          
          
             System.Threading;


          
          
             7
          
          
             8
          
          
            namespace
          
          
             InterlockStudy


          
          
             9
          
          
            {


          
          
            10
          
          
            class
          
          
             ConcurrentIncrease


          
          
            11
          
          
                {


          
          
            12
          
          
            public
          
          
            static
          
          
            void
          
          
             Test()


          
          
            13
          
          
                    {


          
          
            14
          
          
            var
          
           sum = 
          
            0
          
          
            ;


          
          
            15
          
          
            16
          
          
            var
          
           tasks = 
          
            new
          
           Task[
          
            10
          
          
            ];


          
          
            17
          
          
            for
          
           (
          
            var
          
           i = 
          
            1
          
          ; i <= 
          
            10
          
          ; i++
          
            )


          
          
            18
          
          
                        {


          
          
            19
          
                           tasks[i - 
          
            1
          
          ] = Task.Factory.StartNew((state) =>


          
            20
          
          
                            {


          
          
            21
          
          
            int
          
          
             localSum, currentSum;


          
          
            22
          
          
            do
          
          
            23
          
          
                                {


          
          
            24
          
                                   localSum =
          
             sum;


          
          
            25
          
          
            26
          
                                   Thread.Sleep(
          
            10
          
          
            );


          
          
            27
          
          
            var
          
           value = (
          
            int
          
          
            )state;


          
          
            28
          
          
            var
          
           newValue = localSum +
          
             value;


          
          
            29
          
          
            30
          
                                   currentSum = Interlocked.CompareExchange(
          
            ref
          
          
             sum, newValue, localSum);


          
          
            31
          
                               } 
          
            while
          
           (localSum !=
          
             currentSum);


          
          
            32
          
          
                            }, i);


          
          
            33
          
          
                        }


          
          
            34
          
          
            35
          
          
                        Task.WaitAll(tasks);


          
          
            36
          
          
            37
          
          
                        Console.WriteLine(sum);


          
          
            38
          
          
                    }


          
          
            39
          
          
                }


          
          
            40
          
           }
        

数据库并发代码

          
             1
          
          
            public
          
          
            static
          
          
            void
          
          
             Test13()


          
          
             2
          
          
                    {


          
          
             3
          
          
            var
          
           tasks = 
          
            new
          
           Task[
          
            10
          
          
            ];


          
          
             4
          
          
            for
          
           (
          
            var
          
           i = 
          
            1
          
          ; i <= 
          
            10
          
          ; i++
          
            )


          
          
             5
          
          
                        {


          
          
             6
          
                           tasks[i - 
          
            1
          
          ] = Task.Factory.StartNew((state) =>


          
             7
          
          
                            {


          
          
             8
          
          
            int
          
          
             localSum, result;


          
          
             9
          
          
            do
          
          
            10
          
          
                                {


          
          
            11
          
          
            using
          
           (
          
            var
          
           con = 
          
            new
          
          
             SqlConnection(CONNECTION_STRING))


          
          
            12
          
          
                                    {


          
          
            13
          
          
                                        con.Open();


          
          
            14
          
          
            var
          
           cmd = 
          
            new
          
           SqlCommand(
          
            "
          
          
            select sum from Tests where Id = 1
          
          
            "
          
          
            , con);


          
          
            15
          
          
            var
          
           reader =
          
             cmd.ExecuteReader();


          
          
            16
          
          
                                        reader.Read();


          
          
            17
          
                                       localSum = reader.GetInt32(
          
            0
          
          
            );


          
          
            18
          
          
            19
          
                                       System.Threading.Thread.Sleep(
          
            10
          
          
            );


          
          
            20
          
          
            var
          
           value = (
          
            int
          
          
            )state;


          
          
            21
          
          
            var
          
           newValue = localSum +
          
             value;


          
          
            22
          
          
            23
          
                                       cmd = 
          
            new
          
           SqlCommand(
          
            "
          
          
            update Tests set sum = 
          
          
            "
          
           + newValue + 
          
            "
          
          
             where sum = 
          
          
            "
          
           + localSum + 
          
            ""
          
          
            , con);


          
          
            24
          
                                       result =
          
             cmd.ExecuteNonQuery();


          
          
            25
          
          
                                    }


          
          
            26
          
                               } 
          
            while
          
           (result == 
          
            0
          
          
            );


          
          
            27
          
          
                            }, i);


          
          
            28
          
          
                        }


          
          
            29
          
          
            30
          
          
                        Task.WaitAll(tasks);


          
          
            31
          
          
                    }


          
          
            32
          
               }
        

说明

我们发现 CAS 版本的代码和数据库版本的代码出奇的相似,数据库的CAS操作是通过 update + where 来完成的。

写着玩的 RingBuffer 返回目录

代码

          
              1
          
          
            using
          
          
             System;


          
          
              2
          
          
            using
          
          
             System.Collections.Generic;


          
          
              3
          
          
            using
          
          
             System.Collections.Concurrent;


          
          
              4
          
          
            using
          
          
             System.Linq;


          
          
              5
          
          
            using
          
          
             System.Text;


          
          
              6
          
          
            using
          
          
             System.Threading.Tasks;


          
          
              7
          
          
            using
          
          
             System.Threading;


          
          
              8
          
          
              9
          
          
            namespace
          
          
             InterlockStudy


          
          
             10
          
          
            {


          
          
             11
          
          
            internal
          
          
            class
          
           Node<T>


          
             12
          
          
                {


          
          
             13
          
          
            public
          
           T Data { 
          
            get
          
          ; 
          
            set
          
          
            ; }


          
          
             14
          
          
             15
          
          
            public
          
          
            bool
          
           HasValue { 
          
            get
          
          ; 
          
            set
          
          
            ; }


          
          
             16
          
          
                }


          
          
             17
          
          
             18
          
          
            class
          
           RingBuffer<T>


          
             19
          
          
                {


          
          
             20
          
          
            private
          
          
            readonly
          
           Node<T>
          
            [] _nodes;


          
          
             21
          
          
            private
          
          
            long
          
           _tailIndex = -
          
            1
          
          
            ;


          
          
             22
          
          
            private
          
          
            long
          
           _headIndex = -
          
            1
          
          
            ;


          
          
             23
          
          
            private
          
           AutoResetEvent _readEvent = 
          
            new
          
           AutoResetEvent(
          
            false
          
          
            );


          
          
             24
          
          
            private
          
           AutoResetEvent _writeEvent = 
          
            new
          
           AutoResetEvent(
          
            false
          
          
            );


          
          
             25
          
          
             26
          
          
            public
          
           RingBuffer(
          
            int
          
          
             maxSize)


          
          
             27
          
          
                    {


          
          
             28
          
                       _nodes = 
          
            new
          
           Node<T>
          
            [maxSize];


          
          
             29
          
          
             30
          
          
            for
          
           (
          
            var
          
           i = 
          
            0
          
          ; i < maxSize; i++
          
            )


          
          
             31
          
          
                        {


          
          
             32
          
                           _nodes[i] = 
          
            new
          
           Node<T>
          
            ();


          
          
             33
          
          
                        }


          
          
             34
          
          
                    }


          
          
             35
          
          
             36
          
          
            public
          
          
            void
          
          
             EnQueue(T data)


          
          
             37
          
          
                    {


          
          
             38
          
          
            while
          
           (
          
            true
          
          
            )


          
          
             39
          
          
                        {


          
          
             40
          
          
            if
          
           (
          
            this
          
          
            .TryEnQueue(data))


          
          
             41
          
          
                            {


          
          
             42
          
          
                                _readEvent.Set();


          
          
             43
          
          
            return
          
          
            ;


          
          
             44
          
          
                            }


          
          
             45
          
          
             46
          
          
                            _writeEvent.WaitOne();


          
          
             47
          
          
                        }


          
          
             48
          
          
             49
          
          
                    }


          
          
             50
          
          
             51
          
          
            public
          
          
             T DeQueue()


          
          
             52
          
          
                    {


          
          
             53
          
          
            while
          
           (
          
            true
          
          
            )


          
          
             54
          
          
                        {


          
          
             55
          
          
                            T data;


          
          
             56
          
          
            if
          
           (
          
            this
          
          .TryDeQueue(
          
            out
          
          
             data))


          
          
             57
          
          
                            {


          
          
             58
          
          
                                _writeEvent.Set();


          
          
             59
          
          
            return
          
          
             data;


          
          
             60
          
          
                            }


          
          
             61
          
          
             62
          
          
                            _readEvent.WaitOne();


          
          
             63
          
          
                        }


          
          
             64
          
          
             65
          
          
                    }


          
          
             66
          
          
             67
          
          
            public
          
          
            bool
          
          
             TryEnQueue(T data)


          
          
             68
          
          
                    {


          
          
             69
          
          
            long
          
          
             localTailIndex, newTailIndex, currentTailIndex;


          
          
             70
          
          
            do
          
          
             71
          
          
                        {


          
          
             72
          
                           localTailIndex =
          
             _tailIndex;


          
          
             73
          
          
             74
          
          
            if
          
           (!
          
            this
          
          
            .CanWrite(localTailIndex))


          
          
             75
          
          
                            {


          
          
             76
          
          
            return
          
          
            false
          
          
            ;


          
          
             77
          
          
                            }


          
          
             78
          
          
             79
          
                           newTailIndex = localTailIndex + 
          
            1
          
          
            ;


          
          
             80
          
          
             81
          
          
            if
          
           (_nodes[newTailIndex %
          
             _nodes.Length].HasValue)


          
          
             82
          
          
                            {


          
          
             83
          
          
            return
          
          
            false
          
          
            ;


          
          
             84
          
          
                            }


          
          
             85
          
          
             86
          
                           currentTailIndex = Interlocked.CompareExchange(
          
            ref
          
          
             _tailIndex, newTailIndex, localTailIndex);


          
          
             87
          
          
                        }


          
          
             88
          
          
            while
          
           (localTailIndex !=
          
             currentTailIndex);


          
          
             89
          
          
             90
          
                       _nodes[newTailIndex % _nodes.Length].Data =
          
             data;


          
          
             91
          
                       _nodes[newTailIndex % _nodes.Length].HasValue = 
          
            true
          
          
            ;


          
          
             92
          
          
             93
          
          
            return
          
          
            true
          
          
            ;


          
          
             94
          
          
                    }


          
          
             95
          
          
             96
          
          
            public
          
          
            bool
          
           TryDeQueue(
          
            out
          
          
             T data)


          
          
             97
          
          
                    {


          
          
             98
          
          
            long
          
          
             localHeadIndex, newHeadIndex, currentHeadIndex;


          
          
             99
          
          
            do
          
          
            100
          
          
                        {


          
          
            101
          
                           localHeadIndex =
          
             _headIndex;


          
          
            102
          
          
            103
          
          
            if
          
           (!
          
            this
          
          
            .CanRead(localHeadIndex))


          
          
            104
          
          
                            {


          
          
            105
          
                               data = 
          
            default
          
          
            (T);


          
          
            106
          
          
            return
          
          
            false
          
          
            ;


          
          
            107
          
          
                            }


          
          
            108
          
          
            109
          
                           newHeadIndex = localHeadIndex + 
          
            1
          
          
            ;


          
          
            110
          
          
            if
          
           (_nodes[newHeadIndex % _nodes.Length].HasValue == 
          
            false
          
          
            )


          
          
            111
          
          
                            {


          
          
            112
          
                               data = 
          
            default
          
          
            (T);


          
          
            113
          
          
            return
          
          
            false
          
          
            ;


          
          
            114
          
          
                            }


          
          
            115
          
          
            116
          
                           currentHeadIndex = Interlocked.CompareExchange(
          
            ref
          
          
             _headIndex, newHeadIndex, localHeadIndex);


          
          
            117
          
          
                        }


          
          
            118
          
          
            while
          
           (localHeadIndex !=
          
             currentHeadIndex);


          
          
            119
          
          
            120
          
                       data = _nodes[newHeadIndex %
          
             _nodes.Length].Data;


          
          
            121
          
                       _nodes[newHeadIndex % _nodes.Length].HasValue = 
          
            false
          
          
            ;


          
          
            122
          
          
            123
          
          
            return
          
          
            true
          
          
            ;


          
          
            124
          
          
                    }


          
          
            125
          
          
            126
          
          
            private
          
          
            bool
          
           CanWrite(
          
            long
          
          
             localTailIndex)


          
          
            127
          
          
                    {


          
          
            128
          
          
            return
          
           localTailIndex - _headIndex <
          
             _nodes.Length;


          
          
            129
          
          
                    }


          
          
            130
          
          
            131
          
          
            private
          
          
            bool
          
           CanRead(
          
            long
          
          
             localHeadIndex)


          
          
            132
          
          
                    {


          
          
            133
          
          
            return
          
           _tailIndex - localHeadIndex > 
          
            0
          
          
            ;


          
          
            134
          
          
                    }


          
          
            135
          
          
                }


          
          
            136
          
           }
        

备注 返回目录

仓促成文,如果有必要可以再写篇文章,希望大家多批评。

 

 
分类:  .NET
标签:  并发

CAS


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论