`
swingboat
  • 浏览: 60083 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
社区版块
存档分类
最新评论

利用JGroups同步两台server之间的cache。

    博客分类:
  • java
阅读更多

一、需求
前段时间做了一个项目,在后台有很多的数据都放入到了cache中了,而且还会对cache中的数据进行更新。如果只有一台server没有任何问题,但是如果考虑到集群负载平衡,连接多个server的时候,就有问题出现了,怎么样才能保证多个server之间cache的同步呢?请看下面的部署图。

二、引入JGroups
JGroups是一个可靠的组间通讯工具,进程可以加入一个通讯组,给组内所有的成员或单独的成员发送消息,同样,也可以从组中的成员处接收消息。
系统会记录组的每一个成员,在新成员加入或是现有的成员离开或是崩溃时,会通知组内的其他成员。

当我们更新一台server上的cache的时候,利用JGroups进行广播,其他的server接收到广播,根据接收到的信息来更新自己的cache,这样达到了
每个server的cache同步。

三、实现
1、定义一个接口BaseCache规定出对cache类操作的方法

 1public interface BaseCache {
 2    
 3    public void put(String key, Object ob);
 4    public Object get(String key);
 5    
 6    public void delete(String key);
 7    public void batchDelete(String[] list);
 8    public void batchDelete(List list);
 9    public void deleteAll();
10    
11}


2、定义一个同步器(CacheSynchronizer),这个类利用JGroups进行发送广播和接收广播

 1public class CacheSynchronizer {
 2    private static String protocolStackString=
 3        "UDP(mcast_addr=235.11.17.19;mcast_port=32767;ip_ttl=3;"+
 4        "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"+
 5        "PING(timeout=2000;num_initial_members=3):"+
 6        "MERGE2(min_interval=5000;max_interval=10000):"+
 7        "FD_SOCK:"+
 8        "VERIFY_SUSPECT(timeout=1500):"+
 9        "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"+
10        "pbcast.STABLE(desired_avg_gossip=20000):"+
11        "UNICAST(timeout=2500,5000):"+
12        "FRAG:"+
13        "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false)";
14    private static String groupName="YHPTEST";
15    
16    private Channel jgroupsChannel=null;
17    
18    //inner class ,定义接收广播,已经对接收到的广播进行处理
19    private class ReceiveCallback extends ExtendedReceiverAdapter{
20        private BaseCache cache=null;
21        public void setCache(BaseCache baseCache){//设置cache类
22            cache=baseCache;
23        }

24        public void receive(Message msg) {
25            if(cache==nullreturn ;
26            String strMsg = (String) msg.getObject();
27            if(strMsg!=null&&(!"".equals(strMsg))){
28                cache.put(strMsg, strMsg);    //根据接收到的广播,同步cache
29            }

30        }

31    }

32    
33    private ReceiveCallback recvCallback = null;
34    
35    public CacheSynchronizer(BaseCache cache) throws Exception{
36        jgroupsChannel = new JChannel(protocolStackString);
37        recvCallback = new ReceiveCallback();
38        recvCallback.setCache(cache);
39        jgroupsChannel.setReceiver(recvCallback);
40        jgroupsChannel.connect(groupName);
41    }

42    
43    /** *//**
44     * 发送广播信息,我们可以自定义广播的格式。
45     * 这里简单起见,仅仅发送一个字符串
46     * @param sendMsg
47     * @throws Exception
48     */

49    public void sendCacheFlushMessage(String sendMsg)throws Exception {
50        jgroupsChannel.send(nullnull, sendMsg); //发送广播
51        
52    }

53}


3、定义cache类,调用同步器同步cache

 1public class TestDataCache implements BaseCache {
 2    private Map dataCache=null;//保持cache数据
 3    private CacheSynchronizer cacheSyncer = null//同步器
 4    
 5    //inner class for thread safe.
 6    private static final class TestDataCacheHold{
 7        private static TestDataCache  theSingleton=new TestDataCache();        
 8        public static TestDataCache getSingleton(){
 9            return theSingleton;
10        }

11        private TestDataCacheHold(){}
12    }

13    
14    //Prevents to inherit
15    private TestDataCache(){
16        dataCache=new HashMap();
17        createSynchronizer();
18    }

19    
20    public static TestDataCache getInstance(){
21        return TestDataCacheHold.getSingleton();
22    }

23    
24    public CacheSynchronizer getSynchronizer(){
25        return cacheSyncer;
26    }

27    
28    public int getCacheLength(){
29        return dataCache.size();
30    }

31    
32    public void createSynchronizer(){
33        try{
34            cacheSyncer=new CacheSynchronizer(this);
35        }
catch(Exception e){
36            e.printStackTrace();
37        }

38    }

39    
40    public void batchDelete(String[] list) {
41        if(list!=nullreturn ;
42        synchronized (dataCache){
43            for(int i=0;i<list.length;i++){
44                if(list[i].length()>0){
45                    dataCache.remove(list[i]);
46                }

47            }

48        }

49
50    }

51
52    public void batchDelete(List list) {
53        synchronized (dataCache){
54            Iterator itor=list.iterator();
55            while(itor.hasNext()){
56                String tmpKey=(String)itor.next();
57                if(tmpKey.length()>0){
58                    dataCache.remove(tmpKey);
59                }

60            }

61        }

62    }

63
64    public void delete(String key) {
65        synchronized (dataCache) {
66            dataCache.remove(key);
67        }

68    }

69
70    public void deleteAll() {
71        synchronized (dataCache){
72            dataCache.clear();
73        }

74
75    }

76
77    public Object get(String key) {
78        Object theObj=null;
79        synchronized (dataCache) Codehighlighter1_1600_1650_O
分享到:
评论

相关推荐

    JavaEE源代码 jgroups-2.2.8

    JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 ...

    jgroups-2.2.7.jar

    jgroups-2.2.7.jar jgroups-2.2.7.jar

    jgroups.part1

    jgroups.part1

    JGroups_集群.pdf

    JGroups_集群.pdf

    JGroups的Raft实现jgroups-raft.zip

    jgroups-raft 项目是 JGroups 框架对 Raft 的实现。Maven:&lt;groupId&gt;org.jgroups &lt;artifactId&gt;jgroups-raft &lt;version&gt;0.2&lt;/version&gt;Raft 是一个容易理解的共识算法。在容错和性能方面它相当于 Paxos(Google 的一致...

    jgroups-3.0.2

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集(需要说明的是,这并不是说...JGroups 适合使用场合服务器集群cluster、多服务器通讯、服务器replication(复制)、分布式cache缓存等

    Ehcache通过Jgroups做集群

    Ehcache通过使用Jgroups做集群配置,更改每一个不同的jgroups.xml文件的端口号和IP,如果一台机器就使用127.0.0.1即可。配置好之后,把每台机器起来,就可以测试了。

    Jgroups 教程

    JGROUPs 的重要用法全部都在里面了

    jgroups.part3

    jgroups.part3

    jgroups源代码

    jgroups源代码,想要学习jgroups开源框架的童鞋可以看看

    Jgroups中的UNICAST3协议中文翻译

    Jgroups是一款组播工具,基于IP多播的可靠的组播中间件

    cache要用到的包

    jgroups.jar ;jboss-cache.jar;

    jgroups-3.2

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集(需要说明的是,这并不是说必须要使用IP Multicast,JGroups也可以使用TCP来实现)。其工作模式基于IP多播,但可以在可靠性和群组...

    基于JGroups的共享电子白板系统的研究与实现

    基于JGroups的共享电子白板系统的研究与实现

    jgroups-2.6.8.GA.jar

    jgroups-2.6.8.GA.jar jgroups-2.6.8.GA.jar

    Jgroups-all.jar

    JGroup最大的特点就是支持协议栈的可配置性,它本是实现了基本的Java的协议栈实现,也就是最基本的消息广播的基础,同时支持附加协议栈的配置,消息的传递就是在这些协议栈之间相互传递,封装,检查,丢弃,重发。...

    Java多播通讯框架 JGroups

    Java多播通讯框架 JGroups

    Android代码-jgroups-android

    JGroups - A Framework for Group Communication in Java ======================================================== March 3, 1998 Bela Ban 4114 Upson Hall Cornell University Ithaca, NY 14853 bba@...

    jgroups

    JGroups 2.5 tutorial 相当不错

    jgroups的jar

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集

Global site tag (gtag.js) - Google Analytics