- 浏览: 444480 次
- 性别:
- 来自: 西安
文章分类
最新评论
-
进退取舍:
谢谢,这个用上了!!
Java 一个线程池的示例 -
pb_water:
感谢楼主,打算买楼主的书,支持一下,楼主功德无量
JavaScript内核系列第0版整理稿下载 -
lancezhcj:
有图会直观的多呢,再摸索摸索
有限自动机与建模 -
hsmsyy:
这里应该是原创了吧,楼主我觉得闭包的作用:实现面向对象。有待商 ...
JavaScript内核系列 第7章 闭包 -
wll52:
在应用退出之前,需要释放连接 con.disconnect() ...
使用smack与GTalk通信
前言
一直以来,都对异步事件很感兴趣,比如一个应用在运行一个耗时的过程时,最好的方式是提交这个耗时的过程给一个专门的工作线程,然后立即返回到主线程上,进行其他的任务,而工作线程完成耗时任务后,异步的通知主线程,这个过程本身是很有意思的。传统的事件-监听器模型可以较好的解决这个问题,不过事件和监听器两者的耦合往往略显紧密,所以需要另一种实现,使得这两者的耦合尽量小,那样模块可以比较通用。
总线模式
前几天跟同事讨论了下Swing中的消息机制,同事给我讲了下总线模式的消息机制,感觉很有意思,于是周末就自己实现了下。具体的思路是这样的:
- 系统中存在一个消息服务(Message Service),即总线
- 监听器对象,通过实现一个可被通知的对象的接口,将自己注册在消息服务上
- 可被通知的对象可以向消息总线上post消息,就这个对象而言,它对其他注册在总线上的对象是一无所知的
- 消息服务进行消息的调度和转发,将消息(事件)发送给指定的对象,从而传递这个异步事件
这个思路最大的好处是,事件被抽象成消息(Message),具有统一的格式,便于传递。挂在总线上的监听器互相不知道对方的存在,监听器可以指定自己感兴趣的消息类型,消息可以是广播的形式,也可以是点对点的。(后来参看了下JMS,其中有pub/sub的模式(即订阅模式),不过,对于异步消息的传递来说,这个可以不必实现)
消息服务
消息服务可以将一大堆分布在不同物理机上的应用整合起来,进行通信,可以将一些小的应用整合为一个大的,可用的应用系统。
用一个例子来说吧:
public class Test{ public static void main(String[] args) throws RemoteException{ /* * 创建一个可被通知的对象(监听器), 这个监听器关注这样几个事件 * TIMEOUT, CLOSE, and READY */ Configuration config = new RMIServerConfiguration(null, 0); CommonNotifiableEntry entry1 = new CommonNotifiableEntry(config, "client1", MessageTypes.MESSAGE_TIMEOUT | MessageTypes.MESSAGE_CLOSE | MessageTypes.MESSAGE_READY); /* * 创建另一个监听器, 这个监听器关注这样几个事件 * OPEN, CLOSE, and TIMEOUT. */ CommonNotifiableEntry entry2 = new CommonNotifiableEntry(config, "client2", MessageTypes.MESSAGE_OPEN | MessageTypes.MESSAGE_CLOSE | MessageTypes.MESSAGE_TIMEOUT); // 将监听器挂在BUS上 entry1.register(); entry2.register(); // 创建一个新的消息, MESSAGE_OPEN类型. Message msg = new CommonMessage( entry1.getId(), entry2.getId(), MessageTypes.MESSAGE_OPEN, "busying now"); // 传递给entry2 entry1.post(msg); // 创建一个MESSAGE_CLICKED类型的消息, entry2 // 不关注这个类型的消息,所以此消息不会被传递 Message msgCannotBeReceived = new CommonMessage( entry1.getId(), entry2.getId(), MessageTypes.MESSAGE_CLICKED, "cliked evnet"); entry1.post(msgCannotBeReceived); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // re use the message object to send another message entry msg.setSource(entry2.getId()); msg.setTarget(entry1.getId()); msg.setType(MessageTypes.MESSAGE_READY); msg.setBody("okay now"); entry2.post(msg); // 卸载这些监听器,当程序退出,或者 // 或者监听器不在关注事件发生的时候 entry1.unregister(); entry2.unregister(); } }
当前,这个系统可以支持远程的消息传递(通过java的RMI机制),不过对于寻址方面还没有做进一步的处理,有时间再来完善吧。
消息服务的实现
下面我把消息服务的主要实现部分贴出来分析一下:
/** * * @author Abruzzi * */ public class MessageBus extends UnicastRemoteObject implements Bus{ private static MessageBus instance; private List<NotifiableEntry> listeners; private List<Message> messages; private Thread daemonThread = null; public static MessageBus getInstance() throws RemoteException{ if(instance == null){ instance = new MessageBus(); } return instance; } private MessageBus() throws RemoteException{ listeners = new LinkedList<NotifiableEntry>(); messages = new LinkedList<Message>(); Daemon daemon = new Daemon(); daemonThread = new Thread(daemon); daemonThread.setPriority(Thread.NORM_PRIORITY + 3); daemonThread.setDaemon(true); daemonThread.start(); while(!daemonThread.isAlive()); } /** * mount notifiable object to listener list */ public void mount(NotifiableEntry entry) throws RemoteException{ synchronized(listeners){ listeners.add(entry); listeners.notifyAll(); } } /** * unmount the special notifiable object from listener */ public void unmount(NotifiableEntry entry) throws RemoteException{ synchronized(listeners){ listeners.remove(entry); listeners.notifyAll(); } } /** * post a new message into the bus * @param message */ public void post(Message message) throws RemoteException{ synchronized(messages){ messages.add(message); messages.notifyAll(); } } /** * * @author Abruzzi * worker thread, dispatch message to appropriate listener * */ private class Daemon implements Runnable{ private boolean loop = true; public void run(){ while(loop){ if(messages.size() == 0){ synchronized(messages){ try {messages.wait();} catch (InterruptedException e) { e.printStackTrace(); } } } processIncomingMessage(); } } } /** * process the incoming message, remove the first message from * queue, and then check all listeners to see whether should * deliver the message to or not. */ private void processIncomingMessage(){ Message msg; synchronized(messages){ msg = messages.remove(0); } String target = null; int type = 0; int mask = 0; try { target = msg.getTarget(); type = msg.getType(); if(target == MessageTypes.SENDTOALL){ for(NotifiableEntry entry : listeners){ mask = entry.getSense(); if((mask & type) == type){entry.update(msg);} } }else{ for(NotifiableEntry entry : listeners){ mask = entry.getSense(); if(entry.getId().equals(target) && (mask & type) == type){ entry.update(msg); } } } } catch (RemoteException e) { e.printStackTrace(); } } }
消息总线是一个RMI对象,其中mount(), unmout(), post()等方法可以被远程调用。MessageBus维护两个列表,一个消息列表,一个监听器列表。当消息被post到总线上后,post会立即返回,然后工作线程启动,取出消息并将其分发到合适的监听器上。
可能,对同步的处理上考虑不够周全,下来再继续修改。
P.S.我将这个项目托管在google code上了,叫BBMS(Bus Based Message Service),感兴趣的可以去看看:http://code.google.com/p/bbms/。
发表评论
-
JavaScript内核系列 第15章 服务器端的JavaScript
2012-02-12 21:39 2206第15章已经在icodeit上发布,这一章分为上/下两篇,请朋 ... -
使用vim开发python及graphviz绘图
2011-12-23 14:49 6329基本需求 使用vim中的autocmd命令可以很容易的将正在 ... -
Java脚本技术应用实例
2011-01-22 11:24 4075前言 一直以来都很喜欢可以自由扩展的软件,这一点应该已经在很 ... -
可编程计算器(phoc)的设计与实现
2011-01-17 11:34 1855前言 借助JavaScript脚本 ... -
函数式编程(javascirpt)
2009-04-18 22:18 1210前言 Javascript,有人称 ... -
C和指针
2009-05-21 23:15 1052前言 指针是C的灵魂,正是指针使得C存在了这么多年,而且将长 ... -
C和指针(续)
2009-05-25 23:41 1308前言 上一篇《C和指针》可能对关于C和指针的有些内容没有说透 ... -
有限自动机与建模
2009-06-06 10:48 1641前言 在学校学程序设计语言的时候,能接触到的所有例子没有一个 ... -
事件和监听器
2009-06-21 22:06 1360前言 事件监听器是经 ... -
基于总线的消息服务(BBMS)的设计与实现
2009-07-25 22:19 1305前言 异步事件的通知机制在比较有规模的软件设计中必然会有涉及 ... -
JavaScript内核系列 第9章 函数式的Javascript
2010-05-13 19:20 3703第九章 函数式的Javascript 要说Ja ... -
JavaScript内核系列 第8章 面向对象的JavaScript(下)
2010-05-06 09:40 3588接上篇:JavaScript内核系列 第8章 面向对象的Jav ... -
JavaScript内核系列 第8章 面向对象的JavaScript(上)
2010-05-06 09:26 2842第八章 面向对象的 Javascript ... -
JavaScript内核系列 第7章 闭包
2010-05-04 08:48 3768第七章 闭包 闭包向来给包括JavaScript程序 ... -
JavaScript内核系列 第6章 正则表达式
2010-04-27 19:44 3931第六章 正则表达式 正则表达式是对字符串的结构 ... -
JavaScript内核系列 第5章 数组
2010-04-24 15:17 4355第五章 数组 JavaScript的数组也是一个比较 ... -
Swing小应用(Todo-List)之三
2010-04-22 20:47 2058前言 去年9月份开发的那个小工具sTodo,只是做到了能用, ... -
JavaScript内核系列 第4章 函数
2010-04-18 17:31 4947第四章 函数 函数,在C语言之类的过程式语言中 ... -
JavaScript内核系列 第3章 对象与JSON
2010-04-12 09:12 6008第三章 对象与JSON JavaScript对象与传 ... -
JavaScript内核系列 第2章 基本概念
2010-04-03 19:44 5497第二章 基本概念 ...
相关推荐
基于CANoe的can总线通信模拟研究.docx基于CANoe的can总线通信模拟研究.docx基于CANoe的can总线通信模拟研究.docx基于CANoe的can总线通信模拟研究.docx基于CANoe的can总线通信模拟研究.docx基于CANoe的can总线通信...
此文是学士毕业论文,题目为基于can总线的电梯设计。主要是在basic can模式下的研究。对初学can的同学有一定帮助。
Mule是一个企业服务总线(ESB)消息框架.它的主要特性包括: 1.基于J2EE1.4的企业消息总线(ESB)和消息代理(broker). 2.可插入的连接性:比如Jms,jdbc,tcp,udp,multicast,http,servlet,smtp,pop3, file,xmpp等. 3.支持...
当主机收到单片机发出的纯数据包后,主机需要向单片机发出确认信号,为了减少总线 的负载量,确认信息只包含一个字(8位)的信息量,当接收到的数据出现传输错误时,单 片机发送的数据索引号为错误数据的索引,...
DeFiBus=RPC+MQ,是基于开源消息中间件打造的安全可控的分布式金融级消息总线。DeFiBus不仅提供了RPC同步调用,还提供了MQ的异步事件通知、事件组播和广播等常用服务调用和消息模式,同时增加了应用多中心多活、服务...
系统采用最新的SOA架构思想、B/S结构模式,依托企业服务总线实现各业务系统数据交换及共享。数据交换与管理模块采用Web Service接口方式实现业务数据的同步,为建设权威、一致的船舶及港航企业主数据库提供数据基础。...
基于FPGA的I2C SLAVE模式总线的设计.pdf
详细介绍了PCI9052接口器件的功能,结构和使用方法;并结合实际给出了基于PCI9052器件开发PCI总线接口卡的应用实例。
电信设备-基于FIFO模式的串行通信总线数据智能纠错方法.zip
参考资料-基于C8051F040单片机的CAN总线测试模式研究.zip
使用STM32F103RCT6解释OBD ISO协议,基于ISO 15765-4STDCAN总线模式 经过调试验证确认代码可用
基于DSP6713B的1553B(DDC61580)的通信协议
阐述了基于业务基础平台的开发模式;设计了CBBF的体系结构;分析了CBBF实现的关键设计,包括XML总线集成的内核引擎、构件分类、构件模型的形式化描述和构件组装;给出了CBBF的实现细节。实践表明CBBF可以简化软件...
如今,组织内及跨组织的集成领域面临着一个共同的挑战,这就是发布Web服务以向外部使用者提供信息,这通常伴随着提交和操纵信息的操作。这些信息通常存储在OracleDatabase中。一直以来,此数据库可通过Forms应用程序...
基于对CAN 总线控制器的功能分析, 并应用Verilog语言进行软件设计, 从而实现CAN节点之间的通信功能。 0 引言 CAN 总线允许高达1M bit /s通讯速率, 支持多主通讯模式, 有高抗电磁干扰性而且能够检测出通信...
MBassador 是基于订阅发布模式的易用高性能的事件总线,是专门的数据结构,可最大限度地减小锁竞争(lock contention)。 特性:基于注解的监听机制,同步或者 异步的事件发布,弱引用,消息过滤。
Android消息总线,基于LiveData,具有生命周期感知能力使用方法Fork本项目或者直接拷贝源码:LiveDataBus.java依赖依赖Android Architecture Components,具体可参见gradle文件build.gradle示例及Demo订阅消息...
ESB的出现改变了传统的软件架构,可以提供比...从功能上看,ESB提供了事件驱动和文档导向的处理模式,以及分布式的运行管理机制,它支持基于内容的路由和过滤,具备了复杂数据的传输能力,并可以提供一系列的标准接口。