public class JavaGroupBroadcastingManager implements NotificationBus.Consumer {
/**
* 共享数据:未被激活的时间片信息集合(离线通话信息)
*/
private static Map<String,TimeSlice> voiceMessageMap = new HashMap<String, TimeSlice>();
/**
* 共享数据:短信信息
*/
private static Map<String,SmsCcrParameter> smsMessageIdMap = new HashMap<String,SmsCcrParameter>();
private static final Log log = LogFactory
.getLog(JavaGroupBroadcastingManager.class);
private static final String BUS_NAME = "bus.name";
private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
private NotificationBus bus;
private static JavaGroupBroadcastingManager manager = null;
public static JavaGroupBroadcastingManager getInstance(){
if(manager == null){
manager = new JavaGroupBroadcastingManager();
try {
manager.initialize();
} catch (Exception e) {
e.printStackTrace();
}
}
return manager;
}
/**
* 初始化成员
*/
public synchronized void initialize()
throws Exception {
Properties properties = new Properties();
String filePath = System.getProperty("com.sntele.surfing.conf") + File.separator + "ocsgroup.properties";
log.info(filePath);
properties.load(new FileInputStream(filePath));
String channelProperties = properties.getProperty(CHANNEL_PROPERTIES);
String busName = properties.getProperty(BUS_NAME);
try {
bus = new NotificationBus(busName, channelProperties);
bus.start();
bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
bus.setConsumer(this);
log.info("JavaGroups clustering support started successfully");
} catch (Exception e) {
throw new Exception("Initialization failed: " + e);
}
}
/**
* 关闭成员
*/
public synchronized void finialize() throws Exception {
bus.stop();
bus = null;
}
/**
* 发送信息
*/
public void sendNotification(Serializable serializable) {
bus.sendNotification(serializable);
if(serializable instanceof SmsCcrParameter){
SmsCcrParameter smsCcrParameter = (SmsCcrParameter)serializable;
if(smsCcrParameter.getMemoryAction() == 0){
smsMessageIdMap.put(smsCcrParameter.getMessageId(), smsCcrParameter);
}else{
smsMessageIdMap.remove(smsCcrParameter.getMessageId());
}
}else if(serializable instanceof TimeSlice){
TimeSlice timeSlice = (TimeSlice)serializable;
if(timeSlice.getMemoryAction() == 0){
voiceMessageMap.put(timeSlice.getInitCcrParameter().getCallerNumber(),timeSlice);
}else{
TimeSliceControlPool.getInstince().getTimeSliceControl(timeSlice.getInitCcrParameter().getProvinceCode()).removeData(timeSlice.getInitCcrParameter().getCallerNumber());
voiceMessageMap.remove(timeSlice.getInitCcrParameter().getCallerNumber());
}
}
}
/**
* 成员发送信息
*/
public void handleNotification(Serializable serializable) {
if(serializable instanceof SmsCcrParameter){
SmsCcrParameter smsCcrParameter = (SmsCcrParameter)serializable;
if(smsCcrParameter.getMemoryAction() == 0){
smsMessageIdMap.put(smsCcrParameter.getMessageId(), smsCcrParameter);
}else{
smsMessageIdMap.remove(smsCcrParameter.getMessageId());
}
}else if(serializable instanceof TimeSlice){
TimeSlice timeSlice = (TimeSlice)serializable;
if(timeSlice.getMemoryAction() == 0){
voiceMessageMap.put(timeSlice.getInitCcrParameter().getCallerNumber(),timeSlice);
}else{
TimeSliceControlPool.getInstince().getTimeSliceControl(timeSlice.getInitCcrParameter().getProvinceCode()).removeData(timeSlice.getInitCcrParameter().getCallerNumber());
voiceMessageMap.remove(timeSlice.getInitCcrParameter().getCallerNumber());
}
}
}
/**
* 成员地址
*/
public Serializable getCache() {
if (log.isInfoEnabled()) {
log.info("成员本地地址: " + bus.getLocalAddress().toString());
}
return bus.getLocalAddress();
}
/**
* 新成员加入
*/
public void memberJoined(Address address) {
if (log.isInfoEnabled()) {
log.info("新成员加入:" + address);
}
Iterator<?> smsIt = smsMessageIdMap.entrySet().iterator();
log.info("发送缓存中已经存在的短信信息给新成员:" + smsMessageIdMap.size() + "......start ");
while (smsIt.hasNext()) {
Map.Entry entry = (Map.Entry) smsIt.next();
SmsCcrParameter value = (SmsCcrParameter)entry.getValue();
sendNotification(value);
}
Iterator<?> voiceIt = voiceMessageMap.entrySet().iterator();
log.info("发送缓存中已经存在的语音信息给新成员:" + voiceMessageMap.size() + "......start ");
while (voiceIt.hasNext()) {
Map.Entry entry = (Map.Entry) voiceIt.next();
TimeSlice value = (TimeSlice)entry.getValue();
sendNotification(value);
}
}
/**
* 成员离开
*/
public void memberLeft(Address address) {
if (log.isInfoEnabled()) {
log.info("成员离开:" + address);
}
//将检测语音的是否到时的信息放到其他服务器上
Iterator<?> voiceIt = voiceMessageMap.entrySet().iterator();
log.info("添加离开成员的语音到时检索信息到通话时间片控制中:" + voiceMessageMap.size() + "......start ");
while (voiceIt.hasNext()) {
Map.Entry entry = (Map.Entry) voiceIt.next();
TimeSlice value = (TimeSlice)entry.getValue();
TimeSliceControlPool.getInstince().getTimeSliceControl(value.getInitCcrParameter().getProvinceCode()).addTimeSlice(value.getInitCcrParameter().getCallerNumber(),value);
}
}
public Map<String, TimeSlice> getVoiceMessageMap() {
return voiceMessageMap;
}
public Map<String, SmsCcrParameter> getSmsMessageIdMap() {
return smsMessageIdMap;
}
}
测试方法:
JavaGroupBroadcastingManager manager = JavaGroupBroadcastingManager.getInstance();
manager.sendNotification(requestSession);
分享到:
相关推荐
jgroup详细介绍
jgroup代码
Reliable group communication with JGroups 3.x Preface This is the JGroups manual. It provides information about: 1. Installation and configuration 2. Using JGroups (the API) 3. Configuration of the ...
NULL 博文链接:https://8366.iteye.com/blog/921760
EHCAHCE基于JGROUP的集群配置方案,内含相关配置文件,及配置说明
jgroup笔记.
jroup是一个比较优秀的集群通讯开源软件,本实例展示如何用jgroup进行不同机器之间的通讯
找不到对方在调试的时候发现只要是使用了SWT的类的地方会出现线程错误,于是我想是不是出现了线程同步的问题经询问别人后得知在SWT中使用JGroup应该要使线程同步,应该使用Display类的syncExec(Runnable r)方法于是...
最强大得UUP开源组件,用于底层通讯,以被JBOSS采用
jboss jdbc json jgroup.jar
JGroups是一个开源的纯java编写的可靠的群组通讯工具。其工作模式基于IP多播,但可以在可靠性和群组成员管理上进行扩展。其结构上设计灵活,提供了一种灵活兼容多种协议的协议栈。
jgroup 源码 HIBERNATE 二级缓存 集群
jgroup配置[收集].pdf
jgroup配置[归类].pdf
使用JGroup实现分布式数据结构(堆栈和集合) 介绍 [什么是JGroups?]( ) [JGroup入门]( ) JGroups是完全用Java编写的可靠的组通信工具包。 它基于IP多播(也支持TCP),但是有一些特殊功能,例如可靠性和组...
JGroup功能十分强大,通过配置各种参数就可以充分利用它所提供的各项功能。JGroup最大的特点就是支持协议栈的可配置性,它本是实现了基本的Java的协议栈实现,也就是最基本的消息广播的基础,同时支持附加协议栈的...
其实回顾一下集中式的构架,无非两种情况:一是节点均衡的网状(JBoss Tree Cache),利用JGroup的多播通信机制来同步数据;二是Master-Slaves模式(分布式文件系统),由Master来管理Slave,比如如何选择Slave,...
节点均衡的网状(JBoss Tree Cache),利用JGroup的多播通信机制来同步数据。2.Master-Slaves模式(分布式文件系统),由Master来管理Slave,如何选择Slave,如何迁移数据,都是由Master来完成,但是Master本身也...
jgroups官方帮助文档html格式2.X版本