`
sunnylocus
  • 浏览: 869669 次
  • 性别: Icon_minigender_1
  • 来自: 美国图森
社区版块
存档分类
最新评论

Socket通信模式:收发线程互斥

    博客分类:
  • Java
阅读更多

      有做过通信程序或着短信接入程序的程序员都知道,与之通信的每条命令都由消息头和消息尾构成,消息头一般包括整个消息体的长度、流水号、命令类型等信息,客户端向服务端发送一个请求,服务端返回一个响应,请求的流水号和返回的流水号为一一对应关系。如图:

 

 一般我们做法是写一个同步的方法用于发送命令和接收命令,如

 

  public synchronized String recMsg(String reqMsg) {
        //TODO:发送消息 ..... 
       //TODO:接收消息 
     return 收到的消息 
 }

 

  这样做虽然能满足要求,但是效率不高,因为每发送一次命令,需要等到命令成功响应后才能继续发送下一条命令。用收发线程来实现下(直接从项目copy的代码):

package com.bill99.svr;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;

/**
 *<p>title: socket通信包装类</p>
 *<p>Description: </p>
 *<p>CopyRight: CopyRight (c) 2009</p>
 *<p>Company: 99bill.com</p>
 *<p>Create date: 2009-10-14</P>
 *author     sunnylocus
 *                v0.10 2009-10-14 初类
 *                v0.11 2009-11-12 对命令收发逻辑及收发线程互斥机制进行了优化,处理命令速度由原来8~16个/秒提高到25~32个/秒
 */ public class SocketConnection {

	private volatile Socket socket;
	private int timeout = 1000*10; //超时时间,初始值10秒
	private boolean isLaunchHeartcheck = false;//是否已启动心跳检测
	private boolean isNetworkConnect = false; //网络是否已连接
	private static String host = "";
	private static int port;
	static InputStream inStream = null;
	static OutputStream outStream = null;
	private static Logger log =Logger.getLogger(SocketConnection.class);
	private static SocketConnection socketConnection = null;
	private static java.util.Timer heartTimer=null;
	//-------------------------------------------
	//private final Map<String, Object> recMsgMap= Collections.synchronizedMap(new HashMap<String, Object>());
	private final ConcurrentHashMap<String, Object> recMsgMap = new ConcurrentHashMap<String, Object>();
	private static Thread receiveThread = null;
	private final ReentrantLock lock = new ReentrantLock();
	
	private SocketConnection(){
		Properties conf = new Properties();
		try {
			conf.load(SocketConnection.class.getResourceAsStream("test.conf"));
			this.timeout = Integer.valueOf(conf.getProperty("timeout"));
			init(conf.getProperty("ip"),Integer.valueOf(conf.getProperty("port")));
		} catch(IOException e) {
			log.fatal("socket初始化异常!",e);
			throw new RuntimeException("socket初始化异常,请检查配置参数");
		}
	}
	/**
	 * 单态模式 
	 */
	public static SocketConnection getInstance() {
		if(socketConnection==null) {
			synchronized(SocketConnection.class) {
				if(socketConnection==null) {
					socketConnection = new SocketConnection();
					return socketConnection;
				}
			}
		}
		return socketConnection;
	}
	
	private void init(String host,int port) throws IOException {
		InetSocketAddress addr = new InetSocketAddress(host,port);
		socket = new Socket();
		synchronized (this) {
			log.info("【准备与"+addr+"建立连接】");
			socket.connect(addr, timeout);
			log.info("【与"+addr+"连接已建立】");
			inStream = socket.getInputStream();
			outStream = socket.getOutputStream();
			socket.setTcpNoDelay(true);//数据不作缓冲,立即发送
			socket.setSoLinger(true, 0);//socket关闭时,立即释放资源
			socket.setKeepAlive(true);
			socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输
			isNetworkConnect=true;
			receiveThread = new Thread(new ReceiveWorker());
			receiveThread.start();
			SocketConnection.host=host;
			SocketConnection.port=port;
			if(!isLaunchHeartcheck)
				launchHeartcheck();
		}
	}
	/**
	 * 心跳包检测
	 */
	private void launchHeartcheck() {
		if(socket == null)
			throw new IllegalStateException("socket is not established!");
		heartTimer  = new Timer();
		isLaunchHeartcheck = true;
		heartTimer.schedule(new TimerTask() {
			public void run() {
				String msgStreamNo = StreamNoGenerator.getStreamNo("kq");
				int mstType =9999;//999-心跳包请求
				SimpleDateFormat dateformate = new SimpleDateFormat("yyyyMMddHHmmss");
				String msgDateTime = dateformate.format(new Date());
				int msgLength =38;//消息头长度
				String commandstr = "00" +msgLength + mstType + msgStreamNo;
				log.info("心跳检测包 -> IVR "+commandstr);
				int reconnCounter = 1;
				while(true) {
					String responseMsg =null;
					try {
						responseMsg = readReqMsg(commandstr);
					} catch (IOException e) {
						log.error("IO流异常",e);
						reconnCounter ++;
					} 
					if(responseMsg!=null) {
						log.info("心跳响应包 <- IVR "+responseMsg);
						reconnCounter = 1;
						break;
					} else { 
						reconnCounter ++;
					}
					if(reconnCounter >3) {//重连次数已达三次,判定网络连接中断,重新建立连接。连接未被建立时不释放锁
						reConnectToCTCC(); break;
					}
				}
			}
		},1000 * 60*1,1000*60*2);
	}
	
	/**
	 * 重连与目标IP建立重连
	 */
	private void reConnectToCTCC() {
		new Thread(new Runnable(){
			public void run(){
				log.info("重新建立与"+host+":"+port+"的连接");
				//清理工作,中断计时器,中断接收线程,恢复初始变量
				heartTimer.cancel();
				isLaunchHeartcheck=false;
				isNetworkConnect = false;
				receiveThread.interrupt();
				try {
					socket.close();
				} catch (IOException e1) {log.error("重连时,关闭socket连接发生IO流异常",e1);}
				//----------------
				synchronized(this){
					for(; ;){
						try {
							Thread.currentThread();
							Thread.sleep(1000 * 1);
							init(host,port);
							this.notifyAll();
							break ;
						} catch (IOException e) {
							log.error("重新建立连接未成功",e);
						} catch (InterruptedException e){
							log.error("重连线程中断",e);
						}
					}
				}
			}
		}).start();
	}
	/**
	 * 发送命令并接受响应
	 * @param requestMsg
	 * @return
	 * @throws SocketTimeoutException
	 * @throws IOException
	 */
	public String readReqMsg(String requestMsg) throws IOException {
		if(requestMsg ==null) {
			return null;
		}
		if(!isNetworkConnect) {
			synchronized(this){
				try {
					this.wait(1000*5); //等待5秒,如果网络还没有恢复,抛出IO流异常
					if(!isNetworkConnect) {
						throw new IOException("网络连接中断!");
					}
				} catch (InterruptedException e) {
					log.error("发送线程中断",e);
				}
			}
		}
		String msgNo = requestMsg.substring(8, 8 + 24);//读取流水号
		outStream = socket.getOutputStream();
		outStream.write(requestMsg.getBytes());
		outStream.flush();
		Condition msglock = lock.newCondition(); //消息锁
		//注册等待接收消息
		recMsgMap.put(msgNo, msglock);
		try {
			lock.lock();
			msglock.await(timeout,TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			log.error("发送线程中断",e);
		} finally {
			lock.unlock();
		}
		Object respMsg = recMsgMap.remove(msgNo); //响应信息
		if(respMsg!=null &&(respMsg != msglock)) {
			//已经接收到消息,注销等待,成功返回消息
			return (String) respMsg;
		} else {
			log.error(msgNo+" 超时,未收到响应消息");
			throw new SocketTimeoutException(msgNo+" 超时,未收到响应消息");
		}
	}
	
	public void finalize() {
		if (socket != null) {
			try {
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	//消息接收线程
	private class ReceiveWorker implements Runnable {
		String intStr= null;
		public void run() {
			while(!Thread.interrupted()){
				try {
					byte[] headBytes = new byte[4];
					if(inStream.read(headBytes)==-1){
						log.warn("读到流未尾,对方已关闭流!");
						reConnectToCTCC();//读到流未尾,对方已关闭流
						return;
					}
					byte[] tmp  =new byte[4];
					tmp = headBytes;
					String tempStr = new String(tmp).trim();
					if(tempStr==null || tempStr.equals("")) {
						log.error("received message is null");
						continue;
					}
					intStr = new String(tmp);
					int totalLength =Integer.parseInt(intStr);
					//----------------
					byte[] msgBytes = new byte[totalLength-4];
					inStream.read(msgBytes);
					String resultMsg = new String(headBytes)+ new String(msgBytes);
					//抽出消息ID
					String msgNo = resultMsg.substring(8, 8 + 24);
					Condition msglock =(Condition) recMsgMap.get(msgNo);
					if(msglock ==null) {
						log.warn(msgNo+"序号可能已被注销!响应消息丢弃");
						recMsgMap.remove(msgNo);
						continue;
					}
					recMsgMap.put(msgNo, resultMsg);
					try{
						lock.lock();
						msglock.signalAll();
					}finally {
						lock.unlock();
					}
				}catch(SocketException e){
					log.error("服务端关闭socket",e);
					reConnectToCTCC();
				} catch(IOException e) {
					log.error("接收线程读取响应数据时发生IO流异常",e);
				} catch(NumberFormatException e){
					log.error("收到没良心包,String转int异常,异常字符:"+intStr);
				} 
			}
		}
	}
}

 

使用收发线程,一个发一个收,能提高socket通信的效率,不过有隐患:

    比如在高并发环境下,每秒有100个线程调用 readReqMsg(String requestMsg)方法,也就是每秒会有100个需要发送的命令,由于网络传输、服务端处理命令等因素,接收线程每秒只能接收80个响应,将已返回命令的流水号从recMsgMap移除,但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机。

   怎么预防这个隐患,还没有找到合适的解决办法,还请有经验的朋友指点一二。

4
0
分享到:
评论
5 楼 caolongaaron 2011-11-02  
 
4 楼 caolongaaron 2011-11-02  
[b][/b]dsadsa
3 楼 lyy3323 2010-04-05  
对于发送消息而言,把处理完的命令仍到队列中,单独开一个线程读取队列中的值,然后write出去。
2 楼 sunnylocus 2010-03-08  
lyy3323 写道
大概看了下你的代码。感觉写的还是有点乱。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。

接受队列----类似操作。

不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。


recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。


另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。

你是说将准备发送的信息放在一个队列中,然后让消费者线程去取队列中的信息么?
1 楼 lyy3323 2010-03-08  
大概看了下你的代码。感觉写的还是有点乱。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。

接受队列----类似操作。

不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。


recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。


另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。

相关推荐

Global site tag (gtag.js) - Google Analytics