`
fredric0611
  • 浏览: 28885 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
最近访客 更多访客>>
社区版块
存档分类
最新评论

基于MapReduce作业的MapReduce数据流优化

阅读更多
    在编写MapReduce应用程序时,除了最基本的Map模块、Reduce模块和驱动方法之外,用户还可以通过一些技巧优化作业以提高其性能。对用户来说,合理地在MapReduce作业中对程序进行优化,可以极大地提高作业的性能,减少作业执行时间。我们从以下几个方法分析MapReduce作业的优化方法。
1    选择Mapper的数量
    Hadoop平台在处理大量小文件时性能比较逊色,主要由于生成的每个分片都是一整个文件,Map操作时只会处理很少的输入数据,但是会产生很多Map任务,每个Map任务的运行都包括产生、调度和结束时间,大量的Map任务会造成一定的性能损失。可以通过任务Java虚拟机(JVM)重用来解决这个问题,默认每JVM只运行一个任务,使用JVM重用后一个JVM可以顺序执行多个任务,减少了启动时间。控制JVM的属性是mapred.job.reuse.jvm.num.tasks,它指定作业每个JVM运行的任务的最大数量,默认为1。可以通过JonConf的setNumTasksToExecutePerJvm()方法设置,若设置为-1则说明统一作业中共享一个JVM任务的数量不受限制。
    如果输入的文件过大,还可以通过将HDFS上的块大小增大,比如增加到256M或512M,以减少Mapper数量,可以通过运行参数(-Ddfs.block.size = $[256*1024*1024])将块大小增大到256M。
2    选择Reducer的数量
    在Hadoop中默认是运行一个Reducer,所有的Reduce任务都会放到单一的Reducer去执行,效率非常低下。为了提高性能,可以适当增大Reducer的数量。
    最优的Reducer数量取决于集群中可用的Reducer任务槽的数目。Reducer任务槽的数目是集群中节点个数与mapred.tasktracker.reduce.tasks.maximum(默认为2)的乘积,也可以通过MapReduce的用户界面获得。
    一个普遍的做法是将Reducer数量设置为比Reducer任务槽数目稍微小一些,这会给Reducer任务留有余地,同时将使得Reducer能够在同一波中完成任务,并在Reducer阶段充分使用集群。
    Reducer的数量由mapred.reduce.tasks属性设置,通常在MapReduce作业的驱动方法中通过setNumReduceTasks(n)调用方法动态设置Reducer的数目为n。
3    使用Combiner函数
    Combiner过程是一个可选的优化过程,如果这个过程适合你的作业,Combiner实例会在每个运行Map任务的节点上运行,它会接收本节点上Mapper实例的输出作为输入,然后Combiner的输出会被发送到Reducer,而不是发送Mapper的输出。
    Combiner是一个“迷你Reduce”过程,它是用Reducer接口来定义的,只对本节点生成的数据进行规约。为了直观理解Combiner的作用,使用WordCount程序进行说明。在该程序中Map任务会生成很多(“word”,1)对,如果同一分片中“Cat”出现了5次,则会生成5个(“Cat”,1)对被发送至Reducer。通过使用Combiner,这5个key/value对会被规约为一个(“Cat”,5)发送至Reducer。Combiner过程会针对输入反复运行,但不会影响最终结果。运行Combiner的意义在于使Map输出更紧凑,从而大大减少了Shuffle过程所需的带宽,并加速了作业的执行。
    通过setCombinerClass(Class<? extends Reducer> theClass)方法使用Combiner过程,括号中指定了Combiner所使用的类,如果用户的Reduce函数可交换并可组合(比如WordCount的Reduce函数),则可以直接在驱动方法中添加如下代码:conf.setCombinerClass(Reduce.class);否则用户必须编写一个第三方类作为作业的Combiner。
4    压缩Map的输出
    在Map任务完成后对将要溢写入磁盘的数据进行压缩是一种很好的优化方法,它能够使数据写入磁盘的速度更快,节省磁盘空间,减少需要传送到Reducer的数据量,以达到减少MapReduce作业执行时间的目的。Hadoop支持的压缩格式及相关信息如表1所示。

                   表1  Hadoop支持的压缩格式
压缩格式  工具 算法    文件扩展名 Hadoop压缩编码/解码器
DEFLATE 无 DEFLATE    .deflate Org.apache.hadoop.io.compress.DefaultCodec
Gzip gzip DEFLATE    .gz         Org.apache.hadoop.io.compress.GzipCodec
bzip2 bzip2 bzip2    .bz2         Org.apache.hadoop.io.compress.BZip2Codec
LZO lzop LZO    .lzo         Com.hadoop.compression.lzo.LzopCodec

    在使用Map输出压缩时需要考虑压缩格式的速度最优与空间最优的协调。通常来说,Gzip压缩在空间/时间处理上相对平衡;bzip2压缩比gzip更有效,但速度较慢;LZO压缩[19]使用速度最优算法,但压缩效率稍低。我们需要根据MapReduce作业以及输入数据的不同进行选择。
    MapReduce应用程序的驱动方法中加入如下所示代码便可以启用gzip格式来压缩Map的输出结果。
    conf.setCompressMapOutput(true);
    conf.setMapOutputCompressorClass(GzipCodec.class);
5    选择合适的序列化格式
    序列化指的是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储的过程。反序列化指的是将字节流转为一系列结构化对象的过程。
    在Hadoop中,节点之间的进程间通信是通过远程过程调用(RPC)实现的。RPC协议使用序列化将消息编码为二进制流后发送至远程节点,然后二进制流被反序列化为原始信息。Hadoop使用自己的序列化格式Writables,MapReduce程序使用它来序列化key/ value对,它是整个Hadoop的核心。
    Writeable接口定义了两个方法:一个用于将其状态写入二进制格式的DataOutput,另一个用于从二进制格式的DataInput流读取其状态。具体如下所示。
public interface Writable{
    void  write(DataOutput  out)  throws IOException
    void  readFields(DataInput  in)  throws IOException
}
    Hadoop将许多Writable类归入包org.apache.hadoop.io中,其封装了Java的基本类,此外还有short和char类型,具体如表2所示。

          表2  Writable的Java基本类封装
Java基本类型 Writable使用 序列化大小(字节)
布尔型         BooleanWritable 1
字节型         ByteWritable 1
整型         IntWritable 4
整型         VIntWritable 1-5
浮点型         FloatWritable 4
长整型         LongWritable 8
长整型         VLongWritable 1-9
双精度浮点型 DoubleWritable 8

    正确选择合适的Writable类型,能够减少CPU占用率和存储空间,提高作业性能。
    Hadoop自带一系列Writable实现已经能够满足大多数用途。但我们也可以自定义Writable来控制二进制表示和排序顺序以应对更复杂的结构,编写自定义Writable时需要实现RawComparator,通过查看其序列化表示的方式来比较数据。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics