在用Hive进行ETL的时候,对于一些复杂的数据处理逻辑,往往不能用简单的HQL来解决,这个时候就需要使用UDAF了。
对于底层的内容还没有细看,先从应用的角度来说一下吧。
使用UDAF需要实现接口GenericUDAFResolver2,或者继承抽象类AbstractGenericUDAFResolver。
UDAF主要分为2个部分,第一个部分是对传入参数进行校验,数据类型的校验。然后根据传入的数据类型不同调用具体的处理逻辑。
比如说,自己写了一个SUM,SUM对于Long类型和Double类型进行求和,没有问题。
但是,如果传入的参数是一个Array呢?这个时候,就需要在Evaluator方法里面,对参数进行校验了。
- public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
- throws SemanticException {
- if (parameters.length != 1) {
- throw new UDFArgumentTypeException(parameters.length - 1,
- "Exactly one argument is expected.");
- }
- if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
- throw new UDFArgumentTypeException(0,
- "Only primitive type arguments are accepted but "
- + parameters[0].getTypeName() + " is passed.");
- }
- switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- case STRING:
- case TIMESTAMP:
- return new GenericUDAFAverageEvaluator();
- case BOOLEAN:
- default:
- throw new UDFArgumentTypeException(0,
- "Only numeric or string type arguments are accepted but "
- + parameters[0].getTypeName() + " is passed.");
- }
- }
这个方法只支持Primitive类型,也就是INT,String,Double,Float这些。
UDAF使用一个ObjectInspector来抽象化每一行数据的读取。
上面使用的Primitive类型的数据,所以使用PrimitiveObjectInspector来读取传入的参数。
UDAF会根据不同的计算模型,产生不同的阶段。
如:SUM()聚合函数,接受一个原始类型的整型数值,然后创建一个整型的PARTIAL数据,
返回一个固定的整型结果。
如:median() 中位数
可以接受原始整型输入,然后会产生一个中间的整数PARTIAL数据(排序),
然后再返回一个固定的整型结果。
注意:
- 聚合操作会在reduce的环境下执行,然后由一个Java进程的内存大小限制这个操作。
- 因此像排序大结构体的数据,可能会产生对内存不足的异常。
- 一般情况下可以增加内存来解决这个问题。
- <property>
- <name>mapred.child.java.opts</name>
- <value>-Xmx200m</value>
- </property>
在处理逻辑之前,介绍一下UDAF的Mode。
UDAF的Mode,也就是执行阶段。无论怎样的UDAF,最终都会变成MapReduce Job。
Mode是一UDAF的使用类型,主要有4种形势:
因为MapReduce可能是,Map->Reduce也可能是,Map->Reduce->Reduce
- public static enum Mode {
- /**
- * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
- * 将会调用iterate()和terminatePartial()
- */
- PARTIAL1,
- /**
- * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
- * 将会调用merge() 和 terminatePartial()
- */
- PARTIAL2,
- /**
- * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
- * 将会调用merge()和terminate()
- */
- FINAL,
- /**
- * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
- * 将会调用 iterate()和terminate()
- */
- COMPLETE
- };
有的UDAF函数会可以像UDF函数那样使用,有的必须在聚合函数环境下使用,如group by,over(partition by )
而在使用UDAF进行计算的时候,会启用一个init方法。这个init的方法会在买个阶段前面都启动一次。第一次启动的时候,参数指的是读入每一行记录的参数。第二次启动的时候,传入的参数只有1个,指的是中间结果的参数。这里需要特别注意。
- @Override
- public ObjectInspector init(Mode m, ObjectInspector[] parameters)
- throws HiveException {
- super.init(m, parameters);
- //init input
- if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有
- LOG.info(" Mode:"+m.toString()+" result has init");
- inputOI = (PrimitiveObjectInspector) parameters[0];
- inputOI2 = (PrimitiveObjectInspector) parameters[1];
- // result = new DoubleWritable(0);
- }
- //init output
- if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
- outputOI = (PrimitiveObjectInspector) parameters[0];
- result = new DoubleWritable(0);
- return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
- }else{
- result = new DoubleWritable(0);
- return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
- }
- }
所以我们使用枚举方法,根据init启动阶段的不同,接入不同的参数。
实现UDAF的时候,实际就是一个Reducer
对于计算过程中的中间结果,会有一个Buffer对象来进行缓冲。
Buffer对象相当于Reducer里面记录结果集的一个内存对象。
这里面可以大大的发挥想象,作出你想要的各种数据类型。
另外,在UDAF输出的时候,也可以输出Struct,Array类型的数据。
这一部分等到用到再进行研究吧。
最后是完整的UDAF代码。实现一个有条件的SUM,传入2个参数,当第二个参数>1 的时候进行SUM。
- package com.test.udaf;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
- import org.apache.hadoop.hive.ql.metadata.HiveException;
- import org.apache.hadoop.hive.ql.parse.SemanticException;
- import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
- import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
- import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
- import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
- import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
- import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
- import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
- import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
- import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
- import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
- import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
- import org.apache.hadoop.hive.serde2.io.DoubleWritable;
- import org.apache.hadoop.util.StringUtils;
- public class GenericUdafMemberLevel2 extends AbstractGenericUDAFResolver {
- private static final Log LOG = LogFactory
- .getLog(GenericUdafMemberLevel2.class.getName());
- @Override
- public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
- throws SemanticException {
- return new GenericUdafMeberLevelEvaluator();
- }
- public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator {
- private PrimitiveObjectInspector inputOI;
- private PrimitiveObjectInspector inputOI2;
- private PrimitiveObjectInspector outputOI;
- private DoubleWritable result;
- @Override
- public ObjectInspector init(Mode m, ObjectInspector[] parameters)
- throws HiveException {
- super.init(m, parameters);
- //init input
- if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有
- LOG.info(" Mode:"+m.toString()+" result has init");
- inputOI = (PrimitiveObjectInspector) parameters[0];
- inputOI2 = (PrimitiveObjectInspector) parameters[1];
- // result = new DoubleWritable(0);
- }
- //init output
- if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
- outputOI = (PrimitiveObjectInspector) parameters[0];
- result = new DoubleWritable(0);
- return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
- }else{
- result = new DoubleWritable(0);
- return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
- }
- }
- /** class for storing count value. */
- static class SumAgg implements AggregationBuffer {
- boolean empty;
- double value;
- }
- @Override
- //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。
- //使用buffer对象前,先进行内存的清空——reset
- public AggregationBuffer getNewAggregationBuffer() throws HiveException {
- SumAgg buffer = new SumAgg();
- reset(buffer);
- return buffer;
- }
- @Override
- //重置为0
- //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。
- public void reset(AggregationBuffer agg) throws HiveException {
- ((SumAgg) agg).value = 0.0;
- ((SumAgg) agg).empty = true;
- }
- private boolean warned = false;
- //迭代
- //只要把保存当前和的对象agg,再加上输入的参数,就可以了。
- @Override
- public void iterate(AggregationBuffer agg, Object[] parameters)
- throws HiveException {
- // parameters == null means the input table/split is empty
- if (parameters == null) {
- return;
- }
- try {
- double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2);
- if(flag > 1.0) //参数条件
- merge(agg, parameters[0]); //这里将迭代数据放入combiner进行合并
- } catch (NumberFormatException e) {
- if (!warned) {
- warned = true;
- LOG.warn(getClass().getSimpleName() + " "
- + StringUtils.stringifyException(e));
- }
- }
- }
- @Override
- //这里的操作就是具体的聚合操作。
- public void merge(AggregationBuffer agg, Object partial) {
- if (partial != null) {
- // 通过ObejctInspector取每一个字段的数据
- if (inputOI != null) {
- double p = PrimitiveObjectInspectorUtils.getDouble(partial,
- inputOI);
- LOG.info("add up 1:" + p);
- ((SumAgg) agg).value += p;
- } else {
- double p = PrimitiveObjectInspectorUtils.getDouble(partial,
- outputOI);
- LOG.info("add up 2:" + p);
- ((SumAgg) agg).value += p;
- }
- }
- }
- @Override
- public Object terminatePartial(AggregationBuffer agg) {
- return terminate(agg);
- }
- @Override
- public Object terminate(AggregationBuffer agg){
- SumAgg myagg = (SumAgg) agg;
- result.set(myagg.value);
- return result;
- }
- }
- }
在使用Hive的UDAF,需要使用ADD JAR语句,将UDAF方程上传到Hadoop Distributed Cache,让每一个DataNode都能共享到这个jar包。
然后才进行调用
- hive> add jar /home/daxingyu930/test_sum.jar;
- hive> drop temporary function sum_test;
- hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel';
- hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel2';
- hive> select sum_test(height,2.0) from student_height;
附录:关于UDAF流程介绍
init 当实例化UDAF evaluator的时候执行。
getNewAggregationBuffer 返回一个对象用来保存临时的聚合结果集。
iterate 将一条新的数据处理放到聚合内存块中(aggregation buffer)
terminateParital 返回现有的聚合好的一个持久化的路径,相当于数据对象。这些数据可以通过Hive的数据类型可来访问,这个数据对象可以被Java理解,如Integer,String,或者是Array,Map这种。
相当于第二次MapReduce的map阶段。
merge 将partital数据(分区汇总的数据),于terminateParital数据融合在一起
terminate 返回一个最终的数据聚合结果,是一个结果,或者是一个结果集。
在init阶段,hive会自动检测最终生成的object inspector。
并获取使用聚合函数所处的mode。
iterate和 terminalPartial 都是在map阶段
而terminate和merge 都是在reduce阶段。
merge则用来聚合结果集
注意,无论使用UDF和UDAF,尽可能少地使用new关键字,可以使用静态类。
这样可以减少JVM的GC操作,提高效率。
相关推荐
hive udaf 实现按位取与或 hive udaf 实现按位取与或 hive udaf 实现按位取与或
A custom UDAF to group oncatenates all arguments from different rows into a single string.
hive入门级详解,包括数仓与传统数据库的比对,hive的存储结构与存储原理,分区分桶、hql如何转换成mapreduce、UDF自定义函数等
Hive从入门到精通,适合想学hive的小白看
Hive编程指南+HIVE从入门到精通+Hive高级编程+Apache Oozie
Hive入门经典教程 Apache Hive 淘宝团队验证
Hive 入门级编程实例详解。涵盖了各类基础函数使用要点以及 Java 编写 Hive 函数等。
个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...
Hive入门与实战 PDF
Hive入门与实战
Hive开发规范及要点,hql 开发基础知识,规范
1、MapReduce实现基本SQL操作的原理 1.1 Join的实现原理 1.2 Group By的实现原理 ...2.5 Phase5 OperatorTree生成MapReduce Job的过程 2.5.1 对输出表生成MoveTask .......... 2.5.9 OperatorTree生成MapReduceTask全貌
hive常用的开发规范 hdfs hbase udf函数 hql shell脚本开发等常用规范,仅供参考
Hive从入门到精通需要软件及配置
hive是基于Hadoop的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。hive数据仓库工具能将结构化的数据文件映射为一张数据库表,并提供SQL查询...
hive hadoop 开发手册
HIVE安装及详解,及测试,该文档版本是1.2+的。。。。。
hive编程入门课程
hive 下dual表,Lock,explain, 数据类型,开发常见的问题
hive入门,介绍hive相关简单知识和例子。