`
Technoboy
  • 浏览: 153957 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

RocketMQ原理解析-Producer

阅读更多
1.启动
    producer通过配置的namesrv列表,启动时随机选择一个进行相连。首先引出,设置namesrv的几种方式,优先级依次由高到低:
  • 第一种:代码中指定namesrv地址
  •       producer.setNamesrvAddr(namesrvAddr);
          consumer.setNamesrvAddr(namesrvAddr);
  • 第二种:Java启动参数中指定:
  •       -Drocketmq.namesrv.addr=192.168.0.1:9876
  • 第三种:环境变量
  •       NAMESRV_ADDR
  • 第四种为http方式获取。
  •       如果启动前未配置namesrv地址,那么每2分钟从http://jmenv.tbsite.net:8080/rocketmq/nsaddr以http的方式获取namesrv地址。Namesrv寻址可通过hosts文件从定向或者通过设置系统属性进行更改。
    原地址为http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP。
    WS_DOMAIN_NAME对应启动参数rocketmq.namesrv.domain
    WS_DOMAIN_SUBGROUP对应启动参数rocketmq.namesrv.domain.subgroup。
        在3.2.4官方文档中,作者释迦也比较推荐通过http方式获取namesrv地址。好处就是客户端部署简单,并且namesrv可以热升级。

  producer启动后,定时每30s从namesrv更新topic的路由信息。设么意思?假如某个topic的队列从4个增加到8个,或者新增了broker且包含此topic,那么可以重新拿到topic的路由信息。Topic的路由信息有brokerName,queueId组成。定时30s清理下线broker及发送心跳和订阅关系。Producer的启动首先会和namesrv建立连接,然后拿到topic的路由信息后,当在发送消息时会和broker建立连接并将broker信息缓存本地。这里的清理下线broker指检查本地broker列表信息,如果此broker没有topic的路由信息,即从本地列表移除。

2.发送接口
public List<MessageQueue> fetchPublishMessageQueues(String topic)获取某个topic下的队列信息。
public SendResult send(Message msg)同步发送消息。
public void send(Message msg, SendCallback sendCallback)异步发送消息
public void sendOneway(Message msg)发送消息,无返回结果。
public SendResult send(Message msg, MessageQueue mq)同步发送消息到指定的队列
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)异步发送消息到指定队列。
public void sendOneway(Message msg, MessageQueue mq)发送消息到指定队列,无返回值
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)通过指定的队列选择器同步发送消息,arg参数会回传给队列选择器。
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)通过指定的队列选择器异步发送消息,arg参数会回传给队列选择器。
public long searchOffset(MessageQueue mq, long timestamp)根据时间获取某队列的offset
public long maxOffset(MessageQueue mq)获取队列的最大offset
public long minOffset(MessageQueue mq)获取队列的最小offset
public MessageExt viewMessage(String msgId)根据id获取消息信息
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)根据消息key获取消息。

    viewMessage和queryMessage有两点说明:
  • MQ会根据storehostaddress和offset来生成msgId,所以在集群下,可以通过msgId查询到消息。
  • MQ会根据topic和uniqKey以及topic和keys进行消息的索引构建,所以可以通过索引查询消息。

     
3.负载均衡
  默认,producer采用轮询的策略发送消息。Producer从namesrv更新到topic的路由信息后,根据queueId和brokerName组成发送列表。假如,名为test的topic有8个队列,0-7,那么和broker-a组成的发送列表为broker-a-0,broker-a1...broker-a7,然后依次轮训列表进行发送。

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }


4.发送的一些其他说明
 
  • 默认发送超时为3s。
  • 消息超过4k,即启用消息的压缩。
  • 发送失败,默认重发2次。
  • 消息最大限制为4M,即超过4M会提示发送失败。

5.注意
    在MQ内部,发送者是没有group的概念的。Group只是业务上的划分。Producer在启动时,会选择一个namesrv相连,通过topic关系找到broker,并和存有topic的所有master broker相连,也就是说,消息只会发到master的broker上去。
  • 大小: 27.2 KB
分享到:
评论

相关推荐

    rocketmq-console-ng-1.0.1.jar

    RocketMQ-Console是RocketMQ项目的扩展插件...java -jar rocketmq-console-ng-1.0.1.jar --server.port=9999 --rocketmq.config.namesrvAddr=192.168.207.128:9876;192.168.207.129:9876 (ip替换为RocketMQ集群节点ip)

    rocketmq-all-4.7.1-bin-release.zip

    RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息...

    rocketmq监控 rocketmq-console-SNAPSHOT-1.0

    rocketmq监控 查看rocketmq.namesrv对应下的broker、topic、consuemr/producer等

    rocketmq-client-nodejs:Apache RocketMQ Node.js客户端

    const { Producer , PushConsumer } = require ( "apache-rocketmq" ) ; 制片人 建设者 new Producer ( groupId [ , instanceName ] [ , options ] ) ; Producer的构造函数接收三个参数: groupId :生产

    rocketmq-flume:用于RocketMQ与Flume-ng之间的消息接收和投递

    HOME/lib目录中(具体包会在后面描述)SinkSink配置说明配置项必填默认值说明namesrvAddr必填nullName Server地址,遵循RocketMQ配置方式producerGroup可选DEFAULT_PRODUCERProducer分组topic必填nullTopic名称tags可...

    JAVA-ACE-架构师系列视频教程在线观看地址- RocketMQ(订单实战上下全集)

    11011_RocketMQ_Producer_顺序消费机制详解 12012_RocketMQ_Producer_事务消息机制详解 13013_RocketMQ_Consumer_Push和Pull模式及使用详解 14014_RocketMQ_Consumer_配置参数详解 15015_RocketMQ_Consumer_重试策略...

    spring-boot-activemq-producer

    spring-boot-activemq-producer 源码

    rocketmq教程两套

    011-011_RocketMQ_Producer_顺序消费机制详解 012-012_RocketMQ_Producer_事务消息机制详解 013-013_RocketMQ_Consumer_Push和Pull模式及使用详解 014-014_RocketMQ_Consumer_配置参数详解 015-015_RocketMQ_...

    RocketMq学习视频

    011-011_RocketMQ_Producer_顺序消费机制详解 012-012_RocketMQ_Producer_事务消息机制详解 013-013_RocketMQ_Consumer_Push和Pull模式及使用详解 014-014_RocketMQ_Consumer_配置参数详解 015-015_RocketMQ_...

    2017年最新JAVA-ACE-架构师系列视频课程- RocketMQ(上下集)

    11011_RocketMQ_Producer_顺序消费机制详解24:37 12012_RocketMQ_Producer_事务消息机制详解38:15 13013_RocketMQ_Consumer_Push和Pull模式及使用详解28:21 14014_RocketMQ_Consumer_配置参数详解06:45 15015_...

    rocketmq_for_csharp

    商用的rocketmq支持C#客户端,开源的不具备这个接口,由于项目需要开发了支持c#的接口,目前接口只支持consumer的订阅读取,producer的发送。 作者也是个小白级别,怕下载不实用可以先咨询作者,KOUKOU:1335075189

    pentaho-kafka-producer.zip

    kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。

    rocketmq-flume-master:flume收集日志发送到rocketmq

    rocketmq-flume Source&Sink 该项目用于与之间的消息接收和投递。 首先请确定您已经对和有了基本的了解 确保本地maven库中已经存在,或者下载RocketMQ源码自行编译 在rocketmq-flume项目根目录执行mvn clean install...

    RocketMQ实战与原理

    Producer: 生产者,负责生产消息并发送到消息引擎。测评开始时,测评程序会启动10~20个Producer,每个Producer在一条线程中,然后每个Producer随机生产某个Topic或者附属于Queue的消息并发送到消息引擎; Topic: ...

    rocketmq管理界面

    RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行 ...

    Laravel开发-producer

    Laravel开发-producer 基于规则的简单类解析

    common-rocketmq:rocketmq在实际项目开发中做了简单的封装

    |---producer----------RocketMQProducer(生产者接口) | | | |---RocketMQProducerImpl(生产者接口实现) |---consumer------RocketMQConsumer(消费者) | |---...

    spring-kafka-producer.xml

    spring-kafka-producer.xml

    前端开源库-tap-producer-macbre

    前端开源库-tap-producer-macbretap producer macbre,用于产生tap输出的模块

    amazon-kinesis-video-streams-producer-c

    Amazon Kinesis Video Streams C制作人Amazon Kinesis视频流| 安全的视频摄取以...建造下载要下载,请运行以下命令: git clone --recursive https://github.com/awslabs/amazon-kinesis-video-streams-producer-c.g

Global site tag (gtag.js) - Google Analytics