`
sunnylocus
  • 浏览: 869654 次
  • 性别: Icon_minigender_1
  • 来自: 美国图森
社区版块
存档分类
最新评论

非阻塞通信

    博客分类:
  • Java
阅读更多

     对于用ServerSocket和Socket写的服务器程序或着客户端程序,在运行的时候常常会阻塞,如当一个线程执行ServerSocket的accept()方法,如果没有客户机连接,该线程就会一直阻塞直到有了客户机连接才从accept()方法返回,再如,当线程执行Socket的read()方法,如果输入流中没有数据,该线程就会一直等到有数据可读时才从read()方法返回。

    如果服务器要与多个客户机通信,通常做法为每个客户机连接开启一个服务线程,每个工作线程都有可能经常处于长时间的阻塞状态。

    从JDK1.4版本开始,引入了非阻塞的通信机制。服务器程序接收客户连接、客户程序建立与服务器的连接,以及服务器程序和客户程序收发数据的操作都可以按非阻塞的方式进行。服务器程序只需要创建一个线程,就能完成同时与多个客户通信的任务。

    非阻塞通信要比传统的阻塞方式效率要高,Apache的MIMA框架就是以java.nio包中的类编写的。

    不知道是否有朋友看过 孙卫琴写的《Java网络编程精解》,在提到线程阻塞的时

Java网络编程精解 第84页 写道
线程从Socket的输入流读入数据时,如果没有足够的数据,就会进入阻塞状态,直到读到了足够的数据,或者到达输入流的未尾,或者出现了异常,才从输入流的read()方法返回或异常中断。输入流中有多少数据才算足够呢,这要看线程执行read方法的类型。
    int read():只要输入有一个字节,就算足够。
    int read(byte[] buff):只要输入流中的字节数目与参数buff数组的长度相同,就算足够。

 我对描红的描述持不同的意见

  byte[] msgBytes = new byte[512];
  inputStream.read(msgBytes); 

如果按书中描述,这行代码必须读到512个字节后才从阻塞状态中返回,如果没有读到足够的512个字节,则一直阻塞。但实际情况却不是这样的,只要流里哪怕只有一个字节 ,inputStream.read(msgBytes)也会立即返回,返回值为读到的字节数。

下面是简单的NIO示例

1、服务端程序,非阻塞方式

package com.bill99.nioserver;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.charset.Charset;
import java.util.Iterator;

public class NIOServer {
	private Selector socketSelector = null;
	private ServerSocketChannel ssChannel = null;
	private SocketChannel socketChannel =null;
	private static SelectionKey key = null;
	private int port =5512;
	private int backlog = 100;
	private Charset charset = Charset.defaultCharset();
	private ByteBuffer shareBuffer = ByteBuffer.allocate(1024);//生成1kb的缓冲区,可以根据实际情况调的更大些
	
	public NIOServer()  {
		try {
			socketSelector = SelectorProvider.provider().openSelector();
			ssChannel =ServerSocketChannel.open() ;
			ssChannel.socket().bind(new InetSocketAddress(port),100);
			System.out.println(String.format("NIO服务器启动,监听端口%1$s,最大连接数%2$s", port,backlog));
		} catch(IOException e){
			throw new ExceptionInInitializerError(e);
		}
	}
	/**
	 * 接收客户端连接
	 */
	public void acceptConnect() {
		while(true) {
			try {
				SocketChannel socketChannel = ssChannel.accept();//阻塞模式,直到有连接进入
				System.out.println("收到客户机连接,来自:"+ssChannel.socket().getInetAddress());
				socketChannel.configureBlocking(false);//设置非阻塞模式
				synchronized(this){
					socketSelector.wakeup();
					socketChannel.register(socketSelector,SelectionKey.OP_READ|
														  SelectionKey.OP_WRITE);
				}
			} catch(IOException e){e.printStackTrace();}
		}
	}
	/**
	 * 读写服务
	 * @throws IOException
	 */
	public void service() throws IOException{
		while (true) {
			synchronized (this) {//空的同步块,目的是为了避免死锁
			}
			if (!(socketSelector.select() > 0)) {
				continue;
			}
			Iterator<SelectionKey> it = socketSelector.selectedKeys().iterator();
			while (it.hasNext()) {
				key = it.next();
				it.remove();
				if(key.isReadable()) {// 读就绪
					this.readDataFromSocket(key);
				}
				if(key.isWritable()){//写就绪
					this.sayWelcome(key);
				}
			}
		}
	}
	//读取客户机发来的数据
	private void readDataFromSocket(SelectionKey key) throws IOException {
		shareBuffer.clear();//清空buffer
		socketChannel=(SocketChannel) key.channel();
		int num=0;
		while((num = socketChannel.read(shareBuffer))>0){
			shareBuffer.flip();//将当前极限设置为位置,并把设置后的位置改为0
		}
		if(num ==-1){//读取流的未尾,对方已关闭流
			socketChannel.close();
			return;
		}
		System.out.println("client request:"+charset.decode(shareBuffer).toString());
	} 
	//向客户机发响应信息
	private void sayWelcome(SelectionKey key) throws IOException {
		shareBuffer.clear();//清空buffer
		socketChannel=(SocketChannel) key.channel();
		shareBuffer.put("Welcome to china!this is a greate and very beautifual country!\n".getBytes());
		shareBuffer.flip();//将当前极限设置为位置,并把设置后的位置改为0
		socketChannel.write(shareBuffer);
	}
	//Main方法
	public static void main(String[] args) {
		final NIOServer server = new NIOServer();
		Runnable task = new  Runnable() {
			public void run() {
				server.acceptConnect();
			}
		};
		new Thread(task).start();//启动处理客户机连接的线程
		try {
			server.service();
		} catch (IOException e) {//发生IO流异常时,关闭对应的socket
			try {
				key.channel().close();
				key.cancel();
			} catch (IOException e1) {
				e1.printStackTrace();
			}
		}
	}
}

 2、客户端程序,阻塞方式

package com.bill99.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.CharBuffer;

import javax.net.SocketFactory;
//测试类
public class BlockingClient {
	private Socket socket = null;
	private OutputStream out = null;
	private InputStream in = null;
	
	public BlockingClient() {
		try {
			socket= SocketFactory.getDefault().createSocket("127.0.0.1", 5512);
			out = socket.getOutputStream();
			in = socket.getInputStream();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	//发送请求并接收应答
	public String receiveRespMsg(String reqMsg) throws IOException{
		out.write(reqMsg.getBytes());
		out.flush();
		in = socket.getInputStream();
		int c =0;
		CharBuffer buffer = CharBuffer.allocate(1024);
		while((c=in.read())!=-1 && c!=10){
			buffer.put((char)c);
		}
		return new String(buffer.array()).trim();
	}
	
	public static void main(String[] args) throws Exception{
		BlockingClient client = new BlockingClient();
		System.out.println("服务器响应:"+client.receiveRespMsg("hello\n"));
	}
}

 

    总体而信,阻塞模式和非阻塞模式都可以同时处理多个客户机的连接,但阻塞模式需要较多的线程许多时间都浪费在阻塞I/O操作上,Java虚拟机需要频繁地转让CPU的使用权,而非阻塞模式只需要少量线程即可完成所有任务,非阻塞模式能更有效的利用CPU,系统开销小,能够提高程序的并发性能。

 

4
0
分享到:
评论
2 楼 to_zoe_yang 2011-01-14  
不错~
写的很好
1 楼 gsb 2009-12-31  

相关推荐

Global site tag (gtag.js) - Google Analytics