Map/Reduce中Join查询实现

系统 1947 0

http://www.cnblogs.com/MengYan-LongYou/p/3360613.html

 

在做这个 Join 查询的时候,必然涉及数据,我这里设计了 2 张表,分别较 data.txt info.txt ,字段之间以 /t 划分。

data.txt 内容如下:

201001    1003    abc

201002    1005    def

201003    1006    ghi

201004    1003    jkl

201005    1004    mno

201006    1005    pqr

 

info.txt 内容如下:

 

1003    kaka

1004    da

1005    jue

1006    zhao

 

期望输出结果:

1003    201001    abc    kaka

1003    201004    jkl    kaka

1004    201005    mno    da

1005    201002    def    jue

1005    201006    pqr    jue

1006    201003    ghi    zhao

 

四、 Map 代码

首先是 map 的代码,我贴上,然后简要说说

 

public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {

        @Override

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

            // 获取输入文件的全路径和名称

            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

 

            if (pathName.contains("data.txt")) {

                String values[] = value.toString().split("/t");

                if (values.length < 3) {

                    // data数据格式不规范,字段小于3,抛弃数据

                    return;

                } else {

                    // 数据格式规范,区分标识为1

                    TextPair tp = new TextPair(new Text(values[1]), new Text("1"));

                    context.write(tp, new Text(values[0] + "/t" + values[2]));

                }

            }

            if (pathName.contains("info.txt")) {

                String values[] = value.toString().split("/t");

                if (values.length < 2) {

                    // data数据格式不规范,字段小于2,抛弃数据

                    return;

                } else {

                    // 数据格式规范,区分标识为0

                    TextPair tp = new TextPair(new Text(values[0]), new Text("0"));

                    context.write(tp, new Text(values[1]));

                }

            }

        }

    }

 

这里需要注意以下部分:

A pathName 是文件在 HDFS 中的全路径 ( 例如: hdfs://M1:9000/MengYan/join/data/info.txt) ,可以以 endsWith() 的方法来判断。

B 、资料表,也就是这里的 info.txt 需要放在前面,也就是标识号是 0. 否则无法输出理想结果。

C Map 执行完成之后,输出的中间结果如下:

1003,0    kaka

1004,0    da

1005,0    jue

1006,0    zhao

1003,1    201001    abc

1003,1    201004    jkl

1004,1    201005    mon

1005,1    201002    def

1005,1    201006    pqr

1006,1    201003    ghi

 

五、分区和分组

1 map 之后的输出会进行一些分区的操作,代码贴出来:

public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {

        @Override

        public int getPartition(TextPair key, Text value, int numParititon) {

            return Math.abs(key.getFirst().hashCode() * 127) % numParititon;

        }

    }

分区我在以前的文档中写过,这里不做描述了,就说是按照 map 输出的符合 key 的第一个字段做分区关键字。分区之后,相同 key 会划分到一个 reduce 中去处理(如果 reduce 设置是 1 ,那么就是分区有多个,但是还是在一个 reduce 中处理。但是结果会按照分区的原则排序)。分区后结果大致如下:

 

同一区:

1003,0    kaka

1003,1    201001    abc

1003,1    201004    jkl

 

 

同一区:

1004,0    da

1004,1    201005    mon

 

 

同一区:

1005,0    jue

1005,1    201002    def

1005,1    201006    pqr

 

 

同一区:

1006,0    zhao

1006,1    201003    ghi

 

2 、分组操作,代码如下

 

public static class Example_Join_01_Comparator extends WritableComparator {

 

        public Example_Join_01_Comparator() {

            super(TextPair.class, true);

        }

 

        @SuppressWarnings("unchecked")

        public int compare(WritableComparable a, WritableComparable b) {

            TextPair t1 = (TextPair) a;

            TextPair t2 = (TextPair) b;

            return t1.getFirst().compareTo(t2.getFirst());

        }

    }

 

分组操作就是把在相同分区的数据按照指定的规则进行分组的操作,就以上来看,是按照复合 key 的第一个字段做分组原则,达到忽略复合 key 的第二个字段值的目的,从而让数据能够迭代在一个 reduce 中。输出后结果如下:

 

同一组:

1003,0    kaka

1003,0    201001    abc

1003,0    201004    jkl

 

同一组:

1004,0    da

1004,0    201005    mon

 

同一组:

1005,0    jue

1005,0    201002    def

1005,0    201006    pqr

 

同一组:

1006,0    zhao

1006,0    201003    ghi

 

六、 reduce 操作

贴上代码如下:

public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {

        protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,

                InterruptedException {

            Text pid = key.getFirst();

            String desc = values.iterator().next().toString();

            while (values.iterator().hasNext()) {

                context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));

            }

        }

    }

