1.前言
JDK1.4之前的传统阻塞IO(BIO),服务端需要为每一个客户端连接创建单独的线程为其服务,从JDK1.4开始NIO非阻塞式IO出现,它只需要单独的一个线程就能接收多个客户端请求,而真正处理各个请求的细节可以使用多线程的方式高效率的完成,这些处理线程与具体的业务逻辑分离,做到了IO的复用。
2.源码分析
首先以一段典型的NIO使用代码开始:
Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(9527)); ssc.register(selector, SelectionKey.OP_ACCEPT); while(true){ int n = selector.select(); if (n <= 0) continue; Iterator it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = (SelectionKey)it.next(); if (key.isAcceptable()){ SocketChannel sc= ((ServerSocketChannel) key.channel()).accept(); sc.configureBlocking(false); sc.register(key.selector(), SelectionKey.OP_READ|SelectionKey.OP_WRITE); } if (key.isReadable()){ SocketChannel channel = ((SocketChannel) key.channel()); ByteBuffer bf = ByteBuffer.allocate(10); int read = channel.read(bf); System.out.println("read "+read+" : "+new String(bf.array()).trim()); } if (key.isWritable()){ SocketChannel channel = ((SocketChannel) key.channel()); channel.write(ByteBuffer.wrap(new String("hello client").getBytes())); } it.remove(); } }
2.1 Selector.open() 获取选择器。
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); } public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
从Selector源码中可以看到,open方法是交给selectorProvider处理的。 其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;Linux平台会根据不同的内核版本选择是使用select/poll模式还是epoll模式。
public static SelectorProvider create() { PrivilegedAction pa = new GetPropertyAction("os.name"); String osname = (String) AccessController.doPrivileged(pa); if ("SunOS".equals(osname)) { return new sun.nio.ch.DevPollSelectorProvider(); } // use EPollSelectorProvider for Linux kernels >= 2.6 if ("Linux".equals(osname)) { pa = new GetPropertyAction("os.version"); String osversion = (String) AccessController.doPrivileged(pa); String[] vers = osversion.split("\\.", 0); if (vers.length >= 2) { try { int major = Integer.parseInt(vers[0]); int minor = Integer.parseInt(vers[1]); if (major > 2 || (major == 2 && minor >= 6)) { return new sun.nio.ch.EPollSelectorProvider(); } } catch (NumberFormatException x) { // format not recognized } } } return new sun.nio.ch.PollSelectorProvider(); } sun.nio.ch.EPollSelectorProvider public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } sun.nio.ch.PollSelectorProvider public AbstractSelector openSelector() throws IOException { return new PollSelectorImpl(this); }
可以看到,如果Linux内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider,实际上这是在JDK5U9之后才有这样的更新。
public static SelectorProvider create() { return new sun.nio.ch.WindowsSelectorProvider(); } sun.nio.ch.WindowsSelectorProvider public AbstractSelector openSelector() throws IOException { return new WindowsSelectorImpl(this); } WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); } void addWakeupSocket(int fdVal, int index) { putDescriptor(index, fdVal); putEventOps(index, POLLIN); }
接下来,以Windows的实现为准进行分析。在openSelector方法里面实例化WindowsSelectorImpl的过程中,
1).实例化了PollWrapper,pollWrapper用Unsafe类申请一块物理内存,用于存放注册时的socket句柄fdVal和event的数据结构pollfd.
2)Pipe.open()打开一个管道(打开管道的实现后面再看);拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里.addWakeupSocket方法将source的POLLIN事件(有数据可读)标识为感兴趣的,当sink端有数据写入时,source对应的文件描述描wakeupSourceFd就会处于就绪状态.
public static Pipe open() throws IOException { return SelectorProvider.provider().openPipe(); } public Pipe openPipe() throws IOException { return new PipeImpl(this); } PipeImpl(final SelectorProvider sp) throws IOException { try { AccessController.doPrivileged(new Initializer(sp)); } catch (PrivilegedActionException x) { throw (IOException)x.getCause(); } } private Initializer(SelectorProvider sp) { this.sp = sp; } public Void run() throws IOException { LoopbackConnector connector = new LoopbackConnector(); connector.run(); ....//省略 } private class LoopbackConnector implements Runnable { @Override public void run() { ServerSocketChannel ssc = null; SocketChannel sc1 = null; SocketChannel sc2 = null; try { // Loopback address InetAddress lb = InetAddress.getByName("127.0.0.1"); assert(lb.isLoopbackAddress()); InetSocketAddress sa = null; for(;;) { // Bind ServerSocketChannel to a port on the loopback // address if (ssc == null || !ssc.isOpen()) { ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(lb, 0)); sa = new InetSocketAddress(lb, ssc.socket().getLocalPort()); } // Establish connection (assume connections are eagerly // accepted) sc1 = SocketChannel.open(sa); ByteBuffer bb = ByteBuffer.allocate(8); long secret = rnd.nextLong(); bb.putLong(secret).flip(); sc1.write(bb); // Get a connection and verify it is legitimate sc2 = ssc.accept(); bb.clear(); sc2.read(bb); bb.rewind(); if (bb.getLong() == secret) break; sc2.close(); sc1.close(); } // Create source and sink channels source = new SourceChannelImpl(sp, sc1); sink = new SinkChannelImpl(sp, sc2); } catch (IOException e) { try { if (sc1 != null) sc1.close(); if (sc2 != null) sc2.close(); } catch (IOException e2) {} ioe = e; } finally { try { if (ssc != null) ssc.close(); } catch (IOException e2) {} } } } }
通过创建管道的代码分析:创建管道的具体实现方式也是与具体的操作系统紧密相关的,这里以Windows为例,创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法,在run方法里面,windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。通过查阅资料,而在Linux下则是直接使用操作系统提供的管道。
到这里,Selector.open()就完成了,总结一下,主要完成以下几件事:
1.实例化pollWrapper对象,用于将来存放注册时的socket句柄fdVal和event的数据结构pollfd。
2.根据不同操作系统实现了用于自我唤醒的管道,Windows通过创建一对自己连着自己的socket通道,Linux直接使用系统提供的管道。同时,根据linux的不同内核版本还会选择底层进行事件通知的不同机制select/poll或者epoll。
2.2 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);通道注册
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException{ synchronized (regLock) { SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }如果该channel和selector已经注册过,则直接添加事件和附件。否则通过selector实现注册过程。
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; } protected void implRegister(SelectionKeyImpl ski) { synchronized (closeLock) { if (pollWrapper == null) throw new ClosedSelectorException(); growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); fdMap.put(ski); keys.add(ski); pollWrapper.addEntry(totalChannels, ski); totalChannels++; } } private void growIfNeeded() { if (channelArray.length == totalChannels) { int newSize = totalChannels * 2; // Make a larger array SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); channelArray = temp; pollWrapper.grow(newSize); } if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); totalChannels++; threadsCount++; } } void addEntry(int index, SelectionKeyImpl ski) { putDescriptor(index, ski.channel.getFDVal()); }通过selector注册的过程主要完成以下几件事:
- 以当前channel和selector为参数,初始化 SelectionKeyImpl 对象,并添加附件attachment。
- 如果当前channel的数量totalChannels等于SelectionKeyImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操作。
- 如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。windows上select系统调用有最大文件描述符限制,一次只能轮询1024个文件描述符,如果多于1024个,需要多线程进行轮询。
- ski.setIndex(totalChannels)选择键记录下在数组中的索引位置。
- keys.add(ski);将选择键加入到已注册键的集合中。
- fdMap.put(ski);保存选择键对应的文件描述符与选择键的映射关系。
- pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
- k.interestOps(ops)方法最终也会把event添加到对应的pollfd。
2.3 selector.select();
public int select() throws IOException { return select(0); } public int select(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("Negative timeout"); return lockAndDoSelect((timeout == 0) ? -1 : timeout); } private int lockAndDoSelect(long timeout) throws IOException { synchronized (this) { if (!isOpen()) throw new ClosedSelectorException(); synchronized (publicKeys) { synchronized (publicSelectedKeys) { return doSelect(timeout); } } } }当调用selector.select()以及select(0)时,JDK对参数进行修正,其实传给doSelect的timeout为-1。当调用的是selectNow()的时候,timeout则为0,直接以负数作为参数则会抛出异常,其中的doSelector又回到我们的Windows实现:
相关推荐
NULL 博文链接:https://goon.iteye.com/blog/1775421
NULL 博文链接:https://dengqsintyt.iteye.com/blog/2083316
[第4节] JavaNIO流-通道1.flv [第5节] Java NIO流-通道2.flv [第6节] Java NIO流-socket通道操作.flv [第7节] Java NIO流-文件通道操作.flv [第8节] Java NIO流-选择器 .flv [第9节] Java NIO流-选择器操作.flv...
本文将主要分析Netty实现方面的东西,由于精力有限,本人并没有对其源码做了极细 致的研 究。如果下面的内容有错误或不严谨的地方,也请指正和谅解。对于Netty使用者来说,Netty提供了几个典型的example,并有详尽的...
Java 源码包 Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。编辑音乐软件的朋友,这款实例会对你有所帮助。 Calendar万年历 1个目标文件 EJB 模拟银行ATM流程及操作源代码 6个目标文件,EJB来...
Java生成密钥、保存密钥的实例源码,通过本源码可以了解到Java如何产生单钥加密的密钥(myKey)、产生双钥的密钥对(keyPair)、如何保存公钥的字节数组、保存私钥到文件privateKey.dat、如何用Java对象序列化保存私钥...
java注解源码分析 javapro 1、ArrayList 分析 源码注释 2)常用方法示例 3)常见bug分析 2、NIO 内容 1)基础知识点 2)实例
集合源码分析 JavaBooks 推荐书单 书单 01.JVM 深入理解Java虚拟机 02.NIO Netty实战 Netty权威指南 03.Java并发编程 Java并发编程实践 04.技术框架 Spring 实战 Spring Boot编程思想 05.数据库 高可用MySQL 高性能...
Java-NIO-Netty框架学习资源目录:【】Netty5.0架构剖析和源码解读【】Netty5用户指南【】Netty_in_Action(第五版-目录修正版)【】Netty_in_Action_v08_MEAP【】Netty_in_Action_v10_MEAP【】Netty_代码分析【】...
今天我们继续就Android DDMS源码一起分析NIO非阻塞通讯方式,Android123也会给大家分享下手机和PC互通中的一些技术
Java并发编程相关的内容,并发包源码分析等 集合框架 Java集合框架,并发容器,同步容器等 IO框架 Java基础字节流 字符流 NIO等 Java8 Java8语言的行为参数化和流编程等 Java虚拟机 Java虚拟机相关,内存模型,类...
core-niojava nio 客户端和服务端交互实现主要通过长连接的方式进行数据交换,采用多路复用技术,同步非阻塞模式。主要有以下几个概念Channel(渠道,类似于高速公路可以处理很多线程io);Selector(选择器,可以...
│ │ 13.RPC底层通讯原理之Netty线程模型源码分析.wmv │ │ │ ├─14.分库分表之后分布式下如何保证ID全局唯一性 │ │ 14.分库分表之后分布式下如何保证ID全局唯一性.mp4 │ │ │ └─15.大型公司面试必答之...
jdk sun 开头的源码 有利于分析sun 底层的相关实现, 如channel ,nio等
java7 hashmap源码 Excellent-Blog-share Welcome to share ...[Netty源码分析目录]() Spring 系列 Spring Boot系列 Spring Cloud系列 微服务系列 Excellent-Blogs Excellent-Githubs ###企业 ###个人
java源码 uml 工具 :sparkling_heart:说明 该仓库作为我的文章导航页面,文章均为原创,记录了我学习编程的点点滴滴,希望可以帮助到大家,共同进步 ! :hot_beverage:Java基础 Java基础文章:没写系列文章的都放在了...
JCarder 是一个用来查找多线程应用程序中一些潜在的死锁,通过对 Java 字节码的动态分析来完成死锁分析。 Java的Flash解析、生成器 jActionScript jActionScript 是一个使用了 JavaSWF2 的 Flash 解析器和生成器。...
传统BIO通信的弊端 在JDK 1.4推出JAVANIO1.0之前,基于JAVA 的所有Socket通信都采用 BIO 了同步阻塞模式( ),这种一请求一应答的通信模型简化了上层的应用开发, 但是在可靠性和性能方面存在巨大的弊端。...
对 NIO 模式,请求的流程描述的很详细。值得去仔细的研究。