`
zhangxiong0301
  • 浏览: 351099 次
社区版块
存档分类
最新评论

用HIVE中的UDAF实现JSON字符串组装

    博客分类:
  • HIVE
阅读更多

 

      最近有个需求,需要把一个用户的应用使用情况组装成一个GSON字符串,通过UDAF实现了这一功能。具体来说:一张表如下:

meid app usecnt usetime
meid1 com.yulong.x 1 2
meid1 com.baidu.x 2 5
meid2 com.tencent.x 3 8

最终要把同一个用户的应用使用情况做成json串,比如结果中的一条数据如下:

 

{"AppUsageStat": [
        {
            "apName": "cn.kuwo.player",
            "frequency": 9,
            "duration": 312237
        },
        {
            "apName": "com.android.gallery3d",
            "frequency": 3,
            "duration": 70737
        }
    ]
}

 

 

具体代码如下:

package com.yulong;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;



public class MakeCountAndTimeGsonResolver extends AbstractGenericUDAFResolver{
	
	static final Log LOG = LogFactory.getLog( MakeCountAndTimeGsonResolver.class.getName() );

	@Override
	public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException {
		
		if( params.length != 3 ){
			throw new UDFArgumentTypeException(params.length-1,"Exactly 3 args required,supplied " + params.length );
		}
		
		for( int i = 0 ; i<3 ; i ++  ){
			if( params[i].getCategory() != ObjectInspector.Category.PRIMITIVE ){
				throw new UDFArgumentTypeException(i,"Only primitive type arguments are accepted but "
						+ params[i].getTypeName() + " is passed.");
			}
		}
		return new MakeCountAndTimeGsonEvaluator();
	}


	
	public static class MakeCountAndTimeGsonEvaluator extends GenericUDAFEvaluator{

		private Text gsonRs;
		//3个原始数据行中传入的参数对应ObjectInspector
		private PrimitiveObjectInspector inputOI1;
		private PrimitiveObjectInspector inputOI2;
		private PrimitiveObjectInspector inputOI3;
		//combiner或reducer输入的部分结果对应的ObjectInspector,在Merge方法中用到
		private PrimitiveObjectInspector outputOI;
		
		//存放结果的类,实现AggregationBuffer接口
		public static class GsonAgg implements AggregationBuffer{
			String rs;
			boolean empty;
		}
		
		void resetAgg(GsonAgg result){
			result.empty = true;
			result.rs = "\"AppUsageStat\":[]";
		}
		
		//这个方法返回了UDAF的返回类型,这里确定了MakeCountAndTimeGsonEvaluator自定义函数的返回类型是String类型
		@Override
		public ObjectInspector init(Mode m, ObjectInspector[] parameters)
				throws HiveException {
			
			assert ( parameters.length == 3 ) ;
			super.init(m, parameters);
			gsonRs = new Text( "{\"AppUsageStat:\"[]}" );



//每个阶段都会执行init,不同阶段对应的parameters是不一样的,在map阶段parameters代表的是sql语句中每个udaf对应参数的ObjectInspector,而在combiner或者reducer中parameters代表部分聚合结果对应的ObjectInspector。所以要区分对待。从iterate和merge的参数类型(一个数组类型,一个是object)就能看出来。因此在iterate和merge中分别使用inputOI1/2/3和outputOI 提取对应数据



			if( m == Mode.PARTIAL1 || m == Mode.COMPLETE ){
				inputOI1 = ( PrimitiveObjectInspector ) parameters[0];
				inputOI2 = ( PrimitiveObjectInspector ) parameters[1];
				inputOI3 = ( PrimitiveObjectInspector ) parameters[2];
			}else if( m == Mode.PARTIAL2 || m == Mode.FINAL ){
				outputOI = ( PrimitiveObjectInspector ) parameters[0];
			}
			
			return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
		}

		//创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的结果
		@Override
		public AggregationBuffer getNewAggregationBuffer() throws HiveException {
			
			GsonAgg result = new GsonAgg();
			reset(result);
			return result;
		}

		//map阶段调用,只要把保存当前的对象agg,再加上输入的参数,就可以了。
		@Override
		public void iterate(AggregationBuffer result, Object[] parts)
				throws HiveException {
			
			if( parts != null ){
				GsonAgg tmpResult = (GsonAgg)result;
				String pkg = PrimitiveObjectInspectorUtils.getString( parts[0] ,inputOI1 );
				Long count = PrimitiveObjectInspectorUtils.getLong( parts[1] ,  inputOI2 );
				Long time = PrimitiveObjectInspectorUtils.getLong( parts[2] ,  inputOI3 );
				String partialGson = "{\"apName\":\""+pkg+"\",\"frequency\":"+count+",\"duration\":"+time+"}";
				int len = tmpResult.rs.length();
				if( tmpResult.empty ){
					tmpResult.empty = false;
					tmpResult.rs = tmpResult.rs.substring(0,len-1 )+partialGson+"]";
				}else{
					tmpResult.rs = tmpResult.rs.substring(0,len-1 )+","+partialGson+"]";
				}
			}
		}

		//combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
		@Override
		public void merge(AggregationBuffer result, Object partial)
				throws HiveException {
			
			String partialGson = PrimitiveObjectInspectorUtils.getString( partial ,  outputOI ); 
			String partialUsage = partialGson.substring(16 ,  partialGson.length()-1 );
			
			GsonAgg tmpResult = (GsonAgg)result;
			int len = tmpResult.rs.length();
			if( tmpResult.empty ){
				tmpResult.empty = false;
				tmpResult.rs = tmpResult.rs.substring(0,len-2 )+partialUsage+"]";
			}else{
				tmpResult.rs = tmpResult.rs.substring(0,len-2 )+","+partialUsage+"]";
			}
		}

		
		
		@Override
		public void reset(AggregationBuffer arg0) throws HiveException {
			
			GsonAgg  result = (GsonAgg)arg0;
			result.empty = true;
			result.rs = "\"AppUsageStat\":[]";
			
		}

		//reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
		@Override
		public Object terminate(AggregationBuffer result) throws HiveException {
			
			if( result != null ){
				gsonRs.set( ((GsonAgg)result).rs );
				return gsonRs;
			}
			return null;
		}

		//mapper结束要返回的结果,还有combiner结束返回的结果
		@Override
		public Object terminatePartial(AggregationBuffer result)
				throws HiveException {
			terminate(result);
			return null;
		}
		
	}
	
	
}

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics