- 浏览: 344438 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
lvyuan1234:
你好,你那个sample.txt文件可以分享给我吗
hive insert overwrite into -
107x:
不错,谢谢!
hive 表的一些默认值 -
on_way_:
赞
Hadoop相关书籍 -
bupt04406:
dengkanghua 写道出来这个问题该怎么解决?hbase ...
Unexpected state导致HMaster abort -
dengkanghua:
出来这个问题该怎么解决?hbase master启动不起来。
Unexpected state导致HMaster abort
hive> select distinct value from src;
hive> select max(key) from src;
因为没有grouping keys,所以只有一个reducer。
2.2 如果有聚合函数或者groupby,做如下处理:
插入一个select operator,选取所有的字段,用于优化阶段ColumnPruner的优化
2.2.1 hive.map.aggr为true,默认是true,开启的,在map端做部分聚合
2.2.1.1 hive.groupby.skewindata为false,默认是关闭的,groupby的数据没有倾斜。
生成的operator是: GroupByOperator+ReduceSinkOperator+GroupByOperator。
GroupByOperator+ReduceSinkOperator用于在map端做操作,第一个GroupByOperator在map端先做部分聚合。第二个用于在reduce端做GroupBy操作
2.2.1.2 hive.groupby.skewindata为true
生成的operator是: GroupbyOperator+ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
GroupbyOperator+ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
2.2.2 hive.map.aggr为false
2.2.2.1 hive.groupby.skewindata为true
生成的operator是: ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
2.2.2.2 hive.groupby.skewindata为false
生成的operator是: ReduceSinkOperator(map阶段运行)+GroupbyOperator(reduce阶段运行)
第一种情况:
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
SemanticAnalyzer.genGroupByPlan1MR(){
(1)ReduceSinkOperator: It will put all Group By keys and the distinct field (if any) in the map-reduce sort key, and all other fields in the map-reduce value.
(2)GroupbyOperator:GroupByDesc.Mode.COMPLETE,Reducer: iterate/merge (mode = COMPLETE)
}
第二种情况:
set hive.map.aggr=true;
set hive.groupby.skewindata=false;
SemanticAnalyzer.genGroupByPlanMapAggr1MR(){
(1)GroupByOperator:GroupByDesc.Mode.HASH,The agggregation evaluation functions are as follows: Mapper: iterate/terminatePartial (mode = HASH)
(2)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(3)GroupByOperator:GroupByDesc.Mode.MERGEPARTIAL,Reducer: iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode = MERGEPARTIAL)
}
第三种情况:
set hive.map.aggr=false;
set hive.groupby.skewindata=true;
SemanticAnalyzer.genGroupByPlan2MR(){
(1)ReduceSinkOperator:Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(2)GroupbyOperator:GroupByDesc.Mode.PARTIAL1,Reducer: iterate/terminatePartial (mode = PARTIAL1)
(3)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(4)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
}
第四种情况:
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
SemanticAnalyzer.genGroupByPlanMapAggr2MR(){
(1)GroupbyOperator:GroupByDesc.Mode.HASH,Mapper: iterate/terminatePartial (mode = HASH)
(2)ReduceSinkOperator: Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT。 Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT。
(3)GroupbyOperator:GroupByDesc.Mode.PARTIALS, Reducer: iterate/terminatePartial if DISTINCT merge/terminatePartial if NO DISTINCT (mode = MERGEPARTIAL)
(4)ReduceSinkOperator:Partitioining Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(5)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
}
ReduceSinkOperator的processOp(Object row, int tag)会根据相应的条件设置Key的hash值,如第四种情况的第一个ReduceSinkOperator:Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT,如果没有DISTINCT字段,那么在OutputCollector.collect前会设置当前Key的hash值为一个随机数,random = new Random(12345);。如果有DISTINCT字段,那么key的hash值跟grouping + distinct key有关。
GroupByOperator:
initializeOp(Configuration hconf)
processOp(Object row, int tag)
closeOp(boolean abort)
forward(ArrayList<Object> keys, AggregationBuffer[] aggs)
groupby10.q groupby11.q
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
EXPLAIN
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key, count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5)) GROUP BY INPUT.key;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator // insertSelectAllPlanForGroupBy
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=true;
set hive.groupby.skewindata=false;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
keys:
expr: key
type: int
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
type: int
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=false;
set hive.groupby.skewindata=true;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: partial1
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-48-26_387_7978992474997402829/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
type: int
sort order: +
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
keys:
expr: key
type: int
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
type: int
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: partials
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-49-25_899_4946067838822964610/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
type: int
sort order: +
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
EXPLAIN extended
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key, count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5)) GROUP BY INPUT.key;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Needs Tagging: false
Path -> Alias:
hdfs://localhost:54310/user/hive/warehouse/input [input]
Path -> Partition:
hdfs://localhost:54310/user/hive/warehouse/input
Partition
base file name: input
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523947
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523947
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: input
name: input
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000
NumFilesPerFileSink: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523946
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
TotalFiles: 1
MultiFileSpray: false
Stage: Stage-0
Move Operator
tables:
replace: true
source: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523946
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10001
ABSTRACT SYNTAX TREE:
(TOK_QUERY
(TOK_FROM (TOK_TABREF INPUT))
(TOK_INSERT
(TOK_DESTINATION (TOK_TAB dest1))
(TOK_SELECT
(TOK_SELEXPR (. (TOK_TABLE_OR_COL INPUT) key))
(TOK_SELEXPR (TOK_FUNCTION count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
(TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
)
(TOK_GROUPBY (. (TOK_TABLE_OR_COL INPUT) key))
)
)
SemanticAnalyzer.genBodyPlan(QB qb, Operator input){
if (qbp.getAggregationExprsForClause(dest).size() != 0
|| getGroupByForClause(qbp, dest).size() > 0) { //如果有聚合函数或者有groupby,则执行下面的操作
//multiple distincts is not supported with skew in data
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("true") &&
qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
getMsg());
}
// insert a select operator here used by the ColumnPruner to reduce
// the data to shuffle
curr = insertSelectAllPlanForGroupBy(dest, curr); //生成一个SelectOperator,所有的字段都会选取,selectStar=true。
if (conf.getVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)
.equalsIgnoreCase("true")) {
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("false")) {
curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
} else {
curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
}
} else if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("true")) {
curr = genGroupByPlan2MR(dest, qb, curr);
} else {
curr = genGroupByPlan1MR(dest, qb, curr);
}
}
}
distince:
count.q.out
groupby11.q.out
groupby10.q.out
nullgroup4_multi_distinct.q.out
join18.q.out
groupby_bigdata.q.out
join18_multi_distinct.q.out
nullgroup4.q.out
auto_join18_multi_distinct.q.out
auto_join18.q.out
(1)map端部分聚合,数据无倾斜,一个MR生成。
genGroupByPlanMapAggr1MR,生成三个Operator:
(1.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(1.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(1.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.MERGEPARTIAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2)map端部分聚合,数据倾斜,两个MR生成。
genGroupByPlanMapAggr2MR:
(2.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(2.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.PARTIALS
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.4)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(2.5)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3)map端不部分聚合,数据倾斜,两个MR生成。
genGroupByPlan2MR:
(3.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(3.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.PARTIAL1
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3.3)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(3.4)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(4)map端不部分聚合,数据无倾斜,一个MR生成。
genGroupByPlan1MR:
(4.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(4.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.COMPLETE
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
SemanticAnalyzer.genBodyPlan
optimizeMultiGroupBy (multi-group by with the same distinct)
groupby10.q groupby11.q
hive> select max(key) from src;
因为没有grouping keys,所以只有一个reducer。
2.2 如果有聚合函数或者groupby,做如下处理:
插入一个select operator,选取所有的字段,用于优化阶段ColumnPruner的优化
2.2.1 hive.map.aggr为true,默认是true,开启的,在map端做部分聚合
2.2.1.1 hive.groupby.skewindata为false,默认是关闭的,groupby的数据没有倾斜。
生成的operator是: GroupByOperator+ReduceSinkOperator+GroupByOperator。
GroupByOperator+ReduceSinkOperator用于在map端做操作,第一个GroupByOperator在map端先做部分聚合。第二个用于在reduce端做GroupBy操作
2.2.1.2 hive.groupby.skewindata为true
生成的operator是: GroupbyOperator+ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
GroupbyOperator+ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
2.2.2 hive.map.aggr为false
2.2.2.1 hive.groupby.skewindata为true
生成的operator是: ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
2.2.2.2 hive.groupby.skewindata为false
生成的operator是: ReduceSinkOperator(map阶段运行)+GroupbyOperator(reduce阶段运行)
第一种情况:
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
SemanticAnalyzer.genGroupByPlan1MR(){
(1)ReduceSinkOperator: It will put all Group By keys and the distinct field (if any) in the map-reduce sort key, and all other fields in the map-reduce value.
(2)GroupbyOperator:GroupByDesc.Mode.COMPLETE,Reducer: iterate/merge (mode = COMPLETE)
}
第二种情况:
set hive.map.aggr=true;
set hive.groupby.skewindata=false;
SemanticAnalyzer.genGroupByPlanMapAggr1MR(){
(1)GroupByOperator:GroupByDesc.Mode.HASH,The agggregation evaluation functions are as follows: Mapper: iterate/terminatePartial (mode = HASH)
(2)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(3)GroupByOperator:GroupByDesc.Mode.MERGEPARTIAL,Reducer: iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode = MERGEPARTIAL)
}
第三种情况:
set hive.map.aggr=false;
set hive.groupby.skewindata=true;
SemanticAnalyzer.genGroupByPlan2MR(){
(1)ReduceSinkOperator:Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(2)GroupbyOperator:GroupByDesc.Mode.PARTIAL1,Reducer: iterate/terminatePartial (mode = PARTIAL1)
(3)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(4)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
}
第四种情况:
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
SemanticAnalyzer.genGroupByPlanMapAggr2MR(){
(1)GroupbyOperator:GroupByDesc.Mode.HASH,Mapper: iterate/terminatePartial (mode = HASH)
(2)ReduceSinkOperator: Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT。 Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT。
(3)GroupbyOperator:GroupByDesc.Mode.PARTIALS, Reducer: iterate/terminatePartial if DISTINCT merge/terminatePartial if NO DISTINCT (mode = MERGEPARTIAL)
(4)ReduceSinkOperator:Partitioining Key: grouping key。Sorting Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(5)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
}
ReduceSinkOperator的processOp(Object row, int tag)会根据相应的条件设置Key的hash值,如第四种情况的第一个ReduceSinkOperator:Partitioning Key: random() if no DISTINCT grouping + distinct key if DISTINCT,如果没有DISTINCT字段,那么在OutputCollector.collect前会设置当前Key的hash值为一个随机数,random = new Random(12345);。如果有DISTINCT字段,那么key的hash值跟grouping + distinct key有关。
GroupByOperator:
initializeOp(Configuration hconf)
processOp(Object row, int tag)
closeOp(boolean abort)
forward(ArrayList<Object> keys, AggregationBuffer[] aggs)
groupby10.q groupby11.q
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
EXPLAIN
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key, count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5)) GROUP BY INPUT.key;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator // insertSelectAllPlanForGroupBy
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=true;
set hive.groupby.skewindata=false;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
keys:
expr: key
type: int
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
type: int
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=false;
set hive.groupby.skewindata=true;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: partial1
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-48-26_387_7978992474997402829/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
type: int
sort order: +
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=true;
set hive.groupby.skewindata=true;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
keys:
expr: key
type: int
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
type: int
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: partials
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-49-25_899_4946067838822964610/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
type: int
sort order: +
Map-reduce partition columns:
expr: _col0
type: int
tag: -1
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=false;
set hive.groupby.skewindata=false;
EXPLAIN extended
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key, count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5)) GROUP BY INPUT.key;
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
input
TableScan
alias: input
Select Operator
expressions:
expr: key
type: int
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: key
type: int
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
expr: key
type: int
tag: -1
Needs Tagging: false
Path -> Alias:
hdfs://localhost:54310/user/hive/warehouse/input [input]
Path -> Partition:
hdfs://localhost:54310/user/hive/warehouse/input
Partition
base file name: input
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523947
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523947
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: input
name: input
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
keys:
expr: KEY._col0
type: int
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: UDFToInteger(_col1)
type: int
expr: UDFToInteger(_col2)
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000
NumFilesPerFileSink: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523946
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
TotalFiles: 1
MultiFileSpray: false
Stage: Stage-0
Move Operator
tables:
replace: true
source: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10000
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime 1310523946
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive_2011-07-15_21-50-38_510_6852880850328147221/-ext-10001
ABSTRACT SYNTAX TREE:
(TOK_QUERY
(TOK_FROM (TOK_TABREF INPUT))
(TOK_INSERT
(TOK_DESTINATION (TOK_TAB dest1))
(TOK_SELECT
(TOK_SELEXPR (. (TOK_TABLE_OR_COL INPUT) key))
(TOK_SELEXPR (TOK_FUNCTION count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
(TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
)
(TOK_GROUPBY (. (TOK_TABLE_OR_COL INPUT) key))
)
)
SemanticAnalyzer.genBodyPlan(QB qb, Operator input){
if (qbp.getAggregationExprsForClause(dest).size() != 0
|| getGroupByForClause(qbp, dest).size() > 0) { //如果有聚合函数或者有groupby,则执行下面的操作
//multiple distincts is not supported with skew in data
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("true") &&
qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
getMsg());
}
// insert a select operator here used by the ColumnPruner to reduce
// the data to shuffle
curr = insertSelectAllPlanForGroupBy(dest, curr); //生成一个SelectOperator,所有的字段都会选取,selectStar=true。
if (conf.getVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)
.equalsIgnoreCase("true")) {
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("false")) {
curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
} else {
curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
}
} else if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase("true")) {
curr = genGroupByPlan2MR(dest, qb, curr);
} else {
curr = genGroupByPlan1MR(dest, qb, curr);
}
}
}
distince:
count.q.out
groupby11.q.out
groupby10.q.out
nullgroup4_multi_distinct.q.out
join18.q.out
groupby_bigdata.q.out
join18_multi_distinct.q.out
nullgroup4.q.out
auto_join18_multi_distinct.q.out
auto_join18.q.out
(1)map端部分聚合,数据无倾斜,一个MR生成。
genGroupByPlanMapAggr1MR,生成三个Operator:
(1.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(1.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(1.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.MERGEPARTIAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2)map端部分聚合,数据倾斜,两个MR生成。
genGroupByPlanMapAggr2MR:
(2.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(2.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.PARTIALS
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.4)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(2.5)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3)map端不部分聚合,数据倾斜,两个MR生成。
genGroupByPlan2MR:
(3.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(3.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.PARTIAL1
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3.3)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(3.4)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(4)map端不部分聚合,数据无倾斜,一个MR生成。
genGroupByPlan1MR:
(4.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
java.util.ArrayList<ExprNodeDesc> valueCols,
java.util.ArrayList<java.lang.String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols; // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionKeys; // grpByExprs.size()
this.valueCols = valueCols; //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnNames; //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnNames; //outputValueColumnNames
this.tag = tag; // -1
this.numReducers = numReducers; // 一般都是-1
this.partitionCols = partitionCols; // groupby
this.keySerializeInfo = keySerializeInfo;
this.valueSerializeInfo = valueSerializeInfo;
this.distinctColumnIndices = distinctColumnIndices;
}
(4.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList<java.lang.String> outputColumnNames,
final java.util.ArrayList<ExprNodeDesc> keys,
final java.util.ArrayList<org.apache.hadoop.hive.ql.plan.AggregationDesc> aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
}
mode:GroupByDesc.Mode.COMPLETE
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
SemanticAnalyzer.genBodyPlan
optimizeMultiGroupBy (multi-group by with the same distinct)
groupby10.q groupby11.q
发表评论
-
hive rename table name
2013-09-18 14:28 2542hive rename tablename hive re ... -
hive的distribute by如何partition long型的数据
2013-08-20 10:15 2433有用户问:hive的distribute by分桶是怎么分 ... -
hive like vs rlike vs regexp
2013-04-11 18:53 11174like vs rlike vs regexp r ... -
hive sql where条件很简单,但是太多
2012-07-18 15:51 8701insert overwrite table aaaa ... -
insert into时(string->bigint)自动类型转换
2012-06-14 12:30 8248原表src: hive> desc src; ... -
通过复合结构来优化udf的调用
2012-05-11 14:07 1182select split("accba&quo ... -
RegexSerDe
2012-03-14 09:58 1521官方示例在: https://cwiki.apache.or ... -
Hive 的 OutputCommitter
2012-01-30 19:44 1786Hive 的 OutputCommitter publi ... -
hive LATERAL VIEW 行转列
2011-11-09 14:49 5405drop table lateralview; create ... -
hive complex type
2011-11-08 19:56 1325数据: 1,100|3,20|2,70|5,100 建表: ... -
hive转义字符
2011-10-25 16:41 6200CREATE TABLE escape (id STRING, ... -
hive 两个不同类型的columns进行比较
2011-09-19 13:46 2993select case when "ab1234&q ... -
lateral view
2011-09-18 04:04 0lateral view与udtf相关 -
udf 中获得 FileSystem
2011-09-14 10:28 0在udf中获得FileSystem,需要获得知道fs.defa ... -
hive union mapjoin
2011-09-09 16:29 0union union.q union2.q ... -
hive eclipse
2011-09-08 17:42 0eclipse-templates$ vi .classpat ... -
hive join filter
2011-09-07 23:05 0join16.q.out hive.optimize.ppd ... -
hive limit
2011-09-07 21:02 0limit 关键字: input4_limit.q.out ... -
hive convertMapJoin MapJoinProcessor
2011-09-06 21:17 0join25.q join26 ... -
hive hive.merge.mapfiles hive.merge.mapredfiles
2011-09-06 19:14 0HiveConf: HIVEMERGEMAPFILES ...
相关推荐
set hive.groupby.mapaggr.checkinterval = 100000000; //在 Map 端进行聚合操作的条目数目 set hive.groupby.skewindata = true; //解决数据倾斜的万能钥匙 当map阶段运行不了的时候,可以设置 set hive.map.aggr ...
利用Hive进行复杂用户行为大数据分析及优化案例(全套视频+课件+代码+讲义+工具软件),具体内容包括: ...15_Hive中的数据倾斜及解决方案-group by 16_Hive中使用正则加载数据 17_Hive中使用Python脚本进行预处理
3.1 Group By 28 3.2 Order /Sort By 28 4. Hive Join 29 5. HIVE参数设置 31 6. HIVE UDF 33 6.1 基本函数 33 6.1.1 关系操作符 33 6.1.2 代数操作符 34 6.1.3 逻辑操作符 35 6.1.4 复杂类型操作符 35 6.1.5 内建...
3.1 Group By 28 3.2 Order /Sort By 28 4. Hive Join 29 5. HIVE参数设置 31 6. HIVE UDF 33 6.1 基本函数 33 6.1.1 关系操作符 33 6.1.2 代数操作符 34 6.1.3 逻辑操作符 35 6.1.4 复杂类型操作符 35 6.1.5 内建...
hive的group by 和集合函数 hive的Order By/Sort By/Distribute By Join查询,join只支持等值连接 LEFT,RIGHT 和 FULL OUTER JOIN LEFT SEMI JOIN Hive当前没有实现 IN/EXISTS 子查询,可以用 LEFT SEMI JOIN 重写子...
二、Distribute by 五、Cluster by 六、如何使用sort by实现全局排序
51.Hive中的数据倾斜及解决方案-group by 52.Hive中使用正则加载数据 53. Hive中使用Python脚本进行预处理 第5章:Zeus任务资源调度工具 54.资源任务调度框架介绍 55.企业中常见的任务调度框架 56.Zeus的介绍及基本...
02.hive查询语法--分组聚合--groupby查询--where过滤和having过滤的区别.mp4
因此,普通的聚合函数每组(Group by)只返回一个值,而开窗函数则可为窗口中的每行都返回一个值。简单理解,就是对查询的结果多出一列,这一列可以是聚合值,也可以是排序值。 开窗函数一般分为两类,聚合开窗函数和...
1.2 Group By的实现原理 1.3 Distinct的实现原理 2.1 Phase1 SQL词法,语法解析 2.1.1 Antlr 2.1.2 抽象语法树AST Tree 2.1.3 样例SQL ........... 2.3 Phase3 逻辑操作符Operator ........... 2.4 Phase4 逻辑层...
相应研究贡献包括2部分,首先针对应用极广的GroupBy查询和Join查询建立了运行估价模型,确定了不同场景下查询计划的优化选择分支;其次基于Hive ETL机制设计了一种统计信息收集方法,解决了统计海量数据分布特征的问题....
###年龄段和健康的程度 http://localhost:8898/api/v1/hive/table/ageHealthDegree SELECT age,count(exercise_socre) count FROM fitnessanalysis GROUP BY age ###健身次数和健康指数 ...
主要介绍了hive开发过程中常见的性能问题及优化方法: 数据倾斜: 1)group by 数据倾斜 2)join 数据倾斜 3)reduce数过少 4)大小表关联 动态分区 并行 小文件过多 等等
窗⼝函数也称为OLAP函数,OLAP 是OnLine Analytical Processing 的简称,意思是对...窗⼝函数兼具GROUP BY ⼦句的分组功能以及ORDER BY ⼦句的排序功能。但是,PARTITION BY ⼦句并不具备 GROUP BY ⼦句的汇总功能。
一、查询语法 查询语句语法: [WITH CommonTable[removed], CommonTableExpression)*] Only available ...[GROUP BY col_list] [ORDER BY col_list] [CLUSTER BY col_list | [DISTRIBUTE BY col_list]
-- estimate the cardinality of SELECT * FROM src GROUP BY col1, col2; SELECT hll(col1, col2).cardinality from src; -- create hyperloglog cache per hour FROM input_table src INSERT OVERWRITE TABLE ...
where 2group by 3having 4 select 5 基本查询开窗查询 order by 6 limit 7 其中包括hadoop,hive,hdfs,shell,linux基础,开窗函数学习资料和讲解视频 川在川上曰:靠谱! SaL 的运行顺序 from 1 where 2group by 3...
查询 查询语句语法: [WITH CommonTable[removed], CommonTableExpression)*] (Note: Only available starting with Hive 0.13.0) ... [GROUP BY col_list] [ORDER BY col_list] [CLUSTER BY col_list | [DIS
大数据框架(HADOOP、HIVE、HBASE)优化和简历项目编写(视频+讲义+笔记),内容包括但不限于: ... 09_Hive中groupBy数据倾斜面试详解及HBase 性能优化详解 10_大数据项目简历编写指导及HADOOP 项目业务需求补充说明
create table call(case_id int, create_time date,deal_name string,deal_group string) row format delimited fields terminated by ‘,’; load data local inpath ‘/opt/module/data/call.txt’ into table ...