Protocol: 一些常量的定义
Config: 加载和维护配置信息,如加载Pushlet.properties配置文件
SessionManager: 管理session的整个生命周期
EventSourceManager: 事件源管理类
Controller: 对来自客户端的请求的处理类,包含请求的事件的各种处理方法(封装响应事件ResponseEvent)。
Subscriber: 订阅信息的添加删除,及事件推送等。
Dispatcher: 分配事件给订阅者
首先加载Pushlet类,它为一个servlet实现Protocol接口。
Pushlet中的init方法:
1. 取得路径 2. 加载配置文件 3.日志初始化 4. 初始化SessionManager 5. 初始化Dispatcher 6. 加载事件源
public void init() throws ServletException {
try {
// Load configuration (from classpath or WEB-INF root path)
String webInfPath = getServletContext().getRealPath("/") + "/WEB-INF";
Config.load(webInfPath);
Log.init();
// Start session manager 通过单例模式获取sessionManager实例
SessionManager.getInstance().start();
// Start event Dispatcher
Dispatcher.getInstance().start();
if (Config.getBoolProperty(Config.SOURCES_ACTIVATE)) {
//判断配置文件中是否设定为使用事件源,是则加载sources.properties
//并对其中的所有事件源创建单独的守护进程并激活。
EventSourceManager.start(webInfPath);
} else {
Log.info("Not starting local event sources");
}
}
SessionManager中的start方法通过JDK中的Timer类,重复执行任务。
public void start() throws PushletException {
if (timer != null) {
stop();
}
timer = new Timer(false);
timer.schedule(new AgingTimerTask(), TIMER_INTERVAL_MILLIS, TIMER_INTERVAL_MILLIS);
info("started; interval=" + TIMER_INTERVAL_MILLIS + "ms");
}
AgingTimerTask类即定义的任务,在其中不断判断sessionCache中的所有session是否超时,若超时则停止session。
在定时器的run方法中调用apply方法,在这里应用了Visitor模式,apply方法即visitor提供的对所有元素访问的接口,Session即被访问的元素。在SessionManager中有Session[] sessionCache和Map sessions变量,前者用于缓存可修改的session,后者保存活跃的session。在apply中访问所有sessionCache中的session,通过反射访问AgingTimerTask中的回调方法visit。Visist方法则用于判断此session是否已超时。
public AgingTimerTask() throws PushletException {
try {
// Setup Visitor Methods for callback from SessionManager
Class[] argsClasses = {Session.class};
visitMethod = this.getClass().getMethod("visit", argsClasses);//通过反射取得此类中的visit方法
} catch (NoSuchMethodException e) {
throw new PushletException("Failed to setup AgingTimerTask", e);
}
}
/**
* Clock tick callback from Timer.
*/
public void run() {
long now = Sys.now(); //获取当前时间
delta = now - lastRun; //两次时间差
lastRun = now; //将当前时间设为上一次是时间
debug("AgingTimerTask: tick");
// Use Visitor pattern to loop through Session objects (see visit() below)
getInstance().apply(this, visitMethod, new Object[1]);
}
/**
* Callback from SessionManager during apply()
*/
public void visit(Session aSession) {
try {
// Age the lease
aSession.age(delta); //减少session的生存时间
debug("AgingTimerTask: visit: " + aSession);
// Stop session if lease expired 超时则停止session
if (aSession.isExpired()) {
info("AgingTimerTask: Session expired: " + aSession);
aSession.stop();
}
} catch (Throwable t) {
warn("AgingTimerTask: Error in timer task : " + t);
}
}
Dispatcher初始化很简单,创建一个SessionManagerVisitor实例,其为visitor模式中的visitor。其构造方法中将visitMulticast和visitBroadcast方法以Method存入一个Map中。这两个方法即在visitor中单独访问元素的visit方法。
最后若使用事件源(sources.activate为true),将加载事件源配置文件,并对其中的所有事件创建守护线程并激活线程。在EventSourceManager中创建一个全局的事件源集合,现调用activate()激活事件源,此方法应用templates模式其实现由其子类EventPullSource实现,此类中便是对线程进行管理。在线程的run()中产生事件并多点发布。线程每睡眠一段时间便发布一次时间。
public void run() {
Log.debug(getClass().getName() + ": starting...");
alive = true;
while (alive) {
try {//getSleepTime()由继承EventPullSource的子类实现
Thread.sleep(getSleepTime());
// Stopped during sleep: end loop.
if (!alive) {
break;
}
// If passivated wait until we get
// get notify()-ied. If there are no subscribers
// it wasts CPU to remain producing events...
synchronized (this) {
while (!active) {
Log.debug(getClass().getName() + ": waiting...");
wait();
}
}
} catch (InterruptedException e) {
break;
}
try {
//pullEvent()由继承EventPullSource的子类实现
Event event = pullEvent();
// 多点发布事件
Dispatcher.getInstance().multicast(event);
} catch (Throwable t) {
Log.warn("EventPullSource exception while multicasting ", t);
t.printStackTrace();
}
}
Log.debug(getClass().getName() + ": stopped");
}
至此Pushlet的初始化即完成。
当收到http请求时无论doGet还是doPost都是创建一个事件,事件Event通过一个HashMap来保存”p_event”事件类型及其属性,最后都调用doRequest进行处理。其中doGet针对普通请求,doPost针对xml处理。
在doRequest处理中,当事件类开为”join”时说明还未创建session,此时创建一个新的session,否则通过Event中保存的SessionID获取session。在session在创建过程中,包括随机生成session的id并依据session生成Controller和Subscriber实例赋给session。
然后创建一个Command对象,调用Controller中的doCommand执行。Command为对request,response,event等信息的一个封装。
doCommand中执行如下:
更新Session的存活时间。重新设置为默认值。
设置 Session中的address为客户端请求的IP地址。
根据事件类型执行相应的方法。
Refresh: 在Command的响应事件中加入一个”refresh-ack”刷新确认事件。
Subscribe: 订阅
调用Controller中的doSubscribe(),当事件中存在主题Subject时,首先获取初始化时创建的订阅者Subscriber对象,然后调用Subscriber中的addSubscriber创建一订阅信息,添加到一HashMap的全局变量subscribers中,同时返回此订阅。订阅创建即为创建一个subscription对象,在其中设置其属性subject,subjects,label其中subjects为一数组变量,存储subject中包含的所有主题。最后设置相关信息到responseEvent响应事件中,将相应事件封装到Command中。
Unsubscrib: 退订主题,将Map类型的subscribers中的订阅移除。
Join: 将请求的此session加入到SessionManager中的Map类型的sessions中,并设置session的formate等信息。
Listen: 主要是确定订阅信息的转送方式mode,包含三种stream\pull\poll。Stream是发送一连串的事件(http长链接),即记录是不停止的。Pull和poll(客户端定时刷新服务端产即推送)是一个完整的记录返回的。配置文件中的listen.force.pull.all设置是否全部使用pull。
Join-listen: 即join和listen的集合。
Leave: 停止session,也即移除所有的订阅信息。
Hb: heartbear心跳,只是设置心跳事件给客户端。
Publish: 发布事件,若请求中有p_to(session的id),即发送目标,将进行单播,否则为多播。单播时,将请求事件进行浅克隆,将克隆事件发送给目标事件的订阅者。onEvent()中若请求事件已激活、未超时且事件队列未满则加入事件队列,否则停止session。广播时调用上面介绍的apply()。
// Send Event to subscriber.
session.getSubscriber().onEvent((Event) event.clone());
无论是哪种事件类型,最后都是将信息封装在responseEvent中,然后封装在Command中。
若请求事件为listen和refresh则只要存在连接就不断循环从队列中获取事件推送到客户端,不同的mode下进行不同的处理,添加心跳包到队列中等操作。在推送时应用到adapter模式,在command中有方法createClientAdapter,内容为根据开始设置的session中的format值来实例化不同的适配器类。包含js、java序列化对象、xml等。其它的事件类型直接通过适配器推送。
pushlet服务端的代码其实就是一个中间过渡的过程,客户端传入请求事件及参数,服务端根据请求事件做一定的处理,再返回一个事件及传入的参数,客户端再根据返回的事件选择相应的回调函数获取参数进行操作,回调函数就由自己去实现。
相关资料:
http://blog.csdn.net/yxw246/article/details/2418255这个分析的很透彻
http://blog.csdn.net/anghlq/article/details/5869233
分享到:
相关推荐
pushlet_2.0.3_源码分析_服务器端__ java 服务器推技术简单实现!
pushlet源码demo,提供有需要的同学学习,如有更好的实现或建议,欢迎提出
基于pushlet web 实时聊天系统 项目没有任何问题,使用pushlet源码实现服务器推技术 实时通信 导入myeclipse 即可运行!
服务器推 pushlet 服务器推 pushlet 服务器推 pushlet 服务器推 pushlet
实现通过pushlet,进行点对点聊天功能。 包括: 在线用户上线的通知 ...pushlet-2.0.4.zip pushlet 源码 pushlet 扩展工程 聊天时需要使用,两台电脑,或者 分别使用 IE 与FF 浏览器测试,来模拟多个用户
实现通过pushlet,进行点对点聊天功能。 包括: 在线用户上线的通知 ...pushlet-2.0.4.zip pushlet 源码 pushlet 扩展工程 聊天时需要使用,两台电脑,或者 分别使用 IE 与FF 浏览器测试,来模拟多个用户
pushlet开发与应用
pushlet白皮书pushlet白皮书pushlet白皮书pushlet白皮书pushlet白皮书
pushlet配置应用实例,看了之后就能简单应用pushlet实时推送数据
pushlet 所需夹包 和配置文件 ajax-pushlet-client.js pushlet-sessionid.jar sources.properties pushlet.properties
pushlet例子,Pushlet 是一个开源的 Comet 框架,Pushlet 使用了观察者模式:客户端发送请求,订阅感兴趣的事件;服务器端为每个客户端分配一个会话 ID 作为标记,事件源会把新产生的事件以多播的方式发送到订阅者的...
Pushlet简单示例测试
容易上手的简单pushlet例子,供大家学习,代码不错,值得下载。
使用开源框架pushlet实现的http长连接技术(服务器推技术),里面有pushlet框架的源码以及使用文档(服务器推、一对多、点对点)
pushlet文档和项目
个人制作整理的pushlet白皮书中文参考文档。 目录: 1.介绍(Introduction) 3 2.动机(Motivation) 4 3.通知解决(Notification Solutions) 5 3.1轮询(Polling) 6 3.2服务器端回调(Server-side callbacks) 6 3.3 ...
Pushlet的Ajax-pushlet-client.js分析 服务器推技术前台js分析 希望能帮到大家
pushlet 和comet 资料介绍 介绍如何从服器端push subscrite 给浏览器
对comet实现中的pushlet框架进行说明,可以参照该说明进行框架使用
pushlet简单demo,导入即可运行 Java 推送 长连接 轮询 pushlet comet4j