一、Consumer 批量消费
可以通过
- consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
这里需要分为2种情况1、Consumer端先启动 2、Consumer端后启动. 正常情况下:应该是Consumer需要先启动
1、Consumer端先启动
Consumer代码如下
- package quickstart;
- import java.util.List;
- import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import com.alibaba.rocketmq.client.exception.MQClientException;
- import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
- import com.alibaba.rocketmq.common.message.MessageExt;
- /**
- * Consumer,订阅消息
- */
- public class Consumer {
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
- consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
- consumer.setConsumeMessageBatchMaxSize(10);
- /**
- * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
- * 如果非第一次启动,那么按照上次消费的位置继续消费
- */
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.subscribe("TopicTest", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- try {
- System.out.println("msgs的长度" + msgs.size());
- System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.println("Consumer Started.");
- }
- }
由于这里是Consumer先启动,所以他回去轮询MQ上是否有订阅队列的消息,由于每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1):
2、Consumer端后启动,也就是Producer先启动
由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer的
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
所以这段代码就生效了测试结果如下(每次size最多是10):
二、消息重试机制:消息重试分为2种1、Producer端重试 2、Consumer端重试
1、Producer端重试
也就是Producer往MQ上发消息没有发送成功,我们可以设置发送失败重试的次数
- package quickstart;
- import com.alibaba.rocketmq.client.exception.MQClientException;
- import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
- import com.alibaba.rocketmq.client.producer.SendResult;
- import com.alibaba.rocketmq.common.message.Message;
- /**
- * Producer,发送消息
- *
- */
- public class Producer {
- public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
- producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
- producer.setRetryTimesWhenSendFailed(10);//失败的 情况发送10次
- producer.start();
- for (int i = 0; i < 1000; i++) {
- try {
- Message msg = new Message("TopicTest",// topic
- "TagA",// tag
- ("Hello RocketMQ " + i).getBytes()// body
- );
- SendResult sendResult = producer.send(msg);
- System.out.println(sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000);
- }
- }
- producer.shutdown();
- }
- }
2、Consumer端重试
2.1、exception的情况,一般重复16次 10s、30s、1分钟、2分钟、3分钟等等
上面的代码中消费异常的情况返回
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
正常则返回:
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
- package quickstart;
- import java.util.List;
- import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import com.alibaba.rocketmq.client.exception.MQClientException;
- import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
- import com.alibaba.rocketmq.common.message.MessageExt;
- /**
- * Consumer,订阅消息
- */
- public class Consumer {
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
- consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
- consumer.setConsumeMessageBatchMaxSize(10);
- /**
- * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
- * 如果非第一次启动,那么按照上次消费的位置继续消费
- */
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.subscribe("TopicTest", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- try {
- // System.out.println("msgs的长度" + msgs.size());
- System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
- for (MessageExt msg : msgs) {
- String msgbody = new String(msg.getBody(), "utf-8");
- if (msgbody.equals("Hello RocketMQ 4")) {
- System.out.println("======错误=======");
- int a = 1 / 0;
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- if(msgs.get(0).getReconsumeTimes()==3){
- //记录日志
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
- }else{
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
- }
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
- }
- });
- consumer.start();
- System.out.println("Consumer Started.");
- }
- }
打印结果:
假如超过了多少次之后我们可以让他不再重试记录 日志。
if(msgs.get(0).getReconsumeTimes()==3){
//记录日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
2.2超时的情况,这种情况MQ会无限制的发送给消费端。
就是由于网络的情况,MQ发送数据之后,Consumer端并没有收到导致超时。也就是消费端没有给我返回return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;这样的就认为没有到达Consumer端。
这里模拟Producer只发送一条数据。consumer端暂停1分钟并且不发送接收状态给MQ
- package model;
- import java.util.List;
- import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import com.alibaba.rocketmq.client.exception.MQClientException;
- import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
- import com.alibaba.rocketmq.common.message.MessageExt;
- /**
- * Consumer,订阅消息
- */
- public class Consumer {
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
- consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
- consumer.setConsumeMessageBatchMaxSize(10);
- /**
- * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
- * 如果非第一次启动,那么按照上次消费的位置继续消费
- */
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.subscribe("TopicTest", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- try {
- // 表示业务处理时间
- System.out.println("=========开始暂停===============");
- Thread.sleep(60000);
- for (MessageExt msg : msgs) {
- System.out.println(" Receive New Messages: " + msg);
- }
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
- }
- });
- consumer.start();
- System.out.println("Consumer Started.");
- }
- }
三、消费模式
广播消费:rocketMQ默认是集群消费,我们可以通过在Consumer来支持广播消费
consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
- package model;
- import java.util.List;
- import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import com.alibaba.rocketmq.client.exception.MQClientException;
- import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
- import com.alibaba.rocketmq.common.message.MessageExt;
- import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
- /**
- * Consumer,订阅消息
- */
- public class Consumer2 {
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
- consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
- consumer.setConsumeMessageBatchMaxSize(10);
- consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.subscribe("TopicTest", "*");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- try {
- for (MessageExt msg : msgs) {
- System.out.println(" Receive New Messages: " + msg);
- }
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
- }
- });
- consumer.start();
- System.out.println("Consumer Started.");
- }
- }
如果我们有2台节点,Producerw往MQ上写入20条数据 其中Consumer1中拉取了12条 。Consumer2中拉取了8 条,这种情况下,加入Consumer1宕机,那么我们消费数据的时候,只能消费到Consumer2中的8条,Consumer1中的12条已经持久化到中。需要Consumer1回复之后这12条数据才能继续被消费。其实这种先启动producer往MQ上写数据,然后再启动Consumer的情况本来就是违规操作,正确的情况应该是先启动Consumer后再启动producer。
异步复制和同步双写主要是主和从的关系。消息需要实时消费的,就需要采用主从模式部署
异步复制:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就算从producer端发送成功了,然后通过异步复制的方法将数据复制到从节点
同步双写:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就并不算从producer端发送成功了,需要通过同步双写的方法将数据同步到从节点后, 才算数据发送成功。
四、刷盘方式
同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。
异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。
commitlog:
commitlog就是来存储所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。
consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据
http://blog.csdn.net/u010634288/article/details/56049305
相关推荐
RocketMQ消息丢失解决方案:同步刷盘+手动提交.docx
内含多种命令行参数(例如消费位点的调整,消息体大小调整,并发数调整,JVM 参数调优,延迟等级调整等)实现压测的精细化控制。毫不夸张的说,RocketMQ 覆盖的功能基本都能测到。 3 简单易用:我将每个压测场景都...
springboot Rabbit死信队列实现,rocketMq重试消息实现 基于springboot2.15版本,最新rabbit和rocktMq 中间件实例,亲测可用
伴随着互联网的飞速发展, 特别是在近几年中, 移动... 本文介绍了MQTT 协议与RocketMQ 的这种开源项目的应用, 并通过RocketMQ 与Mosquitto 相结合的方式, 实现了一种基于RocketMQ 的MQTT 消息推送服务器及其分布式部署.
RocketMQ生产消费者模型实现
rocketmq推消息模式分享,讲述了基本原理,消息与消息队列,长轮询,offset存储机制,消息异常重推机制的总体概述
《RocketMQ高级原理:深入剖析消息系统的核心机制》的关键内容,涵盖RocketMQ的基础组件、消息存储机制、刷盘方式、主从复制、负载均衡、消息重试、死信队列和幂等性等核心概念。RocketMQ作为一款高性能、高可用的...
3.1.2消费消息 3.2 顺序消息 3.3 ⼴播消息 3.4 延时消息 3.4.1 延时消息介绍 3.4.2 RocketMQ中的延迟消息 3.5 批量消息 3.6 过滤消息 3.6.1 TAG模式过滤 3.6.2 SQL表达式过滤 3.6.3 类过滤模式(基于4.2.0版本) 3.7...
springboot1.5.10.RELEASE集成rocketmq4.3.1消息服务demo,多个消费者多监听
具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。 存储选型 顺序消息 局部顺序:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照 。 延时消息 RabbitMQ不支持延迟消息,Active...
RocketMQ 开发指南,包含基本概念以及生产者和消费者的使用等
015-015_RocketMQ_Consumer_重试策略详解 016-016_RocketMQ_Consumer_幂等去重策略详解 017-017_RocketMQ_消息模式及使用讲解 018-018_RocketMQ_双主双从集群环境搭建与使用详解 019-019_RocketMQ_FilterServer机制...
3.1.2消费消息 3.2 顺序消息 3.3 ⼴播消息 3.4 延时消息 3.4.1 延时消息介绍 3.4.2 RocketMQ中的延迟消息 3.5 批量消息 3.6 过滤消息 3.6.1 TAG模式过滤 3.6.2 SQL表达式过滤 3.6.3 类过滤模式(基于4.2.0版本) 3.7...
首先,消息丢失问题可以通过多个策略来解决,包括生产者使用事务消息机制,配置RocketMQ同步刷盘和Dledger主从架构,以及避免在消费端使用异步消费机制。这些策略确保了在不同环节中消息的完整性和不丢失。其次,...
rocketmq测试用例
RocketMQ消息丢失解决方案:事务消息.docx
rocketmq事务消息, 使用场景, 最终一致性(而不是强一致性)
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
shell脚本:自动部署rocketmq阿里巴巴消息队列阿帕奇消息队列shell脚本:自动部署rocketmq阿里巴巴消息队列阿帕奇消息队列