介绍
hive的用户自定义聚合函数(UDAF)是一个很好的功能,集成了先进的数据处理。hive有两种UDAF:简单和通用。顾名思义,简单的UDAF,写的相当简单的,但因为使用Java反射导致性能损失,而且有些特性不能使用,如可变长度参数列表。通用UDAF可以使用所有功能,但是UDAF就写的比较复杂,不直观。
本文只介绍通用UDAF。
UDAF是需要在hive的sql语句和group by联合使用,hive的group by对于每个分组,只能返回一条记录,这点和mysql不一样,切记。
UDAF开发概览
开发通用UDAF有两个步骤,第一个是编写resolver类,第二个是编写 evaluator 类。 resolver负责类型检查,操作符重载。 evaluator真正实现UDAF的逻辑。通常来说,顶层UDAF类继承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2, 里面编写嵌套类 evaluator 实现UDAF的逻辑。
本文以Hive的内置UDAF sum函数的源代码作为示例讲解。
实现 resolver
resolver通常继承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2 ,但是我们更建议继承 AbstractGenericUDAFResolver,隔离将来hive接口的变化。
GenericUDAFResolver和GenericUDAFResolver2接口的区别是,后面的允许evaluator实现可以访问更多的信息,例如DISTINCT限定符,通配符FUNCTION(*)。
public
class
GenericUDAFSum
extends
AbstractGenericUDAFResolver {
static
final
Log LOG = LogFactory.getLog(GenericUDAFSum.
class
.getName());
@Override
public
GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws
SemanticException {
//
Type-checking goes here!
return
new
GenericUDAFSumLong();
}
public
static
class
GenericUDAFSumLong
extends
GenericUDAFEvaluator {
//
UDAF logic goes here!
}
}
这个就是 UDAF的代码骨架,第一行创建LOG对象,用来写入警告和错误到hive的log。 GenericUDAFResolver只需要重写一个方法: getEvaluator, 它根据SQL传入的参数类型,返回正确的evaluator。这里最主要是实现操作符的重载。
getEvaluator的完整代码如下:
public
GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws
SemanticException {
if
(parameters.length != 1
) {
throw
new
UDFArgumentTypeException(parameters.length - 1
,
"Exactly one argument is expected."
);
}
if
(parameters[0].getCategory() !=
ObjectInspector.Category.PRIMITIVE) {
throw
new
UDFArgumentTypeException(0
,
"Only primitive type arguments are accepted but "
+ parameters[0].getTypeName() + " is passed."
);
}
switch
(((PrimitiveTypeInfo) parameters[0
]).getPrimitiveCategory()) {
case
BYTE:
case
SHORT:
case
INT:
case
LONG:
case
TIMESTAMP:
return
new
GenericUDAFSumLong();
case
FLOAT:
case
DOUBLE:
case
STRING:
return
new
GenericUDAFSumDouble();
case
BOOLEAN:
default
:
throw
new
UDFArgumentTypeException(0
,
"Only numeric or string type arguments are accepted but "
+ parameters[0].getTypeName() + " is passed."
);
}
这里做了类型检查,如果不是原生类型(即符合类型,array,map此类),则抛出异常,还实现了操作符重载,对于整数类型,使用GenericUDAFSumLong实现UDAF的逻辑,对于浮点类型,使用GenericUDAFSumDouble实现UDAF的逻辑。
实现evaluator
所有 evaluators必须继承抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。子类必须实现它的一些抽象方法,实现UDAF的逻辑。
GenericUDAFEvaluator有一个嵌套类Mode,这个类很重要,它表示了udaf在mapreduce的各个阶段,理解Mode的含义,就可以理解了hive的UDAF的运行流程。
public
static
enum
Mode {
/**
* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
* 将会调用iterate()和terminatePartial()
*/
PARTIAL1,
/**
* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
* 将会调用merge() 和 terminatePartial()
*/
PARTIAL2,
/**
* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
* 将会调用merge()和terminate()
*/
FINAL,
/**
* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
* 将会调用 iterate()和terminate()
*/
COMPLETE
};
一般情况下,完整的UDAF逻辑是一个mapreduce过程,如果有mapper和reducer,就会经历PARTIAL1(mapper),FINAL(reducer),如果还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
而有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。
下面以GenericUDAFSumLong的evaluator实现讲解
public
static
class
GenericUDAFSumLong
extends
GenericUDAFEvaluator {
private
PrimitiveObjectInspector inputOI;
private
LongWritable result;
//
这个方法返回了UDAF的返回类型,这里确定了sum自定义函数的返回类型是Long类型
@Override
public
ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws
HiveException {
assert
(parameters.length == 1
);
super
.init(m, parameters);
result
=
new
LongWritable(0
);
inputOI
= (PrimitiveObjectInspector) parameters[0
];
return
PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
/**
存储sum的值的类
*/
static
class
SumLongAgg
implements
AggregationBuffer {
boolean
empty;
long
sum;
}
//
创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。
@Override
public
AggregationBuffer getNewAggregationBuffer()
throws
HiveException {
SumLongAgg result
=
new
SumLongAgg();
reset(result);
return
result;
}
//
mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。
@Override
public
void
reset(AggregationBuffer agg)
throws
HiveException {
SumLongAgg myagg
=
(SumLongAgg) agg;
myagg.empty
=
true
;
myagg.sum
= 0
;
}
private
boolean
warned =
false
;
//
map阶段调用,只要把保存当前和的对象agg,再加上输入的参数,就可以了。
@Override
public
void
iterate(AggregationBuffer agg, Object[] parameters)
throws
HiveException {
assert
(parameters.length == 1
);
try
{
merge(agg, parameters[
0
]);
}
catch
(NumberFormatException e) {
if
(!
warned) {
warned
=
true
;
LOG.warn(getClass().getSimpleName()
+ " "
+
StringUtils.stringifyException(e));
}
}
}
//
mapper结束要返回的结果,还有combiner结束返回的结果
@Override
public
Object terminatePartial(AggregationBuffer agg)
throws
HiveException {
return
terminate(agg);
}
//
combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
@Override
public
void
merge(AggregationBuffer agg, Object partial)
throws
HiveException {
if
(partial !=
null
) {
SumLongAgg myagg
=
(SumLongAgg) agg;
myagg.sum
+=
PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
myagg.empty
=
false
;
}
}
//
reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
@Override
public
Object terminate(AggregationBuffer agg)
throws
HiveException {
SumLongAgg myagg
=
(SumLongAgg) agg;
if
(myagg.empty) {
return
null
;
}
result.set(myagg.sum);
return
result;
}
}
除了GenericUDAFSumLong,还有重载的GenericUDAFSumDouble,以上代码都在hive的源码:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum。
注意
terminate()返回的数据类型要跟输入时的数据类型保持一致,不然会报错!
修改方法注册
修改 ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java文件,加入编写的 UDAF类,并注册名字。
FunctionRegistry类包含了hive的所有内置自定义函数。想要更好学习hive的UDAF,建议多看看里面的UDAF。
总结
本文的目的是为初学者入门学习udaf,所以介绍了udaf的概览,尤其是udaf的运行过程,这对初学者是比较大的槛。
考虑入门,本文简单介绍了sum的UDAF实现,但是如果想要更好理解UDAF的运行过程,建议再看看avg UDAF:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage。avg UDAF对hive的运行流程要控制的更加精细,并判断当前运行的Mode做一定的逻辑处理。
参考 https://cwiki.apache.org/Hive/genericudafcasestudy.html
转自 http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html

