`
miluroe
  • 浏览: 4056 次
  • 性别: Icon_minigender_1
最近访客 更多访客>>
社区版块
存档分类
最新评论

Hadoop源代码分析——io.*(二)

 
阅读更多

        从官方的WordCount中不难看出,Hadoop在读取文本时的至少要有两种分割的功能。即可以将一篇文档按行切割分离,同时可以将一行中的文本依据空格切割出来。这样,类似与Java的I/O操作,只不过是以单词为单位向下迭代。每次迭代时读出一个单词并取出。

 

       Text word = new Text();然后word.set(itr.nextToken());context.write(word, one) (one是数值为1的整数型变量)。将word逐次编号,并将word的value设置为1表示独到的单词已经出现了一次。最后将全部读取结果保存到Context类型的文本上。

       

        因此,从对文件读写要求的角度出发,Hadoop在文件系统包下建立了一些关乎I/O的接口。

        org.apache.hadoop.fs.Seekable

        该接口定义了三种方法。注意:以下方法是对任意文件的操作而非仅仅是针对文本的操作。

        1.void seek(long pos)

        从文件的开始处经过指定的步长pos到达新位置,read()指向新位置。步长不能超出文件。

        2.void getPos(long pos)

        返回当前位置的步长。

 

        3.boolean seekToNewSource(long targPos)

        判断在目标位置是否是当前内容的复本。

 

        与此类似org.apache.hadoop.fs.PositioneReadable定义了一些可读的方法.

        public int read(long position, byte[] buffer, int offset, int length) throws IOException;

        将描述翻译成中文是从文件中的一个指定位置position开始读取直到指定字节串byte[]的位置offset,返回读取到的字节个数length。

       貌似是要实现这样一个过程,仅是个人猜测:eg:I wish the wish you wish.

        int length = 0;

        String s = "you";

        bytes = s.toByteArray();

        read(7, bytes, 2, length);

        从I wish the wish you wish.从t到u一共移动了11个字符。运行完毕后length = 11。

        原文档中提到This dos not  change the current offset of a file, and is thread-safe.估计就是用length的增加取代offset的偏移,实现线程安全。仅是个人猜想read的一种实现及运行,原文档没有对构造传入的变量进行描述。

        public void readFully(long positon, byte[] buffer) throws IOException;

        public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;

        根据文档的描述,其实现的思路和 public int read(long position, byte[] buffer, int offset, int length)是一致的。

 

        实现了以上两个接口的I/O类:

        org.apache.hadoop.io.compress.CompressionInputStream 

        该类在compress包下。在Hadoop集群中,为了保障通讯、存储的效益和质量,压缩操作的读写过程中时时进行,不难猜测,compress包实现了对I/O的压缩与解压功能。

        提到压缩,应从 org.apache.hadoop.io.compress.Compressor 开始分析。

 

        interface Compressor 的方法模仿java.util.zip.Deflater而来。

        通过实例化Deflater对象,可以对数据进行公有zip算法压缩。

        public void justATry() {

            try {

                    String inputString = "blahblahblah??";

                    byte[] input = inputString.getBytes("UTF-8");

                     // Compress the bytes

                    byte[] output = new byte[100];

                    Deflater compresser = new Deflater();

                    compresser.setInput(input);

                    compresser.finish();

                    int compressedDataLength = compresser.deflate(output);

                    System.out.println(output);

 

                     // Decompress the bytes

                    Inflater decompresser = new Inflater();

                    decompresser.setInput(output, 0, compressedDataLength);

                    byte[] result = new byte[100];

                    int resultLength = decompresser.inflate(result);

                    decompresser.end();

                    for (int i = 0; i < result.length; i++) {

                            System.out.println(result[i]);

                        }

                    // Decode the bytes into a String

                    String outputString = new String(result, 0, resultLength, "UTF-8");

                    System.out.println(outputString);

                } catch (java.io.UnsupportedEncodingException ex) {

            } catch (java.util.zip.DataFormatException ex) {

 

        }

        以上方法为Deflater对数据的压缩解压过程。不难分析:interface Compressor  应有类似的public void setInput(byte[] input);public void finish();public int deflate(byte[] output)方法。同理可得:org.apache.hadoop.io.compress.Decompressor 的解压方法。

        因此,可以猜测CompressionInputStream 实现PositionedReadableSeekable 将数据读入。CompressionOnputStream 实现Compressor 对读入的内容进行压缩,输出压缩后的内容。同时可以猜测压缩输出的框架机制。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics