这节主要是分析数据包packet是如何被SessionManager和插件处理的 ,首先分析一些开发的理论知识先:
一、 Tigase服务器插件开发重要的是要了解它是如何工作的。不同类型的插件负责处理数据流中不同阶段的数据包(packet)。
在Tigase服务器中, 插件代码负责处理特定的XMPP节。一个单独的插件可能会负责处理<message>,另一个用于处理<presence>,和还有些插件是单独负责<iq>处理,还有些处理不同版本等等。
一个插件应该提供它感兴趣信息,如哪些XML元素节点,和叫什么名,它具有什么样xmlns名称空间等,这将使得插件在sm中匹配到相对应的packet,从而可以对匹配的packet进行处理。所以你可以创建一个插件来处理包含有感兴趣的caps子节点的packet。也可能都没有一个插件处理某个特定xml节,然而系统会默认的行为是简单转发这个xml节到目的地址。也可能有一个以上的插件处理一个特定的XML元素,然后他们在单独的线程上同时处理同一xml节,所以不保证的不同的插件能按顺序处理xml节。
//一个插件要处理哪些xml元素,是通过以下两个方法来告知系统他将处理那些它感兴趣的xml元素的。 @Override public String[][] supElementNamePaths() { return ELEMENTS; //感兴趣的xml元素。如<message> <iq> } @Override public String[] supNamespaces() { return XMLNSS; //supElementNamePaths中一一对应的xmlns }
每个packet通过SessionManager组件时被处理的几个步骤:
照片显示,每个packet 通过SessionManager处理的4个步骤:
1,Pre-processing
为了不影响Session Manager的性能需要限制该方法处理时间为极小值,用于判断当前package是否应该被阻塞,如果返回为true,则表示阻塞。(只要有一个pre-processor阻塞就算阻塞)
2,Processing
如果一个Package没有被任何的pre-processors阻塞,则才继续执行该方法。所有对当前XML段感兴趣的processor都会将该段加入到独立的线程里运行,这些现成使用内部固定的队列。当所有感兴趣的processor都执行完后就可以得到通知进入下一步。
3,post-processor
对于在第2步中没有被任何processor处理的package将会通过所有的post-processors,并被最后一个post-processor转发到一个目的地,大多数情况是以<message/>的形式被转发。例如MessageAmp.postProcess(..),它判断用户不在线时,massage信息将被保存入数据库。
4,filter
对于以上三步任何形式的结果result输出,都会被所有的filters拦截过滤,这些结果可能最终被拦截也可能被放行。
由于session manager和processors都属于消费者,所以在所有的processors中应该至少有一个processor复制一个新package并发送给某个目标。当然processor处理过程中可以生成任意数量的packet。以上4个步骤任何一个processor都可以生成packet作为结果返出。
看看以下的图片:
如果UserA发送packet-P1到服务器以外的其它节点,例如另一个服务器的用户或一些组件(Muc,PubSub),那么其中一个processor必须新创建一个副本P2的packet并正确地设置所有属性和目的地地址。packet-P1 在被SessionManager处理后消毁了,然而某一个插件生成了一个新的packet-P2。
当然同理,packet从组件转发给用户:
packet的处理组件或插件,必须生成新packet的副本提供给用户。当然,如果没有被任何插件处理的package将会通过系统默认方式转发到一个目的地,注意实现这种方式,因为输入数据包P1可以被许多插件同时处理,因此实际上packet一旦到了SessionManager进行处理应该不得变它。
很明显,下图的处理流程是当用户发送请求到服务器和期望服务器的响应:
这里有一个令人惊讶的设计结果。如果你看看下图显示2个用户之间的交流,可以看到packet送到最终目的地前复制了两次:
上图方式的packet必须被Session Manager处理两次。处理第一次代表packet作为一个即将离开用户A的包和第二次处理它代表传入用户B的packet。 这是确保用户A有权限发送一个packet,确保用户B有权利接收一个packet。另外,如果用户B是离线或脱机的,<message> processors应该把packet信息保存到一个数据库。
二、当需要编写一个自己的PLUGIN的时候根据以上SM分析可以知道 ,
可以按需求来分别实现以下四个接口,可以实现一个也可以实现多个:
1,XMPPPreprocessorIfc
2,XMPPProcessorIfc
3,XMPPPostprocessorIfc
4,XMPPPacketFilterIfc
这四个接口各需要实现一个简单的方法,每个方法的参数类似,参数描述如下:
-Packet packet 需要被处理的PACKET,该PACKET不能为NULL虽然在PROCESS处理过程中无法修改它
-XMPPResourceConnection session 用于保存所有用户的数据,它提供权限访问用户的仓库数据,在没有在线用户SESSION的情况下该参数可以为NULL
-NonAuthUserRepository repo 该参数往往在参数session为NULL的时候被使用,它用于为不在线的用户保存私有或公开的数据信息。
-Queue<Packet> results 这个为输入packet的处理结果产生的packet集合,它总被要求一定要存放一个输入数据包packet的副本到里面,其实包含了所有需要进一步处理的packet,包括process生成的结果packet。
-Map<String, Object> settings 为PLUGIN制定配置信息,一般情况下不需要使用,然而如果需要访问额外的数据库则可以通过配置文件将数据库连接字符串传给plugin
以下就是具体的代码分析了:
SessionManager
public void SessionManager.processPacket(final Packet packet) { if (packet.isCommand() && processCommand(packet)) { packet.processedBy("SessionManager"); } // end of if (pc.isCommand()) //这个方法查找相对应的connection,这方法很重要,from-to ,to-from思想。后面详细分析 XMPPResourceConnection conn = getXMPPResourceConnection(packet); if ((conn == null) && (isBrokenPacket(packet)) || processAdminsOrDomains(packet)) { return; } processPacket(packet, conn); }
protected XMPPResourceConnection getXMPPResourceConnection(Packet p) { XMPPResourceConnection conn = null; //首先先根据发用户发送的packet信息来找连接session,这样的设计正如前面理论中所指出的,发用户是否有权利发packet,发用户A的packet到了SM这时处理为一个阶段,这个packet在这一阶段处理完成就会被消毁了,经过processor处理会生成该packet的副本继续往后面流转 JID from = p.getPacketFrom(); if (from != null) { conn = connectionsByFrom.get(from); if (conn != null) { return conn; } } // It might be a message _to_ some user on this server // so let's look for established session for this user... //如果packet没有含有发用户接连信息则查询接收目标用户连接session,这为下一阶段-这是经过某一个processor处理后转发出来的packet,也就是说packet由系统转发到目的地阶段,判断目的地 接收用户B 是否在线是否有连接在在,如果在线则packet再次经过processor就会转发到用户B了,如果不在线,则最终会被保存入库。 JID to = p.getStanzaTo(); if (to != null) { conn = getResourceConnection(to); } else { } // end of else return conn; } public XMPPResourceConnection getResourceConnection(JID jid) { XMPPSession session = getSession(jid.getBareJID()); if (session != null) { XMPPResourceConnection res = session.getResourceConnection(jid); } return res; } // end of if (session != null) // Maybe this is a call for the server session? if (isLocalDomain(jid.toString(), false)) { return smResourceConnection; } return null; }
protected void processPacket(Packet packet, XMPPResourceConnection conn) { long startTime = System.currentTimeMillis(); int idx = tIdx; tIdx = (tIdx + 1) % maxIdx; packet.setPacketTo(getComponentId()); Queue<Packet> results = new ArrayDeque<Packet>(2); boolean stop = false; if (!stop) { if (defPacketHandler.preprocess(packet, conn, naUserRepository, results)) { packet.processedBy("filter-foward"); addOutPackets(packet, conn, results); return; } } if (!stop) { for (XMPPPreprocessorIfc preproc : preProcessors.values()) { stop |= preproc.preProcess(packet, conn, naUserRepository, results, plugin_config .get(preproc.id())); if (stop && log.isLoggable(Level.FINEST)) { break; } } // end of for (XMPPPreprocessorIfc preproc: preProcessors) } // prepTm = System.currentTimeMillis() - startTime; if (!stop) { if (defPacketHandler.forward(packet, conn, naUserRepository, results)) { packet.processedBy("filter-foward"); addOutPackets(packet, conn, results); return; } } // defForwTm = System.currentTimeMillis() - startTime; if (!stop) { walk(packet, conn); try { if ((conn != null) && conn.getConnectionId().equals(packet.getPacketFrom())) { handleLocalPacket(packet, conn); } } catch (NoConnectionIdException ex) { } } if (!stop) { for (XMPPPostprocessorIfc postproc : postProcessors.values()) { String plug_id = postproc.id(); long[] postProcTime = null; synchronized (postTimes) { postProcTime = postTimes.get(plug_id); if (postProcTime == null) { postProcTime = new long[maxIdx]; postTimes.put(plug_id, postProcTime); } } long stTime = System.currentTimeMillis(); postproc.postProcess(packet, conn, naUserRepository, results, plugin_config.get(postproc.id())); postProcTime[idx] = System.currentTimeMillis() - stTime; } // end of for (XMPPPostprocessorIfc postproc: postProcessors) } // end of if (!stop) // postTm = System.currentTimeMillis() - startTime; if (!stop &&!packet.wasProcessed() && ((packet.getStanzaTo() == null) || (!isLocalDomain(packet.getStanzaTo().toString())))) { if (defPacketHandler.canHandle(packet, conn)) { ProcessingThreads<ProcessorWorkerThread> pt = workerThreads.get( defHandlerProc.id()); if (pt == null) { pt = workerThreads.get(defPluginsThreadsPool); } pt.addItem(defHandlerProc, packet, conn); packet.processedBy(defHandlerProc.id()); } } setPermissions(conn, results); addOutPackets(packet, conn, results); if (packet.wasProcessed() || processAdminsOrDomains(packet)) { Packet error = null; if (stop || ((conn == null) && (packet.getStanzaFrom() != null) && (packet .getStanzaTo() != null) &&!packet.getStanzaTo().equals(getComponentId()) && ((packet.getElemName() == Iq.ELEM_NAME) || (packet.getElemName() == Message .ELEM_NAME)))) { try { error = Authorization.SERVICE_UNAVAILABLE.getResponseMessage(packet, "Service not available.", true); } catch (PacketErrorTypeException e) { } // end of else }
private void walk(final Packet packet, final XMPPResourceConnection connection) { for (XMPPProcessorIfc proc_t : processors.values()) { XMPPProcessorIfc processor = proc_t; Authorization result = processor.canHandle(packet, connection); if (result == Authorization.AUTHORIZED) { ProcessingThreads<ProcessorWorkerThread> pt = workerThreads.get(processor.id()); //获取processor的处理线程,如果预先没有放入,则使用默认的处理线程集处理 if (pt == null) { pt = workerThreads.get(defPluginsThreadsPool); } //加入待处理队列中,则用单独线程去执行process()这样可以提高效率, //而不是用sessionmanager线程执行 if (pt.addItem(processor, packet, connection)) { packet.processedBy(processor.id()); } else { if (result != null) { // TODO: A plugin returned an error, the packet should be bounced back // with an appropriate error } } // end of for () }
相关推荐
tigase相关jar包
高度优化,高度模块化且非常灵活的XMPP / Jabber服务器 这是什么 Tigase XMPP服务器是... -SOCKS5字节流: -用于Tigase的组件 -该组件基于JDK内置HTTP服务器,提供易于使用的HTTP端点进行服务器管理和集成。 -高性
该资源是整合了tigase的java服务端源代码,环境为:idea + gradle + postgresql 注意,这部分项目只包括java源代码,而数据库备份将在下一个资源打包上传,有疑问请阅读相关博文: ...
Tigase开源项目,使用java编写,是个标准的Jabber(XMPP)协议服务端项目,用户数,均衡,符合要求。主页http://www.tigase.org/ 除了tigase开源项目还有: Openfire (Wildfire) 3.x(http://www.igniterealtime.org/)...
描述了如何部署tigase http-api模块,此方式为源码部署
Tigase Swift XMPP客户端库这是什么Tigase Swift XMPP客户端库是用编程语言编写的客户端库。 它提供了XMPP标准核心的实现并处理XML。 此外,它还提供了对许多流行扩展(XEP)的支持。 该存储库包含该库的源文件。...
QuickBlox-Tigase-CustomFeatures 对于QuickBlox自定义功能的列表 tigase服务器: QBAuth-AuthRepository的自定义实现 CustomObjects插件-将聊天消息保存到QuickBlox CustomObjects模块 LastRequestAtPlugin-在...
tigase-server-tigase-server-8.0.0.zip 源码,不知道怎么设置不用积分下载,不还意思。。。。。。。。。
Tigase 概述,描述了1、为什么选择Tigase 2、RFC的实现 3、Tigase实现的XMPP扩展协议等
tigase-local
tigase-server 配置相关内容 https://blog.csdn.net/w690333243/article/details/90550837
全面:tigase 完全实现了XMPP协议,除了全面实施的两个核心协议,它支持大多数的你可能永远都需要的扩展协议。 开源:Tigase是开源的,如果你有有那能力,你可以定制自己的XMPPServer,虽然经过了很多次此时,但是...
Tigase Server 是一个轻量级的可伸缩的 Jabber/XMPP 服务器。无需其他第三方库支持,可以处理非常高的复杂和大量的用户数,可以根据需要进行水平扩展。
Spark连接Tigase服务器,完整的步骤,很清晰的看到。大家可以参考。
tigase 集群设置,已实践测试过,本次测试 以两台机器测试的。
Tigase XMPP 服务器 Docker 映像 安装了 Tigase XMPP 服务器 (5.2.3) 的 Docker 映像用于评估目的。 请勿在生产环境中使用。 为帐户注册和配置存储设置了非持久性 Derby 数据库。 在此设置中创建了一个不存在的...
tigase 内部处理流程 详解,适合初学者参考。
tigase 7.10 mongodb 3 配置
Tigase学习笔记整理,主要分三部分组成: 1、组件(component,tigase的核心) 2、Plugin(插件,被sessionManager组件和C2S组件加载) 3、连接器(认证连接器和用户数据连接器)
tigase-utils-3.4.4.jar(Tigase相关客户端,java语言需要用到的工具类jar包,希望大家喜欢) 正好下载到,同步发出来给更多需要的人吧