RocketMQ简介
笔者使用的是Apache RocketMQ,官网是http://rocketmq.apache.org/。RocketMQ是Alibaba开源的一个分布式消息队列,可以通过http://rocketmq.apache.org/dowloading/releases/下载当前最新的版本。下载后解压缩,然后通过bin/mqnamesrv
启动一个Name Server,它默认监听在9876端口。然后需要通过bin/mqbroker -n localhost:9876
启动一个Broker,并把它注册到刚刚启动的那个Name Server上,Broker默认监听在端口10911上。生产者和消费者都是跟Broker打交道,但是它们不会直接指定Broker的地址,而是通过Name Server来间接的获取Broker的地址。这样做的好处是可以动态的增加Broker,多个Broker之间可以组成一个集群。应用中使用RocketMQ时需要添加RocketMQ Client依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.0</version>
</dependency>
然后可以通过DefaultMQProducer进行消息的发送,每一个生产者必须指定一个Group,下面代码就指定了Group为group1。相同处理逻辑的生产者必须定义相同的Group。这个Group只对于发送事务消息的生产者有用。然后需要通过setNamesrvAddr()
指定Name Server的地址。在发送消息前必须调用其start()
。发送的消息是通过org.apache.rocketmq.common.message.Message
对象表示的。它必须要指定一个Topic,RocketMQ是通过抽象的Topic来管理一组队列的,这个Topic必须在Broker中进行创建。可以通过bin/mqadmin updateTopic -b localhost:10911 -t topic1
在本地刚刚启动的Broker上创建名为topic1的Topic。它默认拥有8个读队列,8个写队列。下面的代码指定了消息都将发送到名为topic1的Topic。通过其send()
进行消息发送,它是同步发送的,发送完后会返回一个SendResult。其SendStatus为SEND_OK时表示发送成功。下面的代码一共发送了10条消息到topic1,消息内容分别是hello0..hello9。
@Test
public void testSend() throws Exception {
//指定Producer的Group为group1
DefaultMQProducer producer = new DefaultMQProducer("group1");
//指定需要连接的Name Server
producer.setNamesrvAddr(nameServer);
//发送消息前必须调用start(),其内部会进行一些初始化工作。
producer.start();
for (int i = 0; i < 10; i++) {
//指定消息发送的Topic是topic1。
Message message = new Message("topic1", ("hello" + i).getBytes());
//同步发送,发送成功后才会返回
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功:" + sendResult);
} else {
System.out.println("消息发送失败:" + sendResult);
}
}
//使用完毕后需要把Producer关闭,以释放相应的资源
producer.shutdown();
}
消息的消费者可以通过DefaultMQPushConsumer进行消费。DefaultMQPushConsumer是进行推模式消费的,它也需要指定一个Group。默认情况下相同Group的消费者将对同一个队列中的消息进行集群消费,即同一条消息只会被一个Consumer实例进行消费。DefaultMQPushConsumer也需要通过setNamesrvAddr()
指定需要连接的Name Server。通过subscribe()
指定需要消费的Topic和对应的Tag。下面指定了需要消费的Topic是topic1,通过*
指定将消费所有的Tag。Tag是用来对消息进行分类标记的,需要在发送消息的时候指定。通过registerMessageListener()
注册消息监听器,当收到消息后会回调它。下面代码注册的是一个MessageListenerConcurrently类型的监听器。消息正常消费后需要返回CONSUME_SUCCESS,如果消费失败可以返回RECONSUME_LATER,这样可以先跳过这一条消息的消费,Broker会过一段时间再投递这一条消息。Consumer也是需要通过start()
进行启动。这样消费者就可以开始进行消息消费了,默认只有它启动之后发送的消息才能收到。
@Test
public void testConsumer() throws Exception {
//创建Consumer并指定消费者组。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
//指定需要连接的Name Server
consumer.setNamesrvAddr(nameServer);
//订阅topic1上的所有Tag。
consumer.subscribe("topic1", "*");
//注册一个消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
AtomicInteger counter = new AtomicInteger();
msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
//为了确保Junit线程不立即死掉。
TimeUnit.SECONDS.sleep(120);
}
(注:本文是基于Apache RocketMQ4.5.0所写)
相关推荐
所以在实际产环境中,个Topic会设置成多分区的模式,来持多个消费者,参照下图:在互联企业的实际产环境中,Topic数量和分区都会较多,这就要求消息中间件在多T
万亿级数据洪峰下的消息引擎——Apache RocketMQ--阿里.pdf
cd /opt/rocketmq-all-4.3.0-bin-release # nohup sh bin/mqnamesrv & #启动每个服务器的nameserver # tail -f nohup.out The Name Server boot success #输出此类信息,说明启动成功 启动broker 服务器Namserver1...
技术文档分享,免费获取请私信博主。
1 . IoT终端消息分析 2. 领域模型设计 3. RocketMQ融合架构——RocketMQ-MQTT
rocketmq可视化jar包.zip
基于spring-cloud-alibaba套件的微服务架构的商场停车场实战案例 关键技术点应用: ... 服务发现——nacos ...异步消息处理——rocketmq 分布式缓存——redis 客户端负载均衡——openfeign RPC调用——dubbo
Java全能学习面试手册——Java面试题库.zip 01 7道消息队列ActiveMQ面试题!.pdf 02 10道Java高级必备的Netty面试题!.pdf 03 10道Java面试必备的设计模式面试题!.pdf 04 10个Java经典的List面试题!.pdf 05 10个...
包含层的概念和应用。还包含了一些例子及源文件,希望对大家有帮助!!
helm部署应用到k8s集群(helm+k8s)——详细文档
1. 概述 1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 2. RocketMQ / MyCAT / Shard
1. 概述 1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 2. RocketMQ / MyCAT / Shard
消息中间件本质上就是一种很简单的数据结构——队列,但是一条队列肯定是当不成中间件的,你必须要考虑性能、容灾、可靠性等等因素。这也给我的写作提供了一些思路,我将从队列开始,给你演示一条队列是如何进化成一...
incubator-technology 简介 为了更好的促进团队的学习技术氛围,更好的同步记录更新学习成果,为新同学提供优质的学习资料,我们决定维护一份优质的技术学习资料。... |—— RocketMQ: 消息中间件 |
2020云原生微服务大会PPT汇总,共21份。 一、主论坛 Capability Oriented Architecture for cloud and edge 云原生时代的微服务架构 ...下一代高性能云原生消息队列-Apache RocketMQ) 在 Dubbo 生态下的微服务架构实践
数据库设计: 共6张表——item(商品信息表)、item_stock(商品库存表)、promo(秒杀商品表)、stock_log(订单流水状态表)、order_info(订单表)、user_info(用户信息表) 实现和优化细节 1、实现分布式Session 在秒杀抢购...
NoSlowQ(no-slow-query)——一个能够轻松发现新增SQL语句,并能自动做分析和通知的系统,由 和 组成。 (系统开发的原因和灵感可以查看 ) 系统架构图如下: 整体效果 1、登录页面 技术栈 后端技术栈 1.SpringBoot 2...
RocketMQ5.0,生于云、长于云的新一代&消息、事件、流&融合处理平台.pdf SenseMARS 火星混合现实平台及应用开发的实现与挑战.pdf TDSQL-C PostgreSQL 在云原生架构下的新活力.pdf Toward Software Performance ...
UMPAY——编码规范 日志规范 异常规范 网络 协议 TCP/IP HTTP hession file HTTPS 负载均衡 容器 JBOSS tomcat resin jetty 容灾 日志框架 开源框架 slf4j 框架实现 log4j logback commong ...