`
BruceXX
  • 浏览: 139019 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

mina的 StreamIoHandler

    博客分类:
  • J2SE
阅读更多

        最近看上了mina,其性能多多,完全不用说了。。,本来是想搞个file+msg传输的,结果。。被这个handler给 block 了,不知道这个东西扎用。。。按其Iohandler interface implement methods+ processStreamIo abstract method   写了一个sample,发现死活发不了包。。。哥怒了。

        没办法,开源的东西还是有好处地,可以深入,下了源码,发现其结构真的很不错,IoService(类似与和customer communication 的一个接口: session )<==IoProcessor(这个好象在code business 的时候没看到,后来才知道是用于处理IoFilter 的)<===IoFilter(*) ,IoHandler(business handler) 。

       原来,StreamIoHandler 的 里面wrap 了两个内部out,in,并且对IoHandler 的每个method 都做了实现,迷底马上揭晓了。。。see the blew:

 

    @Override
    public void sessionOpened(IoSession session) {
        // Set timeouts
        session.getConfig().setWriteTimeout(writeTimeout);
        session.getConfig().setIdleTime(IdleStatus.READER_IDLE, readTimeout);

        // Create streams
        InputStream in = new IoSessionInputStream();
        OutputStream out = new IoSessionOutputStream(session);
        session.setAttribute(KEY_IN, in);
        session.setAttribute(KEY_OUT, out);
        processStreamIo(session, in, out);
    }

 

划克,把processStreamIo  template  method 了。。(PS:曾经偶在写portal 里用到过这种pattern)

然后其他的iohandler 的method 里面也全写满了io   logic....最后session closed 才把in ,out stream close .

我想,这样一来应该每传一个文件就回connect 一次了。。

 

费话说完了。。code ==>

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package minastudy.t2;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.demux.DemuxingIoHandler;
import org.apache.mina.handler.stream.StreamIoHandler;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

/**
 *
 * @author zx04741
 */
public class Server {

    public static void main(String args[]) throws IOException {

        NioSocketAcceptor acceptor = new NioSocketAcceptor();
//        DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
//        chain.addLast("codec", new ProtocolCodecFilter(
//                new TextLineCodecFactory()));

        StreamIoHandler handler = new StreamIoHandler() {

            private ExecutorService pool = Executors.newCachedThreadPool();

//            public void exceptionCaught(IoSession session, Throwable cause) {
//                super.exceptionCaught(session, cause);
//                System.out.println(cause);
//           }
//
//            public void sessionOpened(IoSession session) {
//                super.sessionOpened(session);
//                System.out.println(" session open....");
//            }


            @Override
            protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
                System.out.println("process stream...");
                pool.execute(new Work(session, in, out));
            }
        };
        acceptor.setHandler(handler);
        acceptor.bind(new InetSocketAddress(8889));
    }
}

class Work extends Thread {

    private InputStream in;
    private OutputStream out;
    private IoSession session;

    public Work(IoSession session, InputStream in, OutputStream out) {
        this.in = in;
        this.out = out;
        this.session = session;
    }

    public void run() {
        try {
            FileOutputStream fos = new FileOutputStream(new File("c:/jre(1).z01"));
            byte[] buf = new byte[2048];
            int offset = 0;
            while ((offset = in.read(buf)) != -1) {
                System.out.println(offset);
                fos.write(buf, 0, offset);
                fos.flush();
            }           
            fos.close();
            System.out.println("session over..");
        } catch (IOException ex) {
            Logger.getLogger(Server.class.getName()).log(Level.SEVERE, null, ex);
        }

    }
}


/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package minastudy.t2;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

/**
 *
 * @author zx04741
 */
public class Client {

    public static void main(String args[]) throws FileNotFoundException {

        NioSocketConnector connector = new NioSocketConnector();
        DefaultIoFilterChainBuilder chain = connector.getFilterChain();
        connector.setHandler(new StreamIoHandler() {

//            private ExecutorService pool = Executors.newCachedThreadPool();
//            public void exceptionCaught(IoSession session, Throwable cause) {
//                super.exceptionCaught(session, cause);
//                System.out.println(cause);
//            }

//            public void messageSent(IoSession session, Object message) throws Exception {
//                // Empty handler
//                super.messageSent(session, message);
////                System.out.println("sent===>");
//            }
//
//            public void messageReceived(IoSession session, Object buf) {
//                super.messageReceived(session, buf);
////                System.out.println("recv===>");
//            }

            @Override
            protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
                //pool.execute(new Work(session, in, out));
                System.out.println("client in process stream..");
                try {
                    String fileName = "C:/jdk15910/jre(1).z01";
                    File f = new File(fileName);
                    byte[] buf = new byte[2048];
                    FileInputStream fis = new FileInputStream(f);
                    int offset = 0;
                    while (true) {
                        offset = fis.read(buf);
                        if (offset == -1) {
                            break;
                        }
                        System.out.println(offset);
                        out.write(buf, 0, offset);
                    }              
                    //important must be waiting for over..
                   session.close(false);                   
                    fis.close();
                    System.out.println("over..");
                }  catch (IOException ex) {
                    Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        });
        ConnectFuture future1 = connector.connect(new InetSocketAddress(8889));
        future1.awaitUninterruptibly();
        if (!future1.isConnected()) {
            System.out.println("no connect...");
        }
        IoSession session = future1.getSession();
        System.out.println("session==>start");

    }
}


 

 有很多忌讳的地方,client里面的out 不能flush ,否则回block住,。。。有点怪异,我看了下IoSessionOutputStream

    @Override
    public synchronized void flush() throws IOException {
        if (lastWriteFuture == null) {
            return;
        }

        lastWriteFuture.awaitUninterruptibly();
        if (!lastWriteFuture.isWritten()) {
            throw new IOException(
                    "The bytes could not be written to the session");
        }
    }

 

 重写的flush 方法。。synchonized...要等到上次写完成后才可以写下次。。但是也不可能block 住所有的啊。。难道有相关的锁没有释放??先到这,回家再研究。。

 

  problem traces 到 AbstractIoSession

 

  原来真的是的,在AbstractIoSession里面有一个WriteFuture 的object锁,会将client当前的线程读写锁定在当前buffer之中,OK,这样就真相大白了.现在要做的就是把client改成多线程来处理这件事情了和Server一样的,确实如此,另外,在写的过程要保持Thread.sleep,这样CPU就不会达到很高...

 

  code==>

  public class Client {

    public static void main(String args[]) throws FileNotFoundException {

        final NioSocketConnector connector = new NioSocketConnector();
        DefaultIoFilterChainBuilder chain = connector.getFilterChain();
        connector.setHandler(new StreamIoHandler() {

            private ExecutorService pool = Executors.newCachedThreadPool();

            public void exceptionCaught(IoSession session, Throwable cause) {
                super.exceptionCaught(session, cause);
                System.out.println(cause);
            }

            public void sessionClosed(IoSession session) throws Exception {
                super.sessionClosed(session);
                System.out.println(" session close.. ");

            }

            public void sessionOpened(IoSession session) {
                super.sessionOpened(session);
                System.out.println(" session open....");
            }

            @Override
            protected void processStreamIo(IoSession session, InputStream in, OutputStream out) {
                pool.execute(new Worker2(session, in, out));

            }
        });
        ConnectFuture future1 = connector.connect(new InetSocketAddress(8889));
        future1.awaitUninterruptibly();
        if (!future1.isConnected()) {
            System.out.println("no connect...");
        } else {
            System.out.println("connected session==>start");
        }
//        IoSession session = future1.getSession();


    }
}

class Worker2 extends Thread {

    private InputStream in;
    private OutputStream out;
    private IoSession session;

    public Worker2(IoSession session, InputStream in, OutputStream out) {
        this.in = in;
        this.out = out;
        this.session = session;
    }

    public void run() {
        System.out.println("client in process stream..");
        try {
            //d:/drivers/sw8-Chipset.rar  F:\\IDE\\netbeans-6.7.1-ml-windows.exe
            String fileName = "F:\\IDE\\netbeans-6.7.1-ml-windows.exe";
            File f = new File(fileName);
            byte[] buf = new byte[2048];
            FileInputStream fis = new FileInputStream(f);
            int offset = 0;
            int count = 0;
            while (true) {
                Thread.sleep(4);
                offset = fis.read(buf);
                count++;
                if(count % 1000==0)
                System.out.println(offset+","+System.currentTimeMillis());
                if (offset == -1) {
                    break;
                }
                out.write(buf, 0, offset);
                out.flush();
            }
            fis.close();
            session.close(false).awaitUninterruptibly();
            System.out.println("over..");

        } catch (InterruptedException ex) {
            Logger.getLogger(Worker2.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IOException ex) {
            Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}
 

 

 

 

分享到:
评论
1 楼 noddle0592 2011-04-21  
研究不错,感谢分享

相关推荐

Global site tag (gtag.js) - Google Analytics