Java NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,如果没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。
Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。
![](http://dl.iteye.com/upload/attachment/598672/428eea1f-27fb-3c51-91f0-3a112a20490b.jpg)
图 1 类结构图
package cn.chenkangxian.nioconcurrent;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;
/**
* @Project: testNio
*
* @Author: chenkangxian
*
* @Annotation: 使用线程池来处理大量channel并发
*
* @Date:2011-7-5
*
* @Copyright: 2011 chenkangxian, All rights reserved.
*
*/
public class SelectSocketsThreadPool extends SelectSockets {
private static final int MAX_THREADS = 5;
private ThreadPool pool = new ThreadPool(MAX_THREADS);
/**
* 从socket中读数据
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
WorkerThread worker = pool.getWorker();
if (worker == null) {
return;
}
worker.serviceChannel(key);
}
/**
*
* @Project: concurrentnio
*
* @Author: chenkangxian
*
* @Annotation:线程池
*
* @Date:2011-7-20
*
* @Copyright: 2011 chenkangxian, All rights reserved.
*
*/
private class ThreadPool {
List idle = new LinkedList();
/**
* 线程池初始化
*
* @param poolSize 线程池大小
*/
ThreadPool(int poolSize) {
for (int i = 0; i < poolSize; i++) {
WorkerThread thread = new WorkerThread(this);
thread.setName("Worker" + (i + 1));
thread.start();
idle.add(thread);
}
}
/**
* 获得工作线程
*
* Author: chenkangxian
*
* Last Modification Time: 2011-7-20
*
* @return
*/
WorkerThread getWorker() {
WorkerThread worker = null;
synchronized (idle) {
if (idle.size() > 0) {
worker = (WorkerThread) idle.remove(0);
}
}
return (worker);
}
/**
* 送回工作线程
*
* Author: chenkangxian
*
* Last Modification Time: 2011-7-20
*
* @param worker
*/
void returnWorker(WorkerThread worker) {
synchronized (idle) {
idle.add(worker);
}
}
}
private class WorkerThread extends Thread {
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private ThreadPool pool;
private SelectionKey key;
WorkerThread(ThreadPool pool) {
this.pool = pool;
}
public synchronized void run() {
System.out.println(this.getName() + " is ready");
while (true) {
try {
this.wait();//等待被notify
} catch (InterruptedException e) {
e.printStackTrace();
this.interrupt();
}
if (key == null) {//直到有key
continue;
}
System.out.println(this.getName() + " has been awakened");
try {
drainChannel(key);
} catch (Exception e) {
System.out.println("Caught '" + e + "' closing channel");
try {
key.channel().close();
} catch (IOException ex) {
ex.printStackTrace();
}
key.selector().wakeup();
}
key = null;
this.pool.returnWorker(this);
}
}
synchronized void serviceChannel(SelectionKey key) {
this.key = key;
//消除读的关注
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
this.notify();
}
void drainChannel(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
int count;
buffer.clear();
while ((count = channel.read(buffer)) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
buffer.clear();
}
if (count < 0) {
channel.close();
return;
}
//重新开始关注读事件
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
key.selector().wakeup();
}
}
public static void main(String[] args) throws Exception {
new SelectSocketsThreadPool().go(args);
}
}
package cn.chenkangxian.nioconcurrent;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
*
* @Project: concurrentnio
*
* @Author: chenkangxian
*
* @Annotation:
*
* @Date:2011-7-11
*
* @Copyright: 2011 chenkangxian, All rights reserved.
*
*/
public class SelectSockets {
public static int PORT_NUMBER = 1234;
private ByteBuffer buffer = ByteBuffer.allocate(1024);
public static void main(String[] args) throws Exception {
new SelectSockets().go(args);
}
public void go(String[] args) throws Exception{
int port = PORT_NUMBER;
// if(args.length > 0){
// port = Integer.parseInt(args[0]);
// }
// System.out.println("Listening on port " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
Selector selector = Selector.open();
serverSocket.bind(new InetSocketAddress(port));
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select(); //没有轮询,单个selector
if(n == 0){
continue;
}
Iterator it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey key = (SelectionKey)it.next();
if(key.isAcceptable()){
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
SocketChannel channel = server.accept();
registerChannel(selector,channel
,SelectionKey.OP_READ);
sayHello(channel);
}
if(key.isReadable()){
readDataFromSocket(key);
}
it.remove();
}
}
}
/**
* 在selector上注册channel,并设置interest
*
* Author: chenkangxian
*
* Last Modification Time: 2011-7-11
*
* @param selector 选择器
*
* @param channel 通道
*
* @param ops interest
*
* @throws Exception
*/
protected void registerChannel(Selector selector,
SelectableChannel channel, int ops) throws Exception{
if(channel == null){
return ;
}
channel.configureBlocking(false);
channel.register(selector, ops);
}
/**
* 处理有可用数据的通道
*
* Author: chenkangxian
*
* Last Modification Time: 2011-7-11
*
* @param key 可用通道对应的key
*
* @throws Exception
*/
protected void readDataFromSocket(SelectionKey key) throws Exception{
SocketChannel socketChannel = (SocketChannel)key.channel();
int count;
buffer.clear(); //Empty buffer
while((count = socketChannel.read(buffer)) > 0){
buffer.flip();
while(buffer.hasRemaining()){
socketChannel.write(buffer);
}
buffer.clear();
}
if(count < 0){
socketChannel.close();
}
}
/**
* 打招呼
*
* Author: chenkangxian
*
* Last Modification Time: 2011-7-11
*
* @param channel 客户端channel
*
* @throws Exception
*/
private void sayHello(SocketChannel channel) throws Exception{
buffer.clear();
buffer.put("Hello 哈罗! \r\n".getBytes());
buffer.flip();
channel.write(buffer);
}
}
![点击查看原始大小图片](http://dl2.iteye.com/upload/attachment/0059/8672/428eea1f-27fb-3c51-91f0-3a112a20490b-thumb.jpg)
- 大小: 28.7 KB
分享到:
相关推荐
Java NIO反应器模式讲解,目前热门的Java网络通信框架中Mina,Netty等都采用NIO
NULL 博文链接:https://navylee.iteye.com/blog/914195
java解读NIOSocket非阻塞模式宣贯.pdf
NIO非阻塞通讯模式!NIO非阻塞通讯模式!
ScalableIOJava中文版本的,跟着自己的理解翻译了一下,是编写NIO的作者的一个文档
jdk供的无阻塞I/O(NIO)有效解决了多线程服务器存在的线程开销问题,但在使用上略显得复杂一些。在NIO中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务...非阻塞的NIO有何神秘之处?直接上代码!
从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式.doc
NIO的工作方式
java NIO Selector选择器简介.pdf
socket通信nio模式有很多实现方式,但是在性能上、资源上一般很少考虑,这里封装了一个性能极强的程序。
NIO 入门笔记 Reactor模式概念介绍
Java NIO系列教程(一) Java NIO 概述 Java NIO系列教程(二) Channel Java NIO系列教程(三) Buffer Java NIO系列教程(四) Scatter/Gather Java NIO系列教程(五) 通道之间的数据传输 Java NIO系列教程(六)...
Reactor模式和NIO Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定
Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java...Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。
CS模式聊天程序代码,实现语言为Java,前端UI界面用Java Swing框架实现,服务端与客户端通信采用Java NIO,自定义按分隔符\n读取消息的消息读取格式解决TCP粘包拆包问题。
netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty开发之nio netty...
java nio编程 非阻塞模式的通信 电子书 带目录标签
NIO入门.chm NIO入门.chm NIO入门.chm
IO和NIO区别 Java 中的 IO 和 NIO 是两个不同的输入/输出机制,它们之间有许多区别。下面我们将详细讲解 IO 和 NIO 的区别。...在选择 IO 或 NIO 时,需要考虑具体的应用场景和需求,并根据实际情况选择合适的机制。