`
mengyang
  • 浏览: 263629 次
  • 性别: Icon_minigender_1
  • 来自: 福州
社区版块
存档分类
最新评论

JGroups简介和例子

阅读更多
   JGroups是一个组播通信工具,它可以:
  • 创建和删除一个组
  • 加入和离开某个组
  • 管理组成员关系,当有新的成员进入或存在的成员离开的时候会通知组内其它成员
  • 侦测和移除出现故障的组成员
  • 发送单播消息(unicast,point-to-point)
  • 发送广播消息(multicast,point-to-multipoint)

    JGroups的强大之处在于它有一个很灵活的协议栈,可以根据你的需要,随意的添加或删除某些功能。比如说,刚开始你使用IP广播发送你的消息,过了一会程序开始要求无损的消息传输,你可以添加NAKACK协议,它能确保接收方一定能收到你发送的消息。但是此时接收方收到的消息的顺序是不固定的,为了让接收顺序和发送顺序保持一致,你可以选择添加FIFO协议来确保一对收发者之间发送和接收的顺序。如果要确保组里所有成员的收发顺序,你可以添加TOTAL协议。再接下来,你可以添加GMS和FLUSH协议来维护组成员间的关系;FD协议可以进行故障检测;STATE_TRANSFER协议可以让新加入的组成员从已存在的成员中获取一致的状态;最后你还可以使用CRYPT协议来加密你发送的消息。

    下面开始演示一个聊天组的程序。我们建立一个聊天组,分别发送单播消息和广播消息,当组成员发生变化的时候,所有组成员自动获得新的组成员视图,每当聊天组中加入一个新的成员的时候,新成员先和已存在的组成员进行状态同步(获取聊天记录)。
public class GroupChat extends ExtendedReceiverAdapter{
	
	private JChannel channel;
	private List<String> msgList = new ArrayList<String>();	//模拟状态对象,保存的是本节点的收到的消息
	
	/**
	 * 在有节点向本节点请求状态的时候被调用
	 */
	@Override
	public byte[] getState() {
		byte[] state = null;
		synchronized(msgList){
			try {
				state = Util.objectToByteBuffer(msgList);
			} catch (Exception e) {
				e.printStackTrace();
			}
			return state;
		}
	}

	/**
	 * 在其他节点返回状态给本节点的时候被调用
	 */
	@Override
	public void setState(byte[] state) {
		synchronized(msgList){
			try {
				List<String> tmpList = (List<String>)Util.objectFromByteBuffer(state);
				msgList.clear();
				msgList.addAll(tmpList);
				System.err.println("===receive state:[");
				for(String msg : msgList){
					System.out.println(msg);
				}
				System.out.println("]");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	/**
	 * 当有消息进来的时候被调用
	 */
	@Override
	public void receive(Message msg) {
		System.out.println(">>>new message receive from "+msg.getSrc()+":"+msg.getObject());
		msgList.add(msg.getSrc()+" send : "+ msg.getObject());
	}

	/**
	 * 当组成员发生变化的时候被调用
	 */
	@Override
	public void viewAccepted(View new_view) {
		System.out.println("***new view receiver:"+new_view);
	}
	
	public void start() throws ChannelException{
		//打开channel并指定配置文件
		channel = new JChannel("udp.xml");
		//指定组名,就可以创建或连接到广播组
		channel.connect("ChatGroup");
		//注册回调接口,使用"推"模式来接受广播信息
		channel.setReceiver(this);
		//查看当前组成员
		System.out.println("---current view:"+channel.getView());
		//状态同步,第一个参数为null则向协调者节点获取信息
		channel.getState(null, 1000);
	}
	
	public void close(){
		channel.close();
	}
	
	public void loopSendMessage(){
		BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
		try {
			while(true){
				System.out.println("please input a string");
				String line = br.readLine();
				System.out.println("you put:"+line);
				if(line.equals("exit")){
					break;
				}else{
					String[] array = line.split(",");
					
					Address des = null;  	//接收方地址,为null代表发送广播消息
					Address src = null;		//发送方地址,为null代表自己的地址
					String msg = line;		//发送内容
					
					if(array.length == 3){
						des = new IpAddress(Integer.parseInt(array[0]));
						src = new IpAddress(Integer.parseInt(array[1]));
						msg = array[2];
					}else if(array.length == 2){
						des = new IpAddress(Integer.parseInt(array[0]));
						msg = array[1];
					}
					
					Message message = new Message(des, src, msg);
					//发送消息
					channel.send(message);
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ChannelNotConnectedException e) {
			e.printStackTrace();
		} catch (ChannelClosedException e) {
			e.printStackTrace();
		}
	}
	
	
	public static void main(String[] args) {
		GroupChat chat = new GroupChat();
		try {
			chat.start();
			chat.loopSendMessage();
			chat.close();
		} catch (ChannelException e) {
			e.printStackTrace();
		}
		
	}

}

  可以看到JGroups提供的API屏蔽了底层的通信机制,对于开发人员来说是完全透明的,要关注的只是消息的接受处理就可以了。
  对应的协议栈配置:
<config>
    <UDP
         mcast_group_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
         mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20000000"
         ucast_send_buf_size="640000"
         mcast_recv_buf_size="25000000"
         mcast_send_buf_size="640000"
         loopback="false"
         discard_incompatible_packets="true"
         max_bundle_size="64000"
         max_bundle_timeout="30"
         use_incoming_packet_handler="true"
         ip_ttl="${jgroups.udp.ip_ttl:2}"
         enable_bundling="true"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         use_concurrent_stack="true"

         thread_pool.enabled="true"
         thread_pool.min_threads="1"
         thread_pool.max_threads="25"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="false"
         thread_pool.queue_max_size="100"
         thread_pool.rejection_policy="Run"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000"
            num_initial_members="3"/>
    <MERGE2 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD timeout="10000" max_tries="5"   shun="true"/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK max_xmit_size="60000"
                   use_mcast_xmit="false" gc_lag="0"
                   retransmit_timeout="300,600,1200,2400,4800"
                   discard_delivered_msgs="true"/>
    <UNICAST timeout="300,600,1200,2400,3600"/>
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="400000"/>
    <VIEW_SYNC avg_send_interval="60000"   />
    <pbcast.GMS print_local_addr="true" join_timeout="3000"
                join_retry_timeout="2000" shun="false"
                view_bundling="true"/>
    <FC max_credits="20000000"
                    min_threshold="0.10"/>
    <FRAG2 frag_size="60000"  />
    <!--pbcast.STREAMING_STATE_TRANSFER /-->
    <pbcast.STATE_TRANSFER  />
    <!-- pbcast.FLUSH  /-->
</config>
1
0
分享到:
评论

相关推荐

    JavaEE源代码 jgroups-2.2.8

    JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 ...

    jgroups-2.2.7.jar

    jgroups-2.2.7.jar jgroups-2.2.7.jar

    JGroups的Raft实现jgroups-raft.zip

    jgroups-raft 项目是 JGroups 框架对 Raft 的实现。Maven:&lt;groupId&gt;org.jgroups &lt;artifactId&gt;jgroups-raft &lt;version&gt;0.2&lt;/version&gt;Raft 是一个容易理解的共识算法。在容错和性能方面它相当于 Paxos(Google 的一致...

    jgroups.part1

    jgroups.part1

    jgroups-3.0.2

    JGroups使用灵活的协议栈,这也是JGroups最强大(the most powerful)的功能,它允许开发人员配置协议栈来适用于他们自己的应用需求和网络特征。这样做的好处在于,开发人员只需要关注他们使用到的协议。通过组合和...

    JGroups_集群.pdf

    JGroups_集群.pdf

    jgroups-3.2

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集(需要说明的是,这并不是说必须要使用IP Multicast,JGroups也可以使用TCP来实现)。其工作模式基于IP多播,但可以在可靠性和群组...

    Ehcache通过Jgroups做集群

    Ehcache通过使用Jgroups做集群配置,更改每一个不同的jgroups.xml文件的端口号和IP,如果一台机器就使用127.0.0.1即可。配置好之后,把每台机器起来,就可以测试了。

    jgroups.part3

    jgroups.part3

    jgroups源代码

    jgroups源代码,想要学习jgroups开源框架的童鞋可以看看

    Jgroups 教程

    JGROUPs 的重要用法全部都在里面了

    Jgroups中的UNICAST3协议中文翻译

    Jgroups是一款组播工具,基于IP多播的可靠的组播中间件

    JGroups(Java多播通讯框架) v4.0.0.CR1.zip

    JGroups(Java多播通讯框架)简介 JGroups是一个可靠的群组通讯Java工具包。它基于IP组播(IP multicast),但在可靠性,组成员管理上对它作了扩展。 JGroups的可靠性体现在: 1,对所有接收者的消息的无丢失传输...

    基于JGroups的共享电子白板系统的研究与实现

    基于JGroups的共享电子白板系统的研究与实现

    jgroups-2.6.8.GA.jar

    jgroups-2.6.8.GA.jar jgroups-2.6.8.GA.jar

    Android代码-jgroups-android

    JGroups - A Framework for Group Communication in Java ======================================================== March 3, 1998 Bela Ban 4114 Upson Hall Cornell University Ithaca, NY 14853 bba@...

    Java多播通讯框架 JGroups

    Java多播通讯框架 JGroups

    JGroups-2.6.2.bin.zip

    介绍部分摘自XMPP Jabber即时通讯开发实践 ...JGroups 简介 JGroups是一个基于Java语言的提供可靠多播(组播)的开发工具包...在IP Multicast基础上提供可靠服务,也可以构建在TCP或者WAN上。主要是由Bela Ban开发,...

    jgroups的jar

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集

    jgroups

    JGroups 2.5 tutorial 相当不错

Global site tag (gtag.js) - Google Analytics