`
fokman
  • 浏览: 239141 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

Java nio 客户端连接Server

阅读更多

在做通信系统的开发过程中,经常需要使用Socket通信。java新的io机制给我提供了一个很好的异步socket通信方式,这段时间用java写了一个客户端用来连接server。发现运行效率还比较让人满意。下面是我实现的部分功能。

连接服务器的socket,多线程启动。如果连接失败就重连。

public class CommonSocket extends Thread {
	private SocketChannel socketChannel;
	private boolean stop = false;
	private int port = 0;
	private String ip = "";
	private Selector selector = null;
	private SocketAddress socketAddress = null;
	private Logger logger = Logger.getLogger(CommonSocket.class);

	public CommonSocket() {
		this.ip = SocketInfoUtils.TCP_IP;
		this.port = SocketInfoUtils.TCP_PORT;
	}

	public void run() {
		while (!stop) {
			socketConnet();
			try {
				sleep(5000);
			} catch (InterruptedException e) {
				logger.error("SocketConnect run error: InterruptedException");
			}
		}
	}

	public void socketBuilder() {
		try {
			selector = Selector.open();
		} catch (IOException e) {
			e.printStackTrace();
			logger.error("Open to selector failed: IOException");
		}
	}

	private void openSocketChannel() {
		try {
			socketAddress = new InetSocketAddress(ip, port);
			socketChannel = SocketChannel.open();
			socketChannel.socket().setReuseAddress(true);
			socketChannel.connect(socketAddress);
		} catch (ClosedChannelException e) {
			logger.warn("Channel is closed: ClosedChannelException");
		} catch (IOException e) {
			logger
					.warn("Connet is failed or time out,the system will automatically re-connected : IOException");
		}
	}

	/**
	 * do ClientBuilder if socket conncte success
	 */
	public void socketConnet() {
		try {
			openSocketChannel();
			if (socketChannel.isOpen()) {
				this.stop = true;
				socketBuilder();
				socketChannel.configureBlocking(false);
				socketChannel.register(selector, SelectionKey.OP_READ
						| SelectionKey.OP_WRITE);
				PackageBuilder clientBuilder = new PackageBuilder(socketChannel,
						selector);
				clientBuilder.start();
				logger.info("Has been successfully connected to " + ip
						+ "and port:    " + port);
			} else {
				socketChannel.close();
			}
		} catch (ClosedChannelException e) {
			logger.warn("Channel is closed: ClosedChannelException");
		} catch (IOException e) {
			logger
					.warn("Connet is failed or time out,the system will automatically re-connected : IOException");
		}

	}
}

 发送和接收事件处理,NIO是基于事件的驱动模型,这个类就是专门处理收发的。

public class PackageBuilder  extends Thread{
	private SocketChannel socketChannel = null;
	private Selector selector = null;
	private boolean stop = false;
	private byte[] array = new byte[1024];
	private ByteBuffer byteBuffer;
	private PackageQueue packageQueue;
	private Logger logger = Logger.getLogger(PackageBuilder.class);
	
	public PackageBuilder(SocketChannel socketChannel,Selector selectore){
		this.socketChannel = socketChannel;
		this.selector = selectore;
		packageQueue=new PackageQueue();
	}
	public void run(){
		try {
			while (!stop) {
				Thread.sleep(1);
				if(!socketChannel.isOpen()){
					reconnect();//通道没打开或者断开执行重连工作(Channel did not open the work of the implementation of re-connection )
					break;
				}
				if (selector.select(30) > 0) {
					doSelector();
				}
			}
		} catch (IOException e) {
			logger.error("CameraBuilder run error: IOException");
		} catch (InterruptedException e){
			logger.error("CameraBuilder run error: InterruptedException");
		}
	}
	public void doSelector(){
		for(SelectionKey key:selector.selectedKeys()){
			selector.selectedKeys().remove(key);
			if(!key.isValid()){
				continue;
			}
			doKeys(key);
		}
	}
	
	public void doKeys(SelectionKey key){
		SocketChannel channel = (SocketChannel)key.channel();
		if(key.isReadable()){
			readResponse(channel);
		}
		if(key.isWritable()){
			sendRequest(channel);
		}
	}
	private void readResponse(SocketChannel channel) {
		byteBuffer=ByteBuffer.wrap(array);
		byteBuffer.clear();
		int count = 0;
		try {
			count = channel.read(byteBuffer);
		} catch (IOException e) {
			reconnect();//通道没打开或者断开执行重连工作(Channel did not open the work of the implementation of re-connection )
			logger.error("Connection reset by peer: IOException");
		}
		if(count != -1){
			byteBuffer.flip();
			byte[] bs = new byte[count];
			byteBuffer.get(bs);
			ByteBuffer returnBuffer = ByteBuffer.allocate(count);
			returnBuffer.clear();
			returnBuffer.put(bs);
			returnBuffer.flip();
			PrintUtil.printBf(returnBuffer.array());
			ParseBufferData parseData=new ParseBufferData(returnBuffer);		
			parseData.parseBuffer();			
	  }
		if(count < 0){
			reconnect();
		}
	}
	/**
	 * send pakcet of request
	 * @param channel
	 */
	public void sendRequest(SocketChannel channel){
		byte[] array = packageQueue.takeMsgs();
		if(array!=null){
		ByteBuffer byteBuffer = ByteBuffer.wrap(array);
			try {
				channel.write(byteBuffer);
			 } catch (IOException e) {
				 reconnect();//通道没打开或者断开执行重连工作(Channel did not open the work of the implementation of re-connection )
				logger.warn("socket not connected or has been closed: IOException");
			 }
		 }
	}
	
	public void reconnect(){
		stopClient();
		logger.warn("socket not connected or has been closed");
		ThreadPoolUtil.getExecutor().execute(new CameraSocket());
	}
	
	public void stopClient(){
		this.stop = true;
		if(socketChannel.isConnected() && !socketChannel.isOpen()){
			try {
				socketChannel.close();
				logger.info("server_socket has connected");
			} catch (IOException e) {
				logger.warn("Channel closed to failed: IOException");
			}
		}
	}
}

 发送和接收数据存放在缓存中

public class PackageQueue {
	private static  List<byte[]> queue = new ArrayList<byte[]>();
	
	public PackageQueue(){	
	}
	
	public void pushMsgs(byte[] array){
		synchronized(queue){
			queue.add(array);
		}
	}
	
	public byte[] takeMsgs() {
		synchronized (queue) {
			byte[] sd=null;
			if(queue != null){
				if(queue.size() > 0){
					sd = queue.get(0);
					queue.remove(0);
				}
			}
			return sd;
		}
		
	}

	public static List<byte[]> getQueue() {
		return queue;
	}

	public static void setQueue(List<byte[]> queue) {
		PackageQueue.queue = queue;
	}
}

 以上就是客户端连接、发送、接收的代码。希望对大家有所帮助

分享到:
评论
3 楼 charlotte 2012-05-14  
学习了!!!!!
2 楼 fokman 2011-08-17  
一江春水邀明月 写道
缺少SocketInfoUtils  ThreadPoolUtil CameraSocket  三个类的代码啊, 博主能把这三个类也贴一下吗? 谢谢了

public class CameraSocket extends Thread {
private int cmdPort = SocketInfoUtils.CMD_PORT; // 5554
private String host = SocketInfoUtils.HOST; // 172.16.163.38
ByteBuffer buffer = ByteBuffer.allocate(1024);
// DatagramChannel dataChannel;
DatagramChannel cmdChannel;
Selector selector;
CameraQueue cameraQueue;

public CameraSocket() throws Exception {
selector = Selector.open();
cameraQueue = new CameraQueue();
cmdChannel = DatagramChannel.open();
cmdChannel.configureBlocking(false);
SocketAddress target = new InetSocketAddress(host, cmdPort);
cmdChannel.connect(target);
cmdChannel.register(selector, SelectionKey.OP_WRITE);
}

@Override
public void run() {
boolean flag = true;
while (flag) {
try {
doSelector();
} catch (IOException e) {
flag = false;
e.printStackTrace();
}
}
}

private void doSelector() throws IOException {
if (selector.select(1000) > 0) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isWritable()) {
writeEvent(cmdChannel);
}
}
selector.selectedKeys().clear();
}
}

// private void readEvent(SelectionKey key) throws IOException {
// ByteBuffer buffer = ByteBuffer.allocate(1024);
// dataChannel.receive(buffer);
// buffer.flip();
// ParseBufferData parseBufferData=new ParseBufferData(buffer);
// parseBufferData.parseBuffer();
// }

private void writeEvent(DatagramChannel channel) throws IOException {
byte[] array = cameraQueue.takeMsgs();
if (array != null) {
ByteBuffer byteBuffer = ByteBuffer.wrap(array);
try {
channel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}

}
public class SocketInfoUtils {

public static Properties factory = SocketPropertiesFactory.getInstance().getBoundle();
public static String TCP_IP = factory.getProperty("tcp_ip");
public static int TCP_PORT = Integer.parseInt(factory.getProperty("tcp_port"));
public static int CAMERA_PORT=Integer.parseInt(factory.getProperty("camera_port"));

//public static int UDP_PORT = Integer.parseInt(factory.getProperty("udp_port"));
public static int HIS_UDP_PORT = Integer.parseInt(factory.getProperty("his_udp_port"));

public static int CMD_PORT = Integer.parseInt(factory.getProperty("cmd_port"));
public static int DATA_PORT = Integer.parseInt(factory.getProperty("data_port"));
public static final String HOST = factory.getProperty("host");
}
public class ThreadPoolUtil {

private static ThreadPoolExecutor executor;
static{
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1);
executor = new ThreadPoolExecutor(5,100,500,TimeUnit.MILLISECONDS,queue);
RejectedExecutionHandler rejected = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(String.format("======= Task %d rejected.======", r.hashCode()));
}
};
executor.setRejectedExecutionHandler(rejected);
}

public static ThreadPoolExecutor getExecutor() {
return executor;
}

public static void setExecutor(ThreadPoolExecutor executor) {
ThreadPoolUtil.executor = executor;
}

}
1 楼 一江春水邀明月 2011-06-30  
缺少SocketInfoUtils  ThreadPoolUtil CameraSocket  三个类的代码啊, 博主能把这三个类也贴一下吗? 谢谢了

相关推荐

    JAVA NIO客户端服务端完整项目工程(ServerandClientNIOV1.0.zip)包下载.txt

    该JAVA NIO项目包含server服务端完整项目源码、client客户端项目工程源码。

    java nio socket 例子

    本例包含服务器端和客户端,多线程,每线程多次发送,Eclipse工程,启动服务器使用 nu.javafaq.server.NioServer,启动客户端使用 nu.javafaq.client.NioClient。另本例取自javafaq.nv上的程序修改而成

    java NIO socket聊天室

    使用NIO socket不需要多线程来处理多个连接的请求,效率非常高 ...4,修改封装http做成短连接处理,就是一个小型的webserver,或者结合java2D和robot做远程监控 5,封装自己的协议可以做成自己需要的服务器端程序,

    Java NIO原理和使用

    这是一个守候在端口9000的noblock server例子,如果我们编制一个客户端程序,就可以对它进行互动操作,或者使用telnet 主机名 90000 可以链接上。 通过仔细阅读这个例程,相信你已经大致了解NIO的原理和使用方法,...

    Java NIO 聊天室 JSwing

    // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调 //用channel.finishConnect();才能完成连接 channel.connect(new InetSocketAddress(ip,port)); //将通道管理器和该通道绑定,并...

    JavaNioExample:使用 java NIO 的简单客户端和服务器

    使用 java NIO 的简单客户端和服务器 用法示例: java -jar server.jar 本地主机 8080 java -jar client.jar 本地主机 8080

    java解读NIOSocket非阻塞模式.zip

    在NIO中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务线程,而是通过多线程充分使用用多个CPU的处理能力和处理中的等待时间,达到提高服务能力的目的。 client多线程请求server端,server接收...

    JAVA_API1.6文档(中文)

    java.nio.channels 定义了各种通道,这些通道表示到能够执行 I/O 操作的实体(如文件和套接字)的连接;定义了用于多路复用的、非阻塞 I/O 操作的选择器。 java.nio.channels.spi 用于 java.nio.channels 包的服务...

    Server.rar

    Java NIO使用线程池实现服务端多连接,并异步处理客户端发送的数据

    Netty权威指南

    Netty是基于Java NIO client-server的网络应用框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来开发网络应用程序,这种新的方式使它很容易使用和具有很强的扩展性。Netty的...

    mina高性能Java网络框架 v2.1.3

    当前发行的MINA版本支持基于Java NIO技术的TCP/UDP应用程序开发、串口通讯程序(只在最新的预览版中提供),MINA所支持的功能也在进一步的扩展中。 Apache MINA是一个网络应用程序框架,可帮助用户轻松开发高性能和...

    基于Java实现的端到端加密的聊天室系统完整源码+项目说明.7z

    NIO实现的聊天客户端和服务器 日志解密工具 【打包文件】 netty-chat-server.jar: Netty实现的服务器 netty-chat-client.jar: Netty实现的客户端 chat-server.jar: NIO实现的服务器 chat-client.jar: NIO实现的...

    NettyInAction中文版.docx

    Netty是一个NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来使开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。Netty...

    Netty In Action中文版.docx

     Netty是一个NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来使开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。...

    Netty In Action中文版

    Netty是一个NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提 供了一种新的方式来使开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。...

    JAVA上百实例源码以及开源项目

    第一步:运行ServerData.java 启动服务器,然后服务器处于等待状态 第二步:运行LoginData.java 启动(客户端)登陆界面 输入用户名 ip为本机localhost 第三步:在登陆后的界面文本框输入文本,然后发送 可以同时启动...

    《netty in action中文版》13章全

    Netty是一个NIO client-server(客户端服务器)框架,使用Netty可以快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来使开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。Netty...

    JAVA上百实例源码以及开源项目源代码

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    scala-nio-server:Scala中的一个Nio服务器示例

    Scala Nio服务器示例 此项目服务于。 在该项目中,我使用java选择器接口来实现一个简单的Nio Server。 用 ... 然后运行src/main/java/test/SocketClientExample.java ,运行与服务器连接的客户端。

    Java 1.6 API 中文 New

    java.nio.channels 定义了各种通道,这些通道表示到能够执行 I/O 操作的实体(如文件和套接字)的连接;定义了用于多路复用的、非阻塞 I/O 操作的选择器。 java.nio.channels.spi 用于 java.nio.channels 包的服务...

Global site tag (gtag.js) - Google Analytics