最近有个需求,需要把一个用户的应用使用情况组装成一个GSON字符串,通过UDAF实现了这一功能。具体来说:一张表如下:
meid | app | usecnt | usetime |
meid1 | com.yulong.x | 1 | 2 |
meid1 | com.baidu.x | 2 | 5 |
meid2 | com.tencent.x | 3 | 8 |
最终要把同一个用户的应用使用情况做成json串,比如结果中的一条数据如下:
{"AppUsageStat": [ { "apName": "cn.kuwo.player", "frequency": 9, "duration": 312237 }, { "apName": "com.android.gallery3d", "frequency": 3, "duration": 70737 } ] }
具体代码如下:
package com.yulong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;
public class MakeCountAndTimeGsonResolver extends AbstractGenericUDAFResolver{
static final Log LOG = LogFactory.getLog( MakeCountAndTimeGsonResolver.class.getName() );
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException {
if( params.length != 3 ){
throw new UDFArgumentTypeException(params.length-1,"Exactly 3 args required,supplied " + params.length );
}
for( int i = 0 ; i<3 ; i ++ ){
if( params[i].getCategory() != ObjectInspector.Category.PRIMITIVE ){
throw new UDFArgumentTypeException(i,"Only primitive type arguments are accepted but "
+ params[i].getTypeName() + " is passed.");
}
}
return new MakeCountAndTimeGsonEvaluator();
}
public static class MakeCountAndTimeGsonEvaluator extends GenericUDAFEvaluator{
private Text gsonRs;
//3个原始数据行中传入的参数对应ObjectInspector
private PrimitiveObjectInspector inputOI1;
private PrimitiveObjectInspector inputOI2;
private PrimitiveObjectInspector inputOI3;
//combiner或reducer输入的部分结果对应的ObjectInspector,在Merge方法中用到
private PrimitiveObjectInspector outputOI;
//存放结果的类,实现AggregationBuffer接口
public static class GsonAgg implements AggregationBuffer{
String rs;
boolean empty;
}
void resetAgg(GsonAgg result){
result.empty = true;
result.rs = "\"AppUsageStat\":[]";
}
//这个方法返回了UDAF的返回类型,这里确定了MakeCountAndTimeGsonEvaluator自定义函数的返回类型是String类型
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException {
assert ( parameters.length == 3 ) ;
super.init(m, parameters);
gsonRs = new Text( "{\"AppUsageStat:\"[]}" );
//每个阶段都会执行init,不同阶段对应的parameters是不一样的,在map阶段parameters代表的是sql语句中每个udaf对应参数的ObjectInspector,而在combiner或者reducer中parameters代表部分聚合结果对应的ObjectInspector。所以要区分对待。从iterate和merge的参数类型(一个数组类型,一个是object)就能看出来。因此在iterate和merge中分别使用inputOI1/2/3和outputOI 提取对应数据
if( m == Mode.PARTIAL1 || m == Mode.COMPLETE ){
inputOI1 = ( PrimitiveObjectInspector ) parameters[0];
inputOI2 = ( PrimitiveObjectInspector ) parameters[1];
inputOI3 = ( PrimitiveObjectInspector ) parameters[2];
}else if( m == Mode.PARTIAL2 || m == Mode.FINAL ){
outputOI = ( PrimitiveObjectInspector ) parameters[0];
}
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
//创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的结果
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
GsonAgg result = new GsonAgg();
reset(result);
return result;
}
//map阶段调用,只要把保存当前的对象agg,再加上输入的参数,就可以了。
@Override
public void iterate(AggregationBuffer result, Object[] parts)
throws HiveException {
if( parts != null ){
GsonAgg tmpResult = (GsonAgg)result;
String pkg = PrimitiveObjectInspectorUtils.getString( parts[0] ,inputOI1 );
Long count = PrimitiveObjectInspectorUtils.getLong( parts[1] , inputOI2 );
Long time = PrimitiveObjectInspectorUtils.getLong( parts[2] , inputOI3 );
String partialGson = "{\"apName\":\""+pkg+"\",\"frequency\":"+count+",\"duration\":"+time+"}";
int len = tmpResult.rs.length();
if( tmpResult.empty ){
tmpResult.empty = false;
tmpResult.rs = tmpResult.rs.substring(0,len-1 )+partialGson+"]";
}else{
tmpResult.rs = tmpResult.rs.substring(0,len-1 )+","+partialGson+"]";
}
}
}
//combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
@Override
public void merge(AggregationBuffer result, Object partial)
throws HiveException {
String partialGson = PrimitiveObjectInspectorUtils.getString( partial , outputOI );
String partialUsage = partialGson.substring(16 , partialGson.length()-1 );
GsonAgg tmpResult = (GsonAgg)result;
int len = tmpResult.rs.length();
if( tmpResult.empty ){
tmpResult.empty = false;
tmpResult.rs = tmpResult.rs.substring(0,len-2 )+partialUsage+"]";
}else{
tmpResult.rs = tmpResult.rs.substring(0,len-2 )+","+partialUsage+"]";
}
}
@Override
public void reset(AggregationBuffer arg0) throws HiveException {
GsonAgg result = (GsonAgg)arg0;
result.empty = true;
result.rs = "\"AppUsageStat\":[]";
}
//reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
@Override
public Object terminate(AggregationBuffer result) throws HiveException {
if( result != null ){
gsonRs.set( ((GsonAgg)result).rs );
return gsonRs;
}
return null;
}
//mapper结束要返回的结果,还有combiner结束返回的结果
@Override
public Object terminatePartial(AggregationBuffer result)
throws HiveException {
terminate(result);
return null;
}
}
}
相关推荐
hive udaf 实现按位取与或 hive udaf 实现按位取与或 hive udaf 实现按位取与或
A custom UDAF to group oncatenates all arguments from different rows into a single string.
06.hive中的json解析函数--json-tuple.mp4
部分普通sql查询在hive中的实现方式详细说明;
hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema最新源代码hive-json-schema...
ascii码 与 字符串 相互转化 ascii码 与 字符串 相互转化 ascii码 与 字符串 相互转化
aaa,bbb,ccc n=2时 截取结果 bbb 很明白了吧 哈
hive解析json时所需jar包。具体使用: add jar ../../../target/json-serde-1.3-jar-with-dependencies.jar; CREATE TABLE json_nested_test ( country string, languages array, religions map,array<int>>) ...
判断字符串是否包含emoji表情
impala的substr()和substring()函数是不支持中文的,创建一个udf解决impala sql中substr()函数截取中文字符串乱码的问题
如有需要SQL Server数据库驱动及连接字符串的哥们儿,请不要错过,绝对无误!!!
标题:按某字段合并字符串之一(简单合并) 描述:将如下形式的数据按id字段合并value字段。...1、sql2000中只能用自定义的函数解决 create table tb(id int, value varchar(10)) insert into tb values(1,
hive-json-serde-0.2.jar
亲测可用,mysql字符串相似度匹配函数。下载后直接在mysql中可以测试运行。
个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...
Hive原理与实现 详细介绍了hive的原理
根据指定字节数截取字符串,当指定处为中文第一个字节时少截取一个字符,当长度大于指定截取长度时,截取后在字符串末尾追加指定字符串 * 这里添加了字符串的编码,因为页面的编码不同,字符串所占字节
实现16进制串转换为字符串形式功能.欢迎指导
自定义 hive udf udaf 有url解析,获取网站主域名,根据ip获取区域码,有rownum,列聚合以及一些业务实现udf。
strstr()函数搜索一个字符串在另一个字符串中的第一次出现。 该函数返回字符串的其余部分(从匹配点)。如果未找到所搜索的字符串,则返回 false。 语法复制代码 代码如下:strstr(string,search)输出结果”@exe.com...