在JDK1.4之前,Java OutputStream的write方法、InputStream的Read方法和ServerSocket的accept()方法都是阻塞方法,JDK1.4之前Java引入了新的输入输出系统(New Input/Out,NIO),非阻塞是Java NIO实现的重要功能之一 。
1、Buffer
缓冲区,传输数据使用,本质是一个数组,Channel中读数据和写数据都只能通过Buffer传输。
2、Channel
通道,所有的IO流在NIO中都是从Channel开始的,数据可以从Channel读到Buffer中,也可以从Buffer写到Channel中,是Buffer对象的唯一接口。
3、Selector
选择器,它能检测一个或多个通道 (channel) 上的事件,并将事件分发出去。
使用一个 select 线程就能监听多个通道上的事件,并基于事件驱动触发相应的响应。而不需要为每个 channel 去分配一个线程。
4、SelectionKey
包含了事件的状态信息和时间对应的通道的绑定。
使用NIO的步骤:
1\创建一个Selector实例
2\将该实例注册到各种通道,指定每个通道上感兴趣的IO操作
3\重复执行,选择器循环:
31\调用一种Select()方法
32\获取已选键值
33\对于已选中键集中的每一个键:
331\将已选键从键集中移除
332\获取信道,并从键中获取附件
333\确定准备就绪的操作并执行;对于accept操作获得的SocketChannel对象,需将信道设为非阻塞模式,并将其注册到选择器中
334\根据需要,修改键的兴趣操作集
如下代码:
NIOServer.java package org.hadoopinternal.nio; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NIOServer { private static final int TIMEOUT = 300; private static final int PORT = 12112; public static void main(String[] args) { try { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.configureBlocking(false); listenChannel.socket().bind(new InetSocketAddress(PORT)); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while(true) { if(selector.select(TIMEOUT)==0) { System.out.print("."); continue; } Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while( iter.hasNext() ) { SelectionKey key = iter.next(); iter.remove(); //Server socket channel has pending connection request? if( key.isAcceptable() ) { SocketChannel channel=listenChannel.accept(); channel.configureBlocking(false); SelectionKey connkey=channel.register(selector, SelectionKey.OP_READ ); NIOServerConnection conn=new NIOServerConnection(connkey); connkey.attach(conn); } if( key.isReadable() ) { NIOServerConnection conn=(NIOServerConnection) key.attachment(); conn.handleRead(); } if( key.isValid() && key.isWritable() ) { NIOServerConnection conn=(NIOServerConnection) key.attachment(); conn.handleWrite(); } } } } catch (Exception e) { e.printStackTrace(); } } } NIOServerConnection.java: package org.hadoopinternal.nio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; public class NIOServerConnection { private static final int BUFFSIZE = 1024; SelectionKey key; SocketChannel channel; ByteBuffer buffer; public NIOServerConnection(SelectionKey key) { this.key=key; this.channel=(SocketChannel) key.channel(); buffer=ByteBuffer.allocate(BUFFSIZE); } public void handleRead() throws IOException { long bytesRead=channel.read(buffer); if(bytesRead==-1) { channel.close(); } else { key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE ); } } public void handleWrite() throws IOException { buffer.flip(); channel.write(buffer); if(!buffer.hasRemaining()) { key.interestOps( SelectionKey.OP_READ ); } buffer.compact(); } }
如下代码为hadoop IPC Server 中的Listener是一个标准的NIO应用:
/** Listens on the socket. Creates jobs for the handler threads*/ private class Listener extends Thread { private ServerSocketChannel acceptChannel = null; //the accept channel private Selector selector = null; //the selector that we use for the server private Reader[] readers = null; private int currentReader = 0; private InetSocketAddress address; //the address we bind at private Random rand = new Random(); private long lastCleanupRunTime = 0; //the last time when a cleanup connec- //-tion (for idle connections) ran private long cleanupInterval = 10000; //the minimum interval between //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); private ExecutorService readPool; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); } private class Reader implements Runnable { private volatile boolean adding = false; private Selector readSelector = null; Reader(Selector readSelector) { this.readSelector = readSelector; } public void run() { LOG.info("Starting SocketReader"); synchronized (this) { while (running) { SelectionKey key = null; try { readSelector.select(); while (adding) { this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } key = null; } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } catch (IOException ex) { LOG.error("Error in Reader", ex); } } } } /** * This gets reader into the state that waits for the new channel * to be registered with readSelector. If it was waiting in select() * the thread will be woken up, otherwise whenever select() is called * it will return even if there is nothing to read and wait * in while(adding) for finishAdd call */ public void startAdd() { adding = true; readSelector.wakeup(); } public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException { return channel.register(readSelector, SelectionKey.OP_READ); } public synchronized void finishAdd() { adding = false; this.notify(); } } /** cleanup connections from connectionList. Choose a random range * to scan and also have a limit on the number of the connections * that will be cleanedup per run. The criteria for cleanup is the time * for which the connection was idle. If 'force' is true then all * connections will be looked at for the cleanup. */ private void cleanupConnections(boolean force) { if (force || numConnections > thresholdIdleConnections) { long currentTime = System.currentTimeMillis(); if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) { return; } int start = 0; int end = numConnections - 1; if (!force) { start = rand.nextInt() % numConnections; end = rand.nextInt() % numConnections; int temp; if (end < start) { temp = start; start = end; end = temp; } } int i = start; int numNuked = 0; while (i <= end) { Connection c; synchronized (connectionList) { try { c = connectionList.get(i); } catch (Exception e) {return;} } if (c.timedOut(currentTime)) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); closeConnection(c); numNuked++; end--; c = null; if (!force && numNuked == maxConnectionsToNuke) break; } else i++; } lastCleanupRunTime = System.currentTimeMillis(); } } @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); while (running) { SelectionKey key = null; try { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } cleanupConnections(false); } LOG.info("Stopping " + this.getName()); synchronized (this) { try { acceptChannel.close(); selector.close(); } catch (IOException e) { } selector= null; acceptChannel= null; // clean up all connections while (!connectionList.isEmpty()) { closeConnection(connectionList.remove(0)); } } } private void closeCurrentConnection(SelectionKey key, Throwable e) { if (key != null) { Connection c = (Connection)key.attachment(); if (c != null) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); closeConnection(c); c = null; } } } InetSocketAddress getAddress() { return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); } void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis()); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); } finally { reader.finishAdd(); } } } void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c + ". Number of active connections: "+ numConnections); closeConnection(c); c = null; } else { c.setLastContact(System.currentTimeMillis()); } } synchronized void doStop() { if (selector != null) { selector.wakeup(); Thread.yield(); } if (acceptChannel != null) { try { acceptChannel.socket().close(); } catch (IOException e) { LOG.info(getName() + ":Exception in closing listener socket. " + e); } } readPool.shutdown(); } // The method that will return the next reader to work with // Simplistic implementation of round robin for now Reader getReader() { currentReader = (currentReader + 1) % readers.length; return readers[currentReader]; } }
相关推荐
一个java NIO的例子 有很详细的每一步的描述,很好去理解
Java NIO 深入探讨了 1.4 版的 I/O 新特性,并告诉您如何使用这些特性来极大地提升您所写的 Java 代码的执行效率。这本小册子就程序员所面临的有代表性的 I/O 问题作了详尽阐述,并讲解了 如何才能充分利用新的 I/O ...
本例包含服务器端和客户端,多线程,每线程多次发送,Eclipse工程,启动服务器使用 nu.javafaq.server.NioServer,启动客户端使用 nu.javafaq.client.NioClient。另本例取自javafaq.nv上的程序修改而成
我研究并实现的Java Nio selector例子。java侧起server(NioUdpServer1.java),基于Java Nio的selector 阻塞等候,一个android app(NioUdpClient1文件夹)和一个java程序(UI.java)作为两个client分别向该server...
java NIO 消息推送实例代码,解压Tmp.zip Desk为桌面程序,DeskAppServer为服务端程序,江巅
java nio 附带例子 以及原理 java nio 附带例子 以及原理 java nio 附带例子 以及原理 java nio 附带例子 以及原理
JavaNIO服务器实例Java开发Java经验技巧共6页.pdf.zip
实例介绍了一个简单的nio实例,适合刚接触nio的童鞋们....
一个非常简单的java nio通信的例子,服务器接收到来自客户端的字符串后,计算该字符串的哈希值,然后返回给客户端
该资源包含了一个用javaNIO实现的读写文件以及复制文件的简单的demo,程序注释清晰,简单易懂,喜欢的下载!!!
实现java nio的标准例子,包括server端和client端。
java nio 编程一个实例子.服务端程序
java基于nio的通信实例,带UML结构图及server、client源码。
用java编写的nio通信的例子,nio是io编程的新版本,比io较流行。同时本例子是适用socket通信的。可以在此基础上,添加您的个人应用。本例子适用于:java通信的学习者,android平台通信的学习者。
nio代码实例,Java NIO 系列教程,买不了吃亏,买不了上当
使用NIO socket不需要多线程来处理多个连接的请求,效率非常高 可以作为NIO socket入门的例子,Reactor模式,重点理解key.attach, jar文件里包含了源代码 1,运行server.bat启动服务器,可以打开编辑,修改端口号 ...
举个例子吧 你服务器做一个聊天室 按照以前的阻塞式IO 你必须为每个连接创建一个线程 因为当你调用如 in read buf 时 线程会阻塞在这里 而采用nio 只要注册了事件 它内部采用反应模式 当有IO事件发生时 再调度它 而...
这本书是介绍java nio的基础书籍,原理讲的还是挺明白的,比较好懂,就是例子比较少。nio对于java程序员来说可能不是很好理解,但是对于C程序员来说,就是epoll的一个封装。 我本人是C程序员,对java比较感兴趣,...
Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,...
用nio想的一个不阻塞NIOSocket例子.。。希望对阁下有用