本文发表于本人 博客 。
前面几次讲了关于Hadoop的环境搭建、HDFS操作,今天接着继续。本来Hadoop源码中就有一个例子WordCount,但是今天我们来自己实现一个加深对这个Mapper、Reducer的理解,如有不对欢迎指正。
我们先来梳理一下思路,对于自定义Mapper以及Reducer,我们先要覆盖其map以及reduce函数,然后按照相关步骤比如设置输入文件目录、输入文件格式化类、设置自定义Mapper、分区、排序、分组、规约、设置自定义Reducer等等。这里我们把输入文件的使用空格分割(也可以用制表符来),下面是自定义Mapper类MyMapper:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splied = value.toString().split( " " ); for ( int i = 0 ; i < splied.length; i++) { String lineWord = splied[i]; context.write( new Text(lineWord), new LongWritable( 1 )); } } } |
这里我选择的是新的API,相关库基本是在org.apache.hadoop.mapreduce下,旧API是在org.apache.hadoop.mapred下,包括一些引用库也是这样。自定义MyMapper是泛型继承Mapper,其中参数 key\value 是Hadoop内部类型,它不支持java的基本类型这里我们需要注意下为什么不选择java的基本类型呢,原因是不需要其它额外是操作,而且本身需要序列化反序列化并提升其性能所以加入了hadoop的类型放弃java的基本类型。关于hadoop key\value 跟java基本类型相互转换的问题也很简单,从java基本类型转换至hadoop的 key\value 的话直接new带参就可以了,从hadoop的key\value类型转换至java的基本类型使用get方法就可以了!如:
1 2 3 | LongWritable lw = new LongWritable(1L); long temp = lw.get(); |
接下来继续看自定义Reducer类MyReduce:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0L; for (LongWritable value: values) { count += value.get(); } context.write(key, new LongWritable(count)); } } |
这个跟上面类似了,再来看看main方法的如何执行的!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import com.sun.org.apache.xpath.internal.axes.HasPositionalPredChecker; public class Test { static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/output/" ; static final String INPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/input/test.txt" ; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, Test. class .getSimpleName()); deleteOutputFile(OUTPUT_DIR); //1设置输入目录 FileInputFormat.setInputPaths(job, INPUT_DIR); //2设置输入格式化类 job.setInputFormatClass(TextInputFormat. class ); //3设置自定义Mapper以及键值类型 job.setMapperClass(MyMapper. class ); job.setMapOutputKeyClass(Text. class ); job.setMapOutputValueClass(LongWritable. class ); //4分区 job.setPartitionerClass(HashPartitioner. class ); job.setNumReduceTasks( 1 ); //5排序分组 //6设置在自定义Reduce以及键值类型 job.setReducerClass(MyReduce. class ); job.setOutputKeyClass(Text. class ); job.setOutputValueClass(LongWritable. class ); //7设置输出目录 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR)); //8提交job job.waitForCompletion( true ); } static void deleteOutputFile(String path) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get( new URI(INPUT_DIR),conf); if (fs.exists( new Path(path))){ fs.delete( new Path(path)); } } } |
执行的时候先会输出上次执行过的输出目录。然后就按照步骤:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | 1 .设置输入文件目录; 2 .输入文件格式化类; 3 .设置自定义Mapper以及其键值类型; 4 .分区; 5 .排序; 6 .分组; 7 .规约; 8 .设置自定义Reducer以及其键值类型; 9 .设置输出目录; 10 .代码提交至JobTracker。 |
当然这过程中有些是可以省略的比如输出文件格式化类。从这个例子我们可以得出:既然可以设置自定义Mapper以及自定义Reducer,那么也应该可以设置自定义的输入文件格式化类以及分区、排序、分组、规约等等,这个以后会有相关的笔记现在这里只是写个简单的例子。我们编写一个文件如下并把它上传至hdfs://hadoop-master:9000/mapreduce/input/test.txt:
1 2 3 4 5 | luoliang me asura asura.com luoliang me |
然后执行main函数,将会在hdfs://hadoop-master:9000/mapreduce/output/目录下输出一个类似part-*的文件,我们可以使用如下命令查看:
1 | hadoop fs -text /output/part-* |
此时会输出:
1 2 3 4 5 6 7 | asura 1 asura.com 1 luoliang 2 me 2 |
现在文件是输出了也对比下是正确,但是脑子还是一片空白,不知道其怎么做到的,那么这个就是关于mapreduce的原理了,下面我也说说大概其原理:从把代码提交至JobTracker开始,它就会从指定的输入文件路径去获取文件,这里支持多个文件以及二级目录下的多个文件,这里获取就是使用的HDFS api来操作了!把所有文件读取出来之后按照指定的大小进行分割InputSplit,把分割好后的键值FileSplit(比如:<0,"luoliang me">,<13,"asura asura.com luoliang">)再转化为RecordReader(比如<"luoliang",1>,<"luoliang",1>),此时全部转换完毕后会每个都调用map函数,map函数把数据写入到Mapper.Context中,再会对数据进行分区排序分组规约,最后通过shuffle到达reduce端,这其中每个map的输出数量是等于reduce的输入数量。到达reduce端数据已经发生了质变了不在是<"luoliang",1>而是类似变成<"luoliang",{1,1}>这样的键值数据,这是我们需要迭代获取总数量并在写会context中,计算完后输出到指定的目录。在这里由于有重复的单词所以map函数的调用次数跟reduce函数调用次数是不同的。规约这个其实就是自定义reduce,但是这个不是必须有的因为如果是统计关于类似平均数的问题,数据在map端进行规约了,虽然传送时间以及处理时间减少性能提升了但是对于最终结果可能会有影响,所以这个规约要看具体情况才能使用。至于这个 shuffle 一步还不是怎么了解需要多多再看看。
这次先到这里。坚持记录点点滴滴!