`
sunlujing
  • 浏览: 177931 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

NIO socket 的简单连接池

    博客分类:
  • j2se
阅读更多

      在最近的项目中,需要写一个socket 与 底层服务器通信的模块。在设计中,请求对象被封装 xxxRequest,消息返回被封装为 xxxResponse. 由于socket的编程开发经验少,一开始我使用了短连接的方式,每个请求建立一个socket通信,由于每个socket只进行一次读写,这大大浪费了系统资源。

      于是考虑使用长连接,系统公用一个client socket 并对send 操作进行加锁,结果在处理并发的时候,各种慢,各种等待。没有办法,考虑使用两节池,预先创建多个 client socket 放入 连接池,需要发送请求时从连接池获取一个socket,完成请求时放入连接池中。下面是一个简单的实现。

       

        private  static String IP=GlobalNames.industryIP;
 private  static int PORT =Integer.parseInt(GlobalNames.industryPort);
 
 private static  int CONNECTION_POOL_SIZE = 10;
 private static NIOConnectionPool self = null;
 private Hashtable<Integer, SocketChannel> socketPool = null; // 连接池
 private boolean[] socketStatusArray = null; // 连接的状态(true-被占用,false-空闲)
 private static Selector selector  = null;
 private static InetSocketAddress SERVER_ADDRESS = null;
 
 /**
  * 初始化连接池,最大TCP连接的数量为10
  *
  * @throws IOException
  */
 public static synchronized void init() throws Exception {
  self = new NIOConnectionPool();
  self.socketPool = new Hashtable<Integer, SocketChannel>();
  self.socketStatusArray = new boolean[CONNECTION_POOL_SIZE];
  buildConnectionPool();
 }

 /**
  * 建立连接池
  */
 public synchronized static void buildConnectionPool() throws Exception {
  if (self == null) {
   init();
  }
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   SocketChannel client = allocateSocketChannel();
   self.socketPool.put(new Integer(i), client);
   self.socketStatusArray[i] = false;
  }
 }

 /**
  * 从连接池中获取一个空闲的Socket
  *
  * @return 获取的TCP连接
  */
 public static SocketChannel getConnection() throws Exception {
  if (self == null)
   init();
  int i = 0;
  for (i = 0; i < CONNECTION_POOL_SIZE; i++) {
   if (!self.socketStatusArray[i]) {
    self.socketStatusArray[i] = true;
    break;
   }
  }
  if (i < CONNECTION_POOL_SIZE) {
   return self.socketPool.get(new Integer(i));
   
  } else {

  //目前连接池无可用连接时只是简单的新建一个连接
   SocketChannel newClient = allocateSocketChannel();
   CONNECTION_POOL_SIZE++;
   self.socketPool.put(CONNECTION_POOL_SIZE, newClient);
   return newClient;
  }
 }

 /**
  * 当获得的socket不可用时,重新获得一个空闲的socket。
  *
  * @param socket
  *            不可用的socket
  * @return 新得到的socket
  * @throws Exception
  */
 public static SocketChannel rebuildConnection(SocketChannel socket)
   throws Exception {
  if (self == null) {
   init();
  }
  SocketChannel newSocket = null;
  try {
   for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
    if (self.socketPool.get(new Integer(i)) == socket) {
     newSocket = allocateSocketChannel();
     self.socketPool.put(new Integer(i), newSocket);
     self.socketStatusArray[i] = true;
    }
   }

  } catch (Exception e) {
   System.out.println("重建连接失败!");
   throw new RuntimeException(e);
  }
  return newSocket;
 }


 /**
  * 将用完的socket放回池中,调整为空闲状态。此时连接并没有断开。
  *
  * @param socket
  *            使用完的socket
  * @throws Exception
  */
 public static void releaseConnection(SocketChannel socket) throws Exception {
  if (self == null) {
   init();
  }
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   if (self.socketPool.get(new Integer(i)) == socket) {
    self.socketStatusArray[i] = false;
    break;
   }
  }
 }

 /**
  * 断开池中所有连接
  *
  * @throws Exception
  */
 public synchronized static void releaseAllConnection() throws Exception {
  if (self == null)
   init();

  // 关闭所有连接
  SocketChannel socket = null;
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   socket = self.socketPool.get(new Integer(i));
   try {
    socket.close();
    self.socketStatusArray[i] = false;
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
   
 
 public static SocketChannel allocateSocketChannel(){
  
   SERVER_ADDRESS = new InetSocketAddress(  
                IP, PORT);  
  SocketChannel socketChannel = null;
     SocketChannel client = null;
     try{
     socketChannel = SocketChannel.open();  
        socketChannel.configureBlocking(false);  
        selector = Selector.open();  
        socketChannel.register(selector, SelectionKey.OP_CONNECT);  
        socketChannel.connect(SERVER_ADDRESS);
        Set<SelectionKey> selectionKeys;  
        Iterator<SelectionKey> iterator;  
        SelectionKey selectionKey;
        selector.select();  
        selectionKeys = selector.selectedKeys();  
        iterator = selectionKeys.iterator();  
        while (iterator.hasNext()) {  
            selectionKey = iterator.next();  
            if (selectionKey.isConnectable()) {  
                client = (SocketChannel) selectionKey.channel();  
                if (client.isConnectionPending()) {  
                    client.finishConnect();
                    client.register(selector, SelectionKey.OP_WRITE);  
                    break;
                }
            }
        }
 }catch(Exception e){
  e.printStackTrace();
 }
 return client;
  }

 public static Selector getSelector() {
  return selector;
 }

 

使用连接池进行通信:

 /*缓冲区大小*/ 
     private static int BLOCK = 8*4096;  
     /*发送数据缓冲区*/ 
     private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);  
   
     /*接受数据缓冲区*/
     private static ByteBuffer protocalNum = ByteBuffer.allocate(4);
     private static ByteBuffer functionNum = ByteBuffer.allocate(4);
     private static ByteBuffer messageLen = ByteBuffer.allocate(4);
     private static ByteBuffer receivebuffer = null;
     private  SocketChannel client = null;
     private Selector selector = null;
    
     private boolean readable = true;
     private boolean writable = true;
    
    
     public NIOSocketBackUp() throws Exception{
      client = NIOConnectionPool.getConnection();
      selector = NIOConnectionPool.getSelector();
     }
    
     public String send(ServiceRequest request) throws Exception {   
             
         Set<SelectionKey> selectionKeys;  
         Iterator<SelectionKey> iterator;  
         SelectionKey selectionKey;  
         int count=0;  
         boolean flag = true;
         String receiveText="";  
          while (flag) {  
             selector.select();  
             //返回此选择器的已选择键集。  
             selectionKeys = selector.selectedKeys();  
             iterator = selectionKeys.iterator();  
             while (iterator.hasNext()) {  
                 selectionKey = iterator.next();  
                 if (selectionKey.isWritable() && (writable)) {  
                         sendbuffer.clear();  
                         sendbuffer.put(request.getProtocalNum());
                         sendbuffer.put(request.getFunctionNum());
                         sendbuffer.put(request.getMessageLen());
                         sendbuffer.put(request.getXmlbytes());
                         sendbuffer.flip();  
                         client.write(sendbuffer);  
                         client.register(selector, SelectionKey.OP_READ);  
                         writable = false;
                 } else if (selectionKey.isReadable() && (readable) ) {  
                     protocalNum.clear();
                     functionNum.clear();
                     messageLen.clear();
                    
                    
                     count=client.read(protocalNum);  
                     count=client.read(functionNum);
                     count=client.read(messageLen);
                     messageLen.rewind();
                     int length = messageLen.asIntBuffer().get(0);
                     receivebuffer = ByteBuffer.allocate(length-12);
                     receivebuffer.clear();
                    
                     //read方式竟然不阻塞
                     int total=0;
                     while(total!=(length-12)){
                       count=client.read(receivebuffer);
                       total+=count;
                     }
                     client.register(selector, SelectionKey.OP_WRITE);  
                     receiveText = new String(receivebuffer.array(),"GBK");
                     flag = false;
                     readable = false;
                     break;
                 }
             }  
         }

      
      NIOConnectionPool.releaseConnection(client);
         return receiveText.trim();
     }  

 

 

 

 

2
1
分享到:
评论
1 楼 zhaoqiubo 2015-08-03  
我理解长连接就是链路要维持着,怎么能一次写入就关闭了呢?你可以再次写入啊,搞不懂为啥要用连接池。望不吝赐教。

相关推荐

    rewin.zwgtools.code.jar

    rewin.zwgtools.code....读写xml文档,图片管理,Ftp管理,可以和一个Ftp服务端建立连接实现文件的上传下载,TSF服务提供,两个数据库连接池管理,Socket连接池的管理,等等,皆在为开发人员提供高度集成的开发工具。

    XSocket的学习和总结

    XSocket的学习和总结 应用服务器网络应用网络协议.net编程 ... 检测问题,如低水平NIO选择编程,连接池管理,连接超时被封装的xSocket。 我从它的官网上面下载了两个JAR一个是其核心JAR包xSocket (core) 下载地址是:

    IO_deep_learning_notes.zip

    196 全手写基于Netty的RPC框架自定义协议,连接池 地址 198 全手写基于Netty的RPC框架 协议编解码问题 粘包拆包与内核关系 地址 203 全手写基于Netty的RPC框架 provider端简单dispatcher实现RPC调用全流程 地址 ...

    基于SpringMVC的一个web框架

    1.0.5 从web项目迁移成maven项目 1.0.6 增加菜单框架ext实现,类路径调整 1.0.7 ...1.0.8 socket工具类,权限组件,菜单组件,jdbc分页支持多种数据库,ant路径工具类,增加jquery easyUI ...使用数据库连接池druid dubbo使用

    基于Spring MVC的web框架 1.1.11

    使用数据库连接池druid dubbo使用 1.1.11 集成Spring Cache,FastJson Spring Cache增加redis缓存实现 Mybatis使用二级缓存,增加redis实现 增加reactJs 增加Mybatis插件pageHelper,Mapper doc内有相关文档

    一个可以直接运行的基于SpringMVC的web框架1.1.12

    使用数据库连接池druid dubbo使用 1.1.11 集成Spring Cache,FastJson Spring Cache增加redis缓存实现 Mybatis使用二级缓存,增加redis实现 增加reactJs 增加Mybatis插件pageHelper,Mapper 1.1.12 使用draft富文本编辑...

    可以直接运行的基于SpringMVC的web框架示例,也可以直接当公司框架

    使用数据库连接池druid dubbo使用 1.1.11 集成Spring Cache,FastJson Spring Cache增加redis缓存实现 Mybatis使用二级缓存,增加redis实现 增加reactJs 增加Mybatis插件pageHelper,Mapper 1.1.12 使用draft富文本编辑...

    cl-roadshow:通过代码总结具备路演的能力

    DES、MD5、AESXML/JSON:fastjson、XStreamLog4j:常用的xml配置、输出栈桢的用法Commons BeanUtils:General:比较器、org.json.JSONObject、Google Gson、Alibaba Fastjson、时间戳、IP地址解析Thrift:连接池,...

    精通并发与netty视频教程(2018)视频教程

    7_Netty的Socket编程详解 8_Netty多客户端连接与通信 9_Netty读写检测机制与长连接要素 10_Netty对WebSocket的支援 11_Netty实现服务器端与客户端的长连接通信 12_Google Protobuf详解 13_定义Protobuf文件及消息...

    精通并发与 netty 视频教程(2018)视频教程

    7_Netty的Socket编程详解 8_Netty多客户端连接与通信 9_Netty读写检测机制与长连接要素 10_Netty对WebSocket的支援 11_Netty实现服务器端与客户端的长连接通信 12_Google Protobuf详解 13_定义Protobuf文件及消息...

    精通并发与netty 无加密视频

    第7讲:Netty的Socket编程详解 第8讲:Netty多客户端连接与通信 第9讲:Netty读写检测机制与长连接要素 第10讲:Netty对WebSocket的支援 第11讲:Netty实现服务器端与客户端的长连接通信 第12讲:Google ...

    JAVA上百实例源码以及开源项目源代码

    简单聊天软件CS模式 2个目标文件 一个简单的CS模式的聊天软件,用socket实现,比较简单。 凯撒加密解密程序 1个目标文件 1、程序结构化,用函数分别实现 2、对文件的加密,解密输出到文件 利用随机函数抽取幸运数字 ...

    JAVA上百实例源码以及开源项目

    一个简单的CS模式的聊天软件,用socket实现,比较简单。 凯撒加密解密程序 1个目标文件 1、程序结构化,用函数分别实现 2、对文件的加密,解密输出到文件 利用随机函数抽取幸运数字 简单 EJB的真实世界模型(源代码...

    疯狂JAVA讲义

    第1章 Java概述 1 1.1 Java语言的发展简史 2 1.2 Java的竞争对手及各自优势 4 1.2.1 C#简介和优势 4 1.2.2 Ruby简介和优势 4 1.2.3 Python的简介和优势 5 ...学生提问:当我们使用编译C程序时,不仅需要指定存放...

    NettyChat:基于Netty + TCP + Protobuf实现的Android IM库,包含Protobuf序列化,TCP拆包与粘包,长连接握手认证,心跳机制,断线重连机制,消息重发机制,读写超时机制,离线消息,螺纹池等功能

    自定义IMSConnectStatusListener,实现IMSConnectStatusCallback,重现对应的方法监听ims连接状态4. 调用IMSClientInterface.init(Vector&lt;String&gt; serverUrlList, OnEventListener listener, ...

Global site tag (gtag.js) - Google Analytics