单词计数WordCountApp.class

系统 1610 0
      
        public
      
      
        class
      
      
         WordCountApp {

    
      
      
        //
      
      
         可以指定目录,目录下如果有二级目录的话,是不会执行的,只会执行一级目录.
      
      
        private
      
      
        static
      
      
        final
      
       String INPUT_PATH = "hdfs://hadoop1:9000/abd";
      
        //
      
      
         输入路径
      
      
        private
      
      
        static
      
      
        final
      
       String OUT_PATH = "hdfs://hadoop1:9000/out";
      
        //
      
      
         输出路径,reduce作业输出的结果是一个目录

    
      
      
        //
      
      
         _SUCCESS:在linux中,带下划线的这些文件一般都是被忽略不去处理的.表示作业执行成功.

    
      
      
        //
      
      
         _logs:产生的日志文件.

    
      
      
        //
      
      
         part-r-00000:产生的是我们的输出的文件.开始以part开始.r:reduce输出的结果,map输出的结果是m,00000是序号
      
      
        public
      
      
        static
      
      
        void
      
      
         main(String[] args) {

        Configuration conf 
      
      = 
      
        new
      
       Configuration();
      
        //
      
      
         配置对象
      
      
        try
      
      
         {

            FileSystem fileSystem 
      
      = FileSystem.get(
      
        new
      
      
         URI(OUT_PATH), conf);

            fileSystem.delete(
      
      
        new
      
       Path(OUT_PATH), 
      
        true
      
      
        );

            Job job 
      
      = 
      
        new
      
       Job(conf, WordCountApp.
      
        class
      
      .getSimpleName());
      
        //
      
      
         jobName:作业名称
      
      

            job.setJarByClass(WordCountApp.
      
        class
      
      
        );

            FileInputFormat.setInputPaths(job, INPUT_PATH);
      
      
        //
      
      
         指定数据的输入
      
      

            job.setMapperClass(MyMapper.
      
        class
      
      );
      
        //
      
      
         指定自定义map类
      
      

            job.setMapOutputKeyClass(Text.
      
        class
      
      );
      
        //
      
      
         指定map输出key的类型
      
      

            job.setMapOutputValueClass(LongWritable.
      
        class
      
      );
      
        //
      
      
         指定map输出value的类型
      
      

            job.setReducerClass(MyReducer.
      
        class
      
      );
      
        //
      
      
         指定自定义Reduce类
      
      

            job.setOutputKeyClass(Text.
      
        class
      
      );
      
        //
      
      
         设置Reduce输出key的类型
      
      

            job.setOutputValueClass(LongWritable.
      
        class
      
      );
      
        //
      
      
         设置Reduce输出的value类型
      
      

            FileOutputFormat.setOutputPath(job, 
      
        new
      
       Path(OUT_PATH));
      
        //
      
      
         Reduce输出完之后,就会产生一个最终的输出,指定最终输出的位置
      
      

            job.waitForCompletion(
      
        true
      
      );
      
        //
      
      
         提交给jobTracker并等待结束
      
      

        } 
      
        catch
      
      
         (Exception e) {

            e.printStackTrace();

        }

    }



    
      
      
        /**
      
      
        

     * 输入的key标示偏移量:这一行开始的字节. 输入的value:当前的行文本的内容. MapReduce执行过程:

     * 在这里边,我们的数据输入来自于原始文件,数据输出写出到hdfs, 中间的一堆都是map输出产生的临时结果.存放在map运行的linux磁盘上的,

     * 当经过shuffle时,reduce就会通过http把map端的对应数据给取过来.

     * mapred-default.xml中mapredcue.jobtracker

     * .root.dir,mapred.tmp.dir存储map产生的结果. 作业运行时产生这个目录,作业运行完之后它会删除目录.

     
      
      
        */
      
      
        public
      
      
        static
      
      
        class
      
       MyMapper 
      
        extends
      
      
        

            Mapper
      
      <LongWritable, Text, Text, LongWritable>
      
         {

        
      
      
        //
      
      
         源文件有两行记录,解析源文件会产生两个键值对.分别是<0,hello you>,<10,hello me>,所以map函数会被调用两次.

        
      
      
        //
      
      
         在计算机存储的时候,是一维的结构.
      
      
                @Override

        
      
      
        protected
      
      
        void
      
      
         map(LongWritable key, Text value, Context context)

                
      
      
        throws
      
      
         IOException, InterruptedException {

            
      
      
        //
      
      
         为什么要把hadoop类型转换为java类型?
      
      

            String line =
      
         value.toString();

            String[] splited 
      
      = line.split("\t"
      
        );

            
      
      
        //
      
      
         使用hashMap写出去的优势:减少键值对出现的个数.
      
      

            Map<String, Integer> hashMap = 
      
        new
      
       HashMap<String, Integer>
      
        ();



            
      
      
        for
      
      
         (String word : splited) {

                
      
      
        //
      
      
         在for循环体内,临时变量word出现的此时是常量1
      
      

                context.write(
      
        new
      
       Text(word), 
      
        new
      
       LongWritable(1));
      
        //
      
      
         把每个单词出现的次数1写出去.
      
      
                    }

        }

    }



    
      
      
        //
      
      
         map函数执行结束后,map输出的<k,v>一共有4个.<hello,1>,<you,1>,<hello,1>,<me,1>

    
      
      
        //
      
      
         map把数据处理完之后,就会进入reduce.

    
      
      
        //
      
      
         在进入shuffle之前,数据需要先进行分区.默认只有一个区.

    
      
      
        //
      
      
         对每个不同分区中的数据进行排序,分组.

    
      
      
        //
      
      
         排序后的结果:<hello,1>,<hello,1>,<me,1>,<you,1>

    
      
      
        //
      
      
         分组后的结果(相同key的value放在一个集合中):<hello,{1,1}>,<me,{1}>,<you,{1}>

    
      
      
        //
      
      
         规约(可选)



    
      
      
        //
      
      
         map中的数据分发到reduce的过程称作shuffle
      
      
        public
      
      
        static
      
      
        class
      
       MyReducer 
      
        extends
      
      
        

            Reducer
      
      <Text, LongWritable, Text, LongWritable>
      
         {

        
      
      
        //
      
      
         每一组调用一次reduce函数,一共调用了三次
      
      
                @Override

        
      
      
        protected
      
      
        void
      
       reduce(Text key, Iterable<LongWritable>
      
         values,

                Context context) 
      
      
        throws
      
      
         IOException, InterruptedException {

            
      
      
        //
      
      
         count标示单词key在整个文件出现的次数

            
      
      
        //
      
      
         分组的数量与reduce函数调用次数是相等的.

            
      
      
        //
      
      
         reduce函数调用次数与产生的<k,v>的数量抛开业务,没有任何关系!
      
      
        long
      
       count = 0L
      
        ;

            
      
      
        for
      
      
         (LongWritable times : values) {

                count 
      
      +=
      
         times.get();

            }

            context.write(key, 
      
      
        new
      
      
         LongWritable(count));

        }

    }

}
      
    

 

单词计数WordCountApp.class


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论