public
class
WordCountApp {
//
可以指定目录,目录下如果有二级目录的话,是不会执行的,只会执行一级目录.
private
static
final
String INPUT_PATH = "hdfs://hadoop1:9000/abd";
//
输入路径
private
static
final
String OUT_PATH = "hdfs://hadoop1:9000/out";
//
输出路径,reduce作业输出的结果是一个目录
//
_SUCCESS:在linux中,带下划线的这些文件一般都是被忽略不去处理的.表示作业执行成功.
//
_logs:产生的日志文件.
//
part-r-00000:产生的是我们的输出的文件.开始以part开始.r:reduce输出的结果,map输出的结果是m,00000是序号
public
static
void
main(String[] args) {
Configuration conf
=
new
Configuration();
//
配置对象
try
{
FileSystem fileSystem
= FileSystem.get(
new
URI(OUT_PATH), conf);
fileSystem.delete(
new
Path(OUT_PATH),
true
);
Job job
=
new
Job(conf, WordCountApp.
class
.getSimpleName());
//
jobName:作业名称
job.setJarByClass(WordCountApp.
class
);
FileInputFormat.setInputPaths(job, INPUT_PATH);
//
指定数据的输入
job.setMapperClass(MyMapper.
class
);
//
指定自定义map类
job.setMapOutputKeyClass(Text.
class
);
//
指定map输出key的类型
job.setMapOutputValueClass(LongWritable.
class
);
//
指定map输出value的类型
job.setReducerClass(MyReducer.
class
);
//
指定自定义Reduce类
job.setOutputKeyClass(Text.
class
);
//
设置Reduce输出key的类型
job.setOutputValueClass(LongWritable.
class
);
//
设置Reduce输出的value类型
FileOutputFormat.setOutputPath(job,
new
Path(OUT_PATH));
//
Reduce输出完之后,就会产生一个最终的输出,指定最终输出的位置
job.waitForCompletion(
true
);
//
提交给jobTracker并等待结束
}
catch
(Exception e) {
e.printStackTrace();
}
}
/**
* 输入的key标示偏移量:这一行开始的字节. 输入的value:当前的行文本的内容. MapReduce执行过程:
* 在这里边,我们的数据输入来自于原始文件,数据输出写出到hdfs, 中间的一堆都是map输出产生的临时结果.存放在map运行的linux磁盘上的,
* 当经过shuffle时,reduce就会通过http把map端的对应数据给取过来.
* mapred-default.xml中mapredcue.jobtracker
* .root.dir,mapred.tmp.dir存储map产生的结果. 作业运行时产生这个目录,作业运行完之后它会删除目录.
*/
public
static
class
MyMapper
extends
Mapper
<LongWritable, Text, Text, LongWritable>
{
//
源文件有两行记录,解析源文件会产生两个键值对.分别是<0,hello you>,<10,hello me>,所以map函数会被调用两次.
//
在计算机存储的时候,是一维的结构.
@Override
protected
void
map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
//
为什么要把hadoop类型转换为java类型?
String line =
value.toString();
String[] splited
= line.split("\t"
);
//
使用hashMap写出去的优势:减少键值对出现的个数.
Map<String, Integer> hashMap =
new
HashMap<String, Integer>
();
for
(String word : splited) {
//
在for循环体内,临时变量word出现的此时是常量1
context.write(
new
Text(word),
new
LongWritable(1));
//
把每个单词出现的次数1写出去.
}
}
}
//
map函数执行结束后,map输出的<k,v>一共有4个.<hello,1>,<you,1>,<hello,1>,<me,1>
//
map把数据处理完之后,就会进入reduce.
//
在进入shuffle之前,数据需要先进行分区.默认只有一个区.
//
对每个不同分区中的数据进行排序,分组.
//
排序后的结果:<hello,1>,<hello,1>,<me,1>,<you,1>
//
分组后的结果(相同key的value放在一个集合中):<hello,{1,1}>,<me,{1}>,<you,{1}>
//
规约(可选)
//
map中的数据分发到reduce的过程称作shuffle
public
static
class
MyReducer
extends
Reducer
<Text, LongWritable, Text, LongWritable>
{
//
每一组调用一次reduce函数,一共调用了三次
@Override
protected
void
reduce(Text key, Iterable<LongWritable>
values,
Context context)
throws
IOException, InterruptedException {
//
count标示单词key在整个文件出现的次数
//
分组的数量与reduce函数调用次数是相等的.
//
reduce函数调用次数与产生的<k,v>的数量抛开业务,没有任何关系!
long
count = 0L
;
for
(LongWritable times : values) {
count
+=
times.get();
}
context.write(key,
new
LongWritable(count));
}
}
}

