`

IO输入/输出从PipedInputStream/PipedOutputStream谈起(源码分析)

    博客分类:
  • Java
阅读更多

转载:http://miaoxiaodong78.blog.163.com/blog/static/18765136200702285946971/

PipedInputStream/PipedOutputStream谈起

       江苏 无锡 缪小东

本篇主要从分析PipeInputStremPipedOutputStream谈起。谈及软件设计的变化,以及如何将软件拆分、组合,适配……

1 源代码分析

       下面将详细分析PipedInputStreamPipedOutputStream的源代码。

1.1 PipedInputStream

 

package java.io;

//PipedInputStream必须和PipedOutputStream联合使用。即必须连接输入部分。

//其原理为:PipedInputStream内部有一个Buffer

//PipedInputStream可以使用InputStream的方法读取其Buffer中的字节。

//PipedInputStreamBuffer中的字节是PipedOutputStream调用PipedInputStream的方法放入的。

 

public class PipedInputStream extends InputStream {

    boolean closedByWriter = false;                                                             //标识有读取方或写入方关闭

    volatile boolean closedByReader = false;

    boolean connected = false;                                                                     //是否建立连接

    Thread readSide;                                                                                             //标识哪个线程

    Thread writeSide;

 

    protected static final int PIPE_SIZE = 1024;                         //缓冲区的默认大小

    protected byte buffer[] = new byte[PIPE_SIZE];                  //缓冲区

    protected int in = -1;               //下一个写入字节的位置。0代表空,in==out代表满

    protected int out = 0;               //下一个读取字节的位置

 

    public PipedInputStream(PipedOutputStream src) throws IOException {                //给定源的输入流

                   connect(src);

    }

 

    public PipedInputStream() {    }                                                //默认构造器,下部一定要connect

 

    public void connect(PipedOutputStream src) throws IOException {               //连接输入源

                   src.connect(this);                                                                           //调用源的connect方法连接当前对象

    }

 

    protected synchronized void receive(int b) throws IOException {                   //只被PipedOuputStream调用

        checkStateForReceive();                                                                                 //检查状态,写入

        writeSide = Thread.currentThread();                                                      //永远是PipedOuputStream

        if (in == out)     awaitSpace();                                                           //输入和输出相等,等待空间

         if (in < 0) {

             in = 0;

             out = 0;

         }

         buffer[in++] = (byte)(b & 0xFF);                                                             //放入buffer相应的位置

         if (in >= buffer.length) {      in = 0;         }                                             //in0表示buffer已空

    }

 

    synchronized void receive(byte b[], int off, int len)  throws IOException {

        checkStateForReceive();

        writeSide = Thread.currentThread();                                   //PipedOutputStream可以看出

        int bytesToTransfer = len;

        while (bytesToTransfer > 0) {

            if (in == out)    awaitSpace();                                 //满了,会通知读取的;空会通知写入

            int nextTransferAmount = 0;

            if (out < in) {

                nextTransferAmount = buffer.length - in;

            } else if (in < out) {

                if (in == -1) {

                    in = out = 0;

                    nextTransferAmount = buffer.length - in;

                } else {

                    nextTransferAmount = out - in;

                }

            }

            if (nextTransferAmount > bytesToTransfer)     nextTransferAmount = bytesToTransfer;

            assert(nextTransferAmount > 0);

            System.arraycopy(b, off, buffer, in, nextTransferAmount);

            bytesToTransfer -= nextTransferAmount;

            off += nextTransferAmount;

            in += nextTransferAmount;

            if (in >= buffer.length) {     in = 0;      }

        }

    }

 

    private void checkStateForReceive() throws IOException {                           //检查当前状态,等待输入

        if (!connected) {

            throw new IOException("Pipe not connected");

        } else if (closedByWriter || closedByReader) {

             throw new IOException("Pipe closed");

         } else if (readSide != null && !readSide.isAlive()) {

            throw new IOException("Read end dead");

        }

    }

 

    private void awaitSpace() throws IOException {                                              //Buffer已满,等待一段时间

         while (in == out) {                                                                                             //in==out表示满了,没有空间

             checkStateForReceive();                                                                       //检查接受端的状态

             notifyAll();                                                                                  //通知读取端

             try {

                 wait(1000);

             } catch (InterruptedException ex) {

                   throw new java.io.InterruptedIOException();

             }

         }

    }

 

    synchronized void receivedLast() {                  //通知所有等待的线程()已经接受到最后的字节

         closedByWriter = true;                             //

         notifyAll();

    }

 

    public synchronized int read()  throws IOException {

        if (!connected) {                                                                              //检查一些内部状态

            throw new IOException("Pipe not connected");

        } else if (closedByReader) {

             throw new IOException("Pipe closed");

         } else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) {

            throw new IOException("Write end dead");

         }

        readSide = Thread.currentThread();                                            //当前线程读取

         int trials = 2;                                                                                             //重复两次????

         while (in < 0) {

             if (closedByWriter) {              return -1;        }                 //输入断关闭返回-1

             if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {          //状态错误

                   throw new IOException("Pipe broken");

             }

             notifyAll();                                                             // 空了,通知写入端可以写入

             try {

                 wait(1000);

             } catch (InterruptedException ex) {

                   throw new java.io.InterruptedIOException();

             }

        }

         int ret = buffer[out++] & 0xFF;                                                        //

         if (out >= buffer.length) {             out = 0;                }

         if (in == out) {           in = -1;                 }                             //没有任何字节

         return ret;

    }

 

    public synchronized int read(byte b[], int off, int len)  throws IOException {

     if (b == null) {                                                                                 //检查输入参数的正确性

         throw new NullPointerException();

     } else if (off < 0 || len < 0 || len > b.length - off) {

         throw new IndexOutOfBoundsException();

     } else if (len == 0) {

         return 0;

     }

     int c = read();                                                                                 //读取下一个

     if (c < 0) {    return -1;       }                                             //已经到达末尾了,返回-1

     b[off] = (byte) c;                                                                    //放入外部buffer

     int rlen = 1;                                                                            //return-len

     while ((in >= 0) && (--len > 0)) {                                          //下一个in存在,且没有到达len

         b[off + rlen] = buffer[out++];                                         //依次放入外部buffer

         rlen++;

         if (out >= buffer.length) {         out = 0;           }        //读到buffer的末尾,返回头部

         if (in == out) {     in = -1;      }               //读、写位置一致时,表示没有数据

     }

     return rlen;                                                                            //返回填充的长度

    }

 

    public synchronized int available() throws IOException {             //返回还有多少字节可以读取

         if(in < 0)

             return 0;                                                                                         //到达末端,没有字节

         else if(in == out)

             return buffer.length;                                                               //写入的和读出的一致,表示满

         else if (in > out)

             return in - out;                                                                                 //写入的大于读出

         else

             return in + buffer.length - out;                                                //写入的小于读出的

    }

 

    public void close()  throws IOException {                //关闭当前流,同时释放与其相关的资源

         closedByReader = true;                                             //表示由输入流关闭

        synchronized (this) {     in = -1;    }        //同步化当前对象,in-1

    }

}

 

1.2 PipedOutputStream

// PipedOutputStream一般必须和一个PipedInputStream连接。共同构成一个pipe

//它们的职能是:

 

package java.io;

import java.io.*;

 

public class PipedOutputStream extends OutputStream {

    private PipedInputStream sink;                //包含一个PipedInputStream

 

    public PipedOutputStream(PipedInputStream snk)throws IOException {       //带有目的地的构造器

                   connect(snk);

    }

   

    public PipedOutputStream() {  }                      //默认构造器,必须使用下面的connect方法连接

   

    public synchronized void connect(PipedInputStream snk) throws IOException {

        if (snk == null) {                                                                    //检查输入参数的正确性

            throw new NullPointerException();

        } else if (sink != null || snk.connected) {

             throw new IOException("Already connected");

         }

         sink = snk;                                                                           //一系列初始化工作

         snk.in = -1;

         snk.out = 0;

        snk.connected = true;

    }

 

    public void write(int b) throws IOException {                        //向流中写入数据

        if (sink == null) {    throw new IOException("Pipe not connected");      }

         sink.receive(b);            //本质上是,调用PipedInputStreamreceive方法接受此字节

    }

 

    public void write(byte b[], int off, int len) throws IOException {

        if (sink == null) {                                                                   //首先检查输入参数的正确性

            throw new IOException("Pipe not connected");

        } else if (b == null) {

             throw new NullPointerException();

         } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {

             throw new IndexOutOfBoundsException();

         } else if (len == 0) {

             return;

         }

         sink.receive(b, off, len);                                                                 //调用PipedInputStreamreceive方法接受

    }

 

    public synchronized void flush() throws IOException {                 //flush输出流

         if (sink != null) {

            synchronized (sink) {     sink.notifyAll();     } //本质是通知输入流,可以读取

         }

    }

 

    public void close()  throws IOException {                         //关闭流同时释放相关资源

 

分享到:
评论

相关推荐

    PipedInputStream和PipedOutputStream_动力节点Java学院整理

    PipedInputStream和PipedOutputStream_动力节点Java学院整理

    JAVA IO流缓冲字节流缓冲字符流等流经典代码示例加注释总结.rar

    2、常用21个IO流:FileWriter、FileReader、CharArrayReader、CharArrayWriter、CharSequence、OutputStreamWriter、FileOutputStream、InputStreamReader、PrintWriter、BufferedReader、InputStream、...

    详解PipedInputStream和PipedOutputStream_动力节点Java学院整理

    主要为大家详细介绍了管道PipedInputStream和PipedOutputStream,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

    IO体系.java

    |--PipedOutputStream/:可以将管道输出流连接到管道输入流来创建通信管道。 | 用方法connect(PipedInputStream snk) 将此管道输出流连接到接收者。 同样使用多线程技术,避免死锁。 |--ByteArrayOutputStream/:...

    java IO章节的总结

    IO从大的方向上分为字节流和字符流,包括四个抽象类: 1、输入:Reader, InputStream类型的子类(字符,字节) 2、输出:Writer, OutputStream类型的子类(字符,字节) 决定使用哪个类以及它的构造进程的一般...

    Java之IO流学习总结

    PipedInputStream 是从与其它线程共用的管道中读取数据,与Piped 相关的知识后续单独介绍。 ObjectInputStream 和所有FilterInputStream 的子类都是装饰流(装饰器模式的主角)。 2.输出字节流OutputStream IO 中...

    javaIO流原代码

    javaIO流原代码,刚刚开始学习java的同志们可以看看.有问题留言.

    管道流PipedStream应用举例

    针对java中的管道流的应用的解析,包括PipedInputStream和PipedOutputStream。

    教你彻底明白Java的IO系统

    在Java的IO中,所有的stream(包括Input和Out stream)都包括两种类型: 1.1 以字节为导向的stream 以字节为导向的stream,表示以字节为单位从stream中读取或往stream中写入信息。以字节为导向的stream包括下面几种...

    深刻理解java io

    4) PipedInputStream:实现了pipe的概念,主要在线程中使用 5) SequenceInputStream:把多个InputStream合并为一个InputStream 2) Out stream 1) ByteArrayOutputStream:把信息存入内存中的一个缓冲区中 2) ...

    java管道流

    Listing 1:用管道流截取控制台输出】 PipedInputStream pipedIS = new PipedInputStream(); PipedOutputStream pipedOS = new PipedOutputStream(); try { pipedOS.connect(pipedIS); } catch(IOException e) { ...

    JDK_API_1_6

    PipedOutputStream 可以将管道输出流连接到管道输入流来创建通信管道。 PipedReader 传送的字符输入流。 PipedWriter 传送的字符输出流。 PrintStream PrintStream 为其他输出流添加了功能,使它们能够方便地打印...

    举例讲解Java中Piped管道输入输出流的线程通信控制

    Java中的PipedWriter、PipedReader类管道的读写依赖于PipedOutputStream、PipedInputStream两个管道输入输出类,这里我们将来举例讲解Java中Piped管道输入输出流的线程通信控制:

    Java IO 流的操作

    包含了Java里面大部分的 流类的小实例Propertity FileReader FileWriter FileInputStream PipedInputStream..........

    Java程序设计语言期末试题

    3) PipedOutputStream:实现了pipe的概念,主要在线程中使用 4) SequenceOutputStream:把多个OutStream合并为一个OutStream 1.2 以Unicode字符为导向的stream 以Unicode字符为导向的stream,表示以Unicode字符为...

    something:他山之石,可以攻玉

    集合I/O多线程NIOjava基础知识点整理jvmspring 相关MYSQL分布式存储检索java源码学习集合Linkedlist详解Vector详解Stack详解Map构架HashMap详解HashMap put逻辑总结HashMap与HashTable比较HashSet介绍I/...

    java-piped-streams-test

    此存储库用于演示java.io.PipedInputStream (Oracle Java 实现,存在于 1.7 和 1.8)中的错误。 这是一个最小的工作示例,用于证明PipedInputStream包含一个错误,该错误导致相应的PipedOutputStream等待最多...

Global site tag (gtag.js) - Google Analytics