`

阿里开源消息中间件RocketMQ QuickStart

阅读更多

近期学习了阿里的分布式消息中间件RocketMQ,对它的进行了基本的使用,写一篇博客记录一下:

 

1. 资料获取

RocketMQ相关资料基本都在RocketMQ在github上的主页:

https://github.com/alibaba/RocketMQ

 

相关软件、客户端包括源码的下载可以到:

https://github.com/alibaba/RocketMQ/releases

目前最新:v3.2.6【alibaba-rocketmq-3.2.6.tar.gz】

 

用户开发手册需要按照要求回复后,手册会发送到邮箱:

https://github.com/alibaba/RocketMQ/issues/1

【该手册对RocketMQ进行了一些介绍,但是并不是特别详细】

 

提供一个快速入门,可以到博客查看:

http://blog.csdn.net/a19881029/article/details/34446629

 

2. 部署RocketMQ

RocketMQ需要部署Name Server服务器和broker服务器,而broker服务器由有多种部署方式【master-slave】,启动都需要JDK以及JAVA_HOME环境变量,由于实验室机器有限,有两台机器:

172.13.206.165 部署Name Server和一个master broker

172.13.206.38   部署一个slave broker

 

 (1) 首先部署Name Server【个人感觉类似JNDI,主要管理broker的注册信息】

 

A. 拷贝一份alibaba-rocketmq-3.2.6.tar.gz到机器172.13.206.16上并解压

B. cd 到bin目录,可以打开README.md开一下,里面简单介绍了Name Server和broker的启动命令

C. Name Server 的启动命令如下:

 

 

nohup sh mqnamesrv &
 

 

D. 启动前可以先看一下mqnamesrv的脚本,发现它实际是执行runserver.sh脚本让它去执行com.alibaba.rocketmq.namesrv.NamesrvStartup的main函数来启动Name Server,在runserver.sh脚本中可以看到JVM的启动参数配置:

 

 

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
 

 

由于我的机器内存只有2g,所以需要修改一下JVM的启动参数【用户根据机器情况配置自己的启动参数】,我的修改:

 

 

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=256m"
 

 

E. Name Server 的默认监听端口号为9876,所以我的Name Server地址为:172.13.206.165:9876 【Name Server是无状态的,可以很便利的进行水平扩展】

 

F. Name Server的关闭命令

 

 

sh mqshutdown namesrv
 

 

(2) 部署broker【消息中转角色,负责存储消息,转发消息】

broker集群有多种配置的策略,根据用户手册,大致有四种部署策略:

(1)单个Master

这种配置简单,但是风险比较大,一旦broker宕机会导致整个服务不可用。【实际中不会用这种方式】

(2)多Master

集群中无slave,有多个是master,单个broker宕机不会对应用有影响,性能最高;但是单台机器宕机期间,这台机器上未消费的消息在机器恢复之前不可订阅,消息实时性受影响。

(3)多Master多Slave+异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级;性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预;Master宕机或磁盘损坏时会有少量消息丢失。

(4)多Master多Slave+同步双写

每个Master配一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功;服务可用性与数据可用性非常高;性能比异步HA略低。

在RocketMQ_HOME/conf目录下提供了四种配置的broker启动配置的示例文件,学习时可以按照这些配置

 

启动broker:

A. 生成配置文件

如果想对broker的启动进行更详细的掌控,可以使用以下命令生成配置文件模板:

 

 

sh mqbroker -m > broker.p
 

 

生成的broker.p文件如下,可以对配置进行修改:

 

namesrvAddr=
brokerIP1=172.13.206.165
brokerName=issme-System-Product-Name
brokerClusterName=DefaultCluster
brokerId=0
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
rejectTransactionMessage=false
fetchNamesrvAddrByAddressServer=false
storePathRootDir=/home/issme/store
storePathCommitLog=/home/issme/store/commitlog
flushIntervalCommitLog=1000
flushCommitLogTimed=false
deleteWhen=04
fileReservedTime=72
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
accessMessageInMemoryMaxRatio=40
messageIndexEnable=true
messageIndexSafe=false
haMasterAddress=
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
cleanFileForciblyEnable=true
 

 

B. 启动broker【启动一个Master和一个Slave,HA采用异步复制】

这里为了简化,采用conf中给出的配置文件对broker进行启动,broker的启动命令:

 

 

nohup sh mqbroker -n "172.13.206.165:9876" -c ../conf/2m-2s-async/broker-a.properties &

nohup sh mqbroker -n "172.13.206.165:9876" -c ../conf/2m-2s-async/broker-a-s.properties & 
 

 

这里启动了一个master和一个slave,它们是通过brokerName来进行配对,master的brokerId必须为0,而slave的brokerId不为0【一个master可以配置多个slave】; -n 指定Name Server地址,-c 指定配置文件的地址,这样broker启动完毕,默认端口号为10911:

master地址:172.13.206.165:10911

slave地址: 172.13.206.38:10911

 

C. 类似的我们可以看一下mqbroker的启动脚本,发现它实际是执行runbroker.sh脚本让它去执行com.alibaba.rocketmq.broker.BrokerStartup的main函数来启动broker,在runbroker.sh脚本中同样可以修改JVM的启动参数

 

D. broker关闭命令

 

sh mqshutdown broker

 

3. 客户端程序编写

这里编写了三个简单的客户端代码【具体根据业务调整】

引用RocketMQ客户端jar文件:

 

<dependency>
		<groupId>com.alibaba.rocketmq</groupId>
		<artifactId>rocketmq-client</artifactId>
		<version>3.2.6</version>
</dependency>

 

(1) producer

public class Producer {

	public static void main(String[] args) {
		DefaultMQProducer producer = new DefaultMQProducer("Producer");
		producer.setNamesrvAddr("172.13.206.165:9876");

		try {
			
			producer.start();
			
			
			for(int i=0; i<20; i++) {
				
				Message msg = new Message("TestTopicA", "Push", "test1",
						"Test Msg 1".getBytes(Charset.forName("utf-8")));
	
				SendResult result = producer.send(msg);
				System.out.println("id:" + result.getMsgId() +  
	                    " result:" + result.getSendStatus());
				
				msg = new Message("TestTopicA", "Pull", "test2",
						"Test Msg 2".getBytes(Charset.forName("utf-8")));
				
				
				
				result = producer.send(msg);
				
				System.out.println("id:" + result.getMsgId() +  
	                    " result:" + result.getSendStatus());
			}
		} catch (MQClientException e) {
			e.printStackTrace();
		} catch (RemotingException e) {
			e.printStackTrace();
		} catch (MQBrokerException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			producer.shutdown();
		}
	}
}

 

这里需要注意的是producer.shutdown(),它应用退出时,要调用来清理资源,关闭网络连接,从MetaQ服务器上注销自己,一般建议写在tomcat或jboss的退出钩子。

 

(2)PushConsumer

Push方式消费消息【根据RocketMQ开发手册,Push方式是以Pull长轮询的方式实现的】

 

public class PushConsumer {
	public static void main(String[] args) {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
		consumer.setNamesrvAddr("172.13.206.165:9876");
		
		try {
			consumer.subscribe("TestTopicA", "*");
			consumer.setMessageModel(MessageModel.BROADCASTING);
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {
					Message msg = msgs.get(0);
					System.out.println(msg.toString());
					
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
			consumer.start();
			
		} catch (MQClientException e) {
			e.printStackTrace();
		}
	}
}

 

(3)PullConsumer

以Pull方式消费消息:

 

public class PullConsumer {

	public static void main(String[] args) {
		DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(
				"PullConsumer");
		consumer.setNamesrvAddr("172.13.206.165:9876");

		try {
			consumer.start();
			
			Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TestTopicA");
			for(MessageQueue mq : mqs) {
				System.out.println("Consumer From the queue:" + mq);
				long offset = 0;
				PullResult result = consumer.pullBlockIfNotFound(mq, null, offset, 32);
				List<MessageExt> msgs = result.getMsgFoundList();
				if(msgs!=null && msgs.size() != 0) {
					for(MessageExt msg : msgs) {
						System.out.println(new String(msg.getBody(), Charset.forName("utf-8")));
					}
				}
				offset = result.getNextBeginOffset();
				System.out.println(result.getPullStatus());
			}
			

		} catch (MQClientException e) {
			e.printStackTrace();
		} catch (RemotingException e) {
			e.printStackTrace();
		} catch (MQBrokerException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics