HDFS副本存放策略

系统 2215 0

  在client向DataNode写入block之前,会与NameNode有一次通信,由NameNode来选择指定数目的DataNode来存放副本。具体的副本选择策略在BlockPlacementPolicy接口中,其子类实现是BlockPlacementPolicyDefault。该类中会有多个chooseTarget()方法重载,但最终调用了下面的方法:

      
        
           1
        
        
          /**
        
        
           2
        
        
           * This is not part of the public API but is used by the unit tests. 
        
        
           3
        
        
          */
        
        
           4
        
           DatanodeDescriptor[] chooseTarget(
        
          int
        
        
           numOfReplicas, 
        
        
           5
        
        
           DatanodeDescriptor writer, 
        
        
           6
        
                                             List<DatanodeDescriptor>
        
           chosenNodes, 
        
        
           7
        
                                             HashMap<Node, Node>
        
           excludedNodes, 
        
        
           8
        
        
          long
        
        
           blocksize) { 
        
        
           9
        
        
          //
        
        
          numOfReplicas:要选择的副本个数 
        
        
          10
        
        
          //
        
        
          clusterMap.getNumOfLeaves():整个集群的DN个数
        
        
          11
        
        
          if
        
         (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0
        
          ) { 
        
        
          12
        
        
          return
        
        
          new
        
         DatanodeDescriptor[0
        
          ]; 
        
        
          13
        
        
           } 
        
        
          14
        
        
          15
        
        
          //
        
        
          excludedNodes:排除的DN(因为有些DN已经被选中,所以不再选择他们)
        
        
          16
        
        
          if
        
         (excludedNodes == 
        
          null
        
        
          ) { 
        
        
          17
        
               excludedNodes = 
        
          new
        
         HashMap<Node, Node>
        
          (); 
        
        
          18
        
        
           } 
        
        
          19
        
        
          20
        
        
          int
        
         clusterSize =
        
           clusterMap.getNumOfLeaves(); 
        
        
          21
        
        
          //
        
        
          总的副本个数=已选择的个数 + 指定的副本个数
        
        
          22
        
        
          int
        
         totalNumOfReplicas = chosenNodes.size()+
        
          numOfReplicas; 
        
        
          23
        
        
          if
        
         (totalNumOfReplicas > clusterSize) {    
        
          //
        
        
          若总副本个数 > 整个集群的DN个数
        
        
          24
        
               numOfReplicas -= (totalNumOfReplicas-
        
          clusterSize); 
        
        
          25
        
               totalNumOfReplicas =
        
           clusterSize; 
        
        
          26
        
        
           } 
        
        
          27
        
        
          28
        
        
          //
        
        
          计算每个一个rack能有多少个DN被选中
        
        
          29
        
        
          int
        
         maxNodesPerRack = 


        
          30
        
               (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2
        
          ; 
        
        
          31
        
        
          32
        
             List<DatanodeDescriptor> results = 


        
          33
        
        
          new
        
         ArrayList<DatanodeDescriptor>
        
          (chosenNodes); 
        
        
          34
        
        
          for
        
        
           (DatanodeDescriptor node:chosenNodes) { 
        
        
          35
        
        
          //
        
        
           add localMachine and related nodes to excludedNodes
        
        
          36
        
        
           addToExcludedNodes(node, excludedNodes); 
        
        
          37
        
        
           adjustExcludedNodes(excludedNodes, node); 
        
        
          38
        
        
           } 
        
        
          39
        
        
          40
        
        
          //
        
        
          客户端不是DN
        
        
          41
        
        
          if
        
         (!
        
          clusterMap.contains(writer)) { 
        
        
          42
        
               writer=
        
          null
        
        
          ; 
        
        
          43
        
        
           } 
        
        
          44
        
        
          45
        
        
          boolean
        
         avoidStaleNodes = (stats != 
        
          null
        
         &&
        
           stats 
        
        
          46
        
        
           .shouldAvoidStaleDataNodesForWrite()); 
        
        
          47
        
        
          48
        
        
          //
        
        
          选择numOfReplicas个DN,并返回本地DN
        
        
          49
        
             DatanodeDescriptor localNode =
        
           chooseTarget(numOfReplicas, writer, 
        
        
          50
        
        
           excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes); 
        
        
          51
        
        
          52
        
        
           results.removeAll(chosenNodes); 
        
        
          53
        
        
          54
        
        
          //
        
        
           sorting nodes to form a pipeline 
        
        
          55
        
        
          //
        
        
          将选中的DN(result中的元素)组织成pipe
        
        
          56
        
        
          return
        
         getPipeline((writer==
        
          null
        
        )?
        
          localNode:writer, 
        
        
          57
        
                                results.toArray(
        
          new
        
        
           DatanodeDescriptor[results.size()])); 
        
        
          58
        
           }
      
    

  方法含义大概就如注释中写的,不过要注意其中的变量含义。在第48行,又 调用 chooseTarget()方法来选择指定数目的DN(选中的DN存放在result中),并返回一个DN作为本地DN。下面分析这个方法。  

      
        
           1
        
        
          /*
        
        
           choose <i>numOfReplicas</i> from all data nodes 
        
        
          */
        
        
           2
        
        
          private
        
         DatanodeDescriptor chooseTarget(
        
          int
        
        
           numOfReplicas, 
        
        
           3
        
               DatanodeDescriptor writer, HashMap<Node, Node>
        
           excludedNodes, 
        
        
           4
        
        
          long
        
         blocksize, 
        
          int
        
         maxNodesPerRack, List<DatanodeDescriptor>
        
           results, 
        
        
           5
        
        
          boolean
        
        
           avoidStaleNodes) { 
        
        
           6
        
        
           7
        
        
          if
        
         (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0
        
          ) { 
        
        
           8
        
        
          return
        
        
           writer; 
        
        
           9
        
        
           } 
        
        
          10
        
        
          int
        
         totalReplicasExpected = numOfReplicas +
        
           results.size(); 
        
        
          11
        
        
          12
        
        
          int
        
         numOfResults =
        
           results.size(); 
        
        
          13
        
        
          boolean
        
         newBlock = (numOfResults==0
        
          ); 
        
        
          14
        
        
          if
        
         (writer == 
        
          null
        
         && !
        
          newBlock) { 
        
        
          15
        
               writer = results.get(0
        
          ); 
        
        
          16
        
        
           } 
        
        
          17
        
        
          18
        
        
          //
        
        
           Keep a copy of original excludedNodes
        
        
          19
        
        
          final
        
         HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ? 


        
          20
        
        
          new
        
         HashMap<Node, Node>(excludedNodes) : 
        
          null
        
        
          ; 
        
        
          21
        
        
          22
        
        
          try
        
        
           { 
        
        
          23
        
        
          if
        
         (numOfResults == 0) {    
        
          //
        
        
          选择本地DN
        
        
          24
        
                 writer =
        
           chooseLocalNode(writer, excludedNodes, blocksize, 
        
        
          25
        
        
           maxNodesPerRack, results, avoidStaleNodes); 
        
        
          26
        
        
          if
        
         (--numOfReplicas == 0
        
          ) { 
        
        
          27
        
        
          return
        
        
           writer; 
        
        
          28
        
        
           } 
        
        
          29
        
        
           } 
        
        
          30
        
        
          if
        
         (numOfResults <= 1) {    
        
          //
        
        
          选择远程rack上的DN
        
        
          31
        
                 chooseRemoteRack(1, results.get(0
        
          ), excludedNodes, blocksize, 
        
        
          32
        
        
           maxNodesPerRack, results, avoidStaleNodes); 
        
        
          33
        
        
          if
        
         (--numOfReplicas == 0
        
          ) { 
        
        
          34
        
        
          return
        
        
           writer; 
        
        
          35
        
        
           } 
        
        
          36
        
        
           } 
        
        
          37
        
        
          if
        
         (numOfResults <= 2
        
          ) { 
        
        
          38
        
        
          if
        
         (clusterMap.isOnSameRack(results.get(0), results.get(1))) {  
        
          //
        
        
          若前两个DN在同一个rack上 
        
        
          39
        
        
          //已选择的前两个DN在同一个rack上,则
        
        
          选择与第1个DN不在同一个rack上的DN
        
        
          40
        
                   chooseRemoteRack(1, results.get(0
        
          ), excludedNodes, blocksize, 
        
        
          41
        
        
           maxNodesPerRack, results, avoidStaleNodes); 
        
        
          42
        
                 } 
        
          else
        
        
          if
        
        
           (newBlock){ 
        
        
          43
        
        
          //
        
        
          选择与第2个DN在同一个rack上的DN
        
        
          44
        
                   chooseLocalRack(results.get(1
        
          ), excludedNodes, blocksize, 
        
        
          45
        
        
           maxNodesPerRack, results, avoidStaleNodes); 
        
        
          46
        
                 } 
        
          else
        
        
           { 
        
        
          47
        
        
          //
        
        
          选择与write在同一个rack上的DN
        
        
          48
        
        
           chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, 
        
        
          49
        
        
           results, avoidStaleNodes); 
        
        
          50
        
        
           } 
        
        
          51
        
        
          if
        
         (--numOfReplicas == 0
        
          ) { 
        
        
          52
        
        
          return
        
        
           writer; 
        
        
          53
        
        
           } 
        
        
          54
        
        
           } 
        
        
          55
        
        
          //
        
        
          在整个集群中随机选择剩余的DN
        
        
          56
        
        
           chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, 
        
        
          57
        
        
           maxNodesPerRack, results, avoidStaleNodes); 
        
        
          58
        
             } 
        
          catch
        
        
           (NotEnoughReplicasException e) { 
        
        
          59
        
               FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "


        
          60
        
                        + (totalReplicasExpected - results.size()) + " to reach "


        
          61
        
                        + totalReplicasExpected + "\n"


        
          62
        
                        +
        
           e.getMessage()); 
        
        
          63
        
        
          if
        
        
           (avoidStaleNodes) { 
        
        
          64
        
        
          //
        
        
           Retry chooseTarget again, this time not avoiding stale nodes. 
        
        
          65
        
        
          66
        
        
          //
        
        
           excludedNodes contains the initial excludedNodes and nodes that were 
        
        
          67
        
        
          //
        
        
           not chosen because they were stale, decommissioned, etc. 
        
        
          68
        
        
          //
        
        
           We need to additionally exclude the nodes that were added to the 
        
        
          69
        
        
          //
        
        
           result list in the successful calls to choose*() above.
        
        
          70
        
        
          for
        
        
           (Node node : results) { 
        
        
          71
        
        
           oldExcludedNodes.put(node, node); 
        
        
          72
        
        
           } 
        
        
          73
        
        
          //
        
        
           Set numOfReplicas, since it can get out of sync with the result list 
        
        
          74
        
        
          //
        
        
           if the NotEnoughReplicasException was thrown in chooseRandom().
        
        
          75
        
                 numOfReplicas = totalReplicasExpected -
        
           results.size(); 
        
        
          76
        
        
          return
        
        
           chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, 
        
        
          77
        
                     maxNodesPerRack, results, 
        
          false
        
        
          ); 
        
        
          78
        
        
           } 
        
        
          79
        
        
           } 
        
        
          80
        
        
          return
        
        
           writer; 
        
        
          81
        
           }
      
    

    下面依次分析这3个DN的选择过程。

1、选择本地DN: chooseLocalNode()  

      
        
           1
        
        
          /*
        
        
           choose <i>localMachine</i> as the target. 
        
        
           2
        
        
           * if <i>localMachine</i> is not available, 
        
        
           3
        
        
           * choose a node on the same rack 
        
        
           4
        
        
           * @return the chosen node 
        
        
           5
        
        
          */
        
        
           6
        
        
          protected
        
        
           DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine, 
        
        
           7
        
               HashMap<Node, Node> excludedNodes, 
        
          long
        
         blocksize, 
        
          int
        
        
           maxNodesPerRack, 
        
        
           8
        
               List<DatanodeDescriptor> results, 
        
          boolean
        
        
           avoidStaleNodes) 
        
        
           9
        
        
          throws
        
        
           NotEnoughReplicasException { 
        
        
          10
        
        
          //
        
        
           if no local machine, randomly choose one node
        
        
          11
        
        
          if
        
         (localMachine == 
        
          null
        
        )    
        
          //
        
        
          client端上没有DN 
        
        
          12
        
        
          //
        
        
          从整个集群中随机选择一个DN作为本地DN
        
        
          13
        
        
          return
        
        
           chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, 
        
        
          14
        
        
           maxNodesPerRack, results, avoidStaleNodes); 
        
        
          15
        
        
          16
        
        
          //
        
        
           otherwise try local machine first
        
        
          17
        
             Node oldNode =
        
           excludedNodes.put(localMachine, localMachine); 
        
        
          18
        
        
          if
        
         (oldNode == 
        
          null
        
        ) { 
        
          //
        
        
           was not in the excluded list 
        
        
          19
        
        
          //
        
        
          该client端的DN还没有被选中时,判断这个DN是否负载过重
        
        
          20
        
        
          if
        
         (isGoodTarget(localMachine, blocksize, maxNodesPerRack, 
        
          false
        
        
          , 
        
        
          21
        
        
           results, avoidStaleNodes)) { 
        
        
          22
        
        
           results.add(localMachine); 
        
        
          23
        
        
          //
        
        
           add localMachine and related nodes to excludedNode
        
        
          24
        
        
           addToExcludedNodes(localMachine, excludedNodes); 
        
        
          25
        
        
          return
        
        
           localMachine; 
        
        
          26
        
        
           } 
        
        
          27
        
        
           } 
        
        
          28
        
        
          29
        
        
          //
        
        
           try a node on local rack 
        
        
          30
        
        
          //
        
        
          选择与该client同rack的DN
        
        
          31
        
        
          return
        
        
           chooseLocalRack(localMachine, excludedNodes, blocksize, 
        
        
          32
        
        
           maxNodesPerRack, results, avoidStaleNodes); 
        
        
          33
        
           }
      
    

    本地DN的选择分三步:

  1.1)如果client上没有DN,则从整个集群中随机选择一个DN(chooseRandom()方法),并判断是否该DN是否负载过重(步骤如1.2);如果负载过重则重新随机选择一个。以此类推.....

  1.2)如果该client有DN,则判断该DN是否负载过重(isGoodTarget()方法),步骤如下:结点是否可用、结点是否在“stale”状态、结点容量是否足够、结点流量情况、该节点所在的机架中存放当前数据的DN是否过多;

  1.3)如果前两个条件都不满足,则选择与client同rack的DN(chooseLocalRack()方法)作为本地结点,步骤如下:

  a )随机选择一个与client同rack的DN(步骤同1.1);

  b)否则从整个集群中随机选择一个DN(步骤同1.1)。

  这两步需要解释一下,他们的步骤与1.1都是相同的,那么怎么会得出不同的结果。原因在于传给chooseRandom()方法的第一个参数。如果参数是“NodeBase.ROOT”,实质上就是"/",表示的是整个集群;如果是“localMachine.getNetworkLocation()”,则表示localMachine所在的rack。这样,通过第一个参数就可以表示要进行选择的范围。在NetworkTopology接口中定义了DN与rack的关系,机架感知也是借此来实现。

 

