package com.csm.data.udf.hive;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;
/**
* 根据
* copr_idAcont_idA
* copr_idAcont_idB
* copr_idAcont_idC
*
* 得到
* copr_idAcont_idB
*
* @author hadoop_szty
*
*/
public class UDAFGetCont extends AbstractGenericUDAFResolver {
/**
* 验证参数是否正确
*/
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
//只允许一个参数传入
if (parameters.length != 1) {
throw new UDFArgumentTypeException(parameters.length - 1, "Exactly one argument is expected.");
}
//这里只能传入原始的数据类型,其他的列表,数组不能处理
if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted.");
}
return new Evaluator();
}
public static class Evaluator extends GenericUDAFEvaluator {
//最终结果变量
private Text resContId;
private PrimitiveObjectInspector inputOI;
public Evaluator() {
super();
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
// TODO Auto-generated method stub
super.init(m, parameters);
resContId = new Text();
inputOI = (PrimitiveObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
static class ContAgg implements AggregationBuffer {
boolean empty;
String resCont;
}
// 返回存储临时聚合结果的AggregationBuffer对象。
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
ContAgg result = new ContAgg();
reset(result);
return result;
}
// 重置聚合结果对象,以支持mapper和reducer的重用。
@Override
public void reset(AggregationBuffer agg) throws HiveException {
ContAgg myagg = (ContAgg) agg;
myagg.empty = true;
myagg.resCont = "";
}
// 迭代处理原始数据parameters并保存到agg中。
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
// TODO Auto-generated method stub
merge(agg, parameters[0]);
}
// 以持久化的方式返回agg表示的部分聚合结果,
// 这里的持久化意味着返回值只能Java基础类型、数组、基础类型包装器、Hadoop的Writables、Lists和Maps。
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return terminate(agg);
}
// 合并由partial表示的部分聚合结果到agg中。
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
ContAgg myagg = (ContAgg) agg;
String contIdStr = PrimitiveObjectInspectorUtils.getString(partial, inputOI);
if (!contIdStr.equals("")) {
if (contIdStr.length() == 18) {
String flag = contIdStr.toString().substring(8, 10);
if (flag.equals("02") || flag.equals("03")) {
myagg.resCont = contIdStr;
myagg.empty = false;
}
} else if (contIdStr.length() == 22 && myagg.resCont.length() != 18) {
myagg.resCont = contIdStr;
myagg.empty = false;
}
}
}
}
// 返回最终结果。
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
ContAgg myagg = (ContAgg) agg;
if (myagg.empty) {
return null;
}
resContId.set(myagg.resCont);
return resContId;
}
}
}
相关推荐
hive udaf 实现按位取与或 hive udaf 实现按位取与或 hive udaf 实现按位取与或
A custom UDAF to group oncatenates all arguments from different rows into a single string.
用户Java对于hive的实例操作,帮助更好地加深对hive语句的理解
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...
java操作Hive源码之HiveJDBC实例(mysql数据库),附带所需jar包,欢迎下载学习。
大数据的hive资源的详细代码设计以及分享,望博友相互交流
JDBC连接hive,用JD连接hive。
HIVE-分桶表的详解和创建实例.docx
hive影评案例.zip
自定义 hive udf udaf 有url解析,获取网站主域名,根据ip获取区域码,有rownum,列聚合以及一些业务实现udf。
hive JDBC连接实例 maven工程
使用hive3.1.2和spark3.0.0配置hive on spark的时候,发现官方下载的hive3.1.2和spark3.0.0不兼容,hive3.1.2对应的版本是spark2.3.0,而spark3.0.0对应的hadoop版本是hadoop2.6或hadoop2.7。 所以,如果想要使用高...
windows系统下eclipse集成hadoop,spark,hive开发环境
这是一些有用的 Hive UDF 和 UDAF 的集合。 提供的功能 UDAF Mode ( de.frosner.hive.udaf.Mode ) - 计算组列的统计模式 从源头构建 git clone https://github.com/FRosner/mustached-hive-udfs.git cd mustached...
《Hive编程指南》是一本Apache Hive的编程指南 旨在介绍如何使用Hive的SQL方法 HiveQL来汇总 查询和分析存储在Hadoop分布式文件系统上的大数据集合 全书通过大量的实例 首先介绍如何在用户环境下安装和配置Hive 并对...
hive hive hive hive hive hive hive hive hive hive hive hive
Hive表生成工具,Hive表生成工具Hive表生成工具
hive inputformat实例代码,按照空格对日志文件进行拆分
《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第5章 Hive数据操作.pdf《Hive数据仓库案例教程》教学课件 第...