`
lg_asus
  • 浏览: 184306 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

java NIO中的Selector SelectableChannel SelectionKey

 
阅读更多
推荐参考:
http://rox-xmlrpc.sourceforge.net/niotut/index.html

下面是我写的一个小demo:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author chega
 * 
 *         2012-12-3下午6:41:21
 * 
 * 为每个客户端分配一个线程来执行,执行完后客户端连接并不关闭(即长连接)
 */
public class MyServer
{
	Selector							selector;
	Map<SocketChannel, StringBuilder>	channelMap;
	CharsetDecoder						decoder		= Charset.forName("utf-8").newDecoder();
	ExecutorService						executors	= Executors.newCachedThreadPool();
	private MessageFormat				format		= new MessageFormat("{0, time, medium}, {1}");

	public static void main(String... args)
	{
		try
		{
			new MyServer().init();
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
	}

	private void init() throws IOException
	{
		this.channelMap = Collections.synchronizedMap(new HashMap<SocketChannel, StringBuilder>());

		selector = Selector.open();
		ServerSocketChannel channel = ServerSocketChannel.open();
		channel.configureBlocking(false);
		channel.socket().bind(new InetSocketAddress("localhost", 80));
		print("ServerSocket绑定在本机80端口,等待客户端连接");
		channel.register(selector, SelectionKey.OP_ACCEPT);
		while (true)
		{
			int count = this.selector.select();
			if (count == 0)
			{
				continue;
			}
			Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
			while (it.hasNext())
			{
				final SelectionKey key = it.next();
				it.remove();
				if (!key.isValid())
				{
					continue;
				}
				if (key.isAcceptable())
				{
					this.accept(key);
				}
				else if (key.isReadable())
				{
					this.read(key);
//					Server端用一个线程来处理一个客户端的请求,在read完之后立即向客户端写数据,因此不需要再注册SeletionKey.OP_WRITE
//					由于线程是在线程池中来运行的,因此一定要把当然key的SelectionKey.OP_READ事件给取消掉,否则会多次执行read方法
					key.interestOps(0);
				}
			}
		}
	}

	/**
	 * @param key
	 */
	private void accept(SelectionKey key)
	{
		try
		{
			SocketChannel c = ((ServerSocketChannel) key.channel()).accept();
			c.configureBlocking(false);
			c.register(selector, SelectionKey.OP_READ);
			this.print(c + "连接成功,并注册READ事件");
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
	}

	/**
	 * @param key
	 */
	private void read(final SelectionKey key)
	{
		this.executors.execute(new Runnable() {
			@Override
			public void run()
			{
				try
				{
					int num = -1;
					SocketChannel c = (SocketChannel) key.channel();
					ByteBuffer buffer = ByteBuffer.allocate(1024);
					while (c.isOpen())
					{
						num = c.read(buffer);
						if (num < 1)
						{
							break;
						}
						buffer.flip();
						MyServer.this.put2Map(c, buffer);
						buffer.clear();
					}
					if (num == -1)
					{
						c.close();
						key.channel();
						print(c + "has been closed");
						MyServer.this.channelMap.remove(c);
						return;
					}
					print("从" + c + "读取数据完毕");

					if (c.isOpen() && MyServer.this.channelMap.get(c) != null && MyServer.this.channelMap.get(c).length() != 0)
					{
						MyServer.this.channelMap.get(c).trimToSize();
						String str = "echo: " + MyServer.this.channelMap.get(c);
						c.write(ByteBuffer.wrap(str.getBytes()));
						print("向" + c + "写数据完毕");
						MyServer.this.put2Map(c, null);
//						由于Selector线程中已经取消了OP_READ事件,因此这里再加上OP_READ
						key.interestOps(SelectionKey.OP_READ);
						key.selector().wakeup();
					}
				}
				catch (IOException e)
				{
					e.printStackTrace();
					key.channel();
					try
					{
						key.channel().close();
					}
					catch (IOException e1)
					{
						e1.printStackTrace();
					}
				}
			}
		});
	}

	private void put2Map(SocketChannel channel, ByteBuffer buffer)
	{
		if (this.channelMap.get(channel) == null)
		{
			this.channelMap.put(channel, new StringBuilder());
		}
		if (buffer == null)
		{
			this.channelMap.get(channel).setLength(0);
			return;
		}
		try
		{
			this.channelMap.get(channel).append(decoder.decode(buffer).toString());
		}
		catch (CharacterCodingException e)
		{
			e.printStackTrace();
		}
	}

	private void print(String msg)
	{
		System.out.println(this.format.format(new Object[] { new Date(), msg }));
	}
}



import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Iterator;

/**
 * chega
 * 2012-11-21下午3:19:52
 */

/**
 * @author chega
 * 
 *         2012-11-21下午3:19:52
 * 
 */
public class MyClient
{
	Selector				selector;
	InetSocketAddress		address;
	ByteBuffer				buffer;
	CharsetDecoder			decoder;
	private MessageFormat	format	= new MessageFormat("{0, time, medium}, {1}");
	final String			TEST	= "ntp";

	public static void main(String[] args) throws IOException
	{
		MyClient t = new MyClient();
		t.selector = Selector.open();
		t.address = new InetSocketAddress("localhost", 80);
		t.buffer = ByteBuffer.allocate(1024);
		t.decoder = Charset.forName("utf-8").newDecoder();
		t.run();
	}

	private void run() throws IOException
	{
		this.createNewConnection();
		while (true)
		{
			try
			{
				int selectedCount = selector.select();
				if (selectedCount == 0)
				{
					continue;
				}
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext())
				{
					SelectionKey key = it.next();
					it.remove();// 从己选择键中删除,否则该键一直存在
					if (!key.isValid())
					{
						continue;
					}
					if (key.isConnectable())
					{
						this.connect(key);
					}
					else if (key.isReadable())
					{
						this.read(key);
					}
					else if (key.isWritable())
					{
						this.write(key);
					}
				}
			}
			catch (IOException e)
			{
				e.printStackTrace();
			}
		}
	}

	/**
	 * @param key
	 * @throws IOException
	 * 
	 */
	private void connect(SelectionKey key)
	{
		try
		{
			while (!((SocketChannel) key.channel()).finishConnect())
			{
				try
				{
					Thread.sleep(100);
				}
				catch (InterruptedException e)
				{
					e.printStackTrace();
				}
				continue;
			}
			key.interestOps(SelectionKey.OP_WRITE);
			print(key.channel() + "连接服务器成功,并注册WRITE事件");
		}
		catch (IOException e)
		{
			e.printStackTrace();
			print(key.channel() + "连接失败");
			System.out.println(((SocketChannel) key.channel()).socket().getLocalPort());
			System.exit(-1);
		}
	}

	/**
	 * @param key
	 */
	private void write(SelectionKey key)
	{
		try
		{
			// ((SocketChannel) key.channel()).write(ByteBuffer.wrap("GET / HTTP/1.0\r\n\r\n".getBytes()));
			((SocketChannel) key.channel()).write(ByteBuffer.wrap(TEST.getBytes()));
			key.interestOps(SelectionKey.OP_READ);
			print(key.channel() + "向服务器完数据完毕,并注册READ事件");
		}
		catch (IOException e)
		{
			e.printStackTrace();
			print(key.channel() + "写数据发生错误");
			try
			{
				key.channel().close();
			}
			catch (IOException e1)
			{
				e1.printStackTrace();
			}
		}
	}

	/**
	 * @param key
	 */
	private void read(SelectionKey key)
	{
		try
		{
			int num = -1;
			buffer.clear();
			while (key.channel().isOpen())
			{
				num = ((SocketChannel) key.channel()).read(buffer);
				if (num < 1)
				{
					break;
				}
				buffer.flip();
				String result = this.decoder.decode(buffer).toString();
				System.out.println(result);
				buffer.clear();
			}
//			sleep 5s and write again
			Thread.sleep(5000);
			key.interestOps(SelectionKey.OP_WRITE);
		}
		catch (IOException e)
		{
			e.printStackTrace();
			print(key.channel() + "读取数据发生错误");
			try
			{
				key.channel().close();
			}
			catch (IOException e1)
			{
				e1.printStackTrace();
			}
		}
		catch (InterruptedException e)
		{
			e.printStackTrace();
		}
	}

	private void createNewConnection() throws IOException
	{
		SocketChannel channel = SocketChannel.open();
		channel.configureBlocking(false);
		channel.connect(address);
		channel.register(selector, SelectionKey.OP_CONNECT);
	}

	private void print(String msg)
	{
		System.out.println(this.format.format(new Object[] { new Date(), msg }));
	}
}

分享到:
评论

相关推荐

    浅谈java中nio的使用方式

    NIO其核心概念包括Channel,Selector,SelectionKey,Buffer.

    Java NIO原理和使用

    NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册...

    Java NIO 聊天室 JSwing

    import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import com.nio.user.ClientUser; import ...

    用Java实现非阻塞通信

    java.nio包提供了支持非阻塞通信的类,主要包括: ● ServerSocketChannel:ServerSocket的替代类,支持阻塞通信与非阻塞通信。 ● SocketChannel:Socket的替代类,支持阻塞通信与非阻塞通信。 ● Selector:为...

    NIO-实践-多线程实例

    即需要采用SelectionKey.cancel()从注册的Selector中取消对该Selection的监视,防止同时多个线程获取到SelectionKey的事件 2.注册在向Selector注册通道的时候,如果register方法抛出KeyCancelledException表明,...

    基于Nio的多人聊天Demo

    Selector类似一个调度中心,所有Channel都需要注册到选择器中,并绑定一个SelectionKey,绑定时还会指定要监听的事件,如:连接就绪、读就绪、写就绪等。可以调用Selector提供的API实现对发生监听事件的连接进行处理...

    Java网络编程-Socket-文件传输小案例

    import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.ByteBuffer; import java.io.RandomAccessFile; import java.io.FileOutputStream; import java.io.File; import ...

    封装一套简单易于使用的流式异步通信框架,追求易于理解,方便使用.rar

    具体通信流程 1.客户端连接服务端,服务端通过Selector接收到连接请求,将其socketChannel通道保存到通道集合,并触发客户端连接事件 2.客户端发送数据包请求到服务端,服务端将...若要深入了解,请先了解Java NIO技术

    NIO网络编程

    NIO网络编程 Selector选择器 进行监听,是新连接或已经连接,读写数据 常用方法 public static Selector Open(); 得到一个选择器对象 public int select(long timeout); 监听所有注册通道,存在IO流操作,将对应...

    NIO实现网络聊天室

    1. NIO完成网络编程 1.1 Selector选择器老大 Selector 选择器,网络编程使用NIO的大哥!!! 服务器可以执行一个线程,运行Selector程序,进行监听操作。 新连接, 已经连接, 读取数据,写入数据 Selector常用...

    NIO完成网络编程

    NIO完成网络编程 1. Selector——选择器老大 ... 监听所有注册通道,存在IO流操作是,会将对应的信息SelectionKey存入到内部的集合中,参数是一个超时时间 public Set selectionKeys(); 返回当前Se

    网站开发基础知识

    NioSocket中服务端的处理过程可以分为5部: 1) 创建ServerSocketChannel,并设置相应参数 2) 创建Selector并注册到ServerSocketChannel上 3) 调用Selector的select方法等待请求 4) Selector接收到请求后使用...

Global site tag (gtag.js) - Google Analytics