`

nio 编程实例

    博客分类:
  • java
NIO 
阅读更多
1.编写服务端

package com.boce.nio.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.springframework.util.StringUtils;

public class ServerNio implements Runnable {
//选择器
private Selector selector;

private ServerSocketChannel servChannel;
//设置应答线程池
private ExecutorService exeService = Executors.newFixedThreadPool(ServerConfig.SERVER_WRITE_BOOL);

private boolean stop;

public boolean isStop() {
return stop;
}

public void setStop(boolean stop) {
this.stop = stop;
}

public ServerNio(int port) {
try {
selector = Selector.open();
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
InetSocketAddress endpoint = new InetSocketAddress(port);
servChannel.socket().bind(endpoint, 1024);
//接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
servChannel.register(selector, SelectionKey.OP_ACCEPT);

} catch (Exception e) {

}
}

@Override
public void run() {

while (!stop) {
try {
System.out.println("---------------start");
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> items = set.iterator();

while (items.hasNext()) {
SelectionKey key = items.next();
items.remove();
try{
handleInput(key);

}catch(Exception ex){
if(null !=key){
key.cancel();
}

if(null != key.channel()){
key.channel().close();
}
}
}

} catch (Exception e) {
e.printStackTrace();

}


}

if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}

}




private void handleInput(final SelectionKey key) throws IOException{
if(key.isValid()){
//处理新信息
if(key.isAcceptable()){
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();
if(null !=sc){
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
}
//可以读取信息了
if(key.isReadable()){
SocketChannel sc = (SocketChannel)key.channel();
//每次读取请求字节数组
ByteBuffer byteBuffer = ByteBuffer.allocate(ServerConfig.SERVER_PER_READ);
//每次请求信息
byte[] temp = new byte[ServerConfig.SERVER_TOTAL_READ];//
//合并数组的下标
int tempstart =0;
StringBuffer sb = new StringBuffer(256);
int len = -1;
while((len = sc.read(byteBuffer)) >0){
//设置byteBuffer的position 为0,从头开始读取
byteBuffer.flip();
int tLen =byteBuffer.remaining();
byte[] bytes = new byte[tLen];
//数据读入bytes数组
byteBuffer.get(bytes);
int end = tempstart+tLen;
//把bytes复制到temp
System.arraycopy(bytes,0,temp,tempstart,tLen);
tempstart = end;
//清除byteBuffer
byteBuffer.clear();
}
sb.append(new String(temp,"UTF-8"));
System.out.println("read ==>"+sb.toString());

sb.insert(0, "server is back==>");
ServerWriteHandle serverWriteHandle = new ServerWriteHandle(sc, sb.toString());
exeService.execute(serverWriteHandle);

if(len <= 0){
//关闭链端
if(null != key && key.isValid()){
key.cancel();
}
// if(null !=sc && sc.isOpen()){
// sc.close();
// }
}


}


}//isValid

}


private void writeMsg(SocketChannel sc,final String msg) throws IOException{
//模拟后台执行

try {
Random dom = new Random();
int time = dom.nextInt(300);
Thread.sleep(time);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}


if(!StringUtils.isEmpty(msg)){
byte[] bMsg = msg.getBytes(Charset.forName("UTF-8"));
ByteBuffer byteBuffer = ByteBuffer.allocate(bMsg.length);
byteBuffer.put(bMsg);
byteBuffer.flip();

sc.write(byteBuffer);
}

}




}



package com.boce.nio.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Random;

import org.springframework.util.StringUtils;
/**
* 发送应答信息
* @author gjp
*
*/
public class ServerWriteHandle implements Runnable{

private SocketChannel sc;

private String msg ;



public ServerWriteHandle(SocketChannel sc, String msg) {
super();
this.sc = sc;
this.msg = msg;
}



@Override
public void run() {


try {
Random dom = new Random();
int time = dom.nextInt(30);
Thread.sleep(time);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}


if(!StringUtils.isEmpty(msg)){
byte[] bMsg = msg.getBytes(Charset.forName("UTF-8"));
ByteBuffer byteBuffer = ByteBuffer.allocate(bMsg.length);
byteBuffer.put(bMsg);
byteBuffer.flip();

try {
sc.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

}


package com.boce.nio.server;

/**
* nio server端通用参数
* @author gjp
*
*/
public class ServerConfig {
//写线程池数量
public static final int SERVER_WRITE_BOOL = 300;
//每次读取字节数
public static final int SERVER_PER_READ = 256;
//一次请求读取总字节数(byte)
public static final int SERVER_TOTAL_READ = 1024;
}


编写启动程序:

package com.boce.nio.server;

import java.util.concurrent.Executors;

public class ServerStart {

public static void main(String[] args) {
ServerNio serverNio = new ServerNio(9999);
Executors.newFixedThreadPool(1).execute(serverNio);
}

}



2.编写客户端:

package com.boce.nio.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ClientNio implements Runnable {

private String host;
private int port;

private Selector selector;
private SocketChannel socketChannel;

private volatile boolean stop;

private int step;

public ClientNio(String host, int port,int step) {
try {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
this.selector = Selector.open();
this.socketChannel = SocketChannel.open();
this.socketChannel.configureBlocking(false);

this.step = step;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

@Override
public void run() {
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}

while (!stop) {
try {
System.out.println("---------------start");
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> items = set.iterator();
SelectionKey key = null;
while (items.hasNext()) {
key = items.next();
items.remove();
try {
handleInput(key);
} catch (Exception ex) {
if (null != key) {
key.cancel();
}

if (null != key.channel()) {
key.channel().close();
}
}
}

} catch (Exception e) {
e.printStackTrace();
System.exit(0);

}

}


if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}

}

private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
//如果已经连接
if(key.isConnectable()){
if(sc.finishConnect()){
sc.register(selector, SelectionKey.OP_READ);
System.out.println("----------------handleInput***************doWrite");
doWrite(sc);
}else{
System.out.println("连接失败");
System.exit(1);
}
}


if (key.isReadable()) {
// SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readLen = sc.read(byteBuffer);
if (readLen > 0) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);

String body = new String(bytes, "UTF-8");
System.out.println("read sever back msg :: " + body);

stop = true;

                               // 关闭链端(如果不关闭,客户端最大连接数16286,之后开始报异常)
if(null !=key){
key.cancel();
}
if(null !=sc){
sc.close();
}


} else if (readLen < 0) {
// 关闭链端
if(null !=key){
key.cancel();
}
if(null !=sc){
sc.close();
}
} else {
System.out.println("读取数据 为0");
;// 读取数据为0;忽略
}

}

} // isValid

}

private void doConnect() throws IOException {
// 如果注册成功,注册到多路选择器,发送信息,读取信息
boolean isConn = socketChannel.connect(new InetSocketAddress(host, port));
if (isConn) {
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("doconnect .....");
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}

}

private void doWrite(SocketChannel sc) throws IOException {

String send ="requst time 请求===写入客户端信息内容************************************==========================="+step;
System.out.println("发送信息:"+send);
byte[] msg = send.getBytes("UTF-8");
ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length);
byteBuffer.put(msg);
String position1 ="position"+ byteBuffer.position()+";capacity="+byteBuffer.capacity()+
";limit="+byteBuffer.limit()+";mark"+byteBuffer.mark()+
";remain"+byteBuffer.hasRemaining();
System.out.println(position1);

byteBuffer.flip();
String position2 = byteBuffer.position()+";capacity="+byteBuffer.capacity()+
";limit="+byteBuffer.limit()+";mark"+byteBuffer.mark()+
";remain"+byteBuffer.hasRemaining();
System.out.println(position2);
sc.write(byteBuffer);

if (!byteBuffer.hasRemaining()) {
System.out.println(" send order 2 success");
}
}

}


客户端启动程序:
package com.boce.nio.server;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class ClientStart {

public static void main(String[] args) {
String host="127.0.0.1";
int port =9999;
AtomicInteger ac = new AtomicInteger(0);
ExecutorService exe = Executors.newFixedThreadPool(100);
for(int i=0;i<1;i++){
ClientNio client = new ClientNio(host, port,ac.getAndIncrement());
exe.execute(client);
}

exe.shutdown();

}

}


测试发送记录条数:



  • 大小: 38 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics