`

Mina解析(二)

阅读更多

4.NIO 基础知识

        Java NIO(new IO)是JDK1.4引入的非阻塞IO机制,具体它如何的好,我就不说啦,百度一下看看就了解啦。

        Java NIO引入了两个新的概念:通道Channel和选择器Selector。

        通道是服务端和客户端进行通信的接口-----原来是直接的IO操作,客户端发信息给服务端,服务端从OutputStream中读取,然后向InputStream中写数据;现在则直接从Channel中读取或写入数据。

        选择器是一个多路复用器:所有的通道向它注册事件,因此它管理了所有的通道信息,并轮询各个通道的状态,一旦某个通道某事件发生(比如有数据读或可以写入数据),则通知该管道对应事件的处理器去处理它。

        如下图所示:

        客户端连接上服务端后,首先每个客户端都要与服务端建立一个通道(SocketChannel);然后每个通道向选择器(Selector)注册事件,注册器就会轮询查看每个通道是否有事件发生,一旦某通道有事件发生,比如Client1的SocketChannel有数据了,就触发了读就绪事件,可以进行读取的操作。

        选择器(Selector)是个典型的反应器模式(Reactor Pattern),它的实现原理可以参考该文章:http://bijian1013.iteye.com/blog/2277792

        先介绍下NIO的几个常用类:

(1).Buffer 缓冲区

        Java NIO的缓冲区Buffer基本和Mina的IoBuffer一样,但是准确的说,Mina的IoBuffer就是对Java NIO buffer的二次封装,使得它的功能更加强大。

        数据输入和输出往往是比较耗时的操作。缓冲区从两个方面提高I/O操作的效率:

        1).减少实际的物理读写次数

        这一点比较容易理解,我们经常使用的缓冲数组就是这个道理:

 

public void ioRead(String filePath) throws IOException {
	FileInputStream in = new FileInputStream(filePath);
	byte[] b = new byte[1024];
	int i = 0;
	while ((i = in.read(b)) != -1) {
		logger.info(new String(b, 0, i));
	}
}

 

        2).缓冲区在创建时被分配内存,这块内存区域一直被重用,这可以减少动态分配和回收内存区域的次数。

        这一点是Buffer的优势,也是buffer性能较高的一个原因。

        java.nio.Buffer类是一个抽象类,不能被实例化。共有8个具体的缓冲区类,其中最基本的缓冲区是ByteBuffer,它存放的数据单元是字节。ByteBuffer类并没有提供公开的构造方法,但是提供了两个获得ByteBuffer实例的静态工厂方法:

        a.allocate(int capacity):返回一个ByteBuffer对象,参数capacity指定缓冲区的容量。

        b.directAllocate(int capacity): 返回一个ByteBuffer对象,参数capacity指定缓冲区的容量。该方法返回的缓冲区称为直接缓冲区,它与当前操作系统能够更好的耦合,因此能进一步提高I/O操作的速度。但是分配直接缓冲区的系统开销很大,因此只有在缓冲区较大并且长期存在,或者需要经常重用时,才使用这种缓冲区。

        同样常用的是CharBuffer,使用基本和上面一样,所有不多说啦!其他实现类我也基本不怎么用。

        常用的方法也不多做解释了,翻看API文档吧。

(2).Charset 字符编码

        Charset就是根据指定的编码个数进行编解码的一个接口,不多作解释。

        a.Charset类的静态forName(String encode)方法返回一个Charset对象,它代表参数encode指定的编码类型。

        b.ByteBuffer encode(String str):对参数str指定的字符串进行编码,把得到的字节序列存放在一个ByteBuffer对象中,并将其返回。

        c.ByteBuffer encode(CharBuffer cb):对参数cb指定的字符缓冲区中的字符进行编码,把得到的字节序列存放在一个ByteBuffer对象中,并将其返回。

        d.CharBuffer decode(ByteBuffer bb):把参数bb指定的ByteBuffer中的字节序列进行解码,把得到的字符序列存放在一个CharBuffer对象中,并将其返回。

(3).Channel 通道

        通道在Java NIO的开始就做个解释,它是服务端和客户端进行通信的接口;通道Channel用来连接缓冲区与数据源或数据汇(即数据目的地)。如下图所示,数据源的数据经过通道到达缓冲区,缓冲区的数据经过通道到达数据汇。


        但是在实际的开发中,我们基本都是使用Channel的实现类:ServerSocketChannel和SocketChannel,类图如下:


        ServerSocketChannel从SelectableChannel中继承了configureBlocking()和register()方法。ServerSocketChannel是ServerSocket的替代类,也具有负责接收客户连接的accept()方法。

        ServerSocketChannel并没有public类型的构造方法,必须通过它的静态方法open()来创建ServerSocketChannel对象。

        每个ServerSocketChannel对象都与一个ServerSocket对象关联。ServerSocketChannel的socket()方法返回与它关联的ServerSocket对象。

private Selector selector;
private int PORT = 3015;
private ServerSocketChannel serverSocketChannel = null;
// 创建一个Selector对象
selector = Selector.open();
// 创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
// 使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,
// 可以顺利绑定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
// 使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));

        SocketChannel可看作是Socket的替代类,但它比Socket具有更多的功能;SocketChannel不仅从SelectableChannel父类中继承了configureBlocking()和register()方法,而且实现了ByteChannel接口,因此具有用于读写数据的read(ByteBuffer dst)和write(ByteBuffer src)方法。

        SocketChannel没有public类型的构造方法,必须通过它的静态方法open()来创建SocketChannel对象。

private String HOST = "127.0.0.1";
private int PORT = 3015;
private SocketChannel socketChannel = null;
// 创建一个SocketChannel对象
socketChannel = SocketChannel.open();
// 使SocketChannel工作于非阻塞模式
socketChannel.configureBlocking(false);
// InetAddress ia = InetAddress.getLocalHost();
// InetSocketAddress isa = new InetSocketAddress(ia, 3015);
InetSocketAddress isa = new InetSocketAddress(HOST, PORT);
socketChannel.connect(isa);
logger.info("与服务器建立连接成功....");
// 创建一个Selector对象
selector = Selector.open();

        其他不多做解释了,请参看NIO的API。

(4).Selector 选择器

        选择器在Java NIO的开始就做个解释,它是一个多路复用器,所有的通道向它注册事件;只要ServerSocketChannel以及SocketChannel向Selector注册了特定的事件,Selector就会监控这些事件是否发生。

        ServerSocketChannel以及SelectableChannel的register()方法负责注册事件,该方法返回一个SelectionKey对象,该对象是用于跟踪这些被注册事件的句柄。

        在SelectionKey对象的有效期间,Selector会一直监控与SelectionKey对象相关的事件,如果事件发生,就会把SelectionKey对象加入到selected-keys集合中。

        在以下情况,SelectionKey对象会失效,这意味着Selector再也不会监控与它相关的事件。

        a.程序调用SelectionKey的cancel()方法;

        b.关闭与SelectionKey关联的Channel;

        c.与SelectionKey关联的Selector被关闭;

        在SelectionKey中定义了四种事件,分别用4个int类型的常量来表示:

        a.SelectionKey.OP_ACCEPT:接收连接就绪事件,表示服务器监听到了客户连接,服务器可以接收这个连接了,常量值为16。

        b.SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经建立成功,常量值为8。

        c.SelectionKey.OP_READ:读就绪事件,表示通道中已经有了可读数据,可以执行读操作了,常量值为1。

        d.SelectionKey.OP_WRITE:写就绪事件,表示已经可以向通道写数据了,常量值为4。

        最常用的就是后三种事件:连接就绪事件和读写就绪事件;下面就是它们的具体应用啦,请结合实例理解NIO的用法。

 

5.基于NIO的阻塞服务器

        ServerSocketChannel与SocketChannel采用默认的阻塞模式,因此我们用NIO提供的API做一个阻塞服务器,和以前的阻塞服务器做个对比,以加深它们的区别;

        服务端代码:

package com.bijian.study.mina.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.log4j.Logger;

import com.bijian.study.mina.handler.Server05Handler;

/*
 * 使用NIO的ServerSocketChannel创建阻塞的Socket服务端
 * 使用JDK自带的线程池ExecutorService,多线程处理客户端请求
 */
public class EchoServer05 {
	
	private Logger logger = Logger.getLogger(EchoServer05.class);

	private int PORT = 3015;

	private ServerSocketChannel serverSocketChannel = null;

	private ExecutorService executorService; // 线程池

	private static final int POOL_MULTIPLE = 4; // 单个CPU时线程池中的工作线程个数

	public EchoServer05() throws IOException {
		// 创建线程池
		// Runtime的availableProcessors()方法返回当前系统的CPU格式
		// 系统的CPU越多,线程池中工作线程的数目也越多
		executorService = Executors.newFixedThreadPool(Runtime.getRuntime()
				.availableProcessors()
				* POOL_MULTIPLE);
		// ServerSocketChannel并没有public类型的构造方法,
		// 必须通过它的静态方法open()来创建ServerSocketChannel对象
		// 默认是阻塞模式的,通过configureBlocking(false)设置为非阻塞模式
		serverSocketChannel = ServerSocketChannel.open();
		// 使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,
		// 可以顺利绑定到相同的端口
		serverSocketChannel.socket().setReuseAddress(true);
		// 每个ServerSocketChannel对象都与一个ServerSocket对象关联
		// ServerSocketChannel的socket()方法返回与它关联的ServerSocket对象
		serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
		logger.info("服务端启动....   端口号:" + PORT);
	}

	public void service() {
		while (true) { // 阻塞
			SocketChannel socketChannel = null;
			try {
				socketChannel = serverSocketChannel.accept(); // 等待连接
				// 多线程处理
				executorService.execute(new Server05Handler(socketChannel));
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String args[]) throws IOException {
		new EchoServer05().service();
	}
}
        服务器端的业务处理代码:
package com.bijian.study.mina.handler;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.Date;

import org.apache.log4j.Logger;

public class Server05Handler implements Runnable {
	
	private Logger logger = Logger.getLogger(Server05Handler.class);

	private SocketChannel socketChannel;

	public Server05Handler(SocketChannel socketChannel) {
		this.socketChannel = socketChannel;
	}

	public void run() {
		try {
			Socket socket = socketChannel.socket();
			logger.info("一个新的请求达到并创建  " + socket.getInetAddress() + ":" + socket.getPort());
			InputStream socketIn = socket.getInputStream();
			BufferedReader br = new BufferedReader(new InputStreamReader(socketIn));
			OutputStream socketOut = socket.getOutputStream();
			PrintWriter pw = new PrintWriter(socketOut, true);

			String msg = null;
			while ((msg = br.readLine()) != null) {
				logger.info("服务端受到的信息为:" + msg);
				pw.println(new Date()); // 给客户端响应日期字符串
				if (msg.equals("bye"))
					break;
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				if (socketChannel != null)
					socketChannel.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
        客户端代码:
package com.bijian.study.mina.client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

import org.apache.log4j.Logger;

/*
 * 使用NIO的SocketChannel创建阻塞的客户端
 */
public class EchoClient05 {
	
	private Logger logger = Logger.getLogger(EchoClient05.class);

	private String HOST = "localhost";

	private int PORT = 3015;

	private SocketChannel socketChannel;

	public EchoClient05() throws IOException {
		socketChannel = SocketChannel.open();
		// InetAddress ia = InetAddress.getLocalHost();
		InetSocketAddress isa = new InetSocketAddress(HOST, PORT);
		// socketChannel.connect()与远程主机建立连接
		// 默认采用阻塞模式
		socketChannel.connect(isa);
	}

	public void talk() throws IOException {
		try {
			// 通过socketChannel.socket()方法获得与SocketChannel关联的Socket对象,
			// 然后从这个Socket中获得输出流与输入流,再一行行的发送和接受数据。
			// 获得服务端响应信息的输入流
			InputStream socketIn = socketChannel.socket().getInputStream();
			BufferedReader br = new BufferedReader(new InputStreamReader(
					socketIn));
			// 给服务端发送信息的输出流
			OutputStream socketOut = socketChannel.socket().getOutputStream();
			PrintWriter pw = new PrintWriter(socketOut, true);
			BufferedReader localReader = new BufferedReader(
					new InputStreamReader(System.in));
			String msg = null;
			while ((msg = localReader.readLine()) != null) {
				pw.println(msg);
				logger.info(br.readLine());
				if (msg.equals("bye"))
					break;
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				socketChannel.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) throws IOException {
		new EchoClient05().talk();
	}
}
        启动服务端和客户端,测试,无疑是成功的!

        是不是感觉很别扭呀,明明两个ServerSocket和Socket解决的问题,却创建了一大堆的对象。NIO编程刚开始的却是感觉不爽呀,不过慢慢就适应啦。但如果使用Mina框架,你会发现,根本不需要关心什么NIO,它已经给你封装好啦。

 

6.基于NIO的非阻塞服务器

        待完善……

 

7.多线程的基于NIO的非阻塞服务器

        NIO有效解决了多线程服务器存在的线程开销问题,但在使用上略显得复杂一些。许多基于 NIO 的多线程服务器程序往往直接基于选择器(Selector)的 Reactor 模式实现。这种简单的事件机制对于较复杂的服务器应用,显然缺乏扩展性和可维护性, 而且缺乏直观清晰的结构层次。

 

二.异步操作分析

        待完善……

 

三.Mina内部实现分析

        待完善……

 

四.Mina的线程模型配置

        先看官方文档的描述:(这里我就纯粹翻译一下吧,注意Mina的线程模型配置是针对Mina2.0以前的版本而言的,使用2.0以后版本的可以跳过)

1.禁止缺省的ThreadModel设置

        MINA2.0及以后版本已经没有ThreadModel了,如果使用这些版本的话,可以跳过本节。

        ThreadModel设置是在MINA1.0以后引入的,但是使用ThreadModel增加了配置的复杂性,推荐禁止掉缺省的TheadModel配置。

IoAcceptor acceptor = new SocketAcceptor();
SocketAcceptorConfig cfg = new SocketAcceptorConfig();
cfg.setThreadModel(ThreadModel.MANUAL);// 禁止掉ThreadModel的缺省配置

 

2.配置I/O工作线程的数量

        这节只是NIO实现相关的,NIO数据包以及虚拟机管道等的实现没有这个配置。

        在MINA的NIO实现中,有三种I/O工作线程:

        a.Acceptor线程:接受进入连接,并且转给I/O处理器线程来进行读写操作。每一个SocketAcceptor产生一个Acceptor线程,线程的数目不能配置。

        b.Connector线程:尝试连接远程对等机,并且将成功的连接转给I/O处理器线程来进行读写操作。每一个SocketConnector产生一个Connector线程,这个的数目也不可以配置。

        c.I/O处理器线程:执行实际上的读写操作直到连接关闭。每一个SocketAcceptor或SocketConnector都产生它们自己的I/O处理线程。这个数目可以配置,缺省是1。

        因此,对于每个IoService,可以配置的就是I/O处理线程的数目。下面的代码产生一个有四个I/O处理线程的SocketAcceptor:

IoAcceptor acceptor = new SocketAcceptor(4,Executors.newCachedThreadPool());

        没有单凭经验来决定I/O处理线程数目的方法,一般设置为当前服务器CPU个数+1:

IoAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());

 

3.增加一个ExecutorFilter到IoFilterChain中

        ExecutorFilter是一个IoFilter,用于将进入的I/O事件转到一个 java.util.concurrent.Executor实现。事件会从这个Executor转到下一个IoFilter,通常是一个线程池。可以在 IoFilterChain的任何地方增加任意数目的ExecutorFilter,实现任何类型的线程模型,从简单的线程池到复杂的SEDA。

        到现在为止我们还没有增加ExecutorFilter,如果没有增加ExecutorFilter,事件会通过方法调用转到一个 IoHandler,这意味着在IoHandler实现中的业务逻辑会在I/O处理线程里运行。我们叫这种线程模型为"单线程模型"。单线程模型可以用来就会低反应网络应用程序,受CPU限制的业务逻辑(如,游戏服务器)。

        典型的网络应用需要一个ExecutorFilter插入到IoFilterChain中,因为业务逻辑和I/O处理线程有不同的资源使用模式。如果你用IoHandler的实现来执行数据库操作,而没有增加一个ExecutorFilter的话,那么,你整个服务器会在执行数据库操作的时候锁定,特别是数据库性能低的时候。下面的例子配置一个IoService在一个新的IoSession建立时增加一个ExecutorFilter。

cfg.getFilterChain().addLast("threadPool",new ExecutorFilter(Executors.newCachedThreadPool()));

        如果server关闭,则execute也需要关闭。

        使用一个ExecutorFilter通常不意味着要用一个线程池,对于Executor的实现没有任何限制。

 

4.应该把ExecutorFilter放在IoFilterChain的什么地方

        这个要根据于具体应用的情况来定。如果一个应用有一个ProtocolCodecFilter实现和一个常用的有数据库操作的IoHandler实现的话,那么就建议在ProtocolCodecFilter实现的后面增加一个ExecutorFilter,这是因为大部分的协议解码实现的性能特性是受CPU限制的,和I/O处理线程是一样的。

// Add CPU-bound job first
cfg.getFilterChain().addLast(
	"codec",
	new ProtocolCodecFilter(new VamsCodecFactory(Charset.forName("utf-8"), true)));
// and then a thread pool
cfg.getFilterChain().addLast("threadPool",new ExecutorFilter(Executors.newCachedThreadPool()));

 

5.选择IoService的线程池类型时要小心

        Executors.newCachedThreadPool()经常是IoService首选的。因为如果使用其它类型的话,可能会对 IoService产生不可预知的性能方面的影响。一旦池中的所有线程都在使用中,IoService会在向池尝试请求一个线程时开始锁定,然后会出现一个奇怪的性能下降,这有时是很难跟踪的。

 

6.不推荐IoServices和ExecutorFilters共享一个线程池

        你可以想让IoServices和ExecutorFilters共享一个线程池,而不是一家一个。这个是不禁止的,但是会出现很多问题,在这种情况下,除非你为IoServices建立一个缓冲线程池。

 

PS:Socket编程俗称Java网络编程,是Java Web开发的精髓!做J2EE的人可能很少关心多线程,NIO等等这些东西,但是不可否认它却实时与我们打交道,比如常用的web容器Tomcat。

        Mina解析完整代码见附件MinaDemo05.rar。

 

学习资料:http://wenku.baidu.com/link?url=VyKQnsn4b0BDJ8cQlLUu9cvpGz-Iou_499U4lJE9I0s5nPPY5kF5BDd8qo1yRMOiqsM8wxDPEL_S0koiFp8v5y36G9OGJydC2C12juo0bTW

  • 大小: 14.5 KB
  • 大小: 10.9 KB
  • 大小: 9 KB
  • 大小: 29.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics