`
zjxs_sky100
  • 浏览: 28225 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

NIO 简单实现的服务端和客户端通信

    博客分类:
  • java
NIO 
阅读更多
服务端代码如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

/**
* @author sky 
* @date 2015-6-27 下午2:56:34
*/
public class MultiTimeServer implements Runnable {

/**
* 服务端选择器
*/
private Selector selector;

/**
* 服务端套接字通道
*/
private ServerSocketChannel serverSocketChannel;

/**
* 消息内容编码
*/
private Charset UTF_8 = Charset.forName("utf-8");

/**
* 是否停止工作标志
*/
private volatile boolean stop;

public static void main(String[] args) {
new Thread(new MultiTimeServer(8889)).start();
}

public MultiTimeServer(String ipAddr, int port) {
if (ipAddr == null || "".equals(ipAddr)) {
ipAddr = "127.0.0.1";
}


try {

selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(ipAddr,
port));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("开启服务端:MultiTimeServer, 监听端口:" + port );
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}


public MultiTimeServer(int port) {

this("", port);

}

public void stop() {
this.stop = true;
}

/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {

while (!stop) {

try {
selector.select(1000); //每次检索等待一秒
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {

key = it.next();
it.remove();

try {

handleInput(key);
} catch (IOException e) {

if (key != null) {
key.cancel();

if (key.channel() != null) {
key.channel().close();
}
}
}

}

} catch (IOException e) {
e.printStackTrace();
}
}

}

/**
* 处理就绪通道事件
* @author sky
* @date 2015-6-27 下午3:13:46
* @param key
* @throws IOException
*/
private void handleInput(SelectionKey key) throws IOException {

if (key.isValid()) {


if (key.isAcceptable()) { //接收新的请求

ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel client = serverSocketChannel.accept();
client.configureBlocking(false); //必须注册为非阻塞模式
client.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { //通道已经准备好数据进行读取

SocketChannel client = (SocketChannel) key.channel();

ByteBuffer readBuffer = ByteBuffer.allocate(1024); //分配1K
int readByteNum = client.read(readBuffer);
if (readByteNum > 0) { //有输入字节

byte[] bytes = new byte[readByteNum];
readBuffer.flip();
readBuffer.get(bytes);
String body = new String(bytes, UTF_8);
System.out.println(System.currentTimeMillis() + ":" +
body);
client.register(selector, SelectionKey.OP_READ);
doWrite(body, client);

}
}
}
}

/**
* 响应输出
* @author sky
* @date 2015-6-27 下午3:21:48
* @param body
* @throws IOException
*/
private void doWrite(String body, SocketChannel client) throws IOException {

if (body != null && !body.isEmpty()) {
byte[] responseBytes = body.getBytes(UTF_8);
ByteBuffer byteBuffer = ByteBuffer.wrap(responseBytes);
System.out.println(byteBuffer.position() + "," + byteBuffer.limit());
client.write(byteBuffer);
if (!byteBuffer.hasRemaining()) {
System.out.println("sender to client");
}
}
}

}


客户端代码如下:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

/**
* @author sky
* @date 2015-6-27 下午3:26:08
*/
public class MultiTimeClient implements Runnable {

/**
* 选择器
*/
private Selector selector;

/**
* 本地套接字通道
*/
private SocketChannel socketChannel;

/**
* 控制台输入
*/
private BufferedReader reader;

/**
* 消息内容编码
*/
private Charset UTF_8 = Charset.forName("utf-8");

/**
* 服务端端口
*/
private int port;

private String ipAddr;

/**
* 是否停止工作标志
*/
private volatile boolean stop;

public static void main(String[] args) {
new Thread(new MultiTimeClient(8889)).start();
}

public MultiTimeClient(String ipAddr, int port) {
if (ipAddr == null || "".equals(ipAddr)) {
ipAddr = "127.0.0.1";
}

this.port = port;

//创建客户端套接字通道以及连接服务端
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);

reader = new BufferedReader(new InputStreamReader(System.in));
} catch (IOException e) {
e.printStackTrace();
}
}

public MultiTimeClient(int port) {
this("", port);
}

/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {

try {
doConnect();

while (!stop) {
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {

key = it.next();
it.remove();

try {
doInputHand(key);
} catch (Exception e) {

if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}

if (selector != null) {
selector.close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}



}

private void doInputHand(SelectionKey key) throws IOException {

if (key.isValid()) {

SocketChannel socketChannel = (SocketChannel) key.channel();
if (key.isConnectable()) { //刚建立连接

if (socketChannel.finishConnect()) { //需要注册可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, "hello server!");
} else {
System.exit(1);
}
} else if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024); //分配1K
int readByteNum = socketChannel.read(readBuffer);
if (readByteNum > 0) { //有输入字节

byte[] bytes = new byte[readByteNum];
readBuffer.flip();
readBuffer.get(bytes);
String body = new String(bytes, UTF_8);
System.out.println("Now is:" +
body);
System.out.println("请输入发送:");
String msg = reader.readLine();
doWrite(socketChannel, msg);
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
key.cancel();
socketChannel.close();
}
} else if (key.isWritable()) {
System.out.println("isWritable");
}
}
}

private void doConnect() throws IOException {

if (socketChannel.connect(new InetSocketAddress(ipAddr, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, "hello server!");
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}

private void doWrite(SocketChannel sc, String body) throws IOException {
byte[] req = body.getBytes(UTF_8);
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("send to server success!");
}
}
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics