- 浏览: 38714 次
文章分类
最新评论
mapreduce中控制mapper的数量
很多文档中描述,Mapper的数量在默认情况下不可直接控制干预,因为Mapper的数量由输入的大小和个数决定。在默认情况下,最终input占据了多少block,就应该启动多少个Mapper。如果输入的文件数量巨大,但是每个文件的size都小于HDFS的blockSize,那么会造成启动的Mapper等于文件的数量(即每个文件都占据了一个block),那么很可能造成启动的Mapper数量超出限制而导致崩溃。这些逻辑确实是正确的,但都是在默认情况下的逻辑。其实如果进行一些客户化的设置,就可以控制了。
<wbr><wbr><wbr><wbr>在Hadoop中,设置Map task的数量不像设置Reduce task数量那样直接,即:不能够通过API直接精确的告诉Hadoop应该启动多少个Map task。</wbr></wbr></wbr></wbr>
<wbr><wbr><wbr><wbr>你也许奇怪了,在API中不是提供了接口org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)吗?这个值难道不可以设置Map task的数量吗?这个API的确没错,在文档上解释”Note: This is only a hint to the framework.“,即这个值对Hadoop的框架来说仅仅是个提示,不起决定性的作用。也就是说,即便你设置了,也不一定得到你想要的效果。</wbr></wbr></wbr></wbr>
1. InputFormat介绍
在具体设置Map task数量之前,非常有必要了解一下与Map-Reduce输入相关的基础知识。
这个接口(org.apache.hadoop.mapred.InputFormat)描述了Map-Reduce job的输入规格说明(input-specification),它将所有的输入文件分割成逻辑上的InputSplit,每一个InputSplit将会分给一个单独的mapper;它还提供RecordReader的具体实现,这个Reader从逻辑的InputSplit上获取input records并传给Mapper处理。
InputFormat有多种具体实现,诸如FileInputFormat(处理基于文件的输入的基础抽象类),<wbr><strong>DBInputFormat</strong>(处理基于数据库的输入,数据来自于一个能用SQL查询的表),<strong>KeyValueTextInputFormat</strong>(特殊的FineInputFormat,处理Plain Text File,文件由回车或者回车换行符分割成行,每一行由key.value.separator.in.input.line分割成Key和Value),CompositeInputFormat,DelegatingInputFormat等。在绝大多数应用场景中都会使用FileInputFormat及其子类型。</wbr>
通过以上的简单介绍,我们知道InputFormat决定着InputSplit,每个InputSplit会分配给一个单独的Mapper,因此InputFormat决定了具体的Map task数量。
2. FileInputFormat中影响Map数量的因素
在日常使用中,FileInputFormat是最常用的InputFormat,它有很多具体的实现。以下分析的影响Map数量的因素仅对FileInputFormat及其子类有效,其他非FileInputFormat可以去查看相应的<wbr>getSplits(JobConf job, int numSplits) 具体实现即可。</wbr>
请看如下代码段(摘抄自org.apache.hadoop.mapred.FileInputFormat.getSplits,hadoop-0.20.205.0源代码):
<wbr></wbr>
- long<wbr>goalSize<wbr>=<wbr>totalSize<wbr>/<wbr>(numSplits<wbr>==<wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>0<wbr>?<wbr></wbr></wbr>1<wbr>:<wbr>numSplits);<wbr><wbr></wbr></wbr></wbr></wbr>
- long<wbr>minSize<wbr>=<wbr>Math.max(job.getLong(</wbr></wbr></wbr>"mapred.min.split.size",<wbr></wbr>1),<wbr>minSplitSize);<wbr><wbr></wbr></wbr></wbr>
- <wbr><wbr></wbr></wbr>
- for<wbr>(FileStatus<wbr>file:<wbr>files)<wbr>{<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr>Path<wbr>path<wbr>=<wbr>file.getPath();<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr>FileSystem<wbr>fs<wbr>=<wbr>path.getFileSystem(job);<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>if</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>((length<wbr>!=<wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">)<wbr>&&<wbr>isSplitable(fs,<wbr>path))<wbr>{<wbr><wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr>
- <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>long</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>blockSize<wbr>=<wbr>file.getBlockSize();<wbr><wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>long</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>splitSize<wbr>=<wbr>computeSplitSize(goalSize,<wbr>minSize,<wbr>blockSize);<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>long</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>bytesRemaining<wbr>=<wbr>length;<wbr><wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>while</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>(((</wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>double</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">)<wbr>bytesRemaining)/splitSize<wbr>><wbr>SPLIT_SLOP)<wbr>{<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><wbr><wbr>String[]<wbr>splitHosts<wbr>=<wbr>getSplitHosts(blkLocations,length-bytesRemaining,<wbr>splitSize,<wbr>clusterMap);<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><wbr><wbr>splits.add(<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>FileSplit(path,<wbr>length-bytesRemaining,<wbr>splitSize,<wbr>splitHosts));<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><wbr><wbr>bytesRemaining<wbr>-=<wbr>splitSize;<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr>}<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>if</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>(bytesRemaining<wbr>!=<wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">)<wbr>{<wbr><wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><wbr><wbr>splits.add(<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>FileSplit(path,<wbr>length-bytesRemaining,<wbr>bytesRemaining,<wbr>blkLocations[blkLocations.length-</wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">1</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">].getHosts()));<wbr><wbr></wbr></wbr></span></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr>}<wbr><wbr></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr>}<wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>else</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>if</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>(length<wbr>!=<wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">)<wbr>{<wbr><wbr></wbr></wbr></wbr></span></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr>String[]<wbr>splitHosts<wbr>=<wbr>getSplitHosts(blkLocations,<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">,length,clusterMap);<wbr><wbr></wbr></wbr></span></wbr></wbr></wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr>splits.add(<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>FileSplit(path,<wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">,<wbr>length,<wbr>splitHosts));<wbr><wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
- <wbr><wbr>}<wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>else</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>{<wbr><wbr><wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,130,0)">//Create<wbr>empty<wbr>hosts<wbr>array<wbr>for<wbr>zero<wbr>length<wbr>files</wbr></wbr></wbr></wbr></wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr><wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr>splits.add(<span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>FileSplit(path,<wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">,<wbr>length,<wbr></wbr></wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>new</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>String[</wbr></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(192,0,0)">0</span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px">]));<wbr><wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
- <wbr><wbr>}<wbr><wbr></wbr></wbr></wbr></wbr>
- }<wbr><wbr></wbr></wbr>
- <wbr><wbr></wbr></wbr>
- return<wbr>splits.toArray(</wbr>new<wbr>FileSplit[splits.size()]);<wbr><wbr></wbr></wbr></wbr>
- <wbr><wbr></wbr></wbr>
- protected<wbr></wbr>long<wbr>computeSplitSize(</wbr>long<wbr>goalSize,<wbr></wbr></wbr>long<wbr>minSize,<wbr></wbr></wbr>long<wbr>blockSize)<wbr>{<wbr><wbr></wbr></wbr></wbr></wbr>
- <wbr><wbr><wbr><wbr><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px; color:rgb(0,102,153)"><strong>return</strong></span><span style="word-wrap:normal; word-break:normal; border-style:none; padding:0px; margin:0px"><wbr>Math.max(minSize,<wbr>Math.min(goalSize,<wbr>blockSize));<wbr><wbr></wbr></wbr></wbr></wbr></wbr></span></wbr></wbr></wbr></wbr>
- }<wbr><wbr></wbr></wbr>
totalSize:是整个Map-Reduce job所有输入的总大小。
numSplits:来自job.getNumMapTasks(),即在job启动时用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,给M-R框架的Map数量的提示。
goalSize:是输入总大小与提示Map task数量的比值,即期望每个Mapper处理多少的数据,仅仅是期望,具体处理的数据数由下面的computeSplitSize决定。
minSplitSize:默认为1,可由子类复写函数protected void setMinSplitSize(long minSplitSize) 重新设置。一般情况下,都为1,特殊情况除外。
minSize:取的1和mapred.min.split.size中较大的一个。
blockSize:HDFS的块大小,默认为64M,一般大的HDFS都设置成128M。
splitSize:就是最终每个Split的大小,那么Map的数量基本上就是totalSize/splitSize。
接下来看看computeSplitSize的逻辑:首先在goalSize(期望每个Mapper处理的数据量)和HDFS的block size中取较小的,然后与mapred.min.split.size相比取较大的。
3. 如何调整Map的数量
有了2的分析,下面调整Map的数量就很容易了。
3.1 减小Map-Reduce job 启动时创建的Mapper数量
当处理大批量的大数据时,一种常见的情况是job启动的mapper数量太多而超出了系统限制,导致Hadoop抛出异常终止执行。解决这种异常的思路是减少mapper的数量。具体如下:
3.1.1 输入文件size巨大,但不是小文件
这种情况可以通过增大每个mapper的input size,即增大minSize或者增大blockSize来减少所需的mapper的数量。增大blockSize通常不可行,因为当HDFS被hadoop namenode -format之后,blockSize就已经确定了(由格式化时dfs.block.size决定),如果要更改blockSize,需要重新格式化HDFS,这样当然会丢失已有的数据。所以通常情况下只能通过增大minSize,即增大mapred.min.split.size的值。
3.1.2 输入文件数量巨大,且都是小文件
所谓小文件,就是单个文件的size小于blockSize。这种情况通过增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量。具体细节稍后会更新并展开。
3.2 增加Map-Reduce job 启动时创建的Mapper数量
增加mapper的数量,可以通过减小每个mapper的输入做到,即减小blockSize或者减小mapred.min.split.size的值。
参考资料
http://yaseminavcular.blogspot.com/2011/06/how-to-set-number-of-maps-with-hadoop.html
http://svn.apache.org/repos/asf/hadoop/common/tags/release-0.20.205.0
相关推荐
前四节提供了几个小案例 下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。 Map任务: 读取输入文件内容,解析成key,value对。...注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。
mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...
Python 中的 Hadoop Mapreduce 示例 python 中的几个 Mapreduce 示例以及有关运行它们的文档! 运行代码的步骤 文件夹结构 假定文件存储在 Linux 操作系统中的给定位置。 这只是一个示例说明,实际上位置并不重要。 ...
对Google第一版的mapreduce相关文献进行的翻译。结合了的知秋的相关文章翻译的,不收费
MapReduce中英文,MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,中文和英文,两个文件。Word
MapReduce中Shuffle优化与重构,彭辅权,金苍宏,如今Hadoop已成为目前最主流的云计算平台,在Hadoop分布式计算平台中,如何优化MapReduce计算性能是目前研究的一个热点问题。除了编写高
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
mapreduce在hadoop实现词统计和列式统计,mrwordcount工程是统计hadoop文件中的词数,mrflowcount工程是统hadoop文件中的列表
4 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后查看 MapReduce Web 界面。 5. 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后练习 MapReduce Shell 常用命令。 。。
但是有一些时候,我们需要在MapReduce程序中使用C语言、C++以及其他的语言,比如项目的开发人员更熟悉Java之外的语言,或者项目已经有部分功能用其他语言实现等。针对这些情况,我们需要研究如何在基于Java的...
MPI等并行计算方法缺少高层并行编程模型,为了克服这一缺陷,MapReduce借鉴了Lisp函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型 上升到构架:统一构架,为程序员隐藏系统层细节 MPI等...
对中文进行分词的java代码,分别在map reduce中实现。
MapReduce意味着在计算过程中实际分为两大步,Map过程和Reduce过程。 下面以一个统计单词次数简单案例为例: 数据源 Map类 import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org....
google云计算三大论文之MapReduce
利用MapReduce实现了求学生成绩的最大值,最小值,及成绩分布。结合我的博客“MapReduce之学生平均成绩”看,效果更好。
MapReduce求行平均值--标准差--迭代器处理--MapReduce案例
在整个过程中,Mapreduce库函数 负责原始数据的切割,中间key/value对集的聚合,以及任务的调度,容错、通 信控制等基础工作。而用户定义的map和reduce函数则根据实际问题确定具体操 作。 2. 框架的基本结构和执行...
MapReduce发明人关于MapReduce的介绍
在实际Hadoop系统中,如何使作业完成时间最短成为了一个NP完全问题,导致这个问题的主要原因是MapReduce计算过程中大量的数据从Map节点向Reduce节点进行迁移,容易造成网络拥塞,使得数据迁移时间过长。软件定义网络...
MapReduce求取行平均值 MapReduce小实例 数据有经过处理已经添加行号的 也有未添加的 行平均值的四种求法