`
zhan8610189
  • 浏览: 75524 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop Shuffle过程分析

阅读更多

hadoop shuffle 优化

在hadoop中,在map/reduce的shuffle阶段,jetty用于数据传输。提高map/reduce的效率,针对shuffle的优化也是很重要的。它可以在以下几个方面进行优化:

  1. 优化jetty
  2. 减少map输出
  3. 用netty来替换jetty
  4. 压缩传输

Hadoop Map阶段的输出机制

一个作业由Map Task和Reduce Task组成。通常Map Task的输出,当作Reduce Task的输入。如果该作业没有Reduce Task时,那么该Job的输出目的地为HDFS。在MapTask类中有一个run方法。

 @Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) 
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;  // 与tasktracker通信的代理

    // start thread that will handle communication with parent
    //起一个新的线程reporter去调用umbilical与父节点通信
    TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
        jvmContext); 
    reporter.startCommunicationThread();
    //检查是否是新api
    boolean useNewApi = job.getUseNewMapper();
    //初始化map的上下文环境
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    if (useNewApi) {
      //新的api是位于org.apache.hadoop.mapreduce下面的包
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      //新的api是位于org.apache.hadoop.mapred下面的包
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

 在新api中的输出分两种情况:

  1. 如果没有Reduce,那些就通常直接输出到hdfs上,相应地址由mapred.output.dir控制,out为NewDirectOutputCollector; 
  2. 如果有Reduce,out为NewOutputCollector,它有一定大小的缓存区kvbuffer,输出的数据会先写到这个缓存里面。这里面有复杂的内存控制机制,需要相关参数来协调,如:io.sort.mb(默认<4096),io.sort.record.percent(default: 0.05),io.sort.spill.percent(default: 0.8)等。io.sort.mb的值不能大于2^12=4096。kvbuffer大小的计算公式如下:( io.sort.mb * (2^20) ) * ( 1 - io.sort.record.percent ),分布到不同partition的kv都会存到这个kvbuffer里面,同时通过两个数组kvoffsets,kvindices来建立索引。如果kvbuffer快要满时,就会spill和写到本地磁盘,存取的格式是IFile格式:<key-len, value-len, key, value> 。

 Shuffer机制

待续... 

Jetty 6.1.14和6.1.26版本比较

近段时间对jetty 6.1.14和jetty 6.1.26进行了一次简单的性能测试(该测试是通过jetty server下载5G的文件),发现前者的传输性能明显优于后者,是后者的三倍左右。jetty 6.1.14在测试过程往往可以达到网卡的最大速度,而jetty 6.1.26就要打3折。在new一个jetty server时可以配置一个bufferSize,这个bufferSize的配置也对它们的性能有一定的影响,根据业务配置一个合适的值是能够提高一定的性能的。

在Jetty和Hadoop的Jira上相继报道了相关bug, 其中就包括:

上述两个问题很来严重,因此jetty的开发者都建议hadoop 依赖的jetty包升级到7.0以上。

 

PS:
1. 从hadoop日志里经常可以看到以下的输出日志:
2013-05-28 20:07:05,476 INFO org.mortbay.log: org.mortbay.io.nio.SelectorManager$SelectSet@3160e069 JVM BUG(s) - injecting delay35 times
2013-05-28 20:07:05,476 INFO org.mortbay.log: org.mortbay.io.nio.SelectorManager$SelectSet@3160e069 JVM BUG(s) - recreating selector 35 times, canceled keys 561 times

 这是JVM 1.6的一个bug: Selector.select(timeout)时无法block住select时出现句柄存在(FileExistException)的异常,该bug在jetty中已经作了workround (在代码 org.mortbay.io.nio.SelectorManager中可以查看到),同时JVM 1.7也已经把它修复了。

 

2. 关于Java OOM时如何查看Heap相关信息。

  • 可以通过给java加下面这个参数可以在OOM时把Heap信息dump出来:
-XX:+HeapDumpOnOutOfMemoryError
  •  在OOM之前,还可以通过jmap命令导出来:
jmap -dump:format=b,file=<filename.hprof> <pid>

 

 

Netty vs Jetty

待续...

 

 

分享到:
评论

相关推荐

    Hadoop从入门到上手企业开发

    062 MapReduce Shuffle过程讲解和Map Shuffle Phase讲解 063 Reduce Shuffle Phase讲解 064 源代码跟踪查看Map Task和Reduce Task数目的个数 065 回顾MapReduce执行过程以及MapReduce核心 066 Hadoop MapReduce框架...

    Spark的Shuffle总结分析

    在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。而Spark也会有自己的shuffle实现过程。 1.2 Spark中的 shuffle 介绍 在DAG调度的过程中,Stage 阶段的

    Hadoop权威指南 第二版(中文版)

     本书是Hadoop权威参考,程序员可从中探索如何分析海量数据集,管理员可以从中了解如何安装与运行Hadoop集群。 目录 第1章 初识Hadoop  数据!数据!  数据存储与分析  与其他系统相比  关系型数据库管理系统...

    Hadoop权威指南(中文版)2015上传.rar

    使用Hadoop分析数据 map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 数据...

    Hadoop入门实战手册

    4 .......................................................................................11 Hadoop集群搭建过程手记 4.1 ....................................................................................

    阿里巴巴技术专家杨晓明:基于Hadoop技术进行地理空间分析

    本文将介绍一种通过使用地理网格进行数据关联,并利用Shuffle过程的二次排序实现高效的统计各条道路上位置点分布情况的方法。交通领域正产生着海量的车辆位置点数据。将这些车辆位置信息和道路进行关联的统计操作则...

    Java大数据培训学校全套教程-50)Hadoop与MapReduce最入门

     通过学习Hadoop的安装与配置,hdfs常用命令,WordCount程序详解,Shuffle过程详解,WordCount程序结果分析,Hadoop,HDFS,MapReduce,NameNode和DataNode,yarn,ResourceManager,NodeManager的概念等让大家对Hadoop和...

    hive on spark mr 数据开发常见问题解决

    hive工作常见问题解决收集开发人员在Hive日常开发过程中难免遇到各种各样的hive报错,这些报错信息很多时间并没有形成汇总的知识库,每次遇到问题都会重复查资料,效率非常低 现在总结一些常见的知识库,方便大家...

    Apache Spark的设计与实现 PDF中文版

    不喜欢将该文档称之为“源码分析”,因为本文的主要目的不是去解读实现代码,而是尽量有逻辑地,从设计与实现原理的角度,来理解 job 从产生到执行完成的整个过程,进而去理解整个系统。 讨论系统的设计与实现有很...

    五种大数据架构简介.pdf

    由于批处理在应对⼤量持久数据⽅⾯的表现极为出⾊,因此经常被⽤于对历 史数据进⾏分析。 ⼤量数据的处理需要付出⼤量时间,因此批处理不适合对处理时间要求较⾼的场合。 Apache Hadoop Apache Hadoop是⼀种专⽤于...

    《ApacheSpark设计与实现》.zip

    具体内容如下:Overview 总体介绍Job logical plan 介绍 job 的逻辑执行图(数据依赖图)Job physical plan 介绍 job 的物理执行图Shuffle details 介绍 shuffle 过程Architecture 介绍系统模块如何协调完成整个 job...

    预测算法调研报告.doc

    第二、Hadoop在进行MapReduce计算的过程中,会将map的计算结果写入到本地磁盘或 Hadoop分布式文件系统(Hadoop Distributed File System, HDFS)上,然后再通过shuffle过程将计算结果发送到reduce上进行处理,反复的...

    javashuffle源码-MapReduce-Demo:Hadoop,MapReduce编程学习练手实例

    shuffle源码 前言 之前没怎么在GitHub MD中写过目录索引,下文目录中有的锚点不能跳转(尤其是IE内核的浏览器),我也没办法。这里我将下文放到了我CSDN的博客上,可以跳转目录,看起来也方便美观一点—— 这里放一...

Global site tag (gtag.js) - Google Analytics