2、选择远程rack上的DN:chooseRemoteRack()

      
        
           1
        
        
          /*
        
        
           choose <i>numOfReplicas</i> nodes from the racks 
        
        
           2
        
        
           * that <i>localMachine</i> is NOT on. 
        
        
           3
        
        
           * if not enough nodes are available, choose the remaining ones 
        
        
           4
        
        
           * from the local rack 
        
        
           5
        
        
          */
        
        
           6
        
        
          protected
        
        
          void
        
         chooseRemoteRack(
        
          int
        
        
           numOfReplicas, 
        
        
           7
        
        
           DatanodeDescriptor localMachine, 
        
        
           8
        
                                         HashMap<Node, Node>
        
           excludedNodes, 
        
        
           9
        
        
          long
        
        
           blocksize, 
        
        
          10
        
        
          int
        
        
           maxReplicasPerRack, 
        
        
          11
        
                                         List<DatanodeDescriptor>
        
           results, 
        
        
          12
        
        
          boolean
        
        
           avoidStaleNodes) 
        
        
          13
        
        
          throws
        
        
           NotEnoughReplicasException { 
        
        
          14
        
        
          int
        
         oldNumOfReplicas =
        
           results.size(); 
        
        
          15
        
        
          //
        
        
           randomly choose one node from remote racks
        
        
          16
        
        
          try
        
        
           { 
        
        
          17
        
        
          //
        
        
          选择与localMachine不在同一个rack上的DN
        
        
          18
        
               chooseRandom(numOfReplicas, "~" +
        
           localMachine.getNetworkLocation(), 
        
        
          19
        
        
           excludedNodes, blocksize, maxReplicasPerRack, results, 
        
        
          20
        
        
           avoidStaleNodes); 
        
        
          21
        
             } 
        
          catch
        
        
           (NotEnoughReplicasException e) { 
        
        
          22
        
        
          //
        
        
          选择与localMachine在同一个rack上的DN
        
        
          23
        
               chooseRandom(numOfReplicas-(results.size()-
        
          oldNumOfReplicas), 
        
        
          24
        
        
           localMachine.getNetworkLocation(), excludedNodes, blocksize, 
        
        
          25
        
        
           maxReplicasPerRack, results, avoidStaleNodes); 
        
        
          26
        
        
           } 
        
        
          27
        
           }
      
    

  远程DN的选择分两步:

  2.1)从非本地rack上选择一个DN(步骤同1.1);

  2.2)否则从 本地 rack上选择一个DN(步骤同1.1);

  同样,这两步还是复用了chooseRandom()方法。2.1)的参数为"~" + localMachine.getNetworkLocation(),即在集群中除了localMachine所在的rack中选择一个DN(“~”表示排除);2.2)的参数为“localMachine.getNetworkLocation()”,表示从localMachine所在的rack中选择一个DN。这里很重要,可以看到, 选择的第二个DN与第一个DN并不一定就在不同的rack

 

3、选择第3个DN

  代码在 上面第二段代码分析的第37~50行中,具体步骤如下:

  3.1)如果前两个DN在同一个rack上,则选择一个与他们不在同一个rack上的DN,同步骤2;  

  3.2)否则,如果newBlock为true,则选择与第二个DN同rack的DN,步骤同1.3;  

  3.3)否则,选择与第一个DN同rack的DN,步骤同1.3;

 

4、 从整个集群中选择剩余副本个数的DN,步骤同1.1。(代码在上面第二段代码分析的第56行)

 

  最后返回到上面第一段代码的最后部分,将这些选中的DN组织成pipeline。

 

  通过上面的分析也就明白一个问题:网上经常会看到,有人说第三个DN是与第二个DN是同rack的,也有人说第三个DN是与第一个DN同rack的。那么到底哪个说法对呢?关键就看第二个DN的选择,我在上面写了,第二个DN可能是与第一个DN不在同一个rack,但也可能在同一个rack中,具体要根据当时集群中的情况来分析。所以不能简单的认死理。

 

  本文基于hadoop1.2.1

  如有错误,还请指正

  参考文章: http://blog.csdn.net/xhh198781/article/details/7109764

  转载请注明出处: http://www.cnblogs.com/gwgyk/p/4137060.html

HDFS副本存放策略


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论