`
reb12345reb
  • 浏览: 47645 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Hadoop组件之-MapReduce(InputFormat)

阅读更多
 

Hadoop源码解析之: TextInputFormat如何处理跨split的行

标签: hadoopsplitTextInputFormat跨split
 6402人阅读 评论(1) 收藏 举报

我们知道hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理:

  •  对输入数据进行切分,生成一组split,一个split会分发给一个mapper进行处理。
  •  针对每个split,再创建一个RecordReader读取Split内的数据,并按照<key,value>的形式组织成一条record传给map函数进行处理。

最常见的FormatInput就是TextInputFormat,在split的读取方面,它是将给到的Split按行读取,以行首字节在文件中的偏移做key,以行数据做value传给map函数处理,这部分的逻辑是由它所创建并使用的RecordReader:LineRecordReader封装和实现的.关于这部分逻辑,在一开始接触hadoop时会有一个常见的疑问:如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如word count就是一个例子).搞清楚这个问题还是需要从源码入手了解TextInputFormat的详细工作方式,这里简单地梳理记录如下(本文参考的是hadoop1.1.2的源码):

 

1. LineRecordReader会创建一个org.apache.hadoop.util.LineReader实例,并依赖这个LineReader的readLine方法来读取一行记录,具体可参考org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text),Line 176),那么关键的逻辑就在这个readLine方法里了,下面是添加了额外中文注释的该方法源码.这个方法主要的逻辑归纳起来是3点:

  • 总是是从buffer里读取数据,如果buffer里的数据读完了,先加载下一批数据到buffer
  • 在buffer中查找"行尾",将开始位置至行尾处的数据拷贝给str(也就是最后的Value).如果为遇到"行尾",继续加载新的数据到buffer进行查找.
  • 关键点在于:给到buffer的数据是直接从文件中读取的,完全不会考虑是否超过了split的界限,而是一直读取到当前行结束为止

 

[java] view plaincopy
 
  1. /** 
  2.        * Read one line from the InputStream into the given Text.  A line 
  3.        * can be terminated by one of the following: '\n' (LF) , '\r' (CR), 
  4.        * or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated 
  5.        * line. 
  6.        * 
  7.        * @param str the object to store the given line (without newline) 
  8.        * @param maxLineLength the maximum number of bytes to store into str; 
  9.        *  the rest of the line is silently discarded. 
  10.        * @param maxBytesToConsume the maximum number of bytes to consume 
  11.        *  in this call.  This is only a hint, because if the line cross 
  12.        *  this threshold, we allow it to happen.  It can overshoot 
  13.        *  potentially by as much as one buffer length. 
  14.        * 
  15.        * @return the number of bytes read including the (longest) newline 
  16.        * found. 
  17.        * 
  18.        * @throws IOException if the underlying stream throws 
  19.        */  
  20.       public int readLine(Text str, int maxLineLength,  
  21.                           int maxBytesToConsume) throws IOException {  
  22.         /* We're reading data from in, but the head of the stream may be 
  23.          * already buffered in buffer, so we have several cases: 
  24.          * 1. No newline characters are in the buffer, so we need to copy 
  25.          *    everything and read another buffer from the stream. 
  26.          * 2. An unambiguously terminated line is in buffer, so we just 
  27.          *    copy to str. 
  28.          * 3. Ambiguously terminated line is in buffer, i.e. buffer ends 
  29.          *    in CR.  In this case we copy everything up to CR to str, but 
  30.          *    we also need to see what follows CR: if it's LF, then we 
  31.          *    need consume LF as well, so next call to readLine will read 
  32.          *    from after that. 
  33.          * We use a flag prevCharCR to signal if previous character was CR 
  34.          * and, if it happens to be at the end of the buffer, delay 
  35.          * consuming it until we have a chance to look at the char that 
  36.          * follows. 
  37.          */  
  38.         str.clear();  
  39.         int txtLength = 0//tracks str.getLength(), as an optimization  
  40.         int newlineLength = 0//length of terminating newline  
  41.         boolean prevCharCR = false//true of prev char was CR  
  42.         long bytesConsumed = 0;  
  43.         do {  
  44.           int startPosn = bufferPosn; //starting from where we left off the last time  
  45.           //如果buffer中的数据读完了,先加载一批数据到buffer里  
  46.           if (bufferPosn >= bufferLength) {  
  47.             startPosn = bufferPosn = 0;  
  48.             if (prevCharCR)  
  49.               ++bytesConsumed; //account for CR from previous read  
  50.             bufferLength = in.read(buffer);  
  51.             if (bufferLength <= 0)  
  52.               break// EOF  
  53.           }  
  54.           //注意:这里的逻辑有点tricky,由于不同操作系统对“行结束符“的定义不同:  
  55.           //UNIX: '\n'  (LF)  
  56.           //Mac:  '\r'  (CR)  
  57.           //Windows: '\r\n'  (CR)(LF)  
  58.           //为了准确判断一行的结尾,程序的判定逻辑是:  
  59.           //1.如果当前符号是LF,可以确定一定是到了行尾,但是需要参考一下前一个  
  60.           //字符,因为如果前一个字符是CR,那就是windows文件,“行结束符的长度”  
  61.           //(即变量:newlineLength,这个变量名起的有点糟糕)应该是2,否则就是UNIX文件,“行结束符的长度”为1。  
  62.           //2.如果当前符号不是LF,看一下前一个符号是不是CR,如果是也可以确定一定上个字符就是行尾了,这是一个mac文件。  
  63.           //3.如果当前符号是CR的话,还需要根据下一个字符是不是LF判断“行结束符的长度”,所以只是标记一下prevCharCR=true,供读取下个字符时参考。  
  64.           for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline  
  65.             if (buffer[bufferPosn] == LF) {  
  66.               newlineLength = (prevCharCR) ? 2 : 1;  
  67.               ++bufferPosn; // at next invocation proceed from following byte  
  68.               break;  
  69.             }  
  70.             if (prevCharCR) { //CR + notLF, we are at notLF  
  71.               newlineLength = 1;  
  72.               break;  
  73.             }  
  74.             prevCharCR = (buffer[bufferPosn] == CR);  
  75.           }  
  76.           int readLength = bufferPosn - startPosn;  
  77.           if (prevCharCR && newlineLength == 0)  
  78.             --readLength; //CR at the end of the buffer  
  79.           bytesConsumed += readLength;  
  80.           int appendLength = readLength - newlineLength;  
  81.           if (appendLength > maxLineLength - txtLength) {  
  82.             appendLength = maxLineLength - txtLength;  
  83.           }  
  84.           if (appendLength > 0) {  
  85.             str.append(buffer, startPosn, appendLength);  
  86.             txtLength += appendLength;         
  87.           }//newlineLength == 0 就意味着始终没有读到行尾,程序会继续通过文件输入流继续从文件里读取数据。  
  88.           //这里有一个非常重要的地方:in的实例创建自构造函数:org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit)  
  89.           //第86行:FSDataInputStream fileIn = fs.open(split.getPath()); 我们看以看到:  
  90.           //对于LineRecordReader:当它对取“一行”时,一定是读取到完整的行,不会受filesplit的任何影响,因为它读取是filesplit所在的文件,而不是限定在filesplit的界限范围内。  
  91.           //所以不会出现“断行”的问题!  
  92.         } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);  
  93.       
  94.         if (bytesConsumed > (long)Integer.MAX_VALUE)  
  95.           throw new IOException("Too many bytes before newline: " + bytesConsumed);      
  96.         return (int)bytesConsumed;  
  97.       }  

 

2. 按照readLine的上述行为,在遇到跨split的行时,会到下一个split继续读取数据直至行尾,那么下一个split怎么判定开头的一行有没有被上一个split的LineRecordReader读取过从而避免漏读或重复读取开头一行呢?这方面LineRecordReader使用了一个简单而巧妙的方法:既然无法断定每一个split开始的一行是独立的一行还是被切断的一行的一部分,那就跳过每个split的开始一行(当然要除第一个split之外),从第二行开始读取,然后在到达split的结尾端时总是再多读一行,这样数据既能接续起来又避开了断行带来的麻烦.以下是相关的源码:

在LineRecordReader的构造函数org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit) 108到113行确定start位置时,明确注明::会特别地忽略掉第一行!

 

[java] view plaincopy
 
  1. // If this is not the first split, we always throw away first record  
  2.     // because we always (except the last split) read one extra line in  
  3.     // next() method.  
  4.     if (start != 0) {  
  5.       start += in.readLine(new Text(), 0, maxBytesToConsume(start));  
  6.     }  

相应地,在LineRecordReader判断是否还有下一行的方法:org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text) 170到173行中,while使用的判定条件是:当前位置小于或等于split的结尾位置,也就说:当当前以处于split的结尾位置上时,while依然会执行一次,这一次读到显然已经是下一个split的开始行了!

 

 

[java] view plaincopy
 
  1. // We always read one extra line, which lies outside the upper  
  2. // split limit i.e. (end - 1)  
  3. while (getFilePosition() <= end) {  

小结:

至此,跨split的行读取的逻辑就完备了.如果引申地来看,这是map-reduce前期数据切分的一个普遍性问题,即不管我们用什么方式切分和读取一份大数据中的小部分,包括我们在实现自己的InputFormat时,都会面临在切分处数据时的连续性解析问题. 对此我们应该深刻地认识到:split最直接的现实作用是取出大数据中的一小部分给mapper处理,但这只是一种"逻辑"上的,"宏观"上的切分,在"微观"上,在split的首尾切分处,为了确保数据连续性,跨越split接续并拼接数据也是完全正当和合理的.
分享到:
评论

相关推荐

    Hadoop源码解析---MapReduce之InputFormat

    结合Hadoop源码,详细讲解了MapReduce开发中的InputFormat,很详细。

    CustomInputFormatCollection:Hadoop Mapreduce InputFormat 集合

    Hadoop 代码使用方式 ...hadoop jar hadoop-mapreduce-custom-inputformat-1.0-SNAPSHOT.jar org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce.input.fileinputformat.split.maxsize=10

    自定义MapReduce的InputFormat

    自定义MapReduce的InputFormat,实现提取指定开始与结束限定符的内容。

    ExcelRecordReaderMapReduce:可以读取Excel文件的MapReduce InputFormat

    ExcelRecordReaderMapReducehadoop mapreduce的MapReduce输入格式以读取Microsoft Excel电子表格执照Apache许可。用法1.下载并运行ant。 2.在您的环境中包括ExcelRecordReaderMapReduce-0.0.1-SNAPSHOT.jar 3.使用...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系结构3.1.2 新旧MapReduce API比较3.2 MapReduce API基本概念3.2.1 序列化3.2.2 Reporter参数3.2.3 回调机制3.3 Java API解析3.3.1 ...

    Hadoop实战中文版

    3.3 读和写 3.3.1 InputFormat 3.3.2 OutputFormat 3.4 小结第二部分 实战 第4章 编写MapReduce基础程序 4.1 获得专利数据集 4.1.1 专利引用数据 4.1.2 专利描述数据 4.2 构建MapReduce 程序的基础模板 4.3...

    Hadoop实战中文版.PDF

    30第3章 Hadoop组件 313.1 HDFS文件操作 313.1.1 基本文件命令 323.1.2 编程读写HDFS 353.2 剖析MapReduce程序 373.2.1 Hadoop数据类型 393.2.2 Mapper 403.2.3 Reducer 413.2.4 Partitioner:...

    Hadoop实战(陆嘉恒)译

    Hadoop组件3.1 HDFS 文件操作3.1.1 基本文件命令3.1.2 编程读写HDFS3.2 剖析MapReduce 程序3.2.1 Hadoop数据类型3.2.2 Mapper3.2.3 Reducer3.2.4 Partitioner:重定向Mapper输出3.2.5 Combiner:本地reduce3.2.6 ...

    FocusBigData:【大数据成神之路学习路径+面经+简历】

    FocusBigData :elephant:Hadoop分布存储框架 Hadoop篇 HDFS篇 ...MapReduce之InputFormat数据输入 MapReduce之OutputFormat数据输出 MapReduce之Shuffle机制 MapReduce之MapJoin和ReduceJoin MapReduce之

    mapreduce_training:用于教学目的的MapReduce应用程序集

    MapReduce自定义InputFormat和RecordReader实现 MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外...

    Hadoop实战

    第一部分 Hadoop——一种分布式编程框架第1章 Hadoop简介 21.1 为什么写《Hadoop 实战》 31.2 什么是Hadoop 31.3 了解分布式系统和Hadoop 41.4 比较SQL数据库和Hadoop 51.5 理解MapReduce 61.5.1 动手扩展一个简单...

    大数据学习(九):mapreduce编程模型及具体框架实现

     hadoop中的mapreduce框架、spark。  hadoop中的mapreduce框架:  对编程模型阶段1实现就是:map task  对编程模型阶段2的实现就是reduce task。 map task:  读数据:InputFormat–&gt;TextInputFormat

    mapfileinputformat:MapFiles 的 Hadoop InputFormat,它在将任何内容传递给映射器之前过滤不相关的 FileSplits

    映射文件输入格式MapFiles 的 Hadoop InputFormat,它在将任何内容传递给映射器之前过滤不相关的 FileSplits。目的假设您的文件系统中有一些带有排序键的非常大的文件,并且键已排序。 在编写 MapReduce 作业时,您...

    ecosystem:TensorFlow与其他开源框架的集成

    TensorFlow生态系统 该存储库包含将TensorFlow与其他开源框架集成的示例。... -Hadoop MapReduce和Spark的TFRecord文件InputFormat / OutputFormat。 -Spark TensorFlow连接器 -Python软件包,可帮助用户使用TensorF

    Spark RDD详解

    它可以通过YARN或Spark的Standalone在Hadoop集群中运行,并且可以处理HDFS、Hbase、Cassandra、Hive和任何Hadoop InputFormat中的数据。它旨在执行批处理(类似于MapReduce)和提供新的工作特性,例如流计算,SparkSQL...

    wonderdog:批量加载以进行弹性搜索

    您可以在自己的Hadoop MapReduce作业中使用的 ,可从轻松使用这些InputFormat和OutputFormat类 从 LOAD和STORE到ElasticSearch的 一些用于与ElasticSearch进行交互的 ... &lt; groupId&gt;...

Global site tag (gtag.js) - Google Analytics