Adhesive框架系列文章--分布式组件客户端模块实

系统 2088 0

Adhesive框架中是分布式组件客户端首先实现的是基于Json序列化+二进制协议的Memcached客户端。在本文中会介绍其中的实现细节。

我们先来看一下项目结构:

image

从这个结构大致可以看出:

1)Memcached只是其中的一个具体实现,这个组件期望提供一个ClientSocket-ClientNode-ClientCluster的基础实现,以后可以有各种客户端基于这种结构来实现

2)对于Memcached的实现,其中把协议部分放在的Protocol文件夹中,并且根据协议为每一个请求和响应封装类型,也就是使用面向对象的方式而不是拼数据包的方式来封装协议

那么现在首先来介绍基础结构。从最底部的层次开始,最底部应该是对Socket进行一个封装,在这里我们实现了一个ClientSocket,主要完成下面功能:

1)封装Read、Write、Connect、Reset(因为我们实现的是Socket池,所以在Socket使用之后,归还池之前需要重置)操作

2)封装Socket基本状态,包括创建时间、忙碌时间、闲置时间、发生错误时的回调方法

 

在ClientSocket之上的一层是ClientNode,也就是一个节点的客户端,很明显,这里需要做的是Socket连接池,具体完成的工作有:

1)进行连接池的维护,包括移除空闲超时的Socket、强制结束忙碌时间过长的Socket、补充新的Socket到连接池的下限

2)初始化池、结束池、从池获取Socket、把使用后的Socket返回池、创建非池Socket

在正常使用的时候,所有Socket都从池中获取,如果整个Node不可用,那么我们定时创建非池Socket来测试Node是否恢复

 

在ClientNode之上的是ClientCluster,也就是集群,对于需要客户端进行一致性哈希分发节点的分布式组件来说,这层就很必要了,完成的功能主要有:

1)初始化集群、使用一致性哈希从集群获得节点、直接获得ClientSocket

2)在节点出错的时候进行重新节点分配、尝试恢复出错的节点

 

ClientCluster是使用ClientNodeLocator来分配节点的,其中的算法也就是一致性哈希算法。

image

之前说过节点有权重的概念,在这里也就是通过虚拟节点的数量来设置节点权重,权重越高分配到Key的数量也就会越多。

 

在ClientCluster之上还封装了一层AbstractClient,也就是直接面向用户的API入口。

    
      public
    
    
      abstract
    
    
      class
    
     AbstractClient<T> 
    
      where
    
     T : AbstractClient<T>, 
    
      new
    
    ()
  

完成的功能有:

1)保存所有的Cluster,初始化Cluster

2)获取具体的XXXClient的实现,比如MemcachedClient

 

很明显,我们的第一个实现MemcachedClient是继承了AbstractClient:

    
      public
    
    
      partial
    
    
      class
    
     MemcachedClient : AbstractClient<MemcachedClient>
  

在这里使用了部分类,内部的实现都放在了MemcachedClient_Internal.cs中,而对外的API都放在了MemcachedClient.cs中。

 

对于Memcached的二进制协议,我们首先是实现一个头的格式包:

        [StructLayout(LayoutKind.Sequential, Pack = 1)]

    
    
      internal
    
    
      struct
    
     Header

    {

        
    
      internal
    
    
      byte
    
     Magic;



        
    
      internal
    
    
      byte
    
     Opcode;



        
    
      internal
    
    
      ushort
    
     KeyLength;



        
    
      internal
    
    
      byte
    
     ExtraLength;



        
    
      internal
    
    
      byte
    
     DataType;



        
    
      internal
    
    
      ushort
    
     Reserved;



        
    
      internal
    
    
      uint
    
     TotalBodyLength;



        
    
      internal
    
    
      uint
    
     Opaque;



        
    
      internal
    
    
      ulong
    
     Version;

    }
  

由于我们会直接把结构打包为字节数组,所以这里声明了结构的内存布局。在Protocol.cs中,我们有一些实用的方法,比如结构和字节数组双向转换的实现:

    
      internal
    
    
      static
    
     T BytesToStruct<T>(
    
      this
    
    
      byte
    
    [] rawData)

        {

            T result = 
    
      default
    
    (T);

            RespectEndianness(
    
      typeof
    
    (T), rawData);

            GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned);

            
    
      try
    
    

            {

                IntPtr rawDataPtr = handle.AddrOfPinnedObject();

                result = (T)Marshal.PtrToStructure(rawDataPtr, 
    
      typeof
    
    (T));

            }

            
    
      finally
    
    

            {

                handle.Free();

            }

            
    
      return
    
     result;

        }



        
    
      internal
    
    
      static
    
    
      byte
    
    [] StructToBytes<T>(
    
      this
    
     T data)

        {

            
    
      byte
    
    [] rawData = 
    
      new
    
    
      byte
    
    [Marshal.SizeOf(data)];

            GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned);

            
    
      try
    
    

            {

                IntPtr rawDataPtr = handle.AddrOfPinnedObject();

                Marshal.StructureToPtr(data, rawDataPtr, 
    
      false
    
    );

            }

            
    
      finally
    
    

            {

                handle.Free();

            }

            RespectEndianness(
    
      typeof
    
    (T), rawData);

            
    
      return
    
     rawData;

        }



        
    
      private
    
    
      static
    
    
      void
    
     RespectEndianness(Type type, 
    
      byte
    
    [] data)

        {

            var fields = type.GetFields(BindingFlags.NonPublic | BindingFlags.Instance).Select(field => 
    
      new
    
    

            {

                Field = field,

                Offset = Marshal.OffsetOf(type, field.Name).ToInt32()

            }).ToList();



            fields.ForEach(item => Array.Reverse(data, item.Offset, Marshal.SizeOf(item.Field.FieldType)));

        }
  

在定义了头之后,我们就可以封装一个抽象的请求包了:

image

只要实现这个包,然后调用其GetBytes方法就可以直接获得需要发送的请求数据包,它会在内部处理Header和Body数据的打包。

比如,我们来看一个Set操作的包实现:

    
      internal
    
    
      class
    
     SetRequestPackage : AbstractRequestPackage

    {

        
    
      private
    
     TimeSpan expireSpan;

        
    
      private
    
    
      byte
    
    [] valueBytes;

        
    
      private
    
    
      ulong
    
     version;



        
    
      public
    
    
      override
    
     Opcode Opcode

        {

            get { 
    
      return
    
     Opcode.Set; }

        }



        
    
      internal
    
     SetRequestPackage(
    
      string
    
     key, 
    
      byte
    
    [] valueBytes, TimeSpan expireSpan, 
    
      ulong
    
     version)

            : 
    
      base
    
    (key)

        {

            
    
      if
    
     (expireSpan > TimeSpan.FromDays(30))

                
    
      throw
    
    
      new
    
     ArgumentOutOfRangeException(
    
      "过期时间不能超过30天!"
    
    );

            
    
      this
    
    .expireSpan = expireSpan;

            
    
      this
    
    .valueBytes = valueBytes;

            
    
      this
    
    .version = version;

        }



        
    
      internal
    
     SetRequestPackage(
    
      string
    
     key, 
    
      string
    
    
      value
    
    , TimeSpan expireSpan, 
    
      ulong
    
     version)

            : 
    
      this
    
    (key, Encoding.UTF8.GetBytes(
    
      value
    
    ), expireSpan, version)

        {

        }



        
    
      internal
    
     SetRequestPackage(
    
      string
    
     key, 
    
      string
    
    
      value
    
    , 
    
      ulong
    
     version)

            : 
    
      this
    
    (key, Encoding.UTF8.GetBytes(
    
      value
    
    ), TimeSpan.FromDays(30), version)

        {

        }



        
    
      internal
    
     SetRequestPackage(
    
      string
    
     key, 
    
      byte
    
    [] valueBytes, 
    
      ulong
    
     version)

            : 
    
      this
    
    (key, valueBytes, TimeSpan.FromDays(30), version)

        {

        }



        
    
      protected
    
    
      override
    
    
      ulong
    
     GetVersion()

        {

            
    
      return
    
     version;

        }



        
    
      protected
    
    
      override
    
    
      byte
    
    [] GetExtraBytes()

        {

            var extraBytes = 
    
      new
    
     List<
    
      byte
    
    >();

            
    
      uint
    
     flag = 0xdeadbeef;

            extraBytes.AddRange(flag.GetBigEndianBytes());

            
    
      uint
    
     expire = Convert.ToUInt32(expireSpan.TotalSeconds);

            extraBytes.AddRange(expire.GetBigEndianBytes());

            
    
      return
    
     extraBytes.ToArray();

        }



        
    
      protected
    
    
      override
    
    
      byte
    
    [] GetValueBytes()

        {

            
    
      return
    
     valueBytes;

        }

    }
  

