一、 ConnectionOpenThread 使用单例模式,他是负责建立服务端SOCKET和 接收连接客户端socket 线程。
在初始化ConnectionManager的时候ConnectionManager.connectThread 属性所引用的服务端SOCKET连接线程ConnectionOpenThread 就被初始化了
ConnectionManager:: private static ConnectionOpenThread connectThread = ConnectionOpenThread.getInstance();
ConnectionOpenThread .getInstance()的实现
private Selector selector= null; public static ConnectionOpenThread getInstance() { if (acceptThread == null) { acceptThread = new ConnectionOpenThread(); Thread thrd = new Thread(acceptThread); thrd.setName("ConnectionOpenThread"); thrd.start(); //启动ConnectionOpenThread线程,则this.run()方法将被被执行 if (log.isLoggable(Level.FINER)) { log.finer("ConnectionOpenThread started."); } } // end of if (acceptThread == null) return acceptThread; } private ConnectionOpenThread() { ....... try { selector = Selector.open();//得到一个选择器,可以去了解下nio api } catch (Exception e) { log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e); stopping = true; } // end of try-catch }
ConnectionOpenThread .run()的实现
在该方法中,selector管理的都是服务端SOCKET
public void run() { while (!stopping) { try { selector.select(); //此方法为阻塞方法,当选择器管理channel(也就是向selector注册的channel) 中发生读、写或异常事件时,select()将会被触发会往下执行 // Set<SelectionKey> selected_keys = selector.selectedKeys(); // for (SelectionKey sk : selected_keys) { //返回已此通道已准备就绪的键集,已选择始终是键集的一个子集。 //begin iterator for (Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) { SelectionKey sk = (SelectionKey) i.next(); i.remove(); SocketChannel sc = null; boolean throttled = false; int port_no = 0; if ((sk.readyOps() & SelectionKey.OP_ACCEPT) != 0) { //在此是否为被动SOCKET也就是服务端SOCKET,是则接受客户端socket ServerSocketChannel nextReady = (ServerSocketChannel) sk.channel(); port_no = nextReady.socket().getLocalPort(); sc = nextReady.accept();//得到一个客户端SOCKET ... } // end of if (sk.readyOps() & SelectionKey.OP_ACCEPT) if ((sk.readyOps() & SelectionKey.OP_CONNECT) != 0) { sk.cancel(); // 从Selector中删除指定的SelectionKey //所以这个普通的conect socket只会处理一次侦听到的发生事件 sc = (SocketChannel) sk.channel();//得到connect SOCKET } // end of if (sk.readyOps() & SelectionKey.OP_ACCEPT) if (sc != null) { //设置接收到的SOCKET的一些信息 try { sc.configureBlocking(false);//将客户端通道设置为非阻塞 sc.socket().setSoLinger(false, 0); sc.socket().setReuseAddress(true); //每个ServerSocketChannel在创建注册到selector 时就被绑定了一个ConnectionOpenListener对象, 用这个对象来处理该接受到的socket,该注册过程 在addAllWaiting()中进行 ConnectionOpenListener al = (ConnectionOpenListener) sk.attachment(); sc.socket().setTrafficClass(al.getTrafficClass()); sc.socket().setReceiveBufferSize(al.getReceiveBufferSize()); al.accept(sc);//此方法 为建立连接socket的进行后续处理的设定 } catch (java.net.SocketException e) { ConnectionOpenListener al = (ConnectionOpenListener) sk.attachment(); al.accept(sc); } } else { log.log(Level.INFO, "Can not obtain socket channel from selection key, throttling activated = {0}, for port: {1}", new Object[] { throttled, port_no }); } // end of if (sc != null) else ++accept_counter; } //end of iterator addAllWaiting();//加载要注册到selector中的ServerSocketChannel或connect socket } catch (IOException e) { log.log(Level.SEVERE, "Server I/O error.", e); // stopping = true; } // end of catch catch (Exception e) { log.log(Level.SEVERE, "Other service exception.", e); // stopping = true; } // end of catch } }
在说解到addAllWaiting();加载要注册到selector中的ServerSocketChannel时,先看下源码:
在waiting队列中如果有等待处理的ConnectionOpenListener对象,则创建一个对应的ServerSocketChannel
private void addAllWaiting() throws IOException { ConnectionOpenListener al = null; while ((al = waiting.poll()) != null) { try { addPort(al);//绑定相关的端口进行监听 } catch (Exception e) { log.log(Level.WARNING, "Error: creating connection for: " + al, e); al.accept(null); } // end of try-catch } // end of for () } private void addPort(ConnectionOpenListener al) throws IOException { if ((al.getConnectionType() == ConnectionType.connect) && (al.getRemoteAddress() != null)) { addISA(al.getRemoteAddress(), al); } else if ((al.getIfcs() == null) || (al.getIfcs().length == 0) || al.getIfcs()[0] .equals("ifc") || al.getIfcs()[0].equals("*")) { addISA(new InetSocketAddress(al.getPort()), al);//绑定到InetSocketAddress进行监听服务 } else { for (String ifc : al.getIfcs()) { addISA(new InetSocketAddress(ifc, al.getPort()), al); } // end of for () } // end of if (ip == null || ip.equals("")) else } //addISA(..)这才是真正创建ServerSocketChannel方法,绑定到服务器某一个端口上进行监听服务, //开启了服务端socket private void addISA(InetSocketAddress isa, ConnectionOpenListener al)throws IOException { switch (al.getConnectionType()) { case accept : ... ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().setReceiveBufferSize(al.getReceiveBufferSize()); ssc.configureBlocking(false);//服务端socket也是非阻塞方法 ssc.socket().bind(isa, (int) (port_throttling)); //绑定到相关地址的某一个端口上 ssc.register(selector, SelectionKey.OP_ACCEPT, al);//注册服务端socket到selector中, 并且附带绑定一个ConnectionOpenListener对象,该对象为服务端socket接收到新来的socket 进行后续处理。所以selector能监听这些已注册socket的事件发生 break; case connect : ... //服务器socket之间要进行通讯,则先要连接 SocketChannel sc = SocketChannel.open(); sc.socket().setReceiveBufferSize(al.getReceiveBufferSize()); sc.socket().setTrafficClass(al.getTrafficClass()); sc.configureBlocking(false); sc.connect(isa); sc.register(selector, SelectionKey.OP_CONNECT, al); //在此也注册到ConnectionOpenThread.selector中 break; default : .. break; } }
二、从以上addAllWaiting();分析中看到处理的都是waiting队列里的ConnectionOpenListener对象,那这个ConnectionOpenListener对象是什么时候就会被放到waiting队列的呢,这得从ConnectionManager.initializationCompleted()中说起,在启动章节中分析到MessageRouter.setProperties(map)负责加载了其它的组件最后对每一个组件都执行了初始化完成动作。从而ConnectionManager.initializationCompleted()将会被执行
MessageRouter::
for (ServerComponent comp : components.values()) {
comp.initializationCompleted();
}
ConnectionManager.initializationCompleted()源码如下
public void initializationCompleted() { if (isInitializationComplete()) { // Do we really need to do this again? return; } super.initializationCompleted(); initializationCompleted = true; //加载组件中的服务配置 for (Map<String, Object> params : waitingTasks) { //启动一个定时任务,设置准备加入ConnectionOpenThread.waiting的ConnectionListener对象 reconnectService(params, connectionDelay); } waitingTasks.clear(); if ( null != watchdog ){ watchdog.start(); } }
相关推荐
tigase相关jar包
该资源是整合了tigase的java服务端源代码,环境为:idea + gradle + postgresql 注意,这部分项目只包括java源代码,而数据库备份将在下一个资源打包上传,有疑问请阅读相关博文: ...
Tigase XMPP服务器是高度优化,高度模块化且非常灵活的用Java编写的XMPP / Jabber服务器。 该存储库包含Tigase XMPP服务器主要部分的源代码。 该项目自2004年成立以来,我们最近已将其移至GitHub。 与XMPP相关的...
Tigase开源项目,使用java编写,是个标准的Jabber(XMPP)协议服务端项目,用户数,均衡,符合要求。主页http://www.tigase.org/ 除了tigase开源项目还有: Openfire (Wildfire) 3.x(http://www.igniterealtime.org/)...
tigase 内部处理流程 详解,适合初学者参考。
描述了如何部署tigase http-api模块,此方式为源码部署
Tigase Swift XMPP客户端库这是什么Tigase Swift XMPP客户端库是用编程语言编写的客户端库。 它提供了XMPP标准核心的实现并处理XML。 此外,它还提供了对许多流行扩展(XEP)的支持。 该存储库包含该库的源文件。...
QuickBlox-Tigase-CustomFeatures 对于QuickBlox自定义功能的列表 tigase服务器: QBAuth-AuthRepository的自定义实现 CustomObjects插件-将聊天消息保存到QuickBlox CustomObjects模块 LastRequestAtPlugin-在...
这部分是tigase整合项目的数据库部分。用的是postgresql 额。。。mysql数据库用的人多,这里不不整理了。 具体用法或者疑问请访问: https://blog.csdn.net/cdnight/article/details/85222028
Tigase 概述,描述了1、为什么选择Tigase 2、RFC的实现 3、Tigase实现的XMPP扩展协议等
tigase-server-tigase-server-8.0.0.zip 源码,不知道怎么设置不用积分下载,不还意思。。。。。。。。。
tigase-local
性能和效率:单cpu 单用户连接,只需要10kb的内存就可以处理10000个数据包,安装Tigase的时候已经知道,Tigase支持单机50W并发,另外,Tigase还可以部署在只有10M内存的机器上。Tigase支持虚拟域,单服务器可以安装...
Tigase Server 是一个轻量级的可伸缩的 Jabber/XMPP 服务器。无需其他第三方库支持,可以处理非常高的复杂和大量的用户数,可以根据需要进行水平扩展。
tigase-server 配置相关内容 https://blog.csdn.net/w690333243/article/details/90550837
Tigase XMPP 服务器 Docker 映像 安装了 Tigase XMPP 服务器 (5.2.3) 的 Docker 映像用于评估目的。 请勿在生产环境中使用。 为帐户注册和配置存储设置了非持久性 Derby 数据库。 在此设置中创建了一个不存在的...
Spark连接Tigase服务器,完整的步骤,很清晰的看到。大家可以参考。
tigase 集群设置,已实践测试过,本次测试 以两台机器测试的。
tigase跨服器发送消息流程日志,120服务器的账号发送消息给130服务器的账号
tigase 7.10 mongodb 3 配置