`
kavy
  • 浏览: 870152 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

hcatalog简介和使用

 
阅读更多

Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要hive 0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。

 

HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance

 

由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode

HiveMetaStoreClient.Java

 

[java] view plain copy
 
 print?
  1. public String getDelegationToken(String owner, String renewerKerberosPrincipalName) throws  
  2.     MetaException, TException {  
  3.   if (localMetaStore) {  
  4.     throw new UnsupportedOperationException("getDelegationToken() can be " +  
  5.         "called only in thrift (non local) mode");  
  6.   }  
  7.   return client.get_delegation_token(owner, renewerKerberosPrincipalName);  
  8. }  

 

HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表

HCatInputFormat API:

 

[java] view plain copy
 
 print?
  1. public static void setInput(Job job,  
  2.     InputJobInfo inputJobInfo) throws IOException;  

 

先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据

 

 

[java] view plain copy
 
 print?
  1. public static HCatSchema getTableSchema(JobContext context)   
  2.     throws IOException;  

在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息

 

 

HCatOutputFormat API:

 

[java] view plain copy
 
 print?
  1. public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;  

OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map<String, String>,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values

比如要要写入指定的partition(dt='2013-06-13',country='china' ),可以这样写

 

[java] view plain copy
 
 print?
  1. Map<String, String> partitionValues = new HashMap<String, String>();  
  2. partitionValues.put("dt""2013-06-13");  
  3. partitionValues.put("country""china");  
  4. HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);  
  5. HCatOutputFormat.setOutput(job, info);  

 

[java] view plain copy
 
 print?
  1. public static HCatSchema getTableSchema(JobContext context) throws IOException;  

获取之前HCatOutputFormat.setOutput指定的table schema信息

 

 

[java] view plain copy
 
 print?
  1. public static void setSchema(final Job job, final HCatSchema schema) throws IOException;  

设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息

 

 

下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中

 

[java] view plain copy
 
 print?
  1. import java.io.IOException;  
  2. import java.util.Iterator;  
  3.   
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.conf.Configured;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.WritableComparable;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.mapreduce.Mapper;  
  11. import org.apache.hadoop.mapreduce.Reducer;  
  12. import org.apache.hadoop.util.Tool;  
  13. import org.apache.hadoop.util.ToolRunner;  
  14. import org.apache.hcatalog.data.DefaultHCatRecord;  
  15. import org.apache.hcatalog.data.HCatRecord;  
  16. import org.apache.hcatalog.data.schema.HCatSchema;  
  17. import org.apache.hcatalog.mapreduce.HCatInputFormat;  
  18. import org.apache.hcatalog.mapreduce.HCatOutputFormat;  
  19. import org.apache.hcatalog.mapreduce.InputJobInfo;  
  20. import org.apache.hcatalog.mapreduce.OutputJobInfo;  
  21.   
  22. public class GroupByGuid extends Configured implements Tool {  
  23.   
  24.     @SuppressWarnings("rawtypes")  
  25.     public static class Map extends  
  26.             Mapper<WritableComparable, HCatRecord, Text, IntWritable> {  
  27.         HCatSchema schema;  
  28.         Text guid;  
  29.         IntWritable one;  
  30.   
  31.         @Override  
  32.         protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)  
  33.                 throws IOException, InterruptedException {  
  34.             guid = new Text();  
  35.             one = new IntWritable(1);  
  36.             schema = HCatInputFormat.getTableSchema(context);  
  37.         }  
  38.   
  39.         @Override  
  40.         protected void map(WritableComparable key, HCatRecord value,  
  41.                 Context context) throws IOException, InterruptedException {  
  42.             guid.set(value.getString("guid", schema));  
  43.             context.write(guid, one);  
  44.         }  
  45.     }  
  46.   
  47.     @SuppressWarnings("rawtypes")  
  48.     public static class Reduce extends  
  49.             Reducer<Text, IntWritable, WritableComparable, HCatRecord> {  
  50.         HCatSchema schema;  
  51.   
  52.         @Override  
  53.         protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)  
  54.                 throws IOException, InterruptedException {  
  55.             schema = HCatOutputFormat.getTableSchema(context);  
  56.         }  
  57.   
  58.         @Override  
  59.         protected void reduce(Text key, Iterable<IntWritable> values,  
  60.                 Context context) throws IOException, InterruptedException {  
  61.             int sum = 0;  
  62.             Iterator<IntWritable> iter = values.iterator();  
  63.             while (iter.hasNext()) {  
  64.                 sum++;  
  65.                 iter.next();  
  66.             }  
  67.             HCatRecord record = new DefaultHCatRecord(2);  
  68.             record.setString("guid", schema, key.toString());  
  69.             record.setInteger("count", schema, sum);  
  70.             context.write(null, record);  
  71.         }  
  72.     }  
  73.   
  74.     @Override  
  75.     public int run(String[] args) throws Exception {  
  76.         Configuration conf = getConf();  
  77.   
  78.         String dbname = args[0];  
  79.         String inputTable = args[1];  
  80.         String filter = args[2];  
  81.         String outputTable = args[3];  
  82.         int reduceNum = Integer.parseInt(args[4]);  
  83.   
  84.         Job job = new Job(conf,  
  85.                 "GroupByGuid, Calculating every guid's pageview");  
  86.         HCatInputFormat.setInput(job,  
  87.                 InputJobInfo.create(dbname, inputTable, filter));  
  88.   
  89.         job.setJarByClass(GroupByGuid.class);  
  90.         job.setInputFormatClass(HCatInputFormat.class);  
  91.         job.setMapperClass(Map.class);  
  92.         job.setReducerClass(Reduce.class);  
  93.         job.setMapOutputKeyClass(Text.class);  
  94.         job.setMapOutputValueClass(IntWritable.class);  
  95.         job.setOutputKeyClass(WritableComparable.class);  
  96.         job.setOutputValueClass(DefaultHCatRecord.class);  
  97.         job.setNumReduceTasks(reduceNum);  
  98.   
  99.         HCatOutputFormat.setOutput(job,  
  100.                 OutputJobInfo.create(dbname, outputTable, null));  
  101.         HCatSchema s = HCatOutputFormat.getTableSchema(job);  
  102.         HCatOutputFormat.setSchema(job, s);  
  103.   
  104.         job.setOutputFormatClass(HCatOutputFormat.class);  
  105.   
  106.         return (job.waitForCompletion(true) ? 0 : 1);  
  107.     }  
  108.   
  109.     public static void main(String[] args) throws Exception {  
  110.         int exitCode = ToolRunner.run(new GroupByGuid(), args);  
  111.         System.exit(exitCode);  
  112.     }  
  113. }  

 

其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了

 

本文链接http://blog.csdn.net/lalaguozhe/article/details/9083905,转载请注明

 

分享到:
评论

相关推荐

    hive-hcatalog-core-1.2.1.jar

    hive-hcatalog-core-1.2.1.jarhive-hcatalog-core-1.2.1.jarhive-hcatalog-core-1.2.1.jar

    introduction of hcatalog

    ntroduction of hcatalog

    Pig.Hive.Hcatalog.分享

    Pig Hive 对比分享, Pig HCatalog 元数据组合使用

    HCatalog安装配置学习笔记

    NULL 博文链接:https://springsfeng.iteye.com/blog/1734557

    hcatalog-examples:使用hcatalog读写表的示例代码

    hcatalog-例子使用hcatalog读写表的示例代码用法见

    Hcatalog Hadoop

    从由 Apache 许可授权的 HCatalog 是面向 Apache Hadoop 框架 ...数据存储的位置、数据所采用的格式以及需要使用何种工具将 数据转换为此格式等等。Hcatalog 使得系统内外的用户都可使 用 Apache Hadoop 框架中的数据。

    hcatalog-0.5.0

    hcatalog is a metadata and table management system for hadoop, it's very useful

    hive Hcatalog streaming API使用

    hive streaming 需要配合hive 事务表使用,表的数据存储格式式必须为 orc 在 hive-site.xml 中设置如下参数以支持hive事务表hive.txn.manager =org.apache.hadoop.hive.ql.lockmgr.DbTxnManager hive.compactor....

    hive-hcatalog-core-1.1.0-cdh5.14.4.jar

    org.apache.hadoop.security.AccessControlException: Permission denied.

    Hadoop安装学习-入门教程

    Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, ...

    Hive-HCatalog-Compatibility

    HCatalog 存储处理程序向后兼容 Hive 0.9.0-0.13.0

    运行Sqoop出现hcatalog does not exist!…accumulo does not exist!解决方案

    Warning: /opt/module/sqoop/bin/…/…/hcatalog does not exist! HCatalog jobs will fail. Please set $HCAT_HOME to the root of your HCatalog installation. Warning: /opt/module/sqoop/bin/…/…/accumulo ...

    Hadoop、HBase、Hive、Pig、Zookeeper资料整理

    分享一下Hadoop、HBase、Hive、Pig、Zookeeper相关资料。

    Hadoop_HBase_Pig

    Hadoop_HBase_Pig

    使用Ambari搭建Hadoop集群

    Apache Ambari是一种基于Web的工具,支持Apache Hadoop集群的供应、管理和监控。Ambari已支持大多数Hadoop组件,包括HDFS、MapReduce、Hive、Pig、 Hbase、Zookeeper、Sqoop和Hcatalog等。

    Hive编程指南中文版

    Hive编程指南中文版 2013年12月 第1版 第1章 基础知识 第2章 基础操作 第3章 数据类型和文件格式 第4章 HiveQL:数据定义 第5章 HiveQL:数据操作 第6章 HiveQL:查询 ...第22章 HCatalog 第23章 案例研究

    hdp-ambari

    Ambari已支持大多数Hadoop组件,包括HDFS、MapReduce、Hive、Pig、 Hbase、Zookeper、Sqoop和Hcatalog等。 Apache Ambari 支持HDFS、MapReduce、Hive、Pig、Hbase、Zookeper、Sqoop和Hcatalog等的集中管理。也是5个...

    hive-3.1.1安装包

    它不仅可以存储大量的数据而且可以对存储的数据进行分析,但它有个缺点就是不能实时的更新数据,无法直接修改和删除数据,如果想要修改数据需要先把数据所在的文件下载下来,修改完之后再上传上去。 Hive也不是...

    hive连接所需jar包

    hive连接所需jar包,连接大数据hive所必须jar包,很全,亲测可用

Global site tag (gtag.js) - Google Analytics