hive UDAF开发入门和运行过程详解

系统 2254 0

介绍

hive的用户自定义聚合函数(UDAF)是一个很好的功能,集成了先进的数据处理。hive有两种UDAF:简单和通用。顾名思义,简单的UDAF,写的相当简单的,但因为使用Java反射导致性能损失,而且有些特性不能使用,如可变长度参数列表。通用UDAF可以使用​​所有功能,但是UDAF就写的比较复杂,不直观。

本文只介绍通用UDAF。

UDAF是需要在hive的sql语句和group by联合使用,hive的group by对于每个分组,只能返回一条记录,这点和mysql不一样,切记。

 

UDAF开发概览

开发通用UDAF有两个步骤,第一个是编写resolver类,第二个是编写 evaluator 类。 resolver负责类型检查,操作符重载。 evaluator真正实现UDAF的逻辑。通常来说,顶层UDAF类继承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2, 里面编写嵌套类 evaluator  实现UDAF的逻辑。

 本文以Hive的内置UDAF sum函数的源代码作为示例讲解。

 

实现 resolver

resolver通常继承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2 ,但是我们更建议继承 AbstractGenericUDAFResolver,隔离将来hive接口的变化。

GenericUDAFResolver和GenericUDAFResolver2接口的区别是,后面的允许evaluator实现可以访问更多的信息,例如DISTINCT限定符,通配符FUNCTION(*)。

      
        public
      
      
        class
      
       GenericUDAFSum 
      
        extends
      
      
         AbstractGenericUDAFResolver {

  
      
      
        static
      
      
        final
      
       Log LOG = LogFactory.getLog(GenericUDAFSum.
      
        class
      
      
        .getName());

  @Override
  
      
      
        public
      
      
         GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
    
      
      
        throws
      
      
         SemanticException {

      
      
            //
      
      
         Type-checking goes here!
      
      
            return
      
      
        new
      
      
         GenericUDAFSumLong(); 
        
}
public static class GenericUDAFSumLong extends GenericUDAFEvaluator { // UDAF logic goes here! } }

这个就是 UDAF的代码骨架,第一行创建LOG对象,用来写入警告和错误到hive的log。 GenericUDAFResolver只需要重写一个方法: getEvaluator, 它根据SQL传入的参数类型,返回正确的evaluator。这里最主要是实现操作符的重载。

getEvaluator的完整代码如下:

      
        public
      
      
         GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
    
      
      
        throws
      
      
         SemanticException {
    
      
      
        if
      
       (parameters.length != 1
      
        ) {
      
      
      
        throw
      
      
        new
      
       UDFArgumentTypeException(parameters.length - 1
      
        ,
          
      
      "Exactly one argument is expected."
      
        );
    }

    
      
      
        if
      
       (parameters[0].getCategory() !=
      
         ObjectInspector.Category.PRIMITIVE) {
      
      
      
        throw
      
      
        new
      
       UDFArgumentTypeException(0
      
        ,
          
      
      "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " is passed."
      
        );
    }
    
      
      
        switch
      
       (((PrimitiveTypeInfo) parameters[0
      
        ]).getPrimitiveCategory()) {
    
      
      
        case
      
      
         BYTE:
    
      
      
        case
      
      
         SHORT:
    
      
      
        case
      
      
         INT:
    
      
      
        case
      
      
         LONG:
    
      
      
        case
      
      
         TIMESTAMP:
      
      
      
        return
      
      
        new
      
      
         GenericUDAFSumLong();
    
      
      
        case
      
      
         FLOAT:
    
      
      
        case
      
      
         DOUBLE:
    
      
      
        case
      
      
         STRING:
      
      
      
        return
      
      
        new
      
      
         GenericUDAFSumDouble();
    
      
      
        case
      
      
         BOOLEAN:
    
      
      
        default
      
      
        :
      
      
      
        throw
      
      
        new
      
       UDFArgumentTypeException(0
      
        ,
          
      
      "Only numeric or string type arguments are accepted but "
          + parameters[0].getTypeName() + " is passed."
      
        );
    }
      
    

这里做了类型检查,如果不是原生类型(即符合类型,array,map此类),则抛出异常,还实现了操作符重载,对于整数类型,使用GenericUDAFSumLong实现UDAF的逻辑,对于浮点类型,使用GenericUDAFSumDouble实现UDAF的逻辑。

 

实现evaluator

所有 evaluators必须继承抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。子类必须实现它的一些抽象方法,实现UDAF的逻辑。

GenericUDAFEvaluator有一个嵌套类Mode,这个类很重要,它表示了udaf在mapreduce的各个阶段,理解Mode的含义,就可以理解了hive的UDAF的运行流程。

      
        public
      
      
        static
      
      
        enum
      
      
         Mode {
    
      
      
        /**
      
      
        
     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
     * 将会调用iterate()和terminatePartial()
     
      
      
        */
      
      
        
    PARTIAL1,
        
      
      
        /**
      
      
        
     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
     * 将会调用merge() 和 terminatePartial() 
     
      
      
        */
      
      
        
    PARTIAL2,
        
      
      
        /**
      
      
        
     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合 
     * 将会调用merge()和terminate()
     
      
      
        */
      
      
        
    FINAL,
        
      
      
        /**
      
      
        
     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
      * 将会调用 iterate()和terminate()
     
      
      
        */
      
      
        
    COMPLETE
  };
      
    

一般情况下,完整的UDAF逻辑是一个mapreduce过程,如果有mapper和reducer,就会经历PARTIAL1(mapper),FINAL(reducer),如果还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。

下面以GenericUDAFSumLong的evaluator实现讲解

      
        public
      
      
        static
      
      
        class
      
       GenericUDAFSumLong 
      
        extends
      
      
         GenericUDAFEvaluator {


      
      
        private
      
      
         PrimitiveObjectInspector inputOI;
    
      
      
        private
      
      
         LongWritable result;

   
      
      
        //
      
      
        这个方法返回了UDAF的返回类型,这里确定了sum自定义函数的返回类型是Long类型
      
      
            @Override
    
      
      
        public
      
       ObjectInspector init(Mode m, ObjectInspector[] parameters) 
      
        throws
      
      
         HiveException {
      
      
      
        assert
      
       (parameters.length == 1
      
        );
      
      
      
        super
      
      
        .init(m, parameters);
      result 
      
      = 
      
        new
      
       LongWritable(0
      
        );
      inputOI 
      
      = (PrimitiveObjectInspector) parameters[0
      
        ];
      
      
      
        return
      
      
         PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    }

    
      
      
        /**
      
      
         存储sum的值的类 
      
      
        */
      
      
        static
      
      
        class
      
       SumLongAgg 
      
        implements
      
      
         AggregationBuffer {
      
      
      
        boolean
      
      
         empty;
      
      
      
        long
      
      
         sum;
    }

    
      
      
        //
      
      
        创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。
      
      
        
    @Override
    
      
      
        public
      
       AggregationBuffer getNewAggregationBuffer() 
      
        throws
      
      
         HiveException {
      SumLongAgg result 
      
      = 
      
        new
      
      
         SumLongAgg();
      reset(result);
      
      
      
        return
      
      
         result;
    }
    
    
      
      
        //
      
      
        mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。
      
      
        
    @Override
    
      
      
        public
      
      
        void
      
       reset(AggregationBuffer agg) 
      
        throws
      
      
         HiveException {
      SumLongAgg myagg 
      
      =
      
         (SumLongAgg) agg;
      myagg.empty 
      
      = 
      
        true
      
      
        ;
      myagg.sum 
      
      = 0
      
        ;
    }

    
      
      
        private
      
      
        boolean
      
       warned = 
      
        false
      
      
        ;
  
    
      
      
        //
      
      
        map阶段调用,只要把保存当前和的对象agg,再加上输入的参数,就可以了。
      
      
            @Override
    
      
      
        public
      
      
        void
      
       iterate(AggregationBuffer agg, Object[] parameters) 
      
        throws
      
      
         HiveException {
      
      
      
        assert
      
       (parameters.length == 1
      
        );
      
      
      
        try
      
      
         {
        merge(agg, parameters[
      
      0
      
        ]);
      } 
      
      
        catch
      
      
         (NumberFormatException e) {
        
      
      
        if
      
       (!
      
        warned) {
          warned 
      
      = 
      
        true
      
      
        ;
          LOG.warn(getClass().getSimpleName() 
      
      + " "
              +
      
         StringUtils.stringifyException(e));
        }
      }
    }
   
      
      
        //
      
      
        mapper结束要返回的结果,还有combiner结束返回的结果
      
      
            @Override
    
      
      
        public
      
       Object terminatePartial(AggregationBuffer agg) 
      
        throws
      
      
         HiveException {
      
      
      
        return
      
      
         terminate(agg);
    }
    
    
      
      
        //
      
      
        combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
      
      
            @Override
    
      
      
        public
      
      
        void
      
       merge(AggregationBuffer agg, Object partial) 
      
        throws
      
      
         HiveException {
      
      
      
        if
      
       (partial != 
      
        null
      
      
        ) {
        SumLongAgg myagg 
      
      =
      
         (SumLongAgg) agg;
        myagg.sum 
      
      +=
      
         PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
        myagg.empty 
      
      = 
      
        false
      
      
        ;
      }
    }
     
    
      
      
        //
      
      
        reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
      
      
            @Override
    
      
      
        public
      
       Object terminate(AggregationBuffer agg) 
      
        throws
      
      
         HiveException {
      SumLongAgg myagg 
      
      =
      
         (SumLongAgg) agg;
      
      
      
        if
      
      
         (myagg.empty) {
        
      
      
        return
      
      
        null
      
      
        ;
      }
      result.set(myagg.sum);
      
      
      
        return
      
      
         result;
    }

  }
      
    

除了GenericUDAFSumLong,还有重载的GenericUDAFSumDouble,以上代码都在hive的源码:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum。

 

注意

terminate()返回的数据类型要跟输入时的数据类型保持一致,不然会报错!

修改方法注册

修改   ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java文件,加入编写的 UDAF类,并注册名字。

FunctionRegistry类包含了hive的所有内置自定义函数。想要更好学习hive的UDAF,建议多看看里面的UDAF。

 

总结

本文的目的是为初学者入门学习udaf,所以介绍了udaf的概览,尤其是udaf的运行过程,这对初学者是比较大的槛。

考虑入门,本文简单介绍了sum的UDAF实现,但是如果想要更好理解UDAF的运行过程,建议再看看avg UDAF:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage。avg UDAF对hive的运行流程要控制的更加精细,并判断当前运行的Mode做一定的逻辑处理。

 

参考  https://cwiki.apache.org/Hive/genericudafcasestudy.html

转自  http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html

hive UDAF开发入门和运行过程详解


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论