- 浏览: 108656 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
土豆蛋儿:
我想读取一个外部文件,以什么方式好了? 文件内容经常编辑
flume 自定义source -
土豆蛋儿:
大神,您好。
flume 自定义source
我们Hadoop集群中将近百分之80的作业是通过Hive来提交的,由于Hive写起来简单便捷,而且我们又提供了Hive Web Client,所以使用范围很广,包括ba,pm,po,sales都在使用hive进行ad-hoc查询,但是hive在降低用户使用门槛的同时,也使得用户经常写不合理开销很大的语句,生成了很多的mapreduce job,占用了大量slot数,其中最典型的例子就是分区表查询,不指定分区条件,导致hive没有做partition pruner优化,进而读入了所有的表数据,占用大量IO和计算资源。
为了尽可能规避这种情况,我们可以利用了hive的hook机制,在hook中实现一些方法来对语句做预判,第一期先不会直接block住语句,而是记录有问题的语句来公告警示.
具体做法是实现HiveSemanticAnalyzerHook接口,preAnalyze方法和postAnalyze方法会分别在compile函数之前和之后执行,我们只要实现preAnalyze方法,遍历传进来的ASTNode抽象语法树,获取左子树的From表名和右子树的where判断条件key值,如果该From表是分区表的话,会通过metastore client获取它的所有分区key名字,用户指定的where条件中只要出现任何一个分区key,则此语句通过检测,否则会在标准错误中输出一条warning,并且在后台log中记录用户名和执行语句,每隔一段时间会将这些bad case在hive-user组邮箱进行公示,希望能通过这种方式来起到相互警示和学习的效果.
compile函数中根据hiveconf中指定的hive.semantic.analyzer.hook来反射实例化hook类,此处为实现AbstractSemanticAnalyzerHook的DPSemanticAnalyzerHook
package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.shims.ShimLoader;
public class DPSemanticAnalyzerHook extends AbstractSemanticAnalyzerHook {
private final static String NO_PARTITION_WARNING = "WARNING: HQL is not efficient, Please specify partition condition! HQL:%s ;USERNAME:%s";
private final SessionState ss = SessionState.get();
private final LogHelper console = SessionState.getConsole();
private Hive hive = null;
private String username;
private String currentDatabase = "default";
private String hql;
private String whereHql;
private String tableAlias;
private String tableName;
private String tableDatabaseName;
private Boolean needCheckPartition = false;
@Override
public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
throws SemanticException {
try {
hql = ss.getCmd().toLowerCase();
hql = StringUtils.replaceChars(hql, '\n', ' ');
if (hql.contains("where")) {
whereHql = hql.substring(hql.indexOf("where"));
}
username = ShimLoader.getHadoopShims().getUserName(context.getConf());
if (ast.getToken().getType() == HiveParser.TOK_QUERY) {
try {
hive = context.getHive();
currentDatabase = hive.getCurrentDatabase();
} catch (HiveException e) {
throw new SemanticException(e);
}
extractFromClause((ASTNode) ast.getChild(0));
if (needCheckPartition && !StringUtils.isBlank(tableName)) {
String dbname = StringUtils.isEmpty(tableDatabaseName) ? currentDatabase
: tableDatabaseName;
String tbname = tableName;
String[] parts = tableName.split(".");
if (parts.length == 2) {
dbname = parts[0];
tbname = parts[1];
}
Table t = hive.getTable(dbname, tbname);
if (t.isPartitioned()) {
if (StringUtils.isBlank(whereHql)) {
console.printError(String.format(NO_PARTITION_WARNING, hql, username));
} else {
List<FieldSchema> partitionKeys = t.getPartitionKeys();
List<String> partitionNames = new ArrayList<String>();
for (int i = 0; i < partitionKeys.size(); i++) {
partitionNames.add(partitionKeys.get(i).getName().toLowerCase());
}
if (!containsPartCond(partitionNames, whereHql, tableAlias)) {
console.printError(String.format(NO_PARTITION_WARNING, hql, username));
}
}
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
return ast;
}
private boolean containsPartCond(List<String> partitionKeys, String sql, String alias) {
for (String pk : partitionKeys) {
if (sql.contains(pk)) {
return true;
}
if (!StringUtils.isEmpty(alias) && sql.contains(alias + "." + pk)) {
return true;
}
}
return false;
}
private void extractFromClause(ASTNode ast) {
if (HiveParser.TOK_FROM == ast.getToken().getType()) {
ASTNode refNode = (ASTNode) ast.getChild(0);
if (refNode.getToken().getType() == HiveParser.TOK_TABREF && ast.getChildCount() == 1) {
ASTNode tabNameNode = (ASTNode) (refNode.getChild(0));
int refNodeChildCount = refNode.getChildCount();
if (tabNameNode.getToken().getType() == HiveParser.TOK_TABNAME) {
if (tabNameNode.getChildCount() == 2) {
tableDatabaseName = tabNameNode.getChild(0).getText().toLowerCase();
tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(1))
.toLowerCase();
} else if (tabNameNode.getChildCount() == 1) {
tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(0))
.toLowerCase();
} else {
return;
}
if (refNodeChildCount == 2) {
tableAlias = BaseSemanticAnalyzer.unescapeIdentifier(refNode.getChild(1).getText())
.toLowerCase();
}
needCheckPartition = true;
}
}
}
}
@Override
public void postAnalyze(HiveSemanticAnalyzerHookContext context,
List<Task<? extends Serializable>> rootTasks) throws SemanticException {
// LogHelper console = SessionState.getConsole();
// Set<ReadEntity> readEntitys = context.getInputs();
// console.printInfo("Total Read Entity Size:" + readEntitys.size());
// for (ReadEntity readEntity : readEntitys) {
// Partition p = readEntity.getPartition();
// Table t = readEntity.getTable();
// }
}
}
为了尽可能规避这种情况,我们可以利用了hive的hook机制,在hook中实现一些方法来对语句做预判,第一期先不会直接block住语句,而是记录有问题的语句来公告警示.
具体做法是实现HiveSemanticAnalyzerHook接口,preAnalyze方法和postAnalyze方法会分别在compile函数之前和之后执行,我们只要实现preAnalyze方法,遍历传进来的ASTNode抽象语法树,获取左子树的From表名和右子树的where判断条件key值,如果该From表是分区表的话,会通过metastore client获取它的所有分区key名字,用户指定的where条件中只要出现任何一个分区key,则此语句通过检测,否则会在标准错误中输出一条warning,并且在后台log中记录用户名和执行语句,每隔一段时间会将这些bad case在hive-user组邮箱进行公示,希望能通过这种方式来起到相互警示和学习的效果.
compile函数中根据hiveconf中指定的hive.semantic.analyzer.hook来反射实例化hook类,此处为实现AbstractSemanticAnalyzerHook的DPSemanticAnalyzerHook
package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.shims.ShimLoader;
public class DPSemanticAnalyzerHook extends AbstractSemanticAnalyzerHook {
private final static String NO_PARTITION_WARNING = "WARNING: HQL is not efficient, Please specify partition condition! HQL:%s ;USERNAME:%s";
private final SessionState ss = SessionState.get();
private final LogHelper console = SessionState.getConsole();
private Hive hive = null;
private String username;
private String currentDatabase = "default";
private String hql;
private String whereHql;
private String tableAlias;
private String tableName;
private String tableDatabaseName;
private Boolean needCheckPartition = false;
@Override
public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
throws SemanticException {
try {
hql = ss.getCmd().toLowerCase();
hql = StringUtils.replaceChars(hql, '\n', ' ');
if (hql.contains("where")) {
whereHql = hql.substring(hql.indexOf("where"));
}
username = ShimLoader.getHadoopShims().getUserName(context.getConf());
if (ast.getToken().getType() == HiveParser.TOK_QUERY) {
try {
hive = context.getHive();
currentDatabase = hive.getCurrentDatabase();
} catch (HiveException e) {
throw new SemanticException(e);
}
extractFromClause((ASTNode) ast.getChild(0));
if (needCheckPartition && !StringUtils.isBlank(tableName)) {
String dbname = StringUtils.isEmpty(tableDatabaseName) ? currentDatabase
: tableDatabaseName;
String tbname = tableName;
String[] parts = tableName.split(".");
if (parts.length == 2) {
dbname = parts[0];
tbname = parts[1];
}
Table t = hive.getTable(dbname, tbname);
if (t.isPartitioned()) {
if (StringUtils.isBlank(whereHql)) {
console.printError(String.format(NO_PARTITION_WARNING, hql, username));
} else {
List<FieldSchema> partitionKeys = t.getPartitionKeys();
List<String> partitionNames = new ArrayList<String>();
for (int i = 0; i < partitionKeys.size(); i++) {
partitionNames.add(partitionKeys.get(i).getName().toLowerCase());
}
if (!containsPartCond(partitionNames, whereHql, tableAlias)) {
console.printError(String.format(NO_PARTITION_WARNING, hql, username));
}
}
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
return ast;
}
private boolean containsPartCond(List<String> partitionKeys, String sql, String alias) {
for (String pk : partitionKeys) {
if (sql.contains(pk)) {
return true;
}
if (!StringUtils.isEmpty(alias) && sql.contains(alias + "." + pk)) {
return true;
}
}
return false;
}
private void extractFromClause(ASTNode ast) {
if (HiveParser.TOK_FROM == ast.getToken().getType()) {
ASTNode refNode = (ASTNode) ast.getChild(0);
if (refNode.getToken().getType() == HiveParser.TOK_TABREF && ast.getChildCount() == 1) {
ASTNode tabNameNode = (ASTNode) (refNode.getChild(0));
int refNodeChildCount = refNode.getChildCount();
if (tabNameNode.getToken().getType() == HiveParser.TOK_TABNAME) {
if (tabNameNode.getChildCount() == 2) {
tableDatabaseName = tabNameNode.getChild(0).getText().toLowerCase();
tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(1))
.toLowerCase();
} else if (tabNameNode.getChildCount() == 1) {
tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(0))
.toLowerCase();
} else {
return;
}
if (refNodeChildCount == 2) {
tableAlias = BaseSemanticAnalyzer.unescapeIdentifier(refNode.getChild(1).getText())
.toLowerCase();
}
needCheckPartition = true;
}
}
}
}
@Override
public void postAnalyze(HiveSemanticAnalyzerHookContext context,
List<Task<? extends Serializable>> rootTasks) throws SemanticException {
// LogHelper console = SessionState.getConsole();
// Set<ReadEntity> readEntitys = context.getInputs();
// console.printInfo("Total Read Entity Size:" + readEntitys.size());
// for (ReadEntity readEntity : readEntitys) {
// Partition p = readEntity.getPartition();
// Table t = readEntity.getTable();
// }
}
}
发表评论
-
hive + hbase
2015-01-04 10:42 733环境配置: hadoop-2.0.0-cdh4.3.0 (4 ... -
hive 数据倾斜
2014-08-27 09:03 643链接:http://www.alidata.org/archi ... -
hive 分通总结
2014-08-27 08:42 541总结分析: 1. 定义了桶,但要生成桶的数据,只能是由其他表 ... -
深入了解Hive Index具体实现
2014-08-25 08:51 702索引是标准的数据库技术,hive 0.7版本之后支持索引。hi ... -
explain hive index
2014-08-24 16:44 1119设置索引: 使用聚合索引优化groupby操作 hive> ... -
Hive 中内部表与外部表的区别与创建方法
2014-08-15 17:11 724分类: Hive 2013-12-07 11:56 ... -
hive map和reduce的控制
2014-08-15 16:14 596一、 控制hive任务中的map数: 1. 通 ... -
hive 压缩策略
2014-08-15 15:16 1727Hive使用的是Hadoop的文件 ... -
hive 在mysql中创建备用数据库
2014-08-15 09:21 839修改hive-site.xml <property> ... -
HIVE 窗口及分析函数
2014-08-11 16:21 1152HIVE 窗口及分析函数 使 ... -
hive 内置函数
2014-08-11 09:06 30261.sort_array(): sort_array(arra ... -
hive lateral view
2014-08-09 14:59 1988通过Lateral view可以方便的将UDTF得到的行转列的 ... -
hive数据的导出
2014-07-28 21:53 419在本博客的《Hive几种数据导入方式》文章中,谈到了Hive中 ... -
hive udaf
2014-07-25 16:11 715package com.lwz.udaf; import o ... -
hive自定义InputFormat
2014-07-25 09:13 819自定义分隔符 package com.lwz.inputf; ... -
HiveServer2连接ZooKeeper出现Too many connections问题的解决
2014-07-24 08:49 1690HiveServer2连接ZooKeeper出现Too man ... -
hive 常用命令
2014-07-17 22:22 6391.hive通过外部设置参数传入脚本中: hiv ... -
CouderaHadoop中hive的Hook扩展
2014-07-16 21:18 3261最近在做关于CDH4.3.0的hive封装,其中遇到了很多问题 ... -
hive 的常用命令
2014-07-16 10:07 0设置、查看hive当前的角色: set sys ... -
hive 授权
2014-07-15 10:51 898Hive授权(Security配置) 博客分类: Hive分 ...
相关推荐
Hive表分区,里面有比较详细的Hive表分区方法,希望能够有所帮助。
hive表修改分区数据
01.hive查询语法--基本查询--条件查询--关联查询.mp4
大数据MR原理启动hive设置队列,对已经存在hdfs的有分区有表结构和数据信息的表,进行查询表.查询表分区,查询表结构的完整流程如下.学无长幼.
其实是因为Hive存放的数据是没有索引的,如果没有建立分区直接查询,Hive就会暴力查询,效率很低,所以通过分区能很好提高Hive的查询效率。分区还能够更加方便的管理一些特殊数据,例如一些日志数据,可以是一个天一...
hive双分区外部表复合数据结构博客的数据资料,欢迎下载。
1.通过java查hive hive查询 2.简单查询及jar包
BLOG_如何将一个普通表转换为分区表.pdfBLOG_如何将一个普通表转换为分区表.pdf
目录 1 Hive 概念与连接使用: 2 ...5.1 Hive 添加分区 4 5.2 Hive 删除分区 5 6 SHOW语句 5 7 DESCRIBE语句 5 8 加载数据 5 9表连接 6 10 子查询 6 11 UNION ALL 6 12 Hive使用注意点: 6 13 Hive优化 9
部分普通sql查询在hive中的实现方式详细说明;
利用Hive进行复杂用户行为大数据分析及优化案例(全套视频+课件+代码+讲义+工具软件),具体内容包括: 01_自动批量加载数据到hive 02_Hive表批量加载数据的脚本实现(一) 03_Hive表批量加载数据的脚本实现(二) ...
详细描述了hive分桶表,分区表的创建,附带详细建表语句,包含一级静态分区,二级静态分区,一级动态分区,二级动态分区,分区的查询,删除,添加,数据的导入
hive时间按月份加减UDF,http://blog.csdn.net/xiaowenk/article/details/54290354
hive权限,通过自定义jar对hive的10000端口进行权限管控,直接放入到hive所在的lib环境下,然后对xml文件进行相应的配置
hive数据加载的几种方式、数据的导出、数据简单查询
一、查询语法 查询语句语法: [WITH CommonTable[removed], CommonTableExpression)*] Only available starting with Hive 0.13.0) SELECT [ALL | DISTINCT] select_expr, select_expr, ... FROM table_reference ...
hive2.1.1orc格式读取报数组越界错误,替换jar包。hive-exec-2.1.1-cdh6.3.2.jar、hive-orc-2.1.1-cdh6.3.2.jar。分发各个服务器即可。
hive hive hive hive hive hive hive hive hive hive hive hive
NULL 博文链接:https://jonas-wang.iteye.com/blog/1927709
02.hive查询语法--分组聚合--groupby查询--where过滤和having过滤的区别.mp4