`
to_zoe_yang
  • 浏览: 139370 次
  • 性别: Icon_minigender_2
  • 来自: 01
社区版块
存档分类
最新评论

非阻塞通信

阅读更多
   最近看孙老师的《Java网络编程精解》,读到非阻塞通信!感觉框架的重要性!服务器端和客户端的开发可以有框架遵循!开始写歌简单的,然后逐渐添加功能!

package nonblock;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;

public class MyServer {
	
	private ServerSocketChannel ssc ;
	private Selector selector ;
	private int port = 8000;
	private Charset charset = Charset.forName("GBK");
	public static int num = 1;
	
	public MyServer() throws IOException{
		selector = Selector.open();
		ssc = ServerSocketChannel.open();
		ssc.socket().setReuseAddress(true);
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(port));
		System.out.println("Serve 启动了!");
	}
	
	public void server() throws IOException{
		ssc.register(selector, SelectionKey.OP_ACCEPT);
		while(selector.select()>0){
			Set keySet = selector.selectedKeys();
			Iterator iter = keySet.iterator();
			while(iter.hasNext()){
				SelectionKey key = (SelectionKey)iter.next();
				//一定要remove,否则会一直存在
				iter.remove();
				if(key.isAcceptable()){
					ServerSocketChannel s = (ServerSocketChannel)key.channel();
					SocketChannel sc = (SocketChannel)s.accept();
					System.out.println("服务器端接收到连接...");
					System.out.println("客户端信息"+sc.socket().getLocalAddress()+":"+sc.socket().getPort());
					sc.configureBlocking(false);
					ByteBuffer buffer = ByteBuffer.allocate(1024);
					sc.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, buffer);
					System.out.println("服务器完成注册...");
				}
				if(key.isReadable()){
				 //        receive(key);
				//	System.out.println("服务器端可以读取信息...");
				}
				if(key.isWritable()){
				//	System.out.println("服务器端可以写入信息...");
				}
			}
		}
	}
	
	public void receive(SelectionKey key) throws IOException {
		ByteBuffer buffer = (ByteBuffer)key.attachment();
		SocketChannel socketChannel = (SocketChannel)key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(2);
		socketChannel.read(readBuffer);
		readBuffer.flip();
		
		System.out.println((num++)+"From client:"+charset.decode(readBuffer).toString());
		buffer.limit(readBuffer.capacity());
		buffer.put(readBuffer);
		
	}
	
	public void send(SelectionKey key) throws IOException {
		ByteBuffer buffer = (ByteBuffer)key.attachment();
		String str = charset.decode(buffer).toString();
		
		if(str.indexOf("\r\n")==-1)
			return ;
		String output = str.substring(0, str.indexOf("\r\n"));
		ByteBuffer outBuffer = charset.encode(output);
		SocketChannel socketChannel = (SocketChannel)key.channel();
		while (outBuffer.hasRemaining())
			socketChannel.write(outBuffer);
		
		if (output.equals("bye\r\n")) {
			key.cancel();
			socketChannel.close();
			System.out.println("关闭与客户的连接");
		}
	}
	
	public static void main(String[] arg){
		try {
			new MyServer().server();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}



客户端

package nonblock;
import java.net.*;
import java.nio.channels.*;
import java.nio.*;
import java.io.*;
import java.nio.charset.*;
import java.util.*;

public class MyClient {
	private SocketChannel socketChannel = null;
	private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
	private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
	private Charset charset = Charset.forName("GBK");
	private Selector selector;
	
	public MyClient() throws IOException {
		socketChannel = SocketChannel.open();
		InetAddress ia = InetAddress.getLocalHost();
		InetSocketAddress isa = new InetSocketAddress(ia, 8000);
		socketChannel.connect(isa);
		socketChannel.configureBlocking(false);
		System.out.println("与服务器的连接建立成功");
		selector = Selector.open();
	}
	
	public static void main(String args[]) throws IOException {
		final MyClient client = new MyClient();
//		Thread receiver = new Thread() {
//			public void run() {
//				client.receiveFromUser();
//			}
//		};
//
//		receiver.start();
		client.talk();
	}

	private void talk() throws IOException {
		// TODO Auto-generated method stub
		socketChannel.register(selector, SelectionKey.OP_READ
				| SelectionKey.OP_WRITE);
		while(selector.select()>0){
			Set keySet = selector.selectedKeys();
			Iterator iter = keySet.iterator();
			while(iter.hasNext()){
				SelectionKey key = (SelectionKey)iter.next();
				iter.remove();
				if(key.isWritable()){
				//	send(key);
					System.out.println("客户端可写...");
				}
				if(key.isReadable()){
					System.out.println("客户端可读...");
				}		
			}			
		}
	}
	
	public void send(SelectionKey key) throws IOException {
		
		
		SocketChannel socketChannel = (SocketChannel)key.channel();
		String input ;
		BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
		
		Charset charset = Charset.forName("GBK");
		
		while((input=br.readLine())!=null){
			ByteBuffer buffer = ByteBuffer.allocate(2);
			ByteBuffer outBuffer = ByteBuffer.allocate(2);
			buffer= charset.encode(input+"\r\n");
		//	outBuffer.put(buffer);
		//	buffer.flip();
			socketChannel.write(buffer);
			buffer.flip();
			System.out.println("I say:"+ charset.decode(buffer).toString());
		}	
	}
}


运行后,服务器端显示:
Serve 启动了!
此时运行客户端,服务器端显示:
服务器端接收到连接...
客户端信息/172.30.0.8:1766
服务器完成注册...
服务器端可以写入信息...
服务器端可以写入信息...
.
.
.
而客户端一直循环显示:
客户端可写...
客户端可写...
客户端可写...
.
.
.

当服务器端运行后,便首先使用register方法,是的当前的ServerSocketChannel可以进行OP_ACCEPT,此时运行客户端,则客户端便SocketChannel使用register方法,是的可以Selector可以对OP-READ和OP_WRITE进行等待并执行,与此同时,服务器发现客户端有链接,激活了Selectot的OP_ACCEPT,便建立连接,使用accept接收了SocketChannel的连接,此后SocketChannel便成了服务器端与客户端操作的通道!此时服务器端和当前的客户端都有一个SocketChannel,而SocketChannel是联通的!而服务器端和客户端此时共同维持一个缓冲区,可以对这个缓冲区进行读写操作。而这个缓冲区就是SocketChannel的register操作时的产生的!
个人理解,如有不对的,望大家指出!
分享到:
评论
2 楼 to_zoe_yang 2011-03-27  
试验了下,都不执行操作的时候,服务器端和客户端都是 可写状态,不可写,
因为此时客户端循环显示:客户可写;服务器端显示:服务器可写

但是如果此时断了客户端,服务器轮流显示:服务器可写和服务器可读

现在我觉得,客户端和服务器端是全双工的,有两个通道!
服务器端一个负责写,对应的客户端一个负责读;
服务器端另一个负责读,对应的客户端一个负责写!
而register时的attachment则是用于自己内部通信的!

想明白了!
呵呵~~
不错
今天有成就!

当然,哪一天如果发现自己这个错误了,希望大家指正啊!
可不敢误导别人啊!
1 楼 to_zoe_yang 2011-03-27  
服务器端
package nonblock;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;

public class MyServer {
	
	private ServerSocketChannel ssc ;
	private Selector selector ;
	private int port = 8000;
	private Charset charset = Charset.forName("GBK");
	public static int num = 1;
	
	public MyServer() throws IOException{
		selector = Selector.open();
		ssc = ServerSocketChannel.open();
		ssc.socket().setReuseAddress(true);
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(port));
		System.out.println("Serve 启动了!");
	}
	
	public void server() throws IOException{
		ssc.register(selector, SelectionKey.OP_ACCEPT);
		while(selector.select()>0){
			Set keySet = selector.selectedKeys();
			Iterator iter = keySet.iterator();
			while(iter.hasNext()){
				SelectionKey key = (SelectionKey)iter.next();
				//一定要remove,否则会一直存在
				iter.remove();
				if(key.isAcceptable()){
					ServerSocketChannel s = (ServerSocketChannel)key.channel();
					SocketChannel sc = (SocketChannel)s.accept();
					System.out.println("服务器端接收到连接...");
					System.out.println("客户端信息"+sc.socket().getLocalAddress()+":"+sc.socket().getPort());
					sc.configureBlocking(false);
					ByteBuffer buffer = ByteBuffer.allocate(1024);
					sc.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, buffer);
					System.out.println("服务器完成注册...");
				}
				if(key.isReadable()){
				//	receive(key);
				System.out.println("服务器端可以读取信息...");
				}
				if(key.isWritable()){
				//	send1(key);
					System.out.println("服务器端可以写入信息...");
				}
			}
		}
	}
	
	public void receive(SelectionKey key) throws IOException {
//		ByteBuffer buffer = (ByteBuffer)key.attachment();
//		SocketChannel socketChannel = (SocketChannel)key.channel();
//		ByteBuffer readBuffer = ByteBuffer.allocate(2);
//		socketChannel.read(readBuffer);
//		readBuffer.flip();
//		
//		System.out.println((num++)+"From client:"+charset.decode(readBuffer).toString());
//		buffer.limit(readBuffer.capacity());
//		buffer.put(readBuffer);
		System.out.println("Reading");
		
	}
	
	public void send1(SelectionKey key) throws IOException {
		
		
		SocketChannel socketChannel = (SocketChannel)key.channel();
		String input = "Hello,Client22";
		
			ByteBuffer buffer = ByteBuffer.allocate(200);
			
			buffer= charset.encode(input+"\r\n");
		//	outBuffer.put(buffer);
		//	buffer.flip();
			socketChannel.write(buffer);
			buffer.flip();
			System.out.println("I say:"+ charset.decode(buffer).toString()+","+buffer.position()+","+buffer.limit());
		
			buffer.flip();
	}
	
	public void send(SelectionKey key) throws IOException {
		ByteBuffer buffer = (ByteBuffer)key.attachment();
		String str = charset.decode(buffer).toString();
		
		if(str.indexOf("\r\n")==-1)
			return ;
		String output = str.substring(0, str.indexOf("\r\n"));
		ByteBuffer outBuffer = charset.encode(output);
		SocketChannel socketChannel = (SocketChannel)key.channel();
		while (outBuffer.hasRemaining())
			socketChannel.write(outBuffer);
		
		if (output.equals("bye\r\n")) {
			key.cancel();
			socketChannel.close();
			System.out.println("关闭与客户的连接");
		}
	}
	
	public static void main(String[] arg){
		try {
			new MyServer().server();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}



客户端

package nonblock;
import java.net.*;
import java.nio.channels.*;
import java.nio.*;
import java.io.*;
import java.nio.charset.*;
import java.util.*;

public class MyClient {
	private SocketChannel socketChannel = null;
	private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
	private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
	private Charset charset = Charset.forName("GBK");
	private Selector selector;
	
	public MyClient() throws IOException {
		socketChannel = SocketChannel.open();
		InetAddress ia = InetAddress.getLocalHost();
		InetSocketAddress isa = new InetSocketAddress(ia, 8000);
		socketChannel.connect(isa);
		socketChannel.configureBlocking(false);
		System.out.println("与服务器的连接建立成功");
		selector = Selector.open();
	}
	
	public static void main(String args[]) throws IOException {
		final MyClient client = new MyClient();
//		Thread receiver = new Thread() {
//			public void run() {
//				client.receiveFromUser();
//			}
//		};
//
//		receiver.start();
		client.talk();
	}

	private void talk() throws IOException {
		// TODO Auto-generated method stub
		socketChannel.register(selector, SelectionKey.OP_READ
				| SelectionKey.OP_WRITE);
		while(selector.select()>0){
			Set keySet = selector.selectedKeys();
			Iterator iter = keySet.iterator();
			while(iter.hasNext()){
				SelectionKey key = (SelectionKey)iter.next();
				iter.remove();
				if(key.isWritable()){
			//		send(key);
					System.out.println("客户端可写...");
				}
				if(key.isReadable()){
			//		receive(key);
					System.out.println("客户端可读...");
				}		
			}			
		}
	}
	
	public void receive(SelectionKey key) throws IOException {
		
		SocketChannel socketChannel = (SocketChannel)key.channel();
		ByteBuffer readBuffer = ByteBuffer.allocate(150);
		ByteBuffer buffer = ByteBuffer.allocate(20);
		socketChannel.read(readBuffer);
		readBuffer.flip();
		
		System.out.println(("From server:"+charset.decode(readBuffer).toString()));
		System.out.println("Cap:"+readBuffer.capacity());
		
		readBuffer.flip();
		
		buffer.put(readBuffer);
		System.out.println(("From"+charset.decode(buffer).toString()));
	}
	
	public void send(SelectionKey key) throws IOException {
		
		
		SocketChannel socketChannel = (SocketChannel)key.channel();
		String input ;
		BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
		
		Charset charset = Charset.forName("GBK");
		
		while((input=br.readLine())!=null){
			ByteBuffer buffer = ByteBuffer.allocate(2);
			ByteBuffer outBuffer = ByteBuffer.allocate(2);
			buffer= charset.encode(input+"\r\n");
		//	outBuffer.put(buffer);
		//	buffer.flip();
			socketChannel.write(buffer);
			buffer.flip();
			System.out.println("I say:"+ charset.decode(buffer).toString());
		}	
	}
}

相关推荐

Global site tag (gtag.js) - Google Analytics