近期学习了阿里的分布式消息中间件RocketMQ,对它的进行了基本的使用,写一篇博客记录一下:
1. 资料获取
RocketMQ相关资料基本都在RocketMQ在github上的主页:
https://github.com/alibaba/RocketMQ
相关软件、客户端包括源码的下载可以到:
https://github.com/alibaba/RocketMQ/releases
目前最新:v3.2.6【alibaba-rocketmq-3.2.6.tar.gz】
用户开发手册需要按照要求回复后,手册会发送到邮箱:
https://github.com/alibaba/RocketMQ/issues/1
【该手册对RocketMQ进行了一些介绍,但是并不是特别详细】
提供一个快速入门,可以到博客查看:
http://blog.csdn.net/a19881029/article/details/34446629
2. 部署RocketMQ
RocketMQ需要部署Name Server服务器和broker服务器,而broker服务器由有多种部署方式【master-slave】,启动都需要JDK以及JAVA_HOME环境变量,由于实验室机器有限,有两台机器:
172.13.206.165 部署Name Server和一个master broker
172.13.206.38 部署一个slave broker
(1) 首先部署Name Server【个人感觉类似JNDI,主要管理broker的注册信息】
A. 拷贝一份alibaba-rocketmq-3.2.6.tar.gz到机器172.13.206.16上并解压
B. cd 到bin目录,可以打开README.md开一下,里面简单介绍了Name Server和broker的启动命令
C. Name Server 的启动命令如下:
nohup sh mqnamesrv &
D. 启动前可以先看一下mqnamesrv的脚本,发现它实际是执行runserver.sh脚本让它去执行com.alibaba.rocketmq.namesrv.NamesrvStartup的main函数来启动Name Server,在runserver.sh脚本中可以看到JVM的启动参数配置:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
由于我的机器内存只有2g,所以需要修改一下JVM的启动参数【用户根据机器情况配置自己的启动参数】,我的修改:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=256m"
E. Name Server 的默认监听端口号为9876,所以我的Name Server地址为:172.13.206.165:9876 【Name Server是无状态的,可以很便利的进行水平扩展】
F. Name Server的关闭命令
sh mqshutdown namesrv
(2) 部署broker【消息中转角色,负责存储消息,转发消息】
broker集群有多种配置的策略,根据用户手册,大致有四种部署策略:
(1)单个Master
这种配置简单,但是风险比较大,一旦broker宕机会导致整个服务不可用。【实际中不会用这种方式】
(2)多Master
集群中无slave,有多个是master,单个broker宕机不会对应用有影响,性能最高;但是单台机器宕机期间,这台机器上未消费的消息在机器恢复之前不可订阅,消息实时性受影响。
(3)多Master多Slave+异步复制
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级;性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预;Master宕机或磁盘损坏时会有少量消息丢失。
(4)多Master多Slave+同步双写
每个Master配一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功;服务可用性与数据可用性非常高;性能比异步HA略低。
在RocketMQ_HOME/conf目录下提供了四种配置的broker启动配置的示例文件,学习时可以按照这些配置
启动broker:
A. 生成配置文件
如果想对broker的启动进行更详细的掌控,可以使用以下命令生成配置文件模板:
sh mqbroker -m > broker.p
生成的broker.p文件如下,可以对配置进行修改:
namesrvAddr= brokerIP1=172.13.206.165 brokerName=issme-System-Product-Name brokerClusterName=DefaultCluster brokerId=0 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true rejectTransactionMessage=false fetchNamesrvAddrByAddressServer=false storePathRootDir=/home/issme/store storePathCommitLog=/home/issme/store/commitlog flushIntervalCommitLog=1000 flushCommitLogTimed=false deleteWhen=04 fileReservedTime=72 maxTransferBytesOnMessageInMemory=262144 maxTransferCountOnMessageInMemory=32 maxTransferBytesOnMessageInDisk=65536 maxTransferCountOnMessageInDisk=8 accessMessageInMemoryMaxRatio=40 messageIndexEnable=true messageIndexSafe=false haMasterAddress= brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH cleanFileForciblyEnable=true
B. 启动broker【启动一个Master和一个Slave,HA采用异步复制】
这里为了简化,采用conf中给出的配置文件对broker进行启动,broker的启动命令:
nohup sh mqbroker -n "172.13.206.165:9876" -c ../conf/2m-2s-async/broker-a.properties & nohup sh mqbroker -n "172.13.206.165:9876" -c ../conf/2m-2s-async/broker-a-s.properties &
这里启动了一个master和一个slave,它们是通过brokerName来进行配对,master的brokerId必须为0,而slave的brokerId不为0【一个master可以配置多个slave】; -n 指定Name Server地址,-c 指定配置文件的地址,这样broker启动完毕,默认端口号为10911:
master地址:172.13.206.165:10911
slave地址: 172.13.206.38:10911
C. 类似的我们可以看一下mqbroker的启动脚本,发现它实际是执行runbroker.sh脚本让它去执行com.alibaba.rocketmq.broker.BrokerStartup的main函数来启动broker,在runbroker.sh脚本中同样可以修改JVM的启动参数
D. broker关闭命令
sh mqshutdown broker
3. 客户端程序编写
这里编写了三个简单的客户端代码【具体根据业务调整】
引用RocketMQ客户端jar文件:
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency>
(1) producer
public class Producer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("172.13.206.165:9876"); try { producer.start(); for(int i=0; i<20; i++) { Message msg = new Message("TestTopicA", "Push", "test1", "Test Msg 1".getBytes(Charset.forName("utf-8"))); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("TestTopicA", "Pull", "test2", "Test Msg 2".getBytes(Charset.forName("utf-8"))); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
这里需要注意的是producer.shutdown(),它应用退出时,要调用来清理资源,关闭网络连接,从MetaQ服务器上注销自己,一般建议写在tomcat或jboss的退出钩子。
(2)PushConsumer
Push方式消费消息【根据RocketMQ开发手册,Push方式是以Pull长轮询的方式实现的】
public class PushConsumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("172.13.206.165:9876"); try { consumer.subscribe("TestTopicA", "*"); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { Message msg = msgs.get(0); System.out.println(msg.toString()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } }
(3)PullConsumer
以Pull方式消费消息:
public class PullConsumer { public static void main(String[] args) { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer( "PullConsumer"); consumer.setNamesrvAddr("172.13.206.165:9876"); try { consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TestTopicA"); for(MessageQueue mq : mqs) { System.out.println("Consumer From the queue:" + mq); long offset = 0; PullResult result = consumer.pullBlockIfNotFound(mq, null, offset, 32); List<MessageExt> msgs = result.getMsgFoundList(); if(msgs!=null && msgs.size() != 0) { for(MessageExt msg : msgs) { System.out.println(new String(msg.getBody(), Charset.forName("utf-8"))); } } offset = result.getNextBeginOffset(); System.out.println(result.getPullStatus()); } } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
相关推荐
----------------------------rocketmq 消息队列 ---------------------------- brokerIP1=192.168.1.144 ... tools.cmd org.apache.rocketmq.example.quickstart.Consumer 启动生产者(先后输入):set NAMESRV_
Telerik.QuickStart 很有用
越来越发现,用鼠标将浪费我大量的时间,所以,最近我一直拼命的学习windows的快捷按键。现在很多操作都可以使用快捷按键完成。但是我发现我没法用很快的速度执行我选择的软件执行。...还有一个就是使用这个软件的设置...
robotframework-quickstart
quickstart_uagateway
maven-archetype-quickstart-1.1.jar包下载
Laravel开发-laravel-quickstart Laravel框架。
1.maven-archetype-quickstart-1.1.jar 用于搭建maven模块项目 2.打开cmd窗口,执行mvn install:install-file -DgroupId=org.apache.maven.archetypes -DartifactId=maven-archetype-quickstart -Dversion=1.1 -...
非常专业的QuickStart软件源码
JavaScript and AJAX_ Visual QuickStart Guide
解决Unable to create project from archetype [org.apache.maven.archetypes:maven-archetype-quickstart:1.1] 1. 下载maven-archetype-quickstart-1.1.jar 文件地址: 2.cmd窗口执行mvn install:install-file -...
.NET快速入门教程QuickStart 中文版
.NET快速入门教程QuickStart中文版 ASP.NET 快速入门 ASP.NET 是用于生成基于 Web 的应用程序的内容丰富的编程框架。它为开发人员和管理员提供出色的支持,提供改进的易用性、工具支持、可靠性、可缩放性、管理和...
orabpel QuickStart QuickStart
quickstart-ios, Firebase的快速入门示例 面向iOS的 Firebase快速入门快速入门示例的集合,演示iOS上的Firebase api 。 每个示例都包含 objective-c 和 Swift的目标。 有关更多信息,请参见 ...
Android开发的quickstart范例,对你学习android开发可能会有帮助
QNX Quickstart Guide,his guide will help you install and configure the QNX Software Development Platform, which includes the QNX Neutrino RTOS and the QNX Momentics Tool Suite, so you can start ...
【在本地库中装载maven-archetype-quickstart】 1. 下载该文件 2. 打开cmd窗口,执行mvn install:install-file -DgroupId=org.apache.maven.archetypes -DartifactId=maven-archetype-quickstart -Dversion=1.1 -...