1 、代码比较简单,首先获取关键的 ID 值,就是 key 的第一个字段。

2 、获取公用的字段,通过排组织后可以看到,一些共有字段是在第一位,取出来即可。

3 、遍历余下的结果,输出。

七、其他的支撑代码

1 、首先是 TextPair 代码,没有什么可以细说的,贴出来:

public class TextPair implements WritableComparable<TextPair> {

    private Text first;

    private Text second;

 

    public TextPair() {

        set(new Text(), new Text());

    }

 

    public TextPair(String first, String second) {

        set(new Text(first), new Text(second));

    }

 

    public TextPair(Text first, Text second) {

        set(first, second);

    }

 

    public void set(Text first, Text second) {

        this.first = first;

        this.second = second;

    }

 

    public Text getFirst() {

        return first;

    }

 

    public Text getSecond() {

        return second;

    }

 

    public void write(DataOutput out) throws IOException {

        first.write(out);

        second.write(out);

    }

 

    public void readFields(DataInput in) throws IOException {

        first.readFields(in);

        second.readFields(in);

    }

 

    public int compareTo(TextPair tp) {

        int cmp = first.compareTo(tp.first);

        if (cmp != 0) {

            return cmp;

        }

        return second.compareTo(tp.second);

    }

}

2 Job 的入口函数

public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();

        GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);

        String[] otherArgs = parser.getRemainingArgs();

        if (agrs.length < 3) {

            System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");

            System.exit(2);

        }

 

        //conf.set("hadoop.job.ugi", "root,hadoop");

 

        Job job = new Job(conf, "Example_Join_01");

        // 设置运行的job

        job.setJarByClass(Example_Join_01.class);

        // 设置Map相关内容

        job.setMapperClass(Example_Join_01_Mapper.class);

        // 设置Map的输出

        job.setMapOutputKeyClass(TextPair.class);

        job.setMapOutputValueClass(Text.class);

        // 设置partition

        job.setPartitionerClass(Example_Join_01_Partitioner.class);

        // 在分区之后按照指定的条件分组

        job.setGroupingComparatorClass(Example_Join_01_Comparator.class);

        // 设置reduce

        job.setReducerClass(Example_Join_01_Reduce.class);

        // 设置reduce的输出

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        // 设置输入和输出的目录

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        // 执行,直到结束就退出

        System.exit(job.waitForCompletion(true) ? 0 : 1);

 

    }

 

八、总结

1 、这是个简单的 join 查询,可以看到,我在处理输入源的时候是在 map 端做来源判断。其实在 0.19 可以用 MultipleInputs.addInputPath() 的方法,但是它用了 JobConf 做参数。这个方法原理是多个数据源就采用多个 map 来处理。方法各有优劣。

2 、对于资源表,如果我们采用 0 1 这样的模式来区分,资源表是需要放在前的。例如本例中 info.txt 就是资源表,所以标识位就是 0. 如果写为 1 的话,可以试下,在分组之后,资源表对应的值放在了迭代器最后一位,无法追加在最后所有的结果集合中。

3 、关于分区,并不是所有的 map 都结束才开始的,一部分数据完成就会开始执行。同样,分组操作在一个分区内执行,如果分区完成,分组将会开始执行,也不是等所有分区完成才开始做分组的操作。

Map/Reduce中Join查询实现


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论