喜欢研究IM技术的同学们,欢迎加入群 310790965 一起来学习,有什么问题可以一起讨论。
本源码是基于tigase v7.0.0来分析的。
启动类XMPPServer.main(..)为入口,tigase是基于组件的xmpp服务器,在XMPPServer.start(),首先会加载两个组件分别为tigase.conf.Configurator,tigase.server.MessageRouter,源码如下
String config_class_name = System.getProperty(CONFIGURATOR_PROP_KEY,DEF_CONFIGURATOR );
config = (ConfiguratorAbstract) Class.forName( config_class_name ).newInstance();
String mr_class_name = config.getMessageRouterClassName();
router = (MessageRouterIfc) Class.forName( mr_class_name ).newInstance();
下图为Configurator继承结构,他们都实现了ServerComponent这个基本主键接口,由此可见他们都是属于组件的实现
接下来,就是由这两个组件一起来启动服务器程序,关键代码如下
router.setName( serverName );
router.setConfig( config );//初始化配置,加载服务所需要的其它组件,还包括启动服务端socket,进行监听等待请求到来。
router.start();//启动处理消息,路由功能
主要任务就是加载服务所需要的其它组件,启动服务端socket,把组件按名字和对应的类型绑定到相关的变量里,为后面xmpp packet路由做准备;
1.1 MessageRouter.setConfig( config );
MessageRouter::
private ConfiguratorAbstract config = null;
private Map<String, ServerComponent> components= new ConcurrentHashMap<>();//关联的组件
private ConcurrentHashMap<String, ComponentRegistrator> registrators = new ConcurrentHashMap<>();//注册的组件
public void setConfig(ConfiguratorAbstract config) throws ConfigurationException {
components.put(getName(), this);//将自己添加到自己的components成员变量
this.config = config;//将config赋值给自己的成员变量
addRegistrator(config);//config注册到自己的成员变量registrators
}
相互注册组件到各自组件的成员变量components中。
1.2 MessageRouter.addRegistrator(ComponentRegistrator registr);
上文提到启动是从加载Configurator和MessageRouter开始的,然而Configurator属于ComponentRegistrator组件,所以Configurator被放到了registrators中,其它的组件在加载过程中是都是由registrators中包含的ComponentRegistrator对象来启动的。可以知道MessageRouter负责加载其它相关的组件,而Configurator间接的启动了这些组件,从而完成系统的启动工作。
public void addRegistrator(ComponentRegistrator registr) throws ConfigurationException {
registrators.put(registr.getName(), registr);//1.registr注册到MessageRouter的registrators
addComponent(registr);//2.registr注册到MessageRouter的components和components_byId,同中也把this所指的组件,分别注册到registrators所包含ComponentRegistrator对象的components属性中,由此启动this组件进行服务。
//启动this组件中,所关联components的其它组件
for (ServerComponent comp : components.values()) {
// if (comp != registr) {
registr.addComponent(comp);
//3.把this组件,加入到ComponentRegistrator注册组件的components之中 //,并且启该组件进行提供服务
// } // end of if (comp != registr)
} // end of for (ServerComponent comp : components)
}
把关联的组件相互加入到各自对象的components里管理
1.2 MessageRouter.addComponent(ServerComponent component);
public void addComponent(ServerComponent component) throws ConfigurationException {
log.log(Level.INFO, "Adding component: ", component.getClass().getSimpleName());
for (ComponentRegistrator registr : registrators.values()) {
if (registr != component) {
if (log.isLoggable(Level.FINER)) {
log.log(Level.FINER, "Adding: {0} component to {1} registrator.",
new Object[] { component.getName(),
registr.getName() });
}
registr.addComponent(component);
//把component组件,加入到ComponentRegistrator注册组件的components之中,
//在此补充,如果registr为Configurator,则Configurator.addComponent()专门负责启 //动component组件,如果registr为VHostManager 或StatisticsCollector,则只是负责该对象自 // 身相关属性的设置
} // end of if (reg != component)
} // end of for ()
components.put(component.getName(), component);
components_byId.put(component.getComponentId(), component);
if (component instanceof XMPPService) {
xmppServices.put(component.getName(), (XMPPService) component);
}
}
注册组件的继承结构图,由图可见Configurator也是注册组件成员
ComponentRegistrator 组成
public interface ComponentRegistrator extends ServerComponent {
boolean addComponent(ServerComponent component) throws ConfigurationException;
boolean deleteComponent(ServerComponent component);
}
ComponentRegistrator. addComponent(ServerComponent component),主要是由 AbstractComponentRegistrator来实现
public abstract class AbstractComponentRegistrator<E extends ServerComponent>
extends BasicComponent implements ComponentRegistrator {
protected Map<String, E> components = new LinkedHashMap<String, E>();
public abstract void componentAdded(E component) throws ConfigurationException;//由子类实现
public boolean addComponent(ServerComponent component) throws ConfigurationException {
if (isCorrectType(component)) {
components.put(component.getName(), (E) component);//加入ComponentRegistrator对象的components属性中
componentAdded((E) component);//执行子类ConfiguratorAbstract.componentAdded(component);进行组件设置,其实这就是组件启动的重要入口了
return true;
} else {
return false;
}
}
}
ConfiguratorAbstract.componentAdded(component),组件启动开始进行服务的入口
public void componentAdded(Configurable component) throws ConfigurationException {
if (log.isLoggable(Level.CONFIG)) {
log.log(Level.CONFIG, " component: {0}", component.getName());
}
setup(component);//启动组件进行服务
}
2. ConfiguratorAbstract.setup(component)启动组件,以下代码为提取重要的来说
public void setup(Configurable component) throws ConfigurationException {
Map<String, Object> defs = component.getDefaults(getDefConfigParams());//2.1说明
Map<String, Object> prop = null;
try {
prop = configRepo.getProperties(compId);//每个组件的启动配置信息
}
...............
prop.put(RepositoryFactory.SHARED_USER_REPO_PROP_KEY, user_repo_impl);
prop.put(RepositoryFactory.SHARED_USER_REPO_PARAMS_PROP_KEY, user_repo_params);
prop.put(RepositoryFactory.SHARED_AUTH_REPO_PROP_KEY, auth_repo_impl);
prop.put(RepositoryFactory.SHARED_AUTH_REPO_PARAMS_PROP_KEY, auth_repo_params);
component.setProperties(prop);//执行具体的组件的setProperties方法进行初始化来启动组件
}
2.1 getDefConfigParams()
通过[BasicComponent和ConfigaratorAbstract].getDefaults 获取初始化默认参数列表 ,添加感兴趣的属性。
MessageRouter与registr在执行getDefaults不同的是,他还有获取AbstractMessageReceive的成员变量信息。添加了队列的相关信息。
再通过 MessageRouterConfig.getDefaults(defs, params, getName())设置MessageRouter配置信息。
2.2 MessageRouter.setProperties 进行组件初始化,下图都是有setProperties ()方法的组件图
3 MessageRouter.setProperties(),本方法是MessageRouter负责加载其它组件的重要入口。其它组件的清单如图,
图中MessageRouter负责加载两组组件分别为MessageReceiver和ComponentRegistrator组件
MessageReceiver继承结构
ComponentRegistrator的继承结构
MessageRouter.setProperties()方法加载其它组件分析
public void setProperties(Map<String, Object> props) throws ConfigurationException {
if (inProperties) {
return;
} else {
inProperties = true;
} // end of if (inProperties) else
connectionManagerNames.add("c2s");
connectionManagerNames.add("bosh");
connectionManagerNames.add("s2s");
if (props.get(DISCO_SHOW_VERSION_PROP_KEY) != null) {
disco_show_version = (Boolean) props.get(DISCO_SHOW_VERSION_PROP_KEY);
}
if (props.get(DISCO_NAME_PROP_KEY) != null) {
disco_name = (String) props.get(DISCO_NAME_PROP_KEY);
}
try {
super.setProperties(props);
updateServiceDiscoveryItem(getName(), null, getDiscoDescription(), "server", "im",
false);
if (props.size() == 1) {
return;
}
HashSet<String> inactive_msgrec = new HashSet<String>(receivers.keySet());
MessageRouterConfig conf= new MessageRouterConfig(props);
String[] reg_names= conf.getRegistrNames();//根据配置信息得到要加载ComponentRegistrator
for (String name : reg_names) {
inactive_msgrec.remove(name);
// First remove it and later, add it again if still active
ComponentRegistrator cr = registrators.remove(name);
String cls_name = (String) props.get(REGISTRATOR_PROP_KEY + name +".class");
try {
if ((cr == null) ||!cr.getClass().getName().equals(cls_name)) {
if (cr != null) {
cr.release();
}
cr = conf.getRegistrInstance(name);
cr.setName(name);
} // end of if (cr == null)
addRegistrator(cr);//其它组件也进行注册并启动入口
} ...
} // end of for (String name: reg_names)
String[] msgrcv_names = conf.getMsgRcvActiveNames();//根据配置信息得到要加载MessageReceiver组件
for (String name : msgrcv_names) {
if (log.isLoggable(Level.FINER)) {
log.log(Level.FINER, "Loading and registering message receiver: {0}", name);
}
inactive_msgrec.remove(name);
// First remove it and later, add it again if still active
ServerComponent mr = receivers.remove(name);
xmppServices.remove(name);
String cls_name = (String) props.get(MSG_RECEIVERS_PROP_KEY + name + ".class");
try {
// boolean start = false;
if ((mr == null) || ((mr != null) &&!conf.componentClassEquals(cls_name, mr
.getClass()))) {
if (mr != null) {
if (mr instanceof MessageReceiver) {
removeRouter((MessageReceiver) mr);
} else {
removeComponent(mr);
}
mr.release();
}
mr = conf.getMsgRcvInstance(name);
mr.setName(name);
if (mr instanceof MessageReceiver) {
((MessageReceiver) mr).setParent(this);//把MessageRouter设置到被加载的组件中
((MessageReceiver) mr).start();
}
} // end of if (cr == null)
//如果是messagerReceiver添加到组件的路由,否则加入到MessageRouter.components,并注册到MessageRouter.registrators的每一个对象中,由此启动该组件
if (mr instanceof MessageReceiver) {
addRouter((MessageReceiver) mr);
} else {
addComponent(mr);
}
System.out.println("Loading component: " + mr.getComponentInfo());
} // end of try
catch (ClassNotFoundException e) {
} // end of try
// String[] inactive_msgrec = conf.getMsgRcvInactiveNames();
for (String name : inactive_msgrec) {
ServerComponent mr = receivers.remove(name);
xmppServices.remove(name);
if (mr != null) {
try {
if (mr instanceof MessageReceiver) {
removeRouter((MessageReceiver) mr);
} else {
removeComponent(mr);
}
mr.release();
} catch (Exception ex) {
log.log(Level.WARNING, "exception releasing component:", ex);
}
}
}
// for (MessageReceiver mr : tmp_rec.values()) {
// mr.release();
// } // end of for ()
//
// tmp_rec.clear();
if ((Boolean) props.get(UPDATES_CHECKING_PROP_KEY)) {
installUpdatesChecker((Long) props.get(UPDATES_CHECKING_INTERVAL_PROP_KEY));
} else {
log.log(Level.INFO, "Disabling updates checker.");
stopUpdatesChecker();
}
} finally {
inProperties = false;
} // end of try-finally
//这是MessageRouter.components中每一个组件执行初始化完成工作。
for (ServerComponent comp : components.values()) {
log.log(Level.INFO, "Initialization completed notification to: {0}", comp
.getName());
comp.initializationCompleted();//完成初始化后进行相关的完善工作,
//以ConnectionManager.initializationCompleted()为代表实现来分析,该方法把初始化得到的配置信息加入相关的处理队列中,来启动服务端SOCKET监听事件操作等。
}
// log.info("Initialization completed notification to: " +
// config.getName());
// config.initializationCompleted();
}
public void addRouter(MessageReceiver receiver) throws ConfigurationException {
log.info("Adding receiver: " + receiver.getClass().getSimpleName());
addComponent(receiver);//可以参照前文所说
receivers.put(receiver.getName(), receiver);
}
ConnectionManager.initializationCompleted()组件初始化完成,执行定时任务注册监听事件到服务器端SOCKET线程,等待接受处理到来socket
private LinkedList<Map<String, Object>> waitingTasks = new LinkedList<Map<String,Object>>();
public void initializationCompleted() {
if (isInitializationComplete()) {
// Do we really need to do this again?
return;
}
super.initializationCompleted();
initializationCompleted = true;
for (Map<String, Object> params : waitingTasks) {
reconnectService(params, connectionDelay);
}
waitingTasks.clear();
if ( null != watchdog ){
watchdog.start();
}
}
总结:其实上面的可以归纳为,MessageRouter负责加载需要的组件, Configurator负责启动这些组件。
原理 是这样的:
MessageRouter Component中都有一个Registrators集合和一个components集合。
如果把加入Registrators集合中的对象叫登记者,加入components中的叫组件,则可以归纳如下:
如果是加入的对象是组件,则先注册到每一个登记者中,再加入组件components集合。
如果加入的对象是登记者,则先注册到每一个登记者中,然后加入组件components集合,最后再把所有的组件注册到该登记者自己本身组件中。
所以每一个新加入登记者都被注册到已存在的登记者中。每一个已存在组件都分别注册新加入的登记者中。
如果仅是组件,则只会分别注册到已存在的登记者中。并加入组件集合中。
附上图说明:
启动就说到这,详细往后再继续分析其它源码,本人为了记录下来以备忘记。
- 大小: 755.6 KB
- 大小: 792.7 KB
- 大小: 222.8 KB
- 大小: 555.2 KB
- 大小: 222.8 KB
- 大小: 545.1 KB
- 大小: 487 KB
- 大小: 16 KB
- 大小: 16.5 KB
分享到:
相关推荐
tigase相关jar包
Tigase XMPP服务器是高度优化,高度模块化且非常灵活的用Java编写的XMPP / Jabber服务器。 该存储库包含Tigase XMPP服务器主要部分的源代码。 该项目自2004年成立以来,我们最近已将其移至GitHub。 与XMPP相关的...
该资源是整合了tigase的java服务端源代码,环境为:idea + gradle + postgresql 注意,这部分项目只包括java源代码,而数据库备份将在下一个资源打包上传,有疑问请阅读相关博文: ...
描述了如何部署tigase http-api模块,此方式为源码部署
Tigase开源项目,使用java编写,是个标准的Jabber(XMPP)协议服务端项目,用户数,均衡,符合要求。主页http://www.tigase.org/ 除了tigase开源项目还有: Openfire (Wildfire) 3.x(http://www.igniterealtime.org/)...
Tigase 概述,描述了1、为什么选择Tigase 2、RFC的实现 3、Tigase实现的XMPP扩展协议等
Tigase Swift XMPP客户端库这是什么Tigase Swift XMPP客户端库是用编程语言编写的客户端库。 它提供了XMPP标准核心的实现并处理XML。 此外,它还提供了对许多流行扩展(XEP)的支持。 该存储库包含该库的源文件。...
tigase 集群设置,已实践测试过,本次测试 以两台机器测试的。
直接从eclipse开发环境中导出的版本,完整代码加上完整的mysql数据库,详细压缩包里面的README
tigase-server-tigase-server-8.0.0.zip 源码,不知道怎么设置不用积分下载,不还意思。。。。。。。。。
tigase-local
QuickBlox-Tigase-CustomFeatures 对于QuickBlox自定义功能的列表 tigase服务器: QBAuth-AuthRepository的自定义实现 CustomObjects插件-将聊天消息保存到QuickBlox CustomObjects模块 LastRequestAtPlugin-在...
要启动 tigase xmpp 服务器,请运行 docker run -d -p 5222:5222 --name tigase dictcp/tigase 您可以通过用户 (密码:123456)使用常规 XMPP 客户端(例如 Psi)连接到服务器(本地主机)。 您还可以通过 XMPP ...
全面:tigase 完全实现了XMPP协议,除了全面实施的两个核心协议,它支持大多数的你可能永远都需要的扩展协议。 开源:Tigase是开源的,如果你有有那能力,你可以定制自己的XMPPServer,虽然经过了很多次此时,但是...
Spark连接Tigase服务器,完整的步骤,很清晰的看到。大家可以参考。
Tigase Server 是一个轻量级的可伸缩的 Jabber/XMPP 服务器。无需其他第三方库支持,可以处理非常高的复杂和大量的用户数,可以根据需要进行水平扩展。
tigase 内部处理流程 详解,适合初学者参考。
tigase-server 配置相关内容 https://blog.csdn.net/w690333243/article/details/90550837
tigase 7.10 mongodb 3 配置
tigase 组件源代码,未编译,大家方便下载