0 引子:
hadoop把处理的文件读取到并解析成键值对,这个操作是通过InputFormat类的子类实现的。
在执行一个Job的时候,Hadoop会将处理的数据(存放在hdfs中)划分成N个Split,
然后启动相应的N个Map程序来分别处理它们,
默认下hdfs中的一个block就会被划分为一个Split,
现在的问题是:
a)被处理的hdfs数据如何被划分
b)Split对应的Map程序应该被分配到哪台TrackTracker机器上
c)划分后数据如何被读取到
具体解释如下:
mr根据传入具体XXInputFormat方式来对hdfs目标文件切分(egTextInputFormat),每个split对应一个
block,并记录block对应文件名,起始位置,文件长度,所在的节点HOSTS
切分好后形成一系列的map reduce任务,taskertracker通过心跳机制和jobtracker定期交互,
告诉jobtracker其资源使用情况,map reduce任务执行情况,后jobtracker根据其调度策略,
比如要分配任务对应spl;it的host位置和tasktracker的位置,以及tasktracker资源,同时优先将
执行失败的map任务提交给非执行失败的节点等统筹优先发送map任务给tasktracker节点。
tasktracker节点通过 getRecorder方法,将split的文件,起始位置,长度等信息最后交给hdfs dfsclient之手,
交给hdfs来执行读取文件操作, dfsclient将数据发送给namenode节点,namenode节点在决定
返回哪个datanode节点,然后和具体datanode节点对接上后执行对应文件内容的读取操作。
为何要切分: 文件切分后,能发挥并行处理优势的目的,
切分后形成的每一个inputsplit都会被分配到独立的mapper
但是mapper接受的是键值对, 那么inputslipt如何转换成键值对的呢??是由recordreader完成
InputFormat 两个函数: 一个切分成split 每一个split交给一个mapper 另一个是读取,
将切分好的split读取到真正的文件内容 并变成键值对给mapper
子类结构为:
1 map-reduce执行流程:
解释如下:
1、运行mapred程序;
2、本次运行将生成一个Job,于是JobClient向JobTracker申请一个JobID以标识这个Job;
3、JobClient将Job所需要的资源提交到HDFS中一个以JobID命名的目录中。这些资源包括JAR包、配置文件、InputSplit、等;
4、JobClient向JobTracker提交这个Job;
5、JobTracker初始化这个Job;
6、JobTracker从HDFS获取这个Job的Split等信息;
7、JobTracker向TaskTracker分配任务;
8、TaskTracker从HDFS获取这个Job的相关资源;
9、TaskTracker开启一个新的JVM;
10、TaskTracker用新的JVM来执行Map或Reduce;
a)被处理的hdfs数据如何被划分
何被划分
1) 数据被划分在JobClient中完成,JobClient通过指定的InputFormat
(在程序员代码中指定eg:job.setInputFormatClass(TextInputFormat.class) )将数据进行划分,
划分结果放在InputSplit中,
2) InputFormat 和 InputSplit 接口简介
InputFormat是一个interface, 旗下方法为:
方法1:
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
参数JobConf job是是任务的配置集合 比如其内包含目标hdfs要操作的文件地址
参数numSplits参数是一个Split数目的建议值,是否考虑这个值,由具体的InputFormat实现类决定。
返回值InputSplit数组,一个InputSplit描述一个Split。
方法2:
RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
==============================================================================
InputSplit也是一个interface,具体返回什么样的implement,是由具体的InputFormat来决定,旗下方法为:
方法1:
long getLength() throws IOException : Split有多长
方法2:
String[] getLocations() throws IOException : 存放这个Split的Location信息(eg:可能有多个replication,存在于多台机器上)
===============================================================================
Split中真正重要的描述信息(eg:Split对应于哪个文件?在文件中的起始和结束位置是什么?)在Split接口中并没有展示出来,InputFormat在需要读取到一个Split时,对应的这个Split传递到InputFormat.getRecordReader方法中,后被用于初始化一个RecordReader,以解析输入数据,刚才说的是执行流程(基本没看懂。。。),
描述Split的重要信息都被隐藏了,只有具体的InputFormat自己知道,
它只需要保证getSplits返回的InputSplit和getRecordReader所关心的InputSplit是同样的
implement就行了,这就给InputFormat的实现提供了巨大的灵活性。
3) InputFormat的常见子类简介如下:
3.1) FileInputFormat
使用FileSplit(implements InputSplit)来描述Split, FileSplit中有以下描述信息 :
private Path file; // Split所在的文件
private long start; // Split的起始位置
private long length; // Split的长度,getLength()会返回它
private String[] hosts; // Split所在的机器名称,getLocations()会返回它
FileInputFormat配套使用的RecordReader将从FileSplit中获取信息,
解析文件名为FileSplit.file的文件中从FileSplit.start到FileSplit.start+FileSplit.length之间的内容,
具体的划分策略: FileInputFormat默认为文件在HDFS上的每一个Block生成一个对应的FileSplit,
基于以上文件划分策略,自然有如下结果:
FileSplit.start就是对应Block在文件中的Offset,
FileSplit.length就是对应Block的Length,
FileSplit.hosts就是对应Block的Location,
可以通过设置“mapred.min.split.size”参数,使得Split的大小大于一个Block,
此候FileInputFormat会将连续的若干个Block分在一个Split中、也可能会将一个Block分别划在不同的Split中,
Split的Start、Length都好说,都是划分前就定好的,
而Split的Location就需要对所有划在其中的Block的Location进行整合,尽量寻找它们共有的Location。
而这些Block很可能并没有共同的Location,那么就需要找一个距离这些Block最近的Location作为Split的Location。
3.2) CombineFileInputFormat
可以将若干个Split打包成一个,目的是避免过多的Map任务(因为Split的数目决定了Map的数目)。
CombineFileInputFormat配套使用的CombineFileSplit将从CombineFileSplit中获取信息,
CombineFileSplit的成员如下:
private Path[] paths; // 每个子Split对应一个文件
private long[] startoffset; // 每个子Split在对应文件中的起始位置
private long[] lengths; // 每个子Split的长度
private String[] locations; // Split所在的机器名称,getLocations()会返回它
private long totLength; // 所有子Split长度之和,getLength()会返回它
前三个数组一定是长度相等并且一一对应的,描述了每一个子Split的信息,
CombineFileInputFormat在打包一组子Split时,会考虑子Split的Location,
尽量将在同一个Location(或者临近位置)出现的Split打包在一起,生成一个CombineFileSplit 。
配套使用的RecordReader将从CombineFileSplit中获取信息,解析每一个文件名为CombineFileSplit.paths[i]的文件中从CombineFileSplit.startoffset[i]到CombineFileSplit.startoffset[i]+CombineFileSplit.lengths[i]之间的内容。
具体划分策略,
CombineFileSplit先将输入文件拆分成若干个子Split,每个子Split对应文件在HDFS的一个Block。
然后按照“mapred.max.split.size”配置,将Length之和不超过这个值的拥有共同Location的几个子Split打包起来,得到一个CombineFileSplit。最后可能会剩下一些子Split,它们不满足拥有共同Location这个条件,那么打包它们的时候就需要找一个距离这些子Split最近的Location作为Split的Location。
==================================================================
如果输入文件是不可以划分的(比如它是一个tar.gz,划分会导致它无法解压),那么在设计InputFormat时,
可以重载FileInputFormat的isSplitable()函数来告知文件不可划分,或者干脆就从头实现自己的InputFormat。
由于InputSplit接口是非常灵活的,还可以设计出千奇百怪的划分方式。
b)Split对应的Map程序应该被分配到哪台TrackTracker机器上
InputSplit接口中有getLocations()中的Locations主要就是用来给Split的调度提供参考.
第6步JobTracker会从HDFS获取Job的Split信息,这将生成一系列待处理的Map和Reduce任务,
JobTracker并不会主动的为每一个TaskTracker划分一个任务子集,
而是直接把所有任务都放在跟Job对应的待处理任务列表中。TaskTracker定期向JobTracker发送心跳,
除了保持活动以外,还会报告TaskTracker当前可以执行的Map和Reduce的剩余配额
TaskTracker总的配额由“mapred.tasktracker.map.tasks.maximun”和“mapred.tasktracker.reduce.tasks.maximun”来配置)。
如果JobTracker有待处理的任务,TaskTracker又有相应的配额,
则JobTracker会在心跳的应答中给JobTracker分配任务(优先分配Map任务),
在分配Map任务时,Split的Location信息就要发挥作用了, !!!!!!!!!!!!!!!!!!!
JobTracker会根据TaskTracker的地址来选择一个Location与之最接近的Split所对应的Map任务(注意一个Split可以有多个Location)
这样一来,输入文件中Block的Location信息经过一系列的整合(by InputFormat)和传递,最终就影响到了Map任务的分配
其结果是Map任务倾向于处理存放在本地的数据,以保证效率。
当然,Location仅仅是JobTracker在分配Map任务时所考虑的因素之一。JobTracker在选择任务之前,需要先选定一个Job(可能正有多个Job等待处理),
这取决于具体TaskScheduler的调度策略。然后,JobTracker又会优先选择因为失败而需要重试的任务,而重试任务又尽量不要分配到它曾经执行失败过的机器上。
JobTracker在分配Reduce任务时并不考虑Location,因为大部分情况下,
Reduce处理的是所有Map的输出,这些Map遍布在Hadoop集群的每一个角落,考虑Location意义不大。
总结下的话就是:
JobClient 将hdfs目标文件分割并计算出每个split的locations
TaskTracker通过心跳机制告诉JobTracker自己的资源,JobTracker会根据Split的Location和
TaskTracker的Lotaciont以及TaskTracker的资源剩余情况,做个算法统筹从而决定将
哪个Map交给哪个TaskTracker节点。
c)划分后数据如何被读取到
总结就是: map-redude经过RecordReader的手最后让读写操作交给DFSClient来完成,
在前面的流程图的第10步,TaskTracker就要启动一个新的JVM来执行Map程序了。
在Map执行的时候,会使用InputFormat.getRecordReader()所返回的RecordReader对象来读取Split中的每一条记录,
getRecordReader函数中会使用InputSplit对RecordReader进行初始化。
RecordReader对一个Path的Open操作由DFSClient来完成,
它会向HDFS的NameNode获取对应文件在对应区间上的Block信息:依次有哪些Block、每个Block各自的Location。
而要读写一个Block的时候,DFSClient总是使用NameNode返回的第一个Location,除非读写失败才会依次选择后面的Location。
而NameNode在处理Open请求时(getBlockLocations),在得到一个Block有哪些Location之后,
会以DFSClient所在的地址为依据,对这些Location进行排序,距离越小的越排在前。
而DFSClient又总是会选择排在前面的Location,所以,最终RecordReader会倾向于读取本地的数据。
DFSClient都会向DataNode建立连接,然后请求数据。并不会因为Block是本地的而直接读磁盘上的文件,
因为这些文件都是由DataNode来管理的,需要通过DataNode来找到Block所对应的物理文件、也需要由DataNode来协调对文件的并发读写。
本地与非本地的差别仅仅在于网络传输上,前者是仅仅在本地网络协议栈上面绕一圈、而后者则是真正的网络通讯。
在Block离得不远、且网络比较畅通的情况下,非Local并不意味着太大的开销。
所以Hadoop优先追求Map的Data-local,也就是输入数据存放在本地。如果不能满足,则退而求其次,追求Rack-local,
也就是输入数据存放在同一机架的其他机器上,这样的话网络开销对性能影响一般不会太大。
=============================================================================
总结:
如何分: 通过JobClient的InputFormat来对hdfs目标文件划分
split如何调度给datanode:
jobtracker 根据split的location 然后心跳得知 tasktracker 资源符合 和location下决定split对应的map发送给哪个tasktracker
划分后split数据如何读取到: RecordReader通过open链接DFSClient 然后交给hdfs来就近读取进行IO操作
相关推荐
自定义MapReduce的InputFormat,实现提取指定开始与结束限定符的内容。
自定义inputFormat&&outputFormat1
hive inputformat实例代码,按照空格对日志文件进行拆分
结合Hadoop源码,详细讲解了MapReduce开发中的InputFormat,很详细。
hadoop-mapreduce-custom-inputformat 1.0-SNAPSHOT 运行命令如下: hadoop jar hadoop-mapreduce-custom-inputformat-1.0-SNAPSHOT.jar org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce....
使用可拆分的多行 JSON 的 InputFormat 动机 目前似乎没有任何可以支持多行 JSON 的 JSON InputFormat 类。 执照 Apache 许可。 用法 要开始,只需: 下载并运行ant 在您的环境中包含dist/lib/json-mapreduce-1.0....
问题背景:框架默认的TextInputformat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量
Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...
##Couchbase InputFormat 提供什么? 在与 Couchbase Sqoop 连接器搏斗时,发现了一些错误,使其无法与 CDH3 版本正常工作。 从 Couchbase 中提取键/值的实际 InputFormat 存在于 Sqoop 连接器的基于代码中,但对 ...
使用 XML InputFormat 映射 Reduce。 这是一段代码,用于清理 Wiki XML 数据集并将其转换为带分隔符的文本。 从维基百科档案中提取电影数据进行分析。 提供了 Sample.xml。 如果您的 XML 结构发生变化,请查看 ...
linux和windows安装openOffice java通过jodconverter 将excel、doc文件转成pdf或html,比2.2.1版本相比 提供office 2007版本支持
用法首先,您必须将WikiInputFormat设置为您的作业 InputFormat: job . setInputFormatClass( WikiInputFormat . class); 您的 Mappers 传入 Key 和 Value 需要来自LongWritable和WikiRevisionWritable类型。
映射文件输入格式MapFiles 的 Hadoop InputFormat,它在将任何内容传递给映射器之前过滤不相关的 FileSplits。目的假设您的文件系统中有一些带有排序键的非常大的文件,并且键已排序。 在编写 MapReduce 作业时,您...
ExcelRecordReaderMapReducehadoop mapreduce的MapReduce输入格式以读取Microsoft Excel电子表格执照Apache许可。用法1.下载并运行ant。 2.在您的环境中包括ExcelRecordReaderMapReduce-0.0.1-SNAPSHOT.jar 3....
解决openOffice jodconverter-2.2.1包不能将2007及以上office转为PDF和解决txt乱码问题
3、定义 InputFormat 和 OutputFormat,可选, InputFormat 将每行输入文件的内容转换为 Java 类供 Mapper 函数使用,不定义时默认为 String。 4、定义 main 函数,在里面定义一个 Job 并运行它。 Hadoop 的架构...
本文将介绍Hive多字节分隔符问题的解决方案,包括替换分隔符、RegexSerDe正则加载和自定义InputFormat三种方法。 应用场景 在实际工作中,我们遇到的数据往往不是非常规范化的数据,例如,我们会遇到以下两种情况...
1. 输入格式(InputFormat):InputFormat是MapReduce模型中负责将输入数据分割成小块的接口。它的主要方法有:getSplits(将输入文件分割成逻辑上的多个InputSplit)和getRecordReader(创建记录读取器,读取...
date = datetime(data.Date, 'InputFormat', 'yyyy/MM/dd'); openPrice = data.Open; closePrice = data.Close; 长期和短期价格趋势。 移动平均线、波动率等。 交易量与价格之间的关系。 大额交易对价格的影响。
3.2 InputFormat 数据输入 3.2.1 Job 提交流程和切片源码详解 3.2.2 FileInputFormat 切片机制