原创文章,转载请注明出处:http://jameswxx.iteye.com/blog/2096446
这里以消费者为例说明。一组消费者要消费某个topic,得先知道该topic分布在哪些broker上,某个broker上的topic分布可能会变化,一旦变化,生产者和消费者应该都能被通知到。通知模式有推和拉两种,客户端都是采取拉的模式,所以broker如有变化,通知都是有延迟的。
一 什么时候启动topic路由获取任务
两个地方:
1 首先是DefaultMQPushConsumerImpl启动时,见DefaultMQPushConsumerImpl的start方法里的this .updateTopicSubscribeInfoWhenSubscriptionChanged();
2 另外DefaultMQPushConsumerImpl的start方法也启动了MQClientInstance,MQClientInstance的start方法里调用了startScheduledTask()方法,该方法启动了获取路由的定时任务。
// 定时从Name Server获取Topic路由信息
this.scheduledExecutorService .scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this .updateTopicRouteInfoFromNameServer();
}
catch (Exception e) {
log.error( "ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig .getPollNameServerInteval(), TimeUnit.MILLISECONDS );
|
二 每隔多久获取一次
很简单,看定时任务每隔多久执行一次就知道了,这里的间隔参数是this.clientConfig .getPollNameServerInteval()。
ClientConfig的pollNameServerInteval 定义如下:
private int pollNameServerInteval = 1000 * 30;
DefaultMQPushConsumer继承了ClientConfig,pollNameServerInteval 默认是30秒,显然,这个时间是可以自己定义的,通过DefaultMQPushConsumer的setPollNameServerInteval()方法。
三 获取路由过程
看MQClientInstance的updateTopicRouteInfoFromNameServer()方法,该方法最终会调用下面这个方法,需要注意,对于消费者而言,isDefault参数永远是false。
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv .tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS )) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
//此处省略不必要的信息,对于消费者,分支不会走到这里来,因为isDefault为false,且生产者肯定为空
}
else {
topicRouteData =
this.mQClientAPIImpl .getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
//此处省略无关语句
}
catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX )
&& !topic.equals(MixAll.DEFAULT_TOPIC )) {
log.warn("updateTopicRouteInfoFromNameServer Exception" , e);
}
}
finally {
this.lockNamesrv .unlock();
}
}
else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LockTimeoutMillis);
}
}
catch (InterruptedException e) {
log.warn( "updateTopicRouteInfoFromNameServer Exception", e);
}
return false ;
}
|
其实最终都是通过this .mQClientAPIImpl .getTopicRouteInfoFromNameServer(topic, 1000 * 3);得到的。
四 客户端与nameserver的连接关系
broker与所有nameserver都是长连接,如有变化,则向所有nameserver都发送消息。但是生产者和消费者只是跟某一台nameserver保持联系。设定一个场景,如果某个broker的topic配置发生了变化,它向所有nameserver发布通知,但是此时如果某一台nameserver推送失败(超时或者挂掉了),则nameserver集群之间的信息是不完整的,因为挂掉的那台nameserver没有得到最新变化。
由此衍生三个问题:
1 如果该nameserver不是挂掉,只是那一瞬间没有响应,那么待可正常服务时,刚才那个borker发生的变化应该能生效,不应该被丢弃,否则nameserver之间的数据是不同步的。
解决方案:broker是定时向所有nameserver发送自己的注册信息的,如果当时某台nameserver挂掉重启或者超时,没关系,下次仍然会接受到上次没接收到的broker信息
2 如果真的挂掉了,但是很快又恢复了,因为borker和nameserver保持的是长连接,显然挂掉重新启动后,broker与nameserver的长连接无效了,应该能自动重连
见getAndCreateChannel方法分析
3 只要某个nameserver不可用,消费者应该能failover,每次应该都检查长连接是否还有效,若无效则自动连接其他nameserver。
见getAndCreateNameserverChannel()方法分析
带着这个疑问,看看this .mQClientAPIImpl .getTopicRouteInfoFromNameServer(topic, 1000 * 3)方法。这个方法向nameserver发起调用,获取路由结果
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode. GET_ALL_TOPIC_LIST_FROM_NAMESERVER , null);
RemotingCommand response = this .remotingClient .invokeSync( null, request, timeoutMillis);
|
重点在于remotingClient .invokeSync方法,如下
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException {
//这里获取连接,该方法里面会做连接的检查和恢复
final Channel channel = this .getAndCreateChannel(addr);
//最后如果还是不是有效连接,则关闭连接,抛出异常
if (channel != null && channel.isActive()) {
try {
if (this .rpcHook != null) {
this .rpcHook .doBeforeRequest(addr, request);
}
RemotingCommand response = this .invokeSyncImpl(channel, request, timeoutMillis);
if (this .rpcHook != null) {
this .rpcHook .doAfterResponse(request, response);
}
return response;
}
catch (RemotingSendRequestException e) {
log .warn("invokeSync: send request exception, so close the channel[{}]", addr);
this .closeChannel(addr, channel);
throw e;
}
catch (RemotingTimeoutException e) {
log .warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
// 超时异常如果关闭连接可能会产生连锁反应
// this.closeChannel( addr, channel);
throw e;
}
}
else {
this .closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
|
这个方法大体分为两步,第一步获取连接,第二步通过连接发送请求,获取连接当然是getAndCreateChannel方法了,getAndCreateChannel方法非常重要,它包含了客户端对nameserver的failover,也包含了自动重连功能,对于客户端,传入的addr参数都是null,所以一直会走到getAndCreateNameserverChannel()方法。
private Channel getAndCreateChannel( final String addr) throws InterruptedException {
//无论是producer还是consumer,传进来的addr参数都是null
if (null == addr)
return getAndCreateNameserverChannel();
//因为客户端传入的addr是null,所以客户端不会走到这里来,只有broker才会走到这里来,因为broker传入的addr不为null
ChannelWrapper cw = this .channelTables .get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
//注意,如果和某个addr的连接不OK了,则再向该nameserver发起重连
return this .createChannel(addr);
}
|
createChannel方法很简单,无非就是创建连接嘛,就不细看了,分析下getAndCreateNameserverChannel(),以下是该方法大致过程:
因为客户端都是与某一台nameserver长连接,因此长连接一旦选定,后面不会变化,除非nameserver挂掉,所以已建立的长连接要保存起来。下面这段逻辑就是如此。
String addr = this .namesrvAddrChoosed .get();
if (addr != null) {
ChannelWrapper cw = this .channelTables .get(addr);
//注意这里,虽然长连接已经建立了,但是每次调用时,仍然要通过“cw != null && cw.isOK()”检查连接是否OK。
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
|
如果连接没有建立或连接已经断开,则继续往下,真正创建连接时需要加锁的
if ( this.lockNamesrvChannel .tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS ))
下面的代码都是在这个if块里面
这里又执行了一边上面的获取连接并检测的代码,可以连接,因为有时候连接只是偶尔不OK的
addr = this. namesrvAddrChoosed .get();
if (addr != null) {
ChannelWrapper cw = this .channelTables .get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
}
|
接着往下,这段代码非常重要
namesrvIndex指示了当前跟哪个nameserver发生连接,初始值是个随机数,跟nameserver数量取模,走到这一步,要么是首次发起调用,之前连接还未创建现在要创建了,或者是已创建的连接无效了要连接下一个nameserver,就是“cw.isOK()”为false。
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this .namesrvIndex .incrementAndGet();
index = Math. abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this .namesrvAddrChoosed.set(newAddr);
Channel channelNew = this .createChannel(newAddr);
if (channelNew != null)
return channelNew;
}
}
|
相关推荐
所以在实际产环境中,个Topic会设置成多分区的模式,来持多个消费者,参照下图:在互联企业的实际产环境中,Topic数量和分区都会较多,这就要求消息中间件在多T
1.更改本地topic配置缓存topicConfigTable 2.将缓存topicConfigTable配置信息写入磁盘 3.向NameServer上报变更信
主要介绍了Java RocketMQ 路由注册与删除的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
本文档旨在描述 本文档旨在描述 本文档旨在描述 本文档旨在描述 本文档旨在描述 本文档旨在描述 本文档旨在描述 RocketMQ RocketMQ RocketMQRocketMQRocketMQRocketMQ的多个关键特性实现 的多个关键特性实现 的多个...
下的测试对图,是来评测汗宝马和蒸汽机车谁快的组竞速曲线:图1 汗宝马和蒸汽车的速度稳定性对上图的横轴表测试时间,纵轴表车和马的速度,可以看到,马的加速和最速度均
activemq 虚拟topic配置,可以将一个 topic转发为多个队列和多个topic或者将一个队列转发为多个topic和多个队列
种元数据的管理(Topic 路由信息等) 2. 高效的 I/O 存储,RocketMQ 追求消息发送的高吞吐量,RocketMQ 的消息存储设计成文件组的概念,组内单个文件固定大小,引入了内存映射机 制,所有主题的消息存储基于顺序读写...
RocketMQ分布式消息队列
2、讲解NameServer的启动、注册Broker、客户端查询Topic的路由信息等功能; 3、讲解Broker的启动、注册、处理Producer发送消息、处理Consumer拉取消息、事务消息的处理等功能; 4、讲解Producer端的启动、发送普通...
SpringBoot(36) 整合 RocketMQ
rocketmq控制台 安装文档 http://blog.csdn.net/liu_zhaoming/article/details/77837881 [-] 安装环境 安装 编译克隆 配置jvm内存 配置启动mqnamesrv 配置启动broker 关闭服务器 配置windows控制台github 创建bat...
RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行。...
Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。 ...
2. 定时从namesrv 更新topic 路由信息, Producer 与broker 间的心跳 Producer 定时发送心跳将producer 信息(其实就是procduer 的group)定时发送到, brokerAddrTable 集合中列出的broker 上去 Producer 发送消息...
• 支持 Topic 与 Queue 两种模式 • 亿级消息堆积能力 • 比较友好的分布式特性 • 同时支持 Push 与 Pull 方式消费消息 • 历经多次天猫双十一海量消息考验 目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,对比...
RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行 ...
测评开始时,测评程序会启动10~20个Producer,每个Producer在一条线程中,然后每个Producer随机生产某个Topic或者附属于Queue的消息并发送到消息引擎; Topic: 消息主题。 Queue: 队列。抽象的概念,消息可以发送到...
Apache RocketMQ 是一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性,万亿级容量和灵活的可伸缩性。 它具有多种功能: 消息模式,包括发布/订阅,请求/答复和流式传输 财务级交易消息 基于内置容错...
rocketmq监控 查看rocketmq.namesrv对应下的broker、topic、consuemr/producer等
RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行。