在这里,我们只是实现了抽象方法来为基类提供没有的数据,并不需要关心数据是如何打包的。那么,之后发送Set请求的操作就很简单了:

    
      private
    
    
      bool
    
     InternalSet(
    
      string
    
     key, 
    
      string
    
    
      value
    
    , TimeSpan expire, 
    
      ulong
    
     version)

        {

            
    
      using
    
     (var socket = GetCluster().AcquireSocket(key))

            {

                
    
      if
    
     (socket != 
    
      null
    
    )

                {

                    AbstractRequestPackage requestPackage = expire == TimeSpan.MaxValue ? 
    
      new
    
     SetRequestPackage(key, 
    
      value
    
    , version)

                            : 
    
      new
    
     SetRequestPackage(key, 
    
      value
    
    , expire, version);

                    var requestData = requestPackage.GetBytes();

                    
    
      if
    
     (requestData != 
    
      null
    
    )

                    {

                        socket.Write(requestData);

                        var responsePackage = ResponsePackageCreator.GetPackage(socket);

                        
    
      if
    
     (responsePackage != 
    
      null
    
    )

                        {

                            
    
      if
    
     (responsePackage.ResponseStatus == ResponseStatus.NoError)

                            {

                                
    
      return
    
    
      true
    
    ;

                            }

                            
    
      else
    
    
      if
    
     (responsePackage.ResponseStatus != ResponseStatus.KeyExists

                                    && responsePackage.ResponseStatus != ResponseStatus.KeyNotFound)

                            {

                                LocalLoggingService.Warning(
    
      "在 {0} 上执行操作 {1} 得到了不正确的回复 Key : {2} -> {3}"
    
    ,

                                            socket.Endpoint.ToString(),

                                            requestPackage.Opcode,

                                            key,

                                            responsePackage.ResponseStatus);

                            }

                        }

                        
    
      else
    
    

                        {

                            LocalLoggingService.Error(
    
      "在 {0} 上执行操作 {1} 没有得到回复 Key : {2}"
    
    ,

                                       socket.Endpoint.ToString(),

                                       requestPackage.Opcode,

                                       key);

                        }

                    }

                }

            }

            
    
      return
    
    
      false
    
    ;

        }
  

1)首先是获取到Cluster,再获取到池中的Socket

2)然后初始化一个SetRequestPackage,再通过GetBytes获得数据

3)直接把数据写入Socket

4)通过ResponsePackageCreator来获得返回的数据包

 

很明显,ResponsePackageCreator和AbstractRequestPackage的意图差不多,用来把响应的数据包封装成我们需要的数据,其中有一个:

    
      internal
    
    
      static
    
     GeneralResponsePackage GetPackage(ClientSocket socket)
  

获得的是一个通用的响应数据包:

    
      internal
    
    
      class
    
     GeneralResponsePackage

    {

        
    
      internal
    
     Opcode Opcode { get; set; }



        
    
      internal
    
     ResponseStatus ResponseStatus { get; set; }



        
    
      internal
    
    
      string
    
     Key { get; set; }



        
    
      internal
    
    
      byte
    
    [] ValueBytes { get; set; }



        
    
      internal
    
    
      ulong
    
     Version { get; set; }



        
    
      internal
    
    
      string
    
     Value

        {

            get

            {

                
    
      if
    
     (ValueBytes != 
    
      null
    
    )

                {

                    
    
      return
    
     Encoding.UTF8.GetString(ValueBytes);

                }

                
    
      else
    
    

                {

                    
    
      return
    
    
      null
    
    ;

                }

            }

        }

    }
  

在这里基本的信息都有了,比如操作代码、响应状态、Key、Value、版本号。正因为Memcached的协议比较简单,所有的响应包都是这么一个格式,所以我们并没有实现特殊的响应包。如果要实现的话,只需要在类头部标记OpCode并且继承GeneralResponsePackage,ResponsePackageCreator会自动返回相应的子类:

        [AttributeUsage(AttributeTargets.Class)]

    
    
      internal
    
    
      class
    
     ResponsePackageAttribute : Attribute

    {

        
    
      internal
    
     Opcode Opcode { get; 
    
      private
    
     set; }



        
    
      internal
    
     ResponsePackageAttribute(Opcode opcode)

        {

            
    
      this
    
    .Opcode = opcode;

        }

    }
  

在获得了响应之后,通过判断ResponseStatus来知道响应是否正确,并且记录相关日志即可。这么一来,数据一去一回以及协议如何实现的整个过程就介绍完了。下面,我们再介绍一下客户端中几个特色功能的实现。

 

1)获取一组Key功能。由于一个集群会有多个节点,所以要获取一组Key,我们首先需要把Key按照节点分类,然后对于不同的节点,采用并行的方式同时获取,这样速度会很快,代码片段如下:

                var nodeCache = 
    
      new
    
     Dictionary<ClientNode, List<
    
      string
    
    >>();

            
    
      foreach
    
     (var key 
    
      in
    
     keys)

            {

                var node = GetCluster().AcquireNode(key);

                
    
      if
    
     (!nodeCache.ContainsKey(node))

                    nodeCache.Add(node, 
    
      new
    
     List<
    
      string
    
    > { key });

                
    
      else
    
    
      if
    
     (!nodeCache[node].Contains(key))

                    nodeCache[node].Add(key);

            }



            var data = 
    
      new
    
     Dictionary<
    
      string
    
    , 
    
      string
    
    >();

            Parallel.ForEach(nodeCache, node =>
  

2)List功能。Memcached只提供了Key、Value的存储,有的时候我们的Value是一个列表,那么我们可以有两种方式完成这个功能。第一种就是直接把列表序列化作为一个Value保存,优点是简单,缺点是如果以后需要修改的话需要整个列表取出,修改后再把整个列表保存进去,并且由于Memcached Value大小的限制,这么做也不能保存大列表;第二种方式是一个Value保存列表中的一个项,再使用一个KeyValue来保存其中每一项的ID,这么优点是修改方便,获取的数据可以是列表中的一部分,缺点是实现麻烦,要考虑并发问题、要维护另外一个KeyValue来保存所有的ID。在这里,我们封装了后一种方式的实现。

3)Locker功能。使用Memcached完成锁的功能其实很简单,我们只需要在获取锁的时候判断Add一个空值是否成功,如果不成功则表示占有,等待一段时间尝试获取,一直到超时,在返回锁的时候删除这个项即可。在这里,我们封装了MemcachedLocker来完成这个功能。

Adhesive框架系列文章--分布式组件客户端模块实现


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论