`
assertmyself
  • 浏览: 28636 次
  • 性别: Icon_minigender_1
  • 来自: 南京
文章分类
社区版块
存档分类
最新评论

NIO-Reactor模式介绍

阅读更多
Reactor模式常用于java nio编程中,跟生产者消费者模式有点类似,可以认为是只有一个线程的生产者消费者模型,netty底层也是使用Reactor模式作为nio部分的开发


一个简单的Reactor模式


Reactor.java
package com.gbcom.protocol.nio.core;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 反应器模式,适合nio编码,类似事件驱动编程方式,适合非阻塞IO
 * 
 * 相比较传统的生产者消费者模式,,由于结合nio,不需要开启多个消费者线程,仅仅需要开启一个Reactor线程进行轮询
 * 性能会更高,(reactor 模式 必须结合 nio 也就是 非阻塞的模式使用)
 * 
 * 
 * netty 使用 reactor模式nio部分的编码
 * @author SYZ
 * @date 2016-11-1 上午10:52:58
 * @version 1.0.0
 * @see com.gbcom.protocol.nio.core.Reactor
 */
class Reactor implements Runnable {
	final Selector selector;
	final ServerSocketChannel serverSocket;

	Reactor(int port) throws IOException {
		selector = Selector.open();
		serverSocket = ServerSocketChannel.open();
		serverSocket.socket().bind(new InetSocketAddress(port));
		serverSocket.configureBlocking(false);
		SelectionKey sk = serverSocket.register(selector,
				SelectionKey.OP_ACCEPT);
		sk.attach(new Acceptor());
	}

	/*
	 * Alternatively, use explicit SPI provider: SelectorProvider p =
	 * SelectorProvider.provider(); selector = p.openSelector(); serverSocket =
	 * p.openServerSocketChannel();
	 */
	// class Reactor continued
	public void run() { // normally in a new Thread
		try {
			while (!Thread.interrupted()) {
				selector.select(); //select这个函数是block,
				Set selected = selector.selectedKeys();
				Iterator it = selected.iterator();
				while (it.hasNext())
					dispatch((SelectionKey) (it.next()));
				selected.clear();
			}
		} catch (IOException ex) { /* ... */
		}
	}

	void dispatch(SelectionKey k) {
		Runnable r = (Runnable) (k.attachment());
		if (r != null)
			r.run();
	}

	// class Reactor continued //接收者处理方法
	class Acceptor implements Runnable {
		// inner
		public void run() {
			try {
				SocketChannel c = serverSocket.accept();//1.select()方法仅仅通知有事件到来, 真正的接受 还是要使用 accept
				//每个客户端 应该只执行一次,也就是 new Handler 会重新注册到该select中,以后不会再重复创建了。。。(待测试)
				if (c != null)
					new Handler(selector, c);
			} catch (IOException ex) { /* ... */
			}

		}
	}
	
	/**
	 * Test: (Reactor.main)
	 * @param args
	 * @throws IOException
	 */
	public static void main(String args[]) throws IOException{
		System.out.println(SelectionKey.OP_ACCEPT);
		Reactor reactor = new Reactor(1107);
		Thread t = new Thread(reactor);
//		t.setDaemon(true);
		t.start();
	}
}



Handler.java
package com.gbcom.protocol.nio.core;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * 请求的处理,sk.attach(this);是关键,,也就是直接添加到 选择器中,,如果有事件到来,,直接会调用run方法
 * 
 * 注意 该方法并没有开启线程处理
 * 
 * @author SYZ
 * @date 2016-11-1 上午11:08:04
 * @version 1.0.0
 * @see com.gbcom.protocol.nio.core.Handler
 */
final class Handler implements Runnable {
	final SocketChannel socket;
	final SelectionKey sk;
	ByteBuffer input = ByteBuffer.allocate(0);
	ByteBuffer output = ByteBuffer.allocate(100);
	static final int READING = 0, SENDING = 1;
	int state = READING;

	Handler(Selector sel, SocketChannel c) throws IOException {
		socket = c;
		c.configureBlocking(false);
		// Optionally try first read now
		sk = socket.register(sel, 0);//把channel 注册到选择器中
		sk.attach(this);// 添加到 select中,公用 reactor中的selector线程
		sk.interestOps(SelectionKey.OP_READ);
		sel.wakeup();// 立即执行
	}

	boolean inputIsComplete() {
		return false; /* ... */
	}

	boolean outputIsComplete() {
		return false; /* ... */
	}

	void process() {
		System.out.println("process!");
		/* ... */
	}

	// class Handler continued
	public void run() {
		try {
			if (state == READING)
				read();//IO的read是非阻塞的,但是逻辑是同步,所以有耗时的可能
			else if (state == SENDING)
				send();
		} catch (IOException ex) { /* ... */
		}
	}

	void read() throws IOException {
		socket.read(input);
		if (inputIsComplete()) {
			process();

			state = SENDING;// Normally also do first write now
			sk.interestOps(SelectionKey.OP_WRITE);

			// 设计模式方式
			/*
			 * sk.attach(new Sender()); sk.interest(SelectionKey.OP_WRITE);
			 * sk.selector().wakeup();
			 */

		}
	}

	void send() throws IOException {
		socket.write(output);
		if (outputIsComplete())
			sk.cancel();
	}

	/**
	 * 简单封装,并没有使用
	 * @author SYZ
	 * @date 2016-11-1 上午11:21:38
	 * @version 1.0.0
	 * @see com.gbcom.protocol.nio.core.Sender
	 */
	class Sender implements Runnable {
		public void run() { // ...
			try {
				socket.write(output);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			if (outputIsComplete())
				sk.cancel();
		}
	}

}


以上模式在正常非阻塞IO的情况效果还可以,但是要提供效率,可以使用多线程处理 io之后的业务逻辑,下面看一下多线程Reactor模式

多线程Reactor模式
ReactorWithPool.java
package com.gbcom.protocol.nio.core;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 反应器的多线程版本,
 * 
 * http://gee.cs.oswego.edu Multiple Reactor Threads " Using Reactor Pools Use
 * to match CPU and IO rates Static or dynamic construction " Each with own
 * Selector, Thread, dispatch loop Main acceptor distributes to other reacto
 * 
 * 
 * 选择器也会资源紧张(未深究),所以可以创建多个
 * @author SYZ
 * @date 2016-11-1 上午11:46:49
 * @version 1.0.0
 * @see com.gbcom.protocol.nio.core.ReactorWithPool
 */
class ReactorWithPool implements Runnable {
	final Selector mainSelector;
	Selector[] subselectors = new Selector[10];//业务逻辑相关的选择器,,如何创建??
	// also create threads
	int next = 0;

	final ServerSocketChannel serverSocket;

	ReactorWithPool(int port) throws IOException {
		mainSelector = Selector.open();
		
		for(int i=0;i<10;i++){ //add by myself
			subselectors[i] = Selector.open();
		}
		
		serverSocket = ServerSocketChannel.open();
		serverSocket.socket().bind(new InetSocketAddress(port));
		serverSocket.configureBlocking(false);
		SelectionKey sk = serverSocket.register(mainSelector,
				SelectionKey.OP_ACCEPT);
		sk.attach(new Acceptor());
	}

	/*
	 * Alternatively, use explicit SPI provider: SelectorProvider p =
	 * SelectorProvider.provider(); selector = p.openSelector(); serverSocket =
	 * p.openServerSocketChannel();
	 */
	// class Reactor continued
	public void run() { // normally in a new Thread
		try {
			while (!Thread.interrupted()) {
				mainSelector.select();
				Set selected = mainSelector.selectedKeys();
				Iterator it = selected.iterator();
				while (it.hasNext())
					dispatch((SelectionKey) (it.next()));
				selected.clear();
			}
		} catch (IOException ex) { /* ... */
		}
	}

	void dispatch(SelectionKey k) {
		Runnable r = (Runnable) (k.attachment());
		if (r != null)
			r.run();
	}

	// class Reactor continued //接收者处理方法
	class Acceptor implements Runnable {
		// inner
		public void run() {
			try {
				SocketChannel connection = serverSocket.accept();
				if (connection != null)
					new Handler(subselectors[next], connection);
				if (++next == subselectors.length)
					next = 0;
			} catch (IOException e) {
				e.printStackTrace();
			}

		}
	}

	/**
	 * Test: (Reactor.main)
	 * 
	 * @param args
	 * @throws IOException
	 */
	public static void main(String args[]) throws IOException {
		System.out.println(SelectionKey.OP_ACCEPT);
		ReactorWithPool reactor = new ReactorWithPool(1107);
		Thread t = new Thread(reactor);
		t.setDaemon(true);
		t.start();
	}
}



HandlerWithPool.java
package com.gbcom.protocol.nio.core;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;


/**
 * 使用线程池处理read之后的事情,主要包括业务逻辑,
 * 注意是处理IO read完之后的事情,不是处理IO的读写操作,适用于业务逻辑比较复杂的情况,
 * 
 * 解决逻辑耗时问题
 * 
 * @author SYZ
 * @date 2016-11-1 上午11:33:03
 * @version 1.0.0
 * @see com.gbcom.protocol.nio.core.HandlerWithPool
 */
public class HandlerWithPool implements Runnable{

	  // uses util.concurrent thread pool
	  static PooledExecutor pool = new PooledExecutor();
	  static final int PROCESSING = 3;
	  
	final SocketChannel socket;
	final SelectionKey sk;
	ByteBuffer input = ByteBuffer.allocate(0);
	ByteBuffer output = ByteBuffer.allocate(100);
	static final int READING = 0, SENDING = 1;
	int state = READING;

	HandlerWithPool(Selector sel, SocketChannel c) throws IOException {
		socket = c;
		c.configureBlocking(false);
		// Optionally try first read now
		sk = socket.register(sel, 0);
		sk.attach(this);// 添加到 select中,公用 reactor中的selector线程
		sk.interestOps(SelectionKey.OP_READ);
		sel.wakeup();// 立即执行
	}

	boolean inputIsComplete() {
		return false; /* ... */
	}

	boolean outputIsComplete() {
		return false; /* ... */
	}

	void process() {
		System.out.println("process!");
		/* ... */
	}

	// class Handler continued
	public void run() {
		try {
			if (state == READING)
				read();
			else if (state == SENDING)
				send();
		} catch (IOException ex) { /* ... */
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	synchronized  void read() throws IOException, InterruptedException {
	    socket.read(input);//非阻塞,模式,直接获取了数据,
	    if (inputIsComplete()) {
	      state = PROCESSING;
	      pool.execute(new Processer());
	    }
	}
	
  synchronized void processAndHandOff() {
	    process();
	    state = SENDING; // or rebind attachment
	    sk.interestOps(SelectionKey.OP_WRITE);
	  }
	  class Processer implements Runnable {
	    public void run() { processAndHandOff(); }
	  }
		  

	void send() throws IOException {
		socket.write(output);
		if (outputIsComplete())
			sk.cancel();
	}

	/**
	 * 简单封装,并没有使用
	 * @author SYZ
	 * @date 2016-11-1 上午11:21:38
	 * @version 1.0.0
	 * @see com.gbcom.protocol.nio.core.Sender
	 */
	class Sender implements Runnable {
		public void run() { // ...
			try {
				socket.write(output);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			if (outputIsComplete())
				sk.cancel();
		}
	}


	

}
分享到:
评论

相关推荐

    socket-nio-single-reactor.zip

    reactor多线程,java代码demo,帮助理解reactor模式;由于是测试代码,故不保证一定正确,能正常接入传输数据,目前数据包处理没做,故会出现数据截断

    Reactor模式和NIO

    Reactor模式和NIO Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定

    bio-nio-aio.zip

    【Java IO】从NIO到Reactor三种模式 博客地址:https://blog.csdn.net/qq_36963950/article/details/107998164

    reactor:NIO 编程模型 - Reactor,各版本实现

    依据 Doug Lea 的 基于 NIO 实现的 Reacotr 模式的回显服务器 BasicHandler: 单线程处理器 MultiReactor: 主从 Reactor MultithreadHandler: 线程池处理器 Reactor: 接收连接,I/O 读写 Reactor 模型的说明: ...

    深入浅出NIO

    NIO 入门笔记 Reactor模式概念介绍

    java NIO socket聊天室

    可以作为NIO socket入门的例子,Reactor模式,重点理解key.attach, jar文件里包含了源代码 1,运行server.bat启动服务器,可以打开编辑,修改端口号 2,运行client.bat启动客户端,可以打开编辑,ip,和端口号 3...

    disrupted-reactor:Java异步。 IO(基于选择器)+ LMAX Disruptor

    实施的想法: select()的专用线程-React堆模式,通过特殊的WaitStrategy实现为一个中断实例。 N个线程(即处理器)处理IO事件。 一个NIO通道的处理始终在一个线程中进行(这意味着代码不必是线程安全的-&gt;性能提高...

    nioreactor:无阻塞IOReact器

    由于React堆线程可以在执行IO时饱和,因此nioreactor使用接受器线程将新连接转发到可以在非阻塞模式下处理读取和写入的React堆池。 建筑分布 要求 2.2.0或以上 Java 8或以上 建立: git clone mvn clean install ...

    Doug Lea Scalable IO in Java

    Scalable IO in Java -Doug Lea 学习NIO必看经典 描述java nio 和reactor 设计模式之间的关系

    基于事件的NIO多线程服务器

    许多基于NIO的多线程服务器程序往往直接基于选择器(Selector)的Reactor模式实现。这种简单的事件机制对于较复杂的服务器应用,显然缺乏扩展性和可维护性, 而且缺乏直观清晰的结构层次。本文将通过一个基于事件回...

    Java NIO原理和使用

    本文主要简单介绍NIO的基本原理,在下一篇文章中,将结合Reactor模式和著名线程大师Doug Lea的一篇文章深入讨论。 NIO主要原理和适用。 NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的...

    C++从0实现百万并发Reactor服务器完结13章

    Reactor 模式也叫做反应器设计模式,它是一种为处理服务请求并发提交到一个或者多个服务处理程序的事件设计模式。当请求抵达后,服务处理程序使用解多路分配策略,然后同步地派发这些请求至相关的请求处理程序。 ...

    Scalable IO in Java -Doug Lea

    Scalable IO in Java -Doug Lea 描述java nio 和reactor 设计模式之间的关系

    Observer and Reactor 观察者和recator的比较

    Observer and Reactor 观察者和recator的比较。想从菜鸟变高手,请下载,只要5分钟,看一下。

    Java_NIO原理解析

    Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer(观察员)模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必 开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。

    Apache-Tomcat-8.5.5(Linux )

    NIO是reactor模式,平常说的基本是只这个,比如说典型的Netty便是NIO扩建。NIO2则不同,是preator模式,是async io。async io是指client端只需提交请求,由操作系统来负责在事件就绪后回调应用逻辑。事实上,目前...

    简单了解Java Netty Reactor三种线程模型

    1. Reactor三种线程模型 ...由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处理所有IO相关的操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Ac

    Netty高性能网络应用框架.rar

    NIO是基于Reactor模式,面向缓冲区并结合通道的IO模型。客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求就进行处理。 AIO : Asynchronous IO,即异步非阻塞,采用了 Proactor 模式,特点...

    精通并发与netty视频教程(2018)视频教程

    63_Reactor模式与Netty组件对比及Acceptor组件的作用分析 64_Channel与ChannelPipeline关联关系及模式运用 65_ChannelPipeline创建时机与高级拦截过滤器模式的运用 66_Netty常量池实现及ChannelOption与Attribute...

    精通并发与 netty 视频教程(2018)视频教程

    57_Reactor模式透彻理解及其在Netty中的应用 58_Reactor模式与Netty之间的关系详解 59_Acceptor与Dispatcher角色分析 60_Netty的自适应缓冲区分配策略与堆外内存创建方式 61_Reactor模式5大角色彻底分析 62_Reactor...

Global site tag (gtag.js) - Google Analytics