版权声明:本文为博主原创文章,未经博主允许不得转载。
我们知道hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理:
- 对输入数据进行切分,生成一组split,一个split会分发给一个mapper进行处理。
- 针对每个split,再创建一个RecordReader读取Split内的数据,并按照<key,value>的形式组织成一条record传给map函数进行处理。
最常见的FormatInput就是TextInputFormat,在split的读取方面,它是将给到的Split按行读取,以行首字节在文件中的偏移做key,以行数据做value传给map函数处理,这部分的逻辑是由它所创建并使用的RecordReader:LineRecordReader封装和实现的.关于这部分逻辑,在一开始接触hadoop时会有一个常见的疑问:如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如word count就是一个例子).搞清楚这个问题还是需要从源码入手了解TextInputFormat的详细工作方式,这里简单地梳理记录如下(本文参考的是hadoop1.1.2的源码):
1. LineRecordReader会创建一个org.apache.hadoop.util.LineReader实例,并依赖这个LineReader的readLine方法来读取一行记录,具体可参考org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text),Line 176),那么关键的逻辑就在这个readLine方法里了,下面是添加了额外中文注释的该方法源码.这个方法主要的逻辑归纳起来是3点:
- 总是是从buffer里读取数据,如果buffer里的数据读完了,先加载下一批数据到buffer
- 在buffer中查找"行尾",将开始位置至行尾处的数据拷贝给str(也就是最后的Value).如果为遇到"行尾",继续加载新的数据到buffer进行查找.
- 关键点在于:给到buffer的数据是直接从文件中读取的,完全不会考虑是否超过了split的界限,而是一直读取到当前行结束为止
- /**
- * Read one line from the InputStream into the given Text. A line
- * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
- * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
- * line.
- *
- * @param str the object to store the given line (without newline)
- * @param maxLineLength the maximum number of bytes to store into str;
- * the rest of the line is silently discarded.
- * @param maxBytesToConsume the maximum number of bytes to consume
- * in this call. This is only a hint, because if the line cross
- * this threshold, we allow it to happen. It can overshoot
- * potentially by as much as one buffer length.
- *
- * @return the number of bytes read including the (longest) newline
- * found.
- *
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength,
- int maxBytesToConsume) throws IOException {
- /* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
- str.clear();
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- //如果buffer中的数据读完了,先加载一批数据到buffer里
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR)
- ++bytesConsumed; //account for CR from previous read
- bufferLength = in.read(buffer);
- if (bufferLength <= 0)
- break; // EOF
- }
- //注意:这里的逻辑有点tricky,由于不同操作系统对“行结束符“的定义不同:
- //UNIX: '\n' (LF)
- //Mac: '\r' (CR)
- //Windows: '\r\n' (CR)(LF)
- //为了准确判断一行的结尾,程序的判定逻辑是:
- //1.如果当前符号是LF,可以确定一定是到了行尾,但是需要参考一下前一个
- //字符,因为如果前一个字符是CR,那就是windows文件,“行结束符的长度”
- //(即变量:newlineLength,这个变量名起的有点糟糕)应该是2,否则就是UNIX文件,“行结束符的长度”为1。
- //2.如果当前符号不是LF,看一下前一个符号是不是CR,如果是也可以确定一定上个字符就是行尾了,这是一个mac文件。
- //3.如果当前符号是CR的话,还需要根据下一个字符是不是LF判断“行结束符的长度”,所以只是标记一下prevCharCR=true,供读取下个字符时参考。
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0)
- --readLength; //CR at the end of the buffer
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }//newlineLength == 0 就意味着始终没有读到行尾,程序会继续通过文件输入流继续从文件里读取数据。
- //这里有一个非常重要的地方:in的实例创建自构造函数:org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit)
- //第86行:FSDataInputStream fileIn = fs.open(split.getPath()); 我们看以看到:
- //对于LineRecordReader:当它对取“一行”时,一定是读取到完整的行,不会受filesplit的任何影响,因为它读取是filesplit所在的文件,而不是限定在filesplit的界限范围内。
- //所以不会出现“断行”的问题!
- } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
- if (bytesConsumed > (long)Integer.MAX_VALUE)
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- return (int)bytesConsumed;
- }
2. 按照readLine的上述行为,在遇到跨split的行时,会到下一个split继续读取数据直至行尾,那么下一个split怎么判定开头的一行有没有被上一个split的LineRecordReader读取过从而避免漏读或重复读取开头一行呢?这方面LineRecordReader使用了一个简单而巧妙的方法:既然无法断定每一个split开始的一行是独立的一行还是被切断的一行的一部分,那就跳过每个split的开始一行(当然要除第一个split之外),从第二行开始读取,然后在到达split的结尾端时总是再多读一行,这样数据既能接续起来又避开了断行带来的麻烦.以下是相关的源码:
在LineRecordReader的构造函数org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit) 108到113行确定start位置时,明确注明::会特别地忽略掉第一行!
- // If this is not the first split, we always throw away first record
- // because we always (except the last split) read one extra line in
- // next() method.
- if (start != 0) {
- start += in.readLine(new Text(), 0, maxBytesToConsume(start));
- }
相应地,在LineRecordReader判断是否还有下一行的方法:org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text) 170到173行中,while使用的判定条件是:当前位置小于或等于split的结尾位置,也就说:当当前以处于split的结尾位置上时,while依然会执行一次,这一次读到显然已经是下一个split的开始行了!
- // We always read one extra line, which lies outside the upper
- // split limit i.e. (end - 1)
- while (getFilePosition() <= end) {
小结:
至此,跨split的行读取的逻辑就完备了.如果引申地来看,这是map-reduce前期数据切分的一个普遍性问题,即不管我们用什么方式切分和读取一份大数据中的小部分,包括我们在实现自己的InputFormat时,都会面临在切分处数据时的连续性解析问题. 对此我们应该深刻地认识到:split最直接的现实作用是取出大数据中的一小部分给mapper处理,但这只是一种"逻辑"上的,"宏观"上的切分,在"微观"上,在split的首尾切分处,为了确保数据连续性,跨越split接续并拼接数据也是完全正当和合理的.
相关推荐
结合Hadoop源码,详细讲解了MapReduce开发中的InputFormat,很详细。
Hadoop 代码使用方式 ...hadoop jar hadoop-mapreduce-custom-inputformat-1.0-SNAPSHOT.jar org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce.input.fileinputformat.split.maxsize=10
自定义MapReduce的InputFormat,实现提取指定开始与结束限定符的内容。
ExcelRecordReaderMapReducehadoop mapreduce的MapReduce输入格式以读取Microsoft Excel电子表格执照Apache许可。用法1.下载并运行ant。 2.在您的环境中包括ExcelRecordReaderMapReduce-0.0.1-SNAPSHOT.jar 3.使用...
MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系结构3.1.2 新旧MapReduce API比较3.2 MapReduce API基本概念3.2.1 序列化3.2.2 Reporter参数3.2.3 回调机制3.3 Java API解析3.3.1 ...
3.3 读和写 3.3.1 InputFormat 3.3.2 OutputFormat 3.4 小结第二部分 实战 第4章 编写MapReduce基础程序 4.1 获得专利数据集 4.1.1 专利引用数据 4.1.2 专利描述数据 4.2 构建MapReduce 程序的基础模板 4.3...
30第3章 Hadoop组件 313.1 HDFS文件操作 313.1.1 基本文件命令 323.1.2 编程读写HDFS 353.2 剖析MapReduce程序 373.2.1 Hadoop数据类型 393.2.2 Mapper 403.2.3 Reducer 413.2.4 Partitioner:...
Hadoop组件3.1 HDFS 文件操作3.1.1 基本文件命令3.1.2 编程读写HDFS3.2 剖析MapReduce 程序3.2.1 Hadoop数据类型3.2.2 Mapper3.2.3 Reducer3.2.4 Partitioner:重定向Mapper输出3.2.5 Combiner:本地reduce3.2.6 ...
FocusBigData :elephant:Hadoop分布存储框架 Hadoop篇 HDFS篇 ...MapReduce之InputFormat数据输入 MapReduce之OutputFormat数据输出 MapReduce之Shuffle机制 MapReduce之MapJoin和ReduceJoin MapReduce之
MapReduce自定义InputFormat和RecordReader实现 MapReduce自定义OutputFormat和RecordWriter实现 Pig自定义LoadFunc加载和解析Apache HTTP日志事件 Pig的自定义EvalFunc使用MaxMind GEO API将IP地址转换为位置 另外...
第一部分 Hadoop——一种分布式编程框架第1章 Hadoop简介 21.1 为什么写《Hadoop 实战》 31.2 什么是Hadoop 31.3 了解分布式系统和Hadoop 41.4 比较SQL数据库和Hadoop 51.5 理解MapReduce 61.5.1 动手扩展一个简单...
hadoop中的mapreduce框架、spark。 hadoop中的mapreduce框架: 对编程模型阶段1实现就是:map task 对编程模型阶段2的实现就是reduce task。 map task: 读数据:InputFormat–>TextInputFormat
映射文件输入格式MapFiles 的 Hadoop InputFormat,它在将任何内容传递给映射器之前过滤不相关的 FileSplits。目的假设您的文件系统中有一些带有排序键的非常大的文件,并且键已排序。 在编写 MapReduce 作业时,您...
TensorFlow生态系统 该存储库包含将TensorFlow与其他开源框架集成的示例。... -Hadoop MapReduce和Spark的TFRecord文件InputFormat / OutputFormat。 -Spark TensorFlow连接器 -Python软件包,可帮助用户使用TensorF
它可以通过YARN或Spark的Standalone在Hadoop集群中运行,并且可以处理HDFS、Hbase、Cassandra、Hive和任何Hadoop InputFormat中的数据。它旨在执行批处理(类似于MapReduce)和提供新的工作特性,例如流计算,SparkSQL...
您可以在自己的Hadoop MapReduce作业中使用的 ,可从轻松使用这些InputFormat和OutputFormat类 从 LOAD和STORE到ElasticSearch的 一些用于与ElasticSearch进行交互的 ... < groupId>...