在client向DataNode写入block之前,会与NameNode有一次通信,由NameNode来选择指定数目的DataNode来存放副本。具体的副本选择策略在BlockPlacementPolicy接口中,其子类实现是BlockPlacementPolicyDefault。该类中会有多个chooseTarget()方法重载,但最终调用了下面的方法:
1
/**
2
* This is not part of the public API but is used by the unit tests.
3
*/
4
DatanodeDescriptor[] chooseTarget(
int
numOfReplicas,
5
DatanodeDescriptor writer,
6
List<DatanodeDescriptor>
chosenNodes,
7
HashMap<Node, Node>
excludedNodes,
8
long
blocksize) {
9
//
numOfReplicas:要选择的副本个数
10
//
clusterMap.getNumOfLeaves():整个集群的DN个数
11
if
(numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0
) {
12
return
new
DatanodeDescriptor[0
];
13
}
14
15
//
excludedNodes:排除的DN(因为有些DN已经被选中,所以不再选择他们)
16
if
(excludedNodes ==
null
) {
17
excludedNodes =
new
HashMap<Node, Node>
();
18
}
19
20
int
clusterSize =
clusterMap.getNumOfLeaves();
21
//
总的副本个数=已选择的个数 + 指定的副本个数
22
int
totalNumOfReplicas = chosenNodes.size()+
numOfReplicas;
23
if
(totalNumOfReplicas > clusterSize) {
//
若总副本个数 > 整个集群的DN个数
24
numOfReplicas -= (totalNumOfReplicas-
clusterSize);
25
totalNumOfReplicas =
clusterSize;
26
}
27
28
//
计算每个一个rack能有多少个DN被选中
29
int
maxNodesPerRack =
30
(totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2
;
31
32
List<DatanodeDescriptor> results =
33
new
ArrayList<DatanodeDescriptor>
(chosenNodes);
34
for
(DatanodeDescriptor node:chosenNodes) {
35
//
add localMachine and related nodes to excludedNodes
36
addToExcludedNodes(node, excludedNodes);
37
adjustExcludedNodes(excludedNodes, node);
38
}
39
40
//
客户端不是DN
41
if
(!
clusterMap.contains(writer)) {
42
writer=
null
;
43
}
44
45
boolean
avoidStaleNodes = (stats !=
null
&&
stats
46
.shouldAvoidStaleDataNodesForWrite());
47
48
//
选择numOfReplicas个DN,并返回本地DN
49
DatanodeDescriptor localNode =
chooseTarget(numOfReplicas, writer,
50
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
51
52
results.removeAll(chosenNodes);
53
54
//
sorting nodes to form a pipeline
55
//
将选中的DN(result中的元素)组织成pipe
56
return
getPipeline((writer==
null
)?
localNode:writer,
57
results.toArray(
new
DatanodeDescriptor[results.size()]));
58
}
方法含义大概就如注释中写的,不过要注意其中的变量含义。在第48行,又 调用 chooseTarget()方法来选择指定数目的DN(选中的DN存放在result中),并返回一个DN作为本地DN。下面分析这个方法。
1
/*
choose <i>numOfReplicas</i> from all data nodes
*/
2
private
DatanodeDescriptor chooseTarget(
int
numOfReplicas,
3
DatanodeDescriptor writer, HashMap<Node, Node>
excludedNodes,
4
long
blocksize,
int
maxNodesPerRack, List<DatanodeDescriptor>
results,
5
boolean
avoidStaleNodes) {
6
7
if
(numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0
) {
8
return
writer;
9
}
10
int
totalReplicasExpected = numOfReplicas +
results.size();
11
12
int
numOfResults =
results.size();
13
boolean
newBlock = (numOfResults==0
);
14
if
(writer ==
null
&& !
newBlock) {
15
writer = results.get(0
);
16
}
17
18
//
Keep a copy of original excludedNodes
19
final
HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
20
new
HashMap<Node, Node>(excludedNodes) :
null
;
21
22
try
{
23
if
(numOfResults == 0) {
//
选择本地DN
24
writer =
chooseLocalNode(writer, excludedNodes, blocksize,
25
maxNodesPerRack, results, avoidStaleNodes);
26
if
(--numOfReplicas == 0
) {
27
return
writer;
28
}
29
}
30
if
(numOfResults <= 1) {
//
选择远程rack上的DN
31
chooseRemoteRack(1, results.get(0
), excludedNodes, blocksize,
32
maxNodesPerRack, results, avoidStaleNodes);
33
if
(--numOfReplicas == 0
) {
34
return
writer;
35
}
36
}
37
if
(numOfResults <= 2
) {
38
if
(clusterMap.isOnSameRack(results.get(0), results.get(1))) {
//
若前两个DN在同一个rack上
39
//已选择的前两个DN在同一个rack上,则
选择与第1个DN不在同一个rack上的DN
40
chooseRemoteRack(1, results.get(0
), excludedNodes, blocksize,
41
maxNodesPerRack, results, avoidStaleNodes);
42
}
else
if
(newBlock){
43
//
选择与第2个DN在同一个rack上的DN
44
chooseLocalRack(results.get(1
), excludedNodes, blocksize,
45
maxNodesPerRack, results, avoidStaleNodes);
46
}
else
{
47
//
选择与write在同一个rack上的DN
48
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
49
results, avoidStaleNodes);
50
}
51
if
(--numOfReplicas == 0
) {
52
return
writer;
53
}
54
}
55
//
在整个集群中随机选择剩余的DN
56
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
57
maxNodesPerRack, results, avoidStaleNodes);
58
}
catch
(NotEnoughReplicasException e) {
59
FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
60
+ (totalReplicasExpected - results.size()) + " to reach "
61
+ totalReplicasExpected + "\n"
62
+
e.getMessage());
63
if
(avoidStaleNodes) {
64
//
Retry chooseTarget again, this time not avoiding stale nodes.
65
66
//
excludedNodes contains the initial excludedNodes and nodes that were
67
//
not chosen because they were stale, decommissioned, etc.
68
//
We need to additionally exclude the nodes that were added to the
69
//
result list in the successful calls to choose*() above.
70
for
(Node node : results) {
71
oldExcludedNodes.put(node, node);
72
}
73
//
Set numOfReplicas, since it can get out of sync with the result list
74
//
if the NotEnoughReplicasException was thrown in chooseRandom().
75
numOfReplicas = totalReplicasExpected -
results.size();
76
return
chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
77
maxNodesPerRack, results,
false
);
78
}
79
}
80
return
writer;
81
}
下面依次分析这3个DN的选择过程。
1、选择本地DN: chooseLocalNode()
1
/*
choose <i>localMachine</i> as the target.
2
* if <i>localMachine</i> is not available,
3
* choose a node on the same rack
4
* @return the chosen node
5
*/
6
protected
DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
7
HashMap<Node, Node> excludedNodes,
long
blocksize,
int
maxNodesPerRack,
8
List<DatanodeDescriptor> results,
boolean
avoidStaleNodes)
9
throws
NotEnoughReplicasException {
10
//
if no local machine, randomly choose one node
11
if
(localMachine ==
null
)
//
client端上没有DN
12
//
从整个集群中随机选择一个DN作为本地DN
13
return
chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
14
maxNodesPerRack, results, avoidStaleNodes);
15
16
//
otherwise try local machine first
17
Node oldNode =
excludedNodes.put(localMachine, localMachine);
18
if
(oldNode ==
null
) {
//
was not in the excluded list
19
//
该client端的DN还没有被选中时,判断这个DN是否负载过重
20
if
(isGoodTarget(localMachine, blocksize, maxNodesPerRack,
false
,
21
results, avoidStaleNodes)) {
22
results.add(localMachine);
23
//
add localMachine and related nodes to excludedNode
24
addToExcludedNodes(localMachine, excludedNodes);
25
return
localMachine;
26
}
27
}
28
29
//
try a node on local rack
30
//
选择与该client同rack的DN
31
return
chooseLocalRack(localMachine, excludedNodes, blocksize,
32
maxNodesPerRack, results, avoidStaleNodes);
33
}
本地DN的选择分三步:
1.1)如果client上没有DN,则从整个集群中随机选择一个DN(chooseRandom()方法),并判断是否该DN是否负载过重(步骤如1.2);如果负载过重则重新随机选择一个。以此类推.....
1.2)如果该client有DN,则判断该DN是否负载过重(isGoodTarget()方法),步骤如下:结点是否可用、结点是否在“stale”状态、结点容量是否足够、结点流量情况、该节点所在的机架中存放当前数据的DN是否过多;
1.3)如果前两个条件都不满足,则选择与client同rack的DN(chooseLocalRack()方法)作为本地结点,步骤如下:
a )随机选择一个与client同rack的DN(步骤同1.1);
b)否则从整个集群中随机选择一个DN(步骤同1.1)。
这两步需要解释一下,他们的步骤与1.1都是相同的,那么怎么会得出不同的结果。原因在于传给chooseRandom()方法的第一个参数。如果参数是“NodeBase.ROOT”,实质上就是"/",表示的是整个集群;如果是“localMachine.getNetworkLocation()”,则表示localMachine所在的rack。这样,通过第一个参数就可以表示要进行选择的范围。在NetworkTopology接口中定义了DN与rack的关系,机架感知也是借此来实现。
2、选择远程rack上的DN:chooseRemoteRack()
1
/*
choose <i>numOfReplicas</i> nodes from the racks
2
* that <i>localMachine</i> is NOT on.
3
* if not enough nodes are available, choose the remaining ones
4
* from the local rack
5
*/
6
protected
void
chooseRemoteRack(
int
numOfReplicas,
7
DatanodeDescriptor localMachine,
8
HashMap<Node, Node>
excludedNodes,
9
long
blocksize,
10
int
maxReplicasPerRack,
11
List<DatanodeDescriptor>
results,
12
boolean
avoidStaleNodes)
13
throws
NotEnoughReplicasException {
14
int
oldNumOfReplicas =
results.size();
15
//
randomly choose one node from remote racks
16
try
{
17
//
选择与localMachine不在同一个rack上的DN
18
chooseRandom(numOfReplicas, "~" +
localMachine.getNetworkLocation(),
19
excludedNodes, blocksize, maxReplicasPerRack, results,
20
avoidStaleNodes);
21
}
catch
(NotEnoughReplicasException e) {
22
//
选择与localMachine在同一个rack上的DN
23
chooseRandom(numOfReplicas-(results.size()-
oldNumOfReplicas),
24
localMachine.getNetworkLocation(), excludedNodes, blocksize,
25
maxReplicasPerRack, results, avoidStaleNodes);
26
}
27
}
远程DN的选择分两步:
2.1)从非本地rack上选择一个DN(步骤同1.1);
2.2)否则从 本地 rack上选择一个DN(步骤同1.1);
同样,这两步还是复用了chooseRandom()方法。2.1)的参数为"~" + localMachine.getNetworkLocation(),即在集群中除了localMachine所在的rack中选择一个DN(“~”表示排除);2.2)的参数为“localMachine.getNetworkLocation()”,表示从localMachine所在的rack中选择一个DN。这里很重要,可以看到, 选择的第二个DN与第一个DN并不一定就在不同的rack 。
3、选择第3个DN
代码在 上面第二段代码分析的第37~50行中,具体步骤如下:
3.1)如果前两个DN在同一个rack上,则选择一个与他们不在同一个rack上的DN,同步骤2;
3.2)否则,如果newBlock为true,则选择与第二个DN同rack的DN,步骤同1.3;
3.3)否则,选择与第一个DN同rack的DN,步骤同1.3;
4、 从整个集群中选择剩余副本个数的DN,步骤同1.1。(代码在上面第二段代码分析的第56行)
最后返回到上面第一段代码的最后部分,将这些选中的DN组织成pipeline。
通过上面的分析也就明白一个问题:网上经常会看到,有人说第三个DN是与第二个DN是同rack的,也有人说第三个DN是与第一个DN同rack的。那么到底哪个说法对呢?关键就看第二个DN的选择,我在上面写了,第二个DN可能是与第一个DN不在同一个rack,但也可能在同一个rack中,具体要根据当时集群中的情况来分析。所以不能简单的认死理。
本文基于hadoop1.2.1
如有错误,还请指正
参考文章: http://blog.csdn.net/xhh198781/article/details/7109764