从官方的WordCount中不难看出,Hadoop在读取文本时的至少要有两种分割的功能。即可以将一篇文档按行切割分离,同时可以将一行中的文本依据空格切割出来。这样,类似与Java的I/O操作,只不过是以单词为单位向下迭代。每次迭代时读出一个单词并取出。
Text word = new Text();然后word.set(itr.nextToken());context.write(word, one) (one是数值为1的整数型变量)。将word逐次编号,并将word的value设置为1表示独到的单词已经出现了一次。最后将全部读取结果保存到Context类型的文本上。
因此,从对文件读写要求的角度出发,Hadoop在文件系统包下建立了一些关乎I/O的接口。
org.apache.hadoop.fs.Seekable
该接口定义了三种方法。注意:以下方法是对任意文件的操作而非仅仅是针对文本的操作。
1.void seek(long pos)
从文件的开始处经过指定的步长pos到达新位置,read()指向新位置。步长不能超出文件。
2.void getPos(long pos)
返回当前位置的步长。
3.boolean seekToNewSource(long targPos)
判断在目标位置是否是当前内容的复本。
与此类似org.apache.hadoop.fs.PositioneReadable定义了一些可读的方法.
public int read(long position, byte[] buffer, int offset, int length) throws IOException;
将描述翻译成中文是从文件中的一个指定位置position开始读取直到指定字节串byte[]的位置offset,返回读取到的字节个数length。
貌似是要实现这样一个过程,仅是个人猜测:eg:I wish the wish you wish.
int length = 0;
String s = "you";
bytes = s.toByteArray();
read(7, bytes, 2, length);
从I wish the wish you wish.从t到u一共移动了11个字符。运行完毕后length = 11。
原文档中提到This dos not change the current offset of a file, and is thread-safe.估计就是用length的增加取代offset的偏移,实现线程安全。仅是个人猜想read的一种实现及运行,原文档没有对构造传入的变量进行描述。
public void readFully(long positon, byte[] buffer) throws IOException;
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
根据文档的描述,其实现的思路和 public int read(long position, byte[] buffer, int offset, int length)是一致的。
实现了以上两个接口的I/O类:
org.apache.hadoop.io.compress.CompressionInputStream
该类在compress包下。在Hadoop集群中,为了保障通讯、存储的效益和质量,压缩操作的读写过程中时时进行,不难猜测,compress包实现了对I/O的压缩与解压功能。
提到压缩,应从 org.apache.hadoop.io.compress.Compressor 开始分析。
interface Compressor 的方法模仿java.util.zip.Deflater而来。
通过实例化Deflater对象,可以对数据进行公有zip算法压缩。
public void justATry() {
try {
String inputString = "blahblahblah??";
byte[] input = inputString.getBytes("UTF-8");
// Compress the bytes
byte[] output = new byte[100];
Deflater compresser = new Deflater();
compresser.setInput(input);
compresser.finish();
int compressedDataLength = compresser.deflate(output);
System.out.println(output);
// Decompress the bytes
Inflater decompresser = new Inflater();
decompresser.setInput(output, 0, compressedDataLength);
byte[] result = new byte[100];
int resultLength = decompresser.inflate(result);
decompresser.end();
for (int i = 0; i < result.length; i++) {
System.out.println(result[i]);
}
// Decode the bytes into a String
String outputString = new String(result, 0, resultLength, "UTF-8");
System.out.println(outputString);
} catch (java.io.UnsupportedEncodingException ex) {
} catch (java.util.zip.DataFormatException ex) {
}
以上方法为Deflater对数据的压缩解压过程。不难分析:interface Compressor 应有类似的public void setInput(byte[] input);public void finish();public int deflate(byte[] output)方法。同理可得:org.apache.hadoop.io.compress.Decompressor 的解压方法。
因此,可以猜测CompressionInputStream 实现PositionedReadable、Seekable 将数据读入。CompressionOnputStream 实现Compressor 对读入的内容进行压缩,输出压缩后的内容。同时可以猜测压缩输出的框架机制。
相关推荐
Hadoop源代码分析完整版.pdf
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z Hadoop源码_NativeIO.java
必须将此jar包放在org.apache.hadoop.io包下,否则无法正常覆盖使用
分别取对应的文件夹下解压,之后将这两个文件替换到 hadoop-2.*.*/bin目录下即可 winutils.exe、hadoop.dll
Hadoop源代码eclipse编译指南.zip、Hadoop源代码eclipse编译指南.zipHadoop源代码eclipse编译指南.zipHadoop源代码eclipse编译指南.zipHadoop源代码eclipse编译指南.zip
解决本地调试Hadoop 异常。 org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
Hadoop源代码分析完整版.pdf Hadoop源代码分析 hadoop mapreduce
Hadoop源代码分析完整版
Hadoop源代码分析完整版
mapreduce.lib.partitionreduceoutput类的Hadoop源代码分析
hadoop源代码资源归档Archive.zip
学习hadoop源代码,RPC部分.pdf
在网上下了好多2.6版本的hadoop.dll,但是都不好使,昨天有个好心网友给我发了一份,实际测试通过。开发环境是64位win7+hadoop2.7.1+redhat版本的linux。
包mapreduce.lib.map的Hadoop源代码分析
org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String;I)V 解决方案:下载本资源解压将hadoop.dll和winutils.exe文件复制到hadoop2.7.3的bin目录下即可解决。
深入云计算:Hadoop源代码分析(修订版)
Hadoop源代码分析(完整版).pdf
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:468) at org.apache.hadoop.util.Shell....