Yarn下Map数控制

系统 1799 0
      
        public
      
       List<InputSplit> getSplits(JobContext job) 
      
        throws
      
      
         IOException {

        
      
      
        long
      
       minSize =
      
         Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

        
      
      
        long
      
       maxSize =
      
         getMaxSplitSize(job);



        List splits 
      
      = 
      
        new
      
      
         ArrayList();

        List files 
      
      =
      
         listStatus(job);

        
      
      
        for
      
      
         (FileStatus file : files) {

            Path path 
      
      =
      
         file.getPath();

            
      
      
        long
      
       length =
      
         file.getLen();

            
      
      
        if
      
       (length != 0L
      
        ) {

                FileSystem fs 
      
      =
      
         path.getFileSystem(job.getConfiguration());

                BlockLocation[] blkLocations 
      
      =
      
         fs.getFileBlockLocations(file,

                        
      
      0L
      
        , length);

                
      
      
        if
      
      
         (isSplitable(job, path)) {

                    
      
      
        long
      
       blockSize =
      
         file.getBlockSize();

                    
      
      
        long
      
       splitSize =
      
         computeSplitSize(blockSize, minSize,

                            maxSize);



                    
      
      
        long
      
       bytesRemaining =
      
         length;

                    
      
      
        while
      
       (bytesRemaining / splitSize > 1.1D
      
        ) {

                        
      
      
        int
      
       blkIndex =
      
         getBlockIndex(blkLocations, length

                                
      
      -
      
         bytesRemaining);

                        splits.add(makeSplit(path, length 
      
      -
      
         bytesRemaining,

                                splitSize, blkLocations[blkIndex].getHosts()));



                        bytesRemaining 
      
      -=
      
         splitSize;

                    }



                    
      
      
        if
      
       (bytesRemaining != 0L
      
        ) {

                        
      
      
        int
      
       blkIndex =
      
         getBlockIndex(blkLocations, length

                                
      
      -
      
         bytesRemaining);

                        splits.add(makeSplit(path, length 
      
      -
      
         bytesRemaining,

                                bytesRemaining,

                                blkLocations[blkIndex].getHosts()));

                    }

                } 
      
      
        else
      
      
         {

                    splits.add(makeSplit(path, 
      
      0L
      
        , length,

                            blkLocations[
      
      0
      
        ].getHosts()));

                }

            } 
      
      
        else
      
      
         {

                splits.add(makeSplit(path, 
      
      0L, length, 
      
        new
      
       String[0
      
        ]));

            }

        }



        job.getConfiguration().setLong(

                
      
      "mapreduce.input.fileinputformat.numinputfiles"
      
        , files.size());

        LOG.debug(
      
      "Total # of splits: " +
      
         splits.size());

        
      
      
        return
      
      
         splits;

    }
      
    

 Yarn 下好像没了1*下的由用户设置预期的Map数

      核心代码



long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));



getFormatMinSplitSize 默认返回1,getMinSplitSize 为用户设置的最小分片数, 如果用户设置的大于1,则为用户设置的最小分片数

long maxSize = getMaxSplitSize(job);



getMaxSplitSize为用户设置的最大分片数,默认最大为9223372036854775807L



long splitSize = computeSplitSize(blockSize, minSize,

                            maxSize);



protected long computeSplitSize(long blockSize, long minSize, long maxSize) {

        return Math.max(minSize, Math.min(maxSize, blockSize));

    }


    

 

测试 文件大小 297M(311349250)

块大小128M

测试代码

测试1

   FileInputFormat.setMinInputSplitSize(job, 301349250);
   FileInputFormat.setMaxInputSplitSize(job, 10000);

测试后Map个数为1,由上面分片公式算出分片大小为301349250, 比 311349250小, 理论应该为两个map,  再看分片函数

while (bytesRemaining / splitSize > 1.1D) {
                        int blkIndex = getBlockIndex(blkLocations, length
                                - bytesRemaining);
                        splits.add(makeSplit(path, length - bytesRemaining,
                                splitSize, blkLocations[blkIndex].getHosts()));

                        bytesRemaining -= splitSize;
                    }

只要剩余的文件大小不超过分片大小的1.1倍, 则会分到一个分片中,避免开两个MAP, 其中一个运行数据太小,浪费资源。

 

测试2

FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);

FileInputFormat.setMaxInputSplitSize(job, 10000);

MAP 数为2

测试3

在原有的输入目录下,添加一个很小的文件,几K,测试是否会合并

FileInputFormat.setMinInputSplitSize(job, 150*1024*1024);
FileInputFormat.setMaxInputSplitSize(job, 10000);

Map数变为了3

看源代码

for (FileStatus file : files) {

..

}

原来输入是按照文件名来分片的,这个按照常理也能知道, 不同的文件内容格式不同

 

总结,分片过程大概为,先遍历目标文件,过滤部分不符合要求的文件, 然后添加到列表,然后按照文件名来切分分片 (大小为前面计算分片大小的公式, 最后有个文件尾可能合并,其实常写网络程序的都知道), 然后添加到分片列表,然后每个分片读取自身对应的部分给MAP处理

 

 

 

 

 

Yarn下Map数控制


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论