`
teasp
  • 浏览: 59652 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Java AIO小例子

阅读更多

    Java7 NIO里面的新特性AIO出来也很久了,一直都没用过,今天没事写个小例子体验一下。感觉AIO要比同步NIO要简单点,写出正确的AIO比写同步NIO要容易一些,但是更不好理解,异步编程就是这样,不符合人类的直观感受。

 

    服务端代码:

 

public class TestAioServer
{
    private static AtomicInteger recvNum = new AtomicInteger(0);
    private static AtomicInteger sentNum = new AtomicInteger(0);
    
    private final byte[] resp;
    
    private static final int readLen = 1800;
    
    private ByteBuffer[] genBuf()
    {
        ByteBuffer[] buf = new ByteBuffer[2];
        buf[0] = ByteBuffer.allocate(readLen);
        buf[1] = ByteBuffer.wrap(resp);
        
        return buf;
    }

    public TestAioServer(int port) throws IOException
    {
        StringBuilder sb = new StringBuilder();
        for (int i=0; i<1000; i++)
        {
            sb.append("how are you? ");
        }
        resp = sb.toString().getBytes();
        
        final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port));  
        serverChannel.accept(genBuf(), new CompletionHandler<AsynchronousSocketChannel, ByteBuffer[]>() {   
            public void completed(AsynchronousSocketChannel ch, ByteBuffer[] att) {
                // 接受下一个连接   
                serverChannel.accept(genBuf(), this);   
                // 处理当前连接   
                handle(ch, att);   
            }   
  
            public void failed(Throwable exc, ByteBuffer[] att) {
                System.out.println("建立连接失败");
                serverChannel.accept(null, this);
            }   
        });
          
        System.out.println("Server is listening at : " + port);
    }   
          
    public void handle(final AsynchronousSocketChannel ch, final ByteBuffer[] att) {
        final ByteBuffer dst = att[0];
        ch.read(dst, att, new ReadCompletionHandler(dst, ch));
    }  
    
    private static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer[]>
    {
        ByteBuffer dst;
        AsynchronousSocketChannel ch;
        private CompletionHandler<Integer, ByteBuffer[]> writeCompletionHandler = getWriteCompletionHandler();
        
        public ReadCompletionHandler(ByteBuffer dst, AsynchronousSocketChannel ch)
        {
            this.dst = dst;
            this.ch = ch;
        }
        
        @Override
        public void completed(Integer result, ByteBuffer[] attachment)
        {
//            System.out.println("result len = " + result);
            if (dst.position() == dst.capacity())
            {//读完了
                recvNum.incrementAndGet();
                dst.clear();//为下次读做准备
//                System.out.println("read : " + new String(dst.array()));
                ch.write(attachment[1], attachment, writeCompletionHandler);
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer[] attachment)
        {
            System.out.println("read failed." );
            exc.printStackTrace();
        }
        
        public synchronized CompletionHandler<Integer, ByteBuffer[]> getWriteCompletionHandler()
        {
            if (writeCompletionHandler == null)
            {
                writeCompletionHandler = new CompletionHandler<Integer, ByteBuffer[]>()
                    {
                    @Override
                    public void completed(Integer result, ByteBuffer[] attachment)
                    {
//                        System.out.println("written len : " + result);
                        //要是没写完,继续
                        if (attachment[1].hasRemaining())
                        {
                            ch.write(attachment[1], attachment, this);
                        }
                        else
                        {//已经写完
                            sentNum.incrementAndGet();
                            attachment[1].rewind();//为下次写做准备
                            //写完了才继续读,生产中不要这么做,此处只是为了简单
                            ch.read(dst, attachment, ReadCompletionHandler.this);
                        }
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer[] attachment)
                    {
                        System.out.println("write failed." );
                        exc.printStackTrace();
                    }
                 };
            }
            
            return writeCompletionHandler;
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException
    {
        int port = 1234;
        new TestAioServer(port);
        
        Thread.sleep(15000);
        recvNum.set(0);
        sentNum.set(0);
        Thread.sleep(30000);
        System.out.println(recvNum.get()+":"+sentNum.get());
        
        Thread.sleep(30*60*1000);
    }
}

 

    客户端的代码是阻塞式IO:

public class TestSocketClient
{
    private static final byte[] req; 
    
    static
    {
        StringBuilder sb = new StringBuilder();
        for (int i=0; i<100; i++)
        {
            sb.append("This is a request.");
        }
        req = sb.toString().getBytes();
    }
    
    private static final byte[] resp = new byte[13000];

    public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException
    {
        int port = 1234;
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress(InetAddress.getByName("localhost"), port));
        final InputStream is = socket.getInputStream();
        OutputStream os = socket.getOutputStream();
        
        new Thread ()
        {
            public void run()
            {
                long startTime = System.currentTimeMillis();
                int loopTimes = 1;
                for (;; loopTimes++)
                {
                    try
                    {
                        int len = is.read(resp); //不一定会读满
                        for (; len<resp.length;)
                        {
                            len += is.read(resp, len-1, resp.length-len);
                        }
                    }
                    catch (IOException e)
                    {
                        e.printStackTrace();
                        break;
                    }
//                    System.out.println(new String(b));
                    System.out.println("-" + loopTimes);
                    if (loopTimes == 30000)
                    {
                        System.out.println("time cost : " + (System.currentTimeMillis() - startTime));
                    }
                }
            }
        }.start();
        
        for (int i=0; i<30000; i++)
        {
            os.write(req);
//            os.flush();
            System.out.println("written");
            Thread.sleep(1);
        }
    }
}

 

1
3
分享到:
评论
3 楼 ggd543 2014-06-10  
ReadCompletionHandler 的completed函数中, 如果未读完,要继续读

 @Override
        public void completed(Integer result, ByteBuffer[] attachment) {
            System.out.println("result len = " + result);
            if (dst.position() == dst.capacity()) {   //读完了
                recvNum.incrementAndGet();
                dst.clear();  //为下次读做准备
                System.out.println("read : " + new String(dst.array()));
                ch.write(attachment[1], attachment, writeCompletionHandler);
            } else {
                System.out.println(" 还没读完,继续...");
                ch.read(dst, attachment, new ReadCompletionHandler(dst, ch));
            }
        }
2 楼 teasp 2013-06-20  
aa1asdasd 写道
不错不错,楼主辛苦了

谢谢,不过这代码离生产应用还差很远呢。
1 楼 aa1asdasd 2013-06-20  
不错不错,楼主辛苦了

相关推荐

Global site tag (gtag.js) - Google Analytics