MultipleOutputFormat和MultipleOutputs

系统 1774 0

 

文章出处:http://www.cnblogs.com/liangzh/archive/2012/05/22/2512264.html

 

 

MultipleOutputFormat和MultipleOutputs

 

一,介绍

 

1, 旧API 中有 org.apache.hadoop.mapred.lib. MultipleOutputFormat 和org.apache.hadoop.mapred.lib. MultipleOutputs

 

MultipleOutputFormat allowing to write the output data to different output files.

 

MultipleOutputs creates multiple OutputCollectors. Each OutputCollector can have its own OutputFormat and types for the key/value pair. Your MapReduce program will decide what to output to each OutputCollector.

 

2, 新API中  org.apache.hadoop.mapreduce.lib.output. MultipleOutputs

 

整合了上面旧API两个的功能,没有了MultipleOutputFormat。

 

  The MultipleOutputs class simplifies writing output data to multiple outputs

 

  Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own             OutputFormat, with its own key class and with its own value class.

 

  Case two: to write data to different files provided by user

 

下面这段话来自Hadoop:The.Definitive.Guide(3rd,Early.Release) P 251

 

  “In the old MapReduce API there are two classes for producing multiple outputs: MultipleOutputFormat and MultipleOutputs. In a nutshell, MultipleOutputs is more fully featured, but MultipleOutputFormat has more control over the output directory structure and file naming. MultipleOutputs in the new API combines the best features of the two multiple output classes in the old API.”

 

二,应用

 

 1, 输出到多个文件或多个文件夹:

 

  驱动中不需要额外改变,只需要在MapClass或Reduce类中加入如下代码

 

  private MultipleOutputs<Text,IntWritable> mos;
  public void setup(Context context) throws IOException,InterruptedException {
    mos = new MultipleOutputs(context);
  }
  public void cleanup(Context context) throws IOException,InterruptedException {
    mos.close();
  }
  然后就可以用 mos.write(Key key,Value value,String baseOutputPath) 代替context.write(key, value);
  在MapClass或Reduce中使用,输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0. 而且只有part-m-00*会传给Reduce

 

  2, 以多种格式输出:

 

public class TestwithMultipleOutputs extends Configured implements Tool {

 

  public static class MapClass extends Mapper<LongWritable,Text,Text,IntWritable> {

 

    private MultipleOutputs<Text,IntWritable> mos;

 

    protected void setup(Context context) throws IOException,InterruptedException {
      mos = new MultipleOutputs<Text,IntWritable>(context);
    }

 

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
      String line = value.toString();
      String[] tokens = line.split("-");

 

      mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1])));  //(第一处)
      mos.write("MOSText", new Text(tokens[0]),tokens[2]);      //(第二处)
      mos.write("MOSText", new Text(tokens[0]),line,tokens[0]+"/");   //(第三处)同时也可写到指定的文件或文件夹中
    }

 

    protected void cleanup(Context context) throws IOException,InterruptedException {
      mos.close();
    }

 

  }
  public int run(String[] args) throws Exception {

 

    Configuration conf = getConf();

 

    Job job = new Job(conf,"word count with MultipleOutputs");

 

    job.setJarByClass(TestwithMultipleOutputs.class);

 

    Path in = new Path(args[0]);
    Path out = new Path(args[1]);

 

    FileInputFormat.setInputPaths(job, in);
    FileOutputFormat.setOutputPath(job, out);

 

    job.setMapperClass(MapClass.class);
    job.setNumReduceTasks(0);  

 

    MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class);
    MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class);

 

    System.exit(job.waitForCompletion(true)?0:1);
    return 0;
  }

  public static void main(String[] args) throws Exception {

 

    int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args);
    System.exit(res);
  }

 

}

 

测试的数据:

 

abc-1232-hdf
abc-123-rtd
ioj-234-grjth
ntg-653-sdgfvd
kju-876-btyun
bhm-530-bhyt
hfter-45642-bhgf
bgrfg-8956-fmgh
jnhdf-8734-adfbgf
ntg-68763-nfhsdf
ntg-98634-dehuy
hfter-84567-drhuk

 

结果截图:(结果输出到/test/testMOSout)

 

MultipleOutputFormat和MultipleOutputs

 

遇到的一个问题:

 

  如果没有mos.close(), 程序运行中会出现异常:

 

  12/05/21 20:12:47 WARN hdfs.DFSClient: DataStreamer Exception:

  org.apache.hadoop.ipc.RemoteException:org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on
   /test/mosreduce/_temporary/_attempt_local_0001_r_000000_0/h-r-00000 File does not exist. [Lease. Holder: DFSClient_-352105532, pendingcreates: 5]

MultipleOutputFormat和MultipleOutputs


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论