- 浏览: 869669 次
- 性别:
- 来自: 美国图森
最新评论
-
jnjeC:
jake_12345 写道大哥,这写错了吧Class.isAs ...
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别 -
lgh1992314:
https://my.oschina.net/xianggao ...
Servlet生命周期 -
qq412796770:
大哥,百度第一条就是你的,好歹也修改一下吧
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别 -
技术无涯苦作舟:
大哥,百度第一条就是你的,好歹也修改一下吧
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别 -
lgh1992314:
大哥,百度第一条就是你的,好歹也修改一下吧
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别
有做过通信程序或着短信接入程序的程序员都知道,与之通信的每条命令都由消息头和消息尾构成,消息头一般包括整个消息体的长度、流水号、命令类型等信息,客户端向服务端发送一个请求,服务端返回一个响应,请求的流水号和返回的流水号为一一对应关系。如图:
一般我们做法是写一个同步的方法用于发送命令和接收命令,如
public synchronized String recMsg(String reqMsg) { //TODO:发送消息 ..... //TODO:接收消息 return 收到的消息 }
这样做虽然能满足要求,但是效率不高,因为每发送一次命令,需要等到命令成功响应后才能继续发送下一条命令。用收发线程来实现下(直接从项目copy的代码):
package com.bill99.svr; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; /** *<p>title: socket通信包装类</p> *<p>Description: </p> *<p>CopyRight: CopyRight (c) 2009</p> *<p>Company: 99bill.com</p> *<p>Create date: 2009-10-14</P> *author sunnylocus * v0.10 2009-10-14 初类 * v0.11 2009-11-12 对命令收发逻辑及收发线程互斥机制进行了优化,处理命令速度由原来8~16个/秒提高到25~32个/秒 */ public class SocketConnection { private volatile Socket socket; private int timeout = 1000*10; //超时时间,初始值10秒 private boolean isLaunchHeartcheck = false;//是否已启动心跳检测 private boolean isNetworkConnect = false; //网络是否已连接 private static String host = ""; private static int port; static InputStream inStream = null; static OutputStream outStream = null; private static Logger log =Logger.getLogger(SocketConnection.class); private static SocketConnection socketConnection = null; private static java.util.Timer heartTimer=null; //------------------------------------------- //private final Map<String, Object> recMsgMap= Collections.synchronizedMap(new HashMap<String, Object>()); private final ConcurrentHashMap<String, Object> recMsgMap = new ConcurrentHashMap<String, Object>(); private static Thread receiveThread = null; private final ReentrantLock lock = new ReentrantLock(); private SocketConnection(){ Properties conf = new Properties(); try { conf.load(SocketConnection.class.getResourceAsStream("test.conf")); this.timeout = Integer.valueOf(conf.getProperty("timeout")); init(conf.getProperty("ip"),Integer.valueOf(conf.getProperty("port"))); } catch(IOException e) { log.fatal("socket初始化异常!",e); throw new RuntimeException("socket初始化异常,请检查配置参数"); } } /** * 单态模式 */ public static SocketConnection getInstance() { if(socketConnection==null) { synchronized(SocketConnection.class) { if(socketConnection==null) { socketConnection = new SocketConnection(); return socketConnection; } } } return socketConnection; } private void init(String host,int port) throws IOException { InetSocketAddress addr = new InetSocketAddress(host,port); socket = new Socket(); synchronized (this) { log.info("【准备与"+addr+"建立连接】"); socket.connect(addr, timeout); log.info("【与"+addr+"连接已建立】"); inStream = socket.getInputStream(); outStream = socket.getOutputStream(); socket.setTcpNoDelay(true);//数据不作缓冲,立即发送 socket.setSoLinger(true, 0);//socket关闭时,立即释放资源 socket.setKeepAlive(true); socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输 isNetworkConnect=true; receiveThread = new Thread(new ReceiveWorker()); receiveThread.start(); SocketConnection.host=host; SocketConnection.port=port; if(!isLaunchHeartcheck) launchHeartcheck(); } } /** * 心跳包检测 */ private void launchHeartcheck() { if(socket == null) throw new IllegalStateException("socket is not established!"); heartTimer = new Timer(); isLaunchHeartcheck = true; heartTimer.schedule(new TimerTask() { public void run() { String msgStreamNo = StreamNoGenerator.getStreamNo("kq"); int mstType =9999;//999-心跳包请求 SimpleDateFormat dateformate = new SimpleDateFormat("yyyyMMddHHmmss"); String msgDateTime = dateformate.format(new Date()); int msgLength =38;//消息头长度 String commandstr = "00" +msgLength + mstType + msgStreamNo; log.info("心跳检测包 -> IVR "+commandstr); int reconnCounter = 1; while(true) { String responseMsg =null; try { responseMsg = readReqMsg(commandstr); } catch (IOException e) { log.error("IO流异常",e); reconnCounter ++; } if(responseMsg!=null) { log.info("心跳响应包 <- IVR "+responseMsg); reconnCounter = 1; break; } else { reconnCounter ++; } if(reconnCounter >3) {//重连次数已达三次,判定网络连接中断,重新建立连接。连接未被建立时不释放锁 reConnectToCTCC(); break; } } } },1000 * 60*1,1000*60*2); } /** * 重连与目标IP建立重连 */ private void reConnectToCTCC() { new Thread(new Runnable(){ public void run(){ log.info("重新建立与"+host+":"+port+"的连接"); //清理工作,中断计时器,中断接收线程,恢复初始变量 heartTimer.cancel(); isLaunchHeartcheck=false; isNetworkConnect = false; receiveThread.interrupt(); try { socket.close(); } catch (IOException e1) {log.error("重连时,关闭socket连接发生IO流异常",e1);} //---------------- synchronized(this){ for(; ;){ try { Thread.currentThread(); Thread.sleep(1000 * 1); init(host,port); this.notifyAll(); break ; } catch (IOException e) { log.error("重新建立连接未成功",e); } catch (InterruptedException e){ log.error("重连线程中断",e); } } } } }).start(); } /** * 发送命令并接受响应 * @param requestMsg * @return * @throws SocketTimeoutException * @throws IOException */ public String readReqMsg(String requestMsg) throws IOException { if(requestMsg ==null) { return null; } if(!isNetworkConnect) { synchronized(this){ try { this.wait(1000*5); //等待5秒,如果网络还没有恢复,抛出IO流异常 if(!isNetworkConnect) { throw new IOException("网络连接中断!"); } } catch (InterruptedException e) { log.error("发送线程中断",e); } } } String msgNo = requestMsg.substring(8, 8 + 24);//读取流水号 outStream = socket.getOutputStream(); outStream.write(requestMsg.getBytes()); outStream.flush(); Condition msglock = lock.newCondition(); //消息锁 //注册等待接收消息 recMsgMap.put(msgNo, msglock); try { lock.lock(); msglock.await(timeout,TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("发送线程中断",e); } finally { lock.unlock(); } Object respMsg = recMsgMap.remove(msgNo); //响应信息 if(respMsg!=null &&(respMsg != msglock)) { //已经接收到消息,注销等待,成功返回消息 return (String) respMsg; } else { log.error(msgNo+" 超时,未收到响应消息"); throw new SocketTimeoutException(msgNo+" 超时,未收到响应消息"); } } public void finalize() { if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } //消息接收线程 private class ReceiveWorker implements Runnable { String intStr= null; public void run() { while(!Thread.interrupted()){ try { byte[] headBytes = new byte[4]; if(inStream.read(headBytes)==-1){ log.warn("读到流未尾,对方已关闭流!"); reConnectToCTCC();//读到流未尾,对方已关闭流 return; } byte[] tmp =new byte[4]; tmp = headBytes; String tempStr = new String(tmp).trim(); if(tempStr==null || tempStr.equals("")) { log.error("received message is null"); continue; } intStr = new String(tmp); int totalLength =Integer.parseInt(intStr); //---------------- byte[] msgBytes = new byte[totalLength-4]; inStream.read(msgBytes); String resultMsg = new String(headBytes)+ new String(msgBytes); //抽出消息ID String msgNo = resultMsg.substring(8, 8 + 24); Condition msglock =(Condition) recMsgMap.get(msgNo); if(msglock ==null) { log.warn(msgNo+"序号可能已被注销!响应消息丢弃"); recMsgMap.remove(msgNo); continue; } recMsgMap.put(msgNo, resultMsg); try{ lock.lock(); msglock.signalAll(); }finally { lock.unlock(); } }catch(SocketException e){ log.error("服务端关闭socket",e); reConnectToCTCC(); } catch(IOException e) { log.error("接收线程读取响应数据时发生IO流异常",e); } catch(NumberFormatException e){ log.error("收到没良心包,String转int异常,异常字符:"+intStr); } } } } }
使用收发线程,一个发一个收,能提高socket通信的效率,不过有隐患:
比如在高并发环境下,每秒有100个线程调用 readReqMsg(String requestMsg)方法,也就是每秒会有100个需要发送的命令,由于网络传输、服务端处理命令等因素,接收线程每秒只能接收80个响应,将已返回命令的流水号从recMsgMap移除,但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机。
怎么预防这个隐患,还没有找到合适的解决办法,还请有经验的朋友指点一二。
评论
5 楼
caolongaaron
2011-11-02
4 楼
caolongaaron
2011-11-02
[b][/b]dsadsa
3 楼
lyy3323
2010-04-05
对于发送消息而言,把处理完的命令仍到队列中,单独开一个线程读取队列中的值,然后write出去。
2 楼
sunnylocus
2010-03-08
lyy3323 写道
大概看了下你的代码。感觉写的还是有点乱。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。
接受队列----类似操作。
不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。
recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。
另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。
接受队列----类似操作。
不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。
recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。
另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。
你是说将准备发送的信息放在一个队列中,然后让消费者线程去取队列中的信息么?
1 楼
lyy3323
2010-03-08
大概看了下你的代码。感觉写的还是有点乱。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。
接受队列----类似操作。
不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。
recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。
另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。
首先,概念上,收发同步改为收发异步的设计是正确的。
但是你的代码并没有真正做到。
发送指令应该通过队列的形式发送比较形象。即 指令拼装后---发送指令(在这里做同步,即将SEQ放在MAP中,吧指令扔到队列里)---轮询队列,发送指令。
接受队列----类似操作。
不能理解为什么会“但是发送的速度>接收的速度,每秒仍会向recMsgMap插入20个流水号,直到把服务器内存耗尽,宕机”。
recMsgMap中存几十W数据应该没问题,而且你的SEQ轮询已经也很快达到峰值吧。
另:尽量不要使用synchronized,完全可以用
CountDownLatch替代所有同步操作。
发表评论
-
人在江湖:如何用代码保护自己
2011-10-12 16:30 11231现在上一点规模的 ... -
Spring freemarker页面乱码解决
2011-01-13 11:56 7479在开发过程中遇到乱码十分的头痛,如果你在开发过程中也遇 ... -
数据漂白算法研究
2010-12-07 18:05 3765你的手机是不是 ... -
理解使用static import 机制
2010-11-09 08:48 3182J2SE 1.5里引入了“Sta ... -
理解多线程设计模式
2010-11-08 17:43 10423多线程设计模式:1.Single Threaded Execu ... -
理解ThreadLocal
2010-11-03 17:04 1914ThreadLocal是什么 早在JDK 1 ... -
经验总结:高性能的数据同步
2010-11-03 10:03 6404最近在做一个银行的生产数据脱敏系统,今天写代码时遇到 ... -
用JSSE实现网络安全通信
2010-06-25 15:11 3822在网络上信息由源主机到目标主机要经过很多路由和计算机, ... -
Java实时监控日志文件并输出
2010-06-19 17:21 61123最近有一个银行数据漂白系统,要求操作人员在页面调用远端 ... -
Junit测试private方法
2010-04-28 14:09 8010package com.bill99.junit; pu ... -
保护眼睛的豆沙色
2010-03-19 09:46 3559作我们IT这行的,一天要盯着电脑看,时间长了眼睛会感觉发酸 ... -
中国联通短信网关接入程序源代码(SGIP1.2协议)
2010-01-11 12:23 42852自从我发了博文“中国联通SP业务开发总结”后有很多的朋友问 ... -
Class.isAssignableFrom(Class clz)方法 与 instanceof 关键字的区别
2009-12-24 13:14 67450原地址:http://topic.csdn.net/t/200 ... -
非阻塞通信
2009-12-03 11:43 4615对于用ServerSocket和Socket写的服务 ... -
处理线程泄露
2009-12-01 15:10 8563当一个单线程化 ... -
在Timer和ScheduledExecutorService间决择
2009-11-27 10:25 13372java.util.Timer计时器有管理任务延迟执行(& ... -
ASCII码对照表
2009-11-12 11:26 2518ASCII表 ASCII值 控制字符 ASC ... -
java.net.SocketException: Software caused connection abort: recv failed 异常分析
2009-11-12 11:01 15680java.net.SocketException: Softw ... -
用State模式减少if..elseif语句
2009-11-03 17:20 7074我们在写程序的过 ... -
HttpURLConnection设置网络超时
2009-10-29 17:30 9468Java中可以使用HttpURLConnection来请 ...
相关推荐
g: 利用互斥量来解决线程同步互斥问题 h: problem1 生产者消费者问题 (1生产者 1消费者 1缓冲区) problem1 more 生产者消费者问题 (1生产者 2消费者 4缓冲区) problem2 读者与写着问题 I: 信号量 semaphore ...
2.3 实验三:线程的互斥.doc
小实验二:使用Windows互斥信号量操作函数解决上述线程并发问题,并分析、尝试和讨论线程执行体中有关信号量操作函数调用的正确位置 小实验三:根据同步机制的Peterson软件解决方案尝试自己编程实现线程同步机制和...
实验五:线程的互斥(2).doc
操作系统实验(三)线程的互斥 操作系统实验(三)线程的互斥
一、题目: 创建线程,利用互斥实现线程共享变量通信 二、目的 掌握线程创建和终止,加深对线程和进程概念的理解,会用同步与互斥方法实现线程之间的通信。 三、内容和要求 软件界面上点“创建线程” 按钮,创建三个...
该程序是我写的博客“一起talk C栗子吧(第一百一十六回:C语言实例--线程同步之互斥量二)”的配套程序,共享给大家使用
易语言 线程池 多线程 线程互斥 易语言线程互斥的解决办法
linux内核知识系列:同步与互斥 华嵌智能提供 www.embedded-cn.com http://embedded-cn.taobao.com
一看就懂,一做就会:临界资源互斥
C 创建线程互斥对象的实例源码下载,声明线程函数,创建线程,程序睡眠,释放互斥对象,设置事件对象为无信号状态,生成控制台程序,仅供参考。
这个多线程例子。C#的。主要讲述线程互斥的问题 。只是个简单的例子。
C#多线程互斥实例 多线程获取同一变量(不重复)。是一个很好的学习例子
4个线程互斥类,代码例子,完整的工程,详细说明了4中互斥对象的作用,对学习多线程编程的朋友很有用处............
C#.NET多线程实例6个(包括多线程基本使用,多线程互斥等全部多线程使用实例)
C# 多线程互斥 两个线程交替工作 C#多线程互斥,两个线程交替工作,如上图所示,挺有意思的。
线程互斥测试2!
多线程中互斥量的使用。 涉及到得知识有:线程的创建、互斥量的创建和使用,线程等待等。
线程的互斥和协作,是线程学习的一个重要关节,祝大家能学好