`

hive优化

    博客分类:
  • Hive
阅读更多

hive 表优化
一、表设计层面优化
1、利用分区表优化
分区表 是在某一个或者几个维度上对数据进行分类存储,一个分区对应一个目录。如果筛选条件里有分区字段,那么 Hive 只需要遍历对应分区目录下的文件即可,不需要遍历全局数据,使得处理的数据量大大减少,从而提高查询效率。

当一个 Hive 表的查询大多数情况下,会根据某一个字段进行筛选时,那么非常适合创建为分区表。
2、利用桶表优化
指定桶的个数后,存储数据时,根据某一个字段进行哈希后,确定存储在哪个桶里,这样做的目的和分区表类似,也是使得筛选时不用全局遍历所有的数据,只需要遍历所在桶就可以了。
3、选择合适的文件存储格式
TextFile
默认格式,如果建表时不指定默认为此格式。
存储方式:行存储
SequenceFile
一种Hadoop API 提供的二进制文件,使用方便、可分割、个压缩的特点。
支持三种压缩选择:NONE、RECORD、BLOCK。RECORD压缩率低,一般建议使用BLOCK压缩。
RCFile
存储方式:数据按行分块,每块按照列存储 。
首先,将数据按行分块,保证同一个record在一个块上,避免读一个记录需要读取多个block。
其次,块数据列式存储,有利于数据压缩和快速的列存取。

ORC
存储方式:数据按行分块,每块按照列存储
Hive 提供的新格式,属于 RCFile 的升级版,性能有大幅度提升,而且数据可以压缩存储,压缩快,快速列存取。
Parquet
存储方式:列式存储
Parquet 对于大型查询的类型是高效的。对于扫描特定表格中的特定列查询,Parquet特别有用。Parquet一般使用 Snappy、Gzip 压缩。默认 Snappy。
Parquet 支持 Impala 查询引擎。
表的文件存储格式尽量采用 Parquet 或 ORC,不仅降低存储量,还优化了查询,压缩,表关联等性能;
4、选择合适的压缩方式
Hive 语句最终是转化为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在与 网络IO 和 磁盘IO,要解决性能瓶颈,最主要的是 减少数据量,对数据进行压缩是个好方式。压缩虽然是减少了数据量,但是压缩过程要消耗CPU,但是在Hadoop中,往往性能瓶颈不在于CPU,CPU压力并不大,所以压缩充分利用了比较空闲的CPU。

二、语法和参数层面优化
1、列裁剪
set hive.optimize.cp = true; -- 列裁剪,取数只取查询中需要用到的列,默认为真
2、分区裁剪
在查询的过程中只选择需要的分区,可以减少读入的分区数目,减少读入的数据量。
set hive.optimize.pruner=true; // 默认为true
3、合并小文件
Map 输入合并
在执行 MapReduce 程序的时候,一般情况是一个文件需要一个 mapper 来处理。但是如果数据源是大量的小文件,这样岂不是会启动大量的 mapper 任务,这样会浪费大量资源。可以将输入的小文件进行合并,从而减少mapper任务数量。
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map端输入、合并文件之后按照block的大小分割(默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map端输入,不合并
4、Map/Reduce输出合并
大量的小文件会给 HDFS 带来压力,影响处理效率。可以通过合并 Map 和 Reduce 的结果文件来消除影响。
set hive.merge.mapfiles=true;  -- 是否合并Map输出文件, 默认值为真
set hive.merge.mapredfiles=true; -- 是否合并Reduce 端输出文件,默认值为假
set hive.merge.size.per.task=25610001000; -- 合并文件的大小,默认值为 256000000
合理控制 map/reduce 任务数量
合理控制 mapper 数量
减少 mapper 数可以通过合并小文件来实现,增加 mapper 数可以通过控制上一个 reduce
5、默认的 mapper 个数计算方式
输入文件总大小:total_size     
hdfs 设置的数据块大小:dfs_block_size      
default_mapper_num = total_size/dfs_block_size
set mapred.map.tasks=10;
直接设置 mapper 个数的样子,但是很遗憾不行,这个参数设置只有在大于default_mapper_num的时候,才会生效。

可以通过mapred.min.split.size设置每个任务处理的文件的大小,这个大小只有在大于dfs_block_size的时候才会生效
总结一下控制 mapper 个数的方法

    如果想增加 mapper 个数,可以设置mapred.map.tasks为一个较大的值
    如果想减少 mapper 个数,可以设置maperd.min.split.size为一个较大的值
    如果输入是大量小文件,想减少 mapper 个数,可以通过设置hive.input.format合并小文件
6、合理控制reducer数量
如果 reducer 数量过多,一个 reducer 会产生一个结数量果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个 job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化 reducer 需要耗费和资源。
如果 reducer 数量过少,这样一个 reducer 就需要处理大量的数据,并且还有可能会出现数据倾斜的问题,使得整个查询耗时长。    
参数1:hive.exec.reducers.bytes.per.reducer(默认1G)
参数2:hive.exec.reducers.max(默认为999)

三、Join优化
1、优先过滤数据
尽量减少每个阶段的数据量,对于分区表能用上分区字段的尽量使用,同时只选择后面需要使用到的列,最大限度的减少参与 join 的数据量。
2、小表 join 大表原则
小表 join 大表的时应遵守小表 join 大表原则,原因是 join 操作的 reduce 阶段,位于 join 左边的表内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出的几率。join 中执行顺序是从左到右生成 Job,应该保证连续查询中的表的大小从左到右是依次增加的。
3、使用相同的连接键
在 hive 中,当对 3 个或更多张表进行 join 时,如果 on 条件使用相同字段,那么它们会合并为一个 MapReduce  Job,利用这种特性,可以将相同的 join on 的放入一个 job 来节省执行时间
4、启用 mapjoin
mapjoin 是将 join 双方比较小的表直接分发到各个 map 进程的内存中,在 map 进程中进行 join 操作,这样就不用进行 reduce 步骤,从而提高了速度。只有 join 操作才能启用 mapjoin。

set hive.auto.convert.join = true; -- 是否根据输入小表的大小,自动将reduce端的common join 转化为map join,将小表刷入内存中。
set hive.mapjoin.smalltable.filesize = 2500000; -- 刷入内存表的大小(字节)
set hive.mapjoin.maxsize=1000000;  -- Map Join所处理的最大的行数。超过此行数,Map Join进程会异常退出

5、尽量原子操作
尽量避免一个SQL包含复杂的逻辑,可以使用中间表来完成复杂的逻辑。

6、桶表 mapjoin
当两个分桶表 join 时,如果 join on的是分桶字段,小表的分桶数是大表的倍数时,可以启用 mapjoin 来提高效率。
set hive.optimize.bucketmapjoin = true; -- 启用桶表 map join

四、Group By 优化
默认情况下,Map阶段同一个Key的数据会分发到一个Reduce上,当一个Key的数据过大时会产生 数据倾斜。进行group by操作时可以从以下两个方面进行优化:

1. Map端部分聚合
事实上并不是所有的聚合操作都需要在 Reduce 部分进行,很多聚合操作都可以先在 Map 端进行部分聚合,然后在 Reduce 端的得出最终结果。
set hive.map.aggr=true; -- 开启Map端聚合参数设置
set hive.grouby.mapaggr.checkinterval=100000; -- 在Map端进行聚合操作的条目数目
2. 有数据倾斜时进行负载均衡    
set hive.groupby.skewindata = true; -- 有数据倾斜的时候进行负载均衡(默认是false)
当选项设定为 true 时,生成的查询计划有两个 MapReduce 任务。在第一个 MapReduce 任务中,map 的输出结果会随机分布到 reduce 中,每个 reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的group by key有可能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处理的数据结果按照group by key分布到各个 reduce 中,最后完成最终的聚合操作。
五、Order By 优化
order by只能是在一个reduce进程中进行,所以如果对一个大数据集进行order by,会导致一个reduce进程中处理的数据相当大,造成查询执行缓慢。

在最终结果上进行order by,不要在中间的大数据集上进行排序。如果最终结果较少,可以在一个reduce上进行排序时,那么就在最后的结果集上进行order by。
如果是去排序后的前N条数据,可以使用distribute by和sort by在各个reduce上进行排序后前N条,然后再对各个reduce的结果集合合并后在一个reduce中全局排序,再取前N条,因为参与全局排序的order by的数据量最多是reduce个数 * N,所以执行效率很高。
六、COUNT DISTINCT优化.
- 优化前(只有一个reduce,先去重再count负担比较大):
select count(distinct id) from tablename;
-- 优化后(启动两个job,一个job负责子查询(可以有多个reduce),另一个job负责count(1)):
select count(1) from (select distinct id from tablename) tmp;

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics