`
chenzehe
  • 浏览: 532429 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Java NIO 使用实例

 
阅读更多

        在JDK1.4之前,Java OutputStream的write方法、InputStream的Read方法和ServerSocket的accept()方法都是阻塞方法,JDK1.4之前Java引入了新的输入输出系统(New Input/Out,NIO),非阻塞是Java NIO实现的重要功能之一 。

 1、Buffer

缓冲区,传输数据使用,本质是一个数组,Channel中读数据和写数据都只能通过Buffer传输。

2、Channel

通道,所有的IO流在NIO中都是从Channel开始的,数据可以从Channel读到Buffer中,也可以从Buffer写到Channel中,是Buffer对象的唯一接口。

3、Selector

选择器,它能检测一个或多个通道 (channel) 上的事件,并将事件分发出去。

使用一个 select 线程就能监听多个通道上的事件,并基于事件驱动触发相应的响应。而不需要为每个 channel 去分配一个线程。

4、SelectionKey

包含了事件的状态信息和时间对应的通道的绑定。

 

使用NIO的步骤:

1\创建一个Selector实例

2\将该实例注册到各种通道,指定每个通道上感兴趣的IO操作

3\重复执行,选择器循环:

    31\调用一种Select()方法

    32\获取已选键值

    33\对于已选中键集中的每一个键:

        331\将已选键从键集中移除

        332\获取信道,并从键中获取附件

        333\确定准备就绪的操作并执行;对于accept操作获得的SocketChannel对象,需将信道设为非阻塞模式,并将其注册到选择器中

        334\根据需要,修改键的兴趣操作集

 

如下代码:

 

NIOServer.java

package org.hadoopinternal.nio;

import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {
	private static final int TIMEOUT  = 300;
	private static final int PORT     = 12112;

	public static void main(String[] args) {		
		try {		    
			Selector selector = Selector.open();
			
			ServerSocketChannel listenChannel = ServerSocketChannel.open();
			listenChannel.configureBlocking(false);
			listenChannel.socket().bind(new InetSocketAddress(PORT));
			listenChannel.register(selector, SelectionKey.OP_ACCEPT);
			
			while(true) {
				if(selector.select(TIMEOUT)==0) {
					System.out.print(".");
					continue;
				}
				
				Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
				while( iter.hasNext() ) {
					SelectionKey key = iter.next();
					iter.remove();
					
					//Server socket channel has pending connection request?
					if( key.isAcceptable() ) {
						SocketChannel channel=listenChannel.accept();
						channel.configureBlocking(false);
						SelectionKey connkey=channel.register(selector, SelectionKey.OP_READ );
						NIOServerConnection conn=new NIOServerConnection(connkey);
						connkey.attach(conn);
					}
					
					if( key.isReadable() ) {
						NIOServerConnection conn=(NIOServerConnection) key.attachment();
						conn.handleRead();
					}
					
					if( key.isValid() && key.isWritable() ) {
						NIOServerConnection conn=(NIOServerConnection) key.attachment();
						conn.handleWrite();
					}
				}				
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}


NIOServerConnection.java:

package org.hadoopinternal.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

public class NIOServerConnection {
	private static final int BUFFSIZE = 1024;
	
	SelectionKey key;
	SocketChannel channel;
	ByteBuffer buffer;
	
	public NIOServerConnection(SelectionKey key) {
		this.key=key;
		this.channel=(SocketChannel) key.channel();
		buffer=ByteBuffer.allocate(BUFFSIZE);
	}
	
	public void handleRead() throws IOException {
		long bytesRead=channel.read(buffer);
		
		if(bytesRead==-1) {
			channel.close();
		} else {
			key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE );
		}
	}

    public void handleWrite() throws IOException {
    	buffer.flip();
    	channel.write(buffer);
    	
    	if(!buffer.hasRemaining()) {
    		key.interestOps( SelectionKey.OP_READ );
    	} 
    	
    	buffer.compact();
	}
}
 

 

如下代码为hadoop IPC Server 中的Listener是一个标准的NIO应用:

 

/** Listens on the socket. Creates jobs for the handler threads*/
  private class Listener extends Thread {
    
    private ServerSocketChannel acceptChannel = null; //the accept channel
    private Selector selector = null; //the selector that we use for the server
    private Reader[] readers = null;
    private int currentReader = 0;
    private InetSocketAddress address; //the address we bind at
    private Random rand = new Random();
    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
                                         //-tion (for idle connections) ran
    private long cleanupInterval = 10000; //the minimum interval between 
                                          //two cleanup runs
    private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
    private ExecutorService readPool; 
   
    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      for (int i = 0; i < readThreads; i++) {
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        readPool.execute(reader);
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }
    
    private class Reader implements Runnable {
      private volatile boolean adding = false;
      private Selector readSelector = null;

      Reader(Selector readSelector) {
        this.readSelector = readSelector;
      }
      public void run() {
        LOG.info("Starting SocketReader");
        synchronized (this) {
          while (running) {
            SelectionKey key = null;
            try {
              readSelector.select();
              while (adding) {
                this.wait(1000);
              }              

              Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
              while (iter.hasNext()) {
                key = iter.next();
                iter.remove();
                if (key.isValid()) {
                  if (key.isReadable()) {
                    doRead(key);
                  }
                }
                key = null;
              }
            } catch (InterruptedException e) {
              if (running) {                      // unexpected -- log it
                LOG.info(getName() + " caught: " +
                         StringUtils.stringifyException(e));
              }
            } catch (IOException ex) {
              LOG.error("Error in Reader", ex);
            }
          }
        }
      }

      /**
       * This gets reader into the state that waits for the new channel
       * to be registered with readSelector. If it was waiting in select()
       * the thread will be woken up, otherwise whenever select() is called
       * it will return even if there is nothing to read and wait
       * in while(adding) for finishAdd call
       */
      public void startAdd() {
        adding = true;
        readSelector.wakeup();
      }
      
      public synchronized SelectionKey registerChannel(SocketChannel channel)
                                                          throws IOException {
          return channel.register(readSelector, SelectionKey.OP_READ);
      }

      public synchronized void finishAdd() {
        adding = false;
        this.notify();        
      }
    }

    /** cleanup connections from connectionList. Choose a random range
     * to scan and also have a limit on the number of the connections
     * that will be cleanedup per run. The criteria for cleanup is the time
     * for which the connection was idle. If 'force' is true then all 
     * connections will be looked at for the cleanup.
     */
    private void cleanupConnections(boolean force) {
      if (force || numConnections > thresholdIdleConnections) {
        long currentTime = System.currentTimeMillis();
        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
          return;
        }
        int start = 0;
        int end = numConnections - 1;
        if (!force) {
          start = rand.nextInt() % numConnections;
          end = rand.nextInt() % numConnections;
          int temp;
          if (end < start) {
            temp = start;
            start = end;
            end = temp;
          }
        }
        int i = start;
        int numNuked = 0;
        while (i <= end) {
          Connection c;
          synchronized (connectionList) {
            try {
              c = connectionList.get(i);
            } catch (Exception e) {return;}
          }
          if (c.timedOut(currentTime)) {
            if (LOG.isDebugEnabled())
              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
            closeConnection(c);
            numNuked++;
            end--;
            c = null;
            if (!force && numNuked == maxConnectionsToNuke) break;
          }
          else i++;
        }
        lastCleanupRunTime = System.currentTimeMillis();
      }
    }

    @Override
    public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);
      while (running) {
        SelectionKey key = null;
        try {
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give 
          // some thread(s) a chance to finish
          LOG.warn("Out of Memory in server select", e);
          closeCurrentConnection(key, e);
          cleanupConnections(true);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          closeCurrentConnection(key, e);
        }
        cleanupConnections(false);
      }
      LOG.info("Stopping " + this.getName());

      synchronized (this) {
        try {
          acceptChannel.close();
          selector.close();
        } catch (IOException e) { }

        selector= null;
        acceptChannel= null;
        
        // clean up all connections
        while (!connectionList.isEmpty()) {
          closeConnection(connectionList.remove(0));
        }
      }
    }

    private void closeCurrentConnection(SelectionKey key, Throwable e) {
      if (key != null) {
        Connection c = (Connection)key.attachment();
        if (c != null) {
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
          closeConnection(c);
          c = null;
        }
      }
    }

    InetSocketAddress getAddress() {
      return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
    }
    
    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        Reader reader = getReader();
        try {
          reader.startAdd();
          SelectionKey readKey = reader.registerChannel(channel);
          c = new Connection(readKey, channel, System.currentTimeMillis());
          readKey.attach(c);
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
          if (LOG.isDebugEnabled())
            LOG.debug("Server connection from " + c.toString() +
                "; # active connections: " + numConnections +
                "; # queued calls: " + callQueue.size());          
        } finally {
          reader.finishAdd(); 
        }

      }
    }

    void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      
      try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) {
        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
      } catch (Exception e) {
        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
        count = -1; //so that the (count < 0) block is executed
      }
      if (count < 0) {
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + ": disconnecting client " + 
                    c + ". Number of active connections: "+
                    numConnections);
        closeConnection(c);
        c = null;
      }
      else {
        c.setLastContact(System.currentTimeMillis());
      }
    }   

    synchronized void doStop() {
      if (selector != null) {
        selector.wakeup();
        Thread.yield();
      }
      if (acceptChannel != null) {
        try {
          acceptChannel.socket().close();
        } catch (IOException e) {
          LOG.info(getName() + ":Exception in closing listener socket. " + e);
        }
      }
      readPool.shutdown();
    }

    // The method that will return the next reader to work with
    // Simplistic implementation of round robin for now
    Reader getReader() {
      currentReader = (currentReader + 1) % readers.length;
      return readers[currentReader];
    }

  }

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    一个java NIO的例子

    一个java NIO的例子 有很详细的每一步的描述,很好去理解

    Java NIO 中文 Java NIO 中文 Java NIO 中文文档

    Java NIO 深入探讨了 1.4 版的 I/O 新特性,并告诉您如何使用这些特性来极大地提升您所写的 Java 代码的执行效率。这本小册子就程序员所面临的有代表性的 I/O 问题作了详尽阐述,并讲解了 如何才能充分利用新的 I/O ...

    java nio socket 例子

    本例包含服务器端和客户端,多线程,每线程多次发送,Eclipse工程,启动服务器使用 nu.javafaq.server.NioServer,启动客户端使用 nu.javafaq.client.NioClient。另本例取自javafaq.nv上的程序修改而成

    Java Nio selector例程

    我研究并实现的Java Nio selector例子。java侧起server(NioUdpServer1.java),基于Java Nio的selector 阻塞等候,一个android app(NioUdpClient1文件夹)和一个java程序(UI.java)作为两个client分别向该server...

    java NIO推送实例

    java NIO 消息推送实例代码,解压Tmp.zip Desk为桌面程序,DeskAppServer为服务端程序,江巅

    java NIO原理和使用

    java nio 附带例子 以及原理 java nio 附带例子 以及原理 java nio 附带例子 以及原理 java nio 附带例子 以及原理

    JavaNIO服务器实例Java开发Java经验技巧共6页

    JavaNIO服务器实例Java开发Java经验技巧共6页.pdf.zip

    java NIO实例

    实例介绍了一个简单的nio实例,适合刚接触nio的童鞋们....

    JAVA nio的一个简单的例子

    一个非常简单的java nio通信的例子,服务器接收到来自客户端的字符串后,计算该字符串的哈希值,然后返回给客户端

    javaNIO实例

    该资源包含了一个用javaNIO实现的读写文件以及复制文件的简单的demo,程序注释清晰,简单易懂,喜欢的下载!!!

    实现java网络与nio例子

    实现java nio的标准例子,包括server端和client端。

    nio.rar_NIO_NIO-socket_java nio_java 实例_java.nio

    java nio 编程一个实例子.服务端程序

    java nio im(server+client)

    java基于nio的通信实例,带UML结构图及server、client源码。

    java nio 通信服务器、客户端完整例子

    用java编写的nio通信的例子,nio是io编程的新版本,比io较流行。同时本例子是适用socket通信的。可以在此基础上,添加您的个人应用。本例子适用于:java通信的学习者,android平台通信的学习者。

    Java NIO实例

    nio代码实例,Java NIO 系列教程,买不了吃亏,买不了上当

    java NIO socket聊天室

    使用NIO socket不需要多线程来处理多个连接的请求,效率非常高 可以作为NIO socket入门的例子,Reactor模式,重点理解key.attach, jar文件里包含了源代码 1,运行server.bat启动服务器,可以打开编辑,修改端口号 ...

    java NIO参考文档

    举个例子吧 你服务器做一个聊天室 按照以前的阻塞式IO 你必须为每个连接创建一个线程 因为当你调用如 in read buf 时 线程会阻塞在这里 而采用nio 只要注册了事件 它内部采用反应模式 当有IO事件发生时 再调度它 而...

    Java NIO 中文全书签

    这本书是介绍java nio的基础书籍,原理讲的还是挺明白的,比较好懂,就是例子比较少。nio对于java程序员来说可能不是很好理解,但是对于C程序员来说,就是epoll的一个封装。 我本人是C程序员,对java比较感兴趣,...

    Java NIO原理和使用

    Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,...

    自己写的Java NIO 同步不阻塞IO操作

    用nio想的一个不阻塞NIOSocket例子.。。希望对阁下有用

Global site tag (gtag.js) - Google Analytics