public
List<InputSplit> getSplits(JobContext job)
throws
IOException {
long
minSize =
Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long
maxSize =
getMaxSplitSize(job);
List splits
=
new
ArrayList();
List files
=
listStatus(job);
for
(FileStatus file : files) {
Path path
=
file.getPath();
long
length =
file.getLen();
if
(length != 0L
) {
FileSystem fs
=
path.getFileSystem(job.getConfiguration());
BlockLocation[] blkLocations
=
fs.getFileBlockLocations(file,
0L
, length);
if
(isSplitable(job, path)) {
long
blockSize =
file.getBlockSize();
long
splitSize =
computeSplitSize(blockSize, minSize,
maxSize);
long
bytesRemaining =
length;
while
(bytesRemaining / splitSize > 1.1D
) {
int
blkIndex =
getBlockIndex(blkLocations, length
-
bytesRemaining);
splits.add(makeSplit(path, length
-
bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts()));
bytesRemaining
-=
splitSize;
}
if
(bytesRemaining != 0L
) {
int
blkIndex =
getBlockIndex(blkLocations, length
-
bytesRemaining);
splits.add(makeSplit(path, length
-
bytesRemaining,
bytesRemaining,
blkLocations[blkIndex].getHosts()));
}
}
else
{
splits.add(makeSplit(path,
0L
, length,
blkLocations[
0
].getHosts()));
}
}
else
{
splits.add(makeSplit(path,
0L, length,
new
String[0
]));
}
}
job.getConfiguration().setLong(
"mapreduce.input.fileinputformat.numinputfiles"
, files.size());
LOG.debug(
"Total # of splits: " +
splits.size());
return
splits;
}
Yarn 下好像没了1*下的由用户设置预期的Map数
核心代码
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
getFormatMinSplitSize 默认返回1,getMinSplitSize 为用户设置的最小分片数, 如果用户设置的大于1,则为用户设置的最小分片数
long maxSize = getMaxSplitSize(job);
getMaxSplitSize为用户设置的最大分片数,默认最大为9223372036854775807L
long splitSize = computeSplitSize(blockSize, minSize,
maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
测试 文件大小 297M(311349250)
块大小128M
测试代码
测试1
FileInputFormat.setMinInputSplitSize(job, 301349250);
FileInputFormat.setMaxInputSplitSize(job, 10000);
测试后Map个数为1,由上面分片公式算出分片大小为301349250, 比 311349250小, 理论应该为两个map, 再看分片函数
while (bytesRemaining / splitSize > 1.1D) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
只要剩余的文件大小不超过分片大小的1.1倍, 则会分到一个分片中,避免开两个MAP, 其中一个运行数据太小,浪费资源。
测试2
FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
FileInputFormat.setMaxInputSplitSize(job, 10000);
MAP 数为2
测试3
在原有的输入目录下,添加一个很小的文件,几K,测试是否会合并
FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
FileInputFormat.setMaxInputSplitSize(job, 10000);
Map数变为了3
看源代码
for (FileStatus file : files) {
..
}
原来输入是按照文件名来分片的,这个按照常理也能知道, 不同的文件内容格式不同
总结,分片过程大概为,先遍历目标文件,过滤部分不符合要求的文件, 然后添加到列表,然后按照文件名来切分分片 (大小为前面计算分片大小的公式, 最后有个文件尾可能合并,其实常写网络程序的都知道), 然后添加到分片列表,然后每个分片读取自身对应的部分给MAP处理

