`
assertmyself
  • 浏览: 28640 次
  • 性别: Icon_minigender_1
  • 来自: 南京
文章分类
社区版块
存档分类
最新评论

基于zbus消息队列的 生产者和消费者模型

阅读更多
zubs是一个消息队列;; ZBUS = MQ + RPC + PROXY  支持消息队列, 发布订阅, RPC, 代理(TCP/HTTP/DMZ)

消费者
ZConsumer.java
package com.gbcom.frame.zbus;

import java.io.IOException;

import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Consumer.ConsumerHandler;
import org.zbus.mq.server.MqServer;
import org.zbus.mq.server.MqServerConfig;
import org.zbus.net.http.Message;

/**
 * 消费者:嵌入zbus服务器,订阅消息处理器
 * @author SYZ
 * @date 2016-6-14 下午02:58:50
 * @version 1.0.0
 * @see com.gbcom.frame.zbus.ZConsumer
 */
public class ZConsumer {

	/**   : (ZConsumer.main)
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		try {
			start();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private static void start() throws Exception{
		
		//嵌入zbus消息服务器。
		MqServerConfig config = new MqServerConfig();
		config.serverPort = 15555;
		config.storePath = "./store";
		final MqServer server = new MqServer(config);
		server.start(); 
		
		Broker broker = new ZbusBroker("127.0.0.1:15555"); //SingleBroker
		Consumer consumer = new Consumer(broker, "MyMQ");  
		consumer.start(new ConsumerHandler() { 
		    @Override
		    public void handle(Message msg, Consumer consumer) throws IOException { 
		        //消息回调处理
		        System.out.println(msg);
		    }
		}); 
	}

}




生产者

ZProduct.java
package com.gbcom.frame.zbus;

import java.io.IOException;

import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;
/**
 * 需要开启 zbus服务器。。消息中间件都是这样。
 * 
 * 如果不开启zbus 需要嵌入到服务器中个,例如  zconsumer.java中 
 * 
 * @author SYZ
 * @date 2016-8-12 下午05:38:35
 * @version 1.0.0
 * @see com.gbcom.frame.zbus.ZProduct
 */
public class ZProduct {

	/**   : (ZClient.main)
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		try {
			start();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	private static void start() throws IOException, InterruptedException{
		Broker broker = new ZbusBroker("127.0.0.1:15555"); //SingleBroker
//		Broker broker = new ZbusBroker("127.0.0.1:16666;127.0.0.1:16667"); //HaBroker
//		Broker broker = new ZbusBroker("jvm"); //JvmBroker
		
		Producer producer = new Producer(broker, "MyMQ");
		producer.createMQ();//确定为创建消息队列需要显示调用



		for (int i = 0; i < 10; i++) {
			Message msg = new Message();
			msg.setBody("hello world-"+i);
			Message res = producer.sendSync(msg, 1000);
			System.out.println(res);
			}
		
		broker.close();
		
	}

}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics