阅读更多

5顶
2踩

开源软件

第一次真正接触Java消息服务是在2013年底,当时是给中国移动做统一支付平台,当时用的就是著名的Apache ActiveMQ,当时觉得很有趣,一个服务队列竟然可以玩出这么多花样来。当时为了尽快的入门,还把《Java Message Service》给看了一遍,这对于初学者的我收获颇多。我说知道的完全实现JMS规范的MOMActiveMQ/ApolloHornetQ,都是采用Java实现。JMS中说明了Java消息服务的两种消息传送模型,即P2P(点对点)和Pub/Sub(发布订阅),在约定了一些消息服务特性的同时,并提供了一套接口API,是否实现了该API,标志着MOM是否支持JMS规范,JMS规范中定义了消息服务诸多特性,这些特性和他所面对的企业级服务场景相关,当然,这也严重限制了消息服务的吞吐量,完全实现JMS规范的MOM,性能总不会太高,而且JMS规范中没有涉及消息服务的分布式特性,导致大多数实现JMS规范的MOM分布式部署功能比较弱,只适合集群部署。

 

说到高性能消息中间件,第一个想到的肯定是LinkedIn开源的Kafka,虽然最初Kafka是为日志传输而生,但也非常适合互联网公司消息服务的应用场景,他们不要求数据实时的强一致性(事务),更多是希望达到数据的最终一致性。RocketMQMetaQ3.0版本,而MetaQ最初的设计又参考了Kafka。最初的MetaQ 1.x版本由阿里的原作者庄晓丹开发,后面的MetaQ 2.x版本才进行了开源,这里需要注意一点的事,MetaQ 1.xMetaQ 2.x是依赖ZooKeeper的,但RocketMQ(即MetaQ 3.x)却去掉了ZooKeeper依赖,转而采用自己的NameServer

 

ZooKeeper是著名的分布式协作框架,提供了Master选举、分布式锁、数据的发布和订阅等诸多功能,为什么RocketMQ没有选择ZooKeeper,而是自己开发了NameServer,我们来具体看看NameServerRocketMQ集群中的作用就明了了。

 

RocketMQBroker有三种集群部署方式:1.单台Master部署;2.多台Master部署;3.MasterSlave部署;采用第3种部署方式时,MasterSlave可以采用同步复制和异步复制两种方式。下图是第3种部署方式的简单图:

 


 

 

 

图虽然是网上找的,但也足以说明问题,当采用多Master方式时,MasterMaster之间是不需要知道彼此的,这样的设计直接降低了Broker实现的复查性,你可以试想,如果MasterMaster之间需要知道彼此的存在,这会需要在Master之中维护一个网络的Master列表,而且必然设计到Master发现和活跃Master数量变更等诸多状态更新问题,所以最简单也最可靠的做法就是Master只做好自己的事情(比如和Slave进行数据同步)即可,这样,在分布式环境中,某台Master宕机或上线,不会对其他Master造成任何影响。

 

那么怎么才能知道网络中有多少台MasterSlave呢?你会很自然想到用ZooKeeper,每个活跃的MasterSlave都去约定的ZooKeeper节点下注册一个状态节点,但RocketMQ没有使用ZooKeeper,所以这件事就交给了NameServer来做了(看上图)。

 

结论一:NameServer用来保存活跃的broker列表,包括MasterSlave

当然,这个结论百度一查就知道,我们移步到rocketmq-namesrv模块中最重要的一个类:RouteInfoManager,它的主要属性如下:

 

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

 

每个属性通过名字就能清楚的知道是什么意思,之所以能用非线程安全的HashMap,是因为有读写锁lock来对HashMap的修改做保护。我们注意到保存brokerMap有两个,即brokerAddrTable用来保存所有的broker列表和brokerLiveTable用来保存当前活跃的broker列表,而BrokerData用来保存broker的主要新增,而BrokerLiveInfo只用来保存上次更新(心跳)时间,我们可以直接看看RouteInfoManager中扫描非活跃broker的方法:

 

// Broker Channel两分钟过期

private final static long BrokerChannelExpiredTime = 1000 * 60 * 2;

public void scanNotActiveBroker() {

    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();

    while (it.hasNext()) {

        Entry<String, BrokerLiveInfo> next = it.next();

        long last = next.getValue().getLastUpdateTimestamp();

        if ((last + BrokerChannelExpiredTime) < System.currentTimeMillis()) {

            RemotingUtil.closeChannel(next.getValue().getChannel());

            it.remove();

            log.warn("The broker channel expired, {} {}ms", next.getKey(), BrokerChannelExpiredTime);

            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());

        }

    }

}

 

可以看出,如果两分钟内都没收到一个broker的心跳数据,则直接将其从brokerLiveTable中移除,注意,这还会导致该brokerbrokerAddrTable被删除,当然,如果该brokerMaster,则它的所有Slavebroker都将被删除。具体细节可以参看RouteInfoManageronChannelDestroy方法。

 

结论二:NameServer用来保存所有topic和该topic所有队列的列表。

我们注意到,topicQueueTablevalueQueueDataList,我们看看QueueData中的属性:

 

private String brokerName;  // broker的名称

private int readQueueNums;  // 读队列数量

private int writeQueueNums; // 写队列数量

private int perm;           // 读写权限

private int topicSynFlag;   // 同步复制还是异步复制标记

 

所以,你几乎可以在NameServer这里知道topic相关的所有信息,包括topic有哪些队列,这些队列在那些broker上等。

 

结论三:NameServer用来保存所有brokerFilter列表。

关于这一点,讨论broker的时候再细说。

 

DefaultRequestProcessorNameServer的默认请求处理器,他处理了定义在rocketmq-common模块中RequestCode定义的部分请求,比如注册broker、注销broker、获取topic路由、删除topic、获取brokertopic权限、获取NameServer的所有topic等。

 

在源代码中,NettyServerConfig类记录NameServer中的一些默认参数,比如端口、服务端线程数等,列出如下:

private int listenPort = 8888;

private int serverWorkerThreads = 8;

private int serverCallbackExecutorThreads = 0;

private int serverSelectorThreads = 3;

private int serverOnewaySemaphoreValue = 256;

private int serverAsyncSemaphoreValue = 64;

private int serverChannelMaxIdleTimeSeconds = 120;

 

这些都可以通过启动时指定配置文件来进行覆盖修改,具体可以参考NameServer的启动类NamesrvStartup的实现(没想到Apache还有对命令行提供支持的commons-cls的包)。

 

 

现在我们再回过头来看看RocketMQ为什么不使用ZooKeeperZooKeeper可以提供Master选举功能,比如Kafka用来给每个分区选一个broker作为leader,但对于RocketMQ来说,topic的数据在每个Master上是对等的,没有哪个Master上有topic上的全部数据,所以这里选举leader没有意义;RockeqMQ集群中,需要有构件来处理一些通用数据,比如broker列表,broker刷新时间,虽然ZooKeeper也能存放数据,并有一致性保证,但处理数据之间的一些逻辑关系却比较麻烦,而且数据的逻辑解析操作得交给ZooKeeper客户端来做,如果有多种角色的客户端存在,自己解析多级数据确实是个麻烦事情;既然RocketMQ集群中没有用到ZooKeeper的一些重量级的功能,只是使用ZooKeeper的数据一致性和发布订阅的话,与其依赖重量级的ZooKeeper,还不如写个轻量级的NameServerNameServer也可以集群部署,只有一千多行代码的NameServer稳定性肯定高于ZooKeeper,占用的系统资源也可以忽略不计,何乐而不为?当然,这些只是本人的一点理解,具体原因当然得RocketMQ设计和开发者来说。

5
2
评论 共 0 条 请登录后发表评论

发表评论

您还没有登录,请您登录后再发表评论

相关推荐

  • 模拟退火算法

    模拟退火 首先看一下度娘的定义 模拟退火算法(Simulate Anneal,SA)是一种通用概率演算法,用来在一个大的搜寻空间内找寻命题的最优解 模拟退火是一种非常好用的随机化算法,它是爬山算法的改进版 爬山算法的思想就是一个劲的找最优解,如果接下来的任何状态都比当前状态差,那么就停止 但是这样显然是错误的,比如下面这种情况     爬山找到A点之后就GG了,但是模拟退火...

  • rocketmq 初探(一)

    这篇主要是简单介绍下 rocketmq以及idea 本地调试 rocketmq。 项目架构 感兴趣的可以下载源码看下。 https://github.com/apache/rocketmq 项目结构图。 rocketmq-acl: acl 秘钥方式的鉴权,用在broker端。 rocketmq...

  • RocketMQ初探

    常用的消息中间件 1.消息队列的介绍 消息队列本质上来说是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产...rocketmq是阿里巴巴团队使用Java语言开发的一款分布式消息中间件,是一款低延迟,高

  • RocketMQ(一):初探门径

    RocketMQ是阿里开源的消息中间件,目前也已经孵化为Apache顶级项目,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,它对消息的可靠传输及事务性做了优化,...

  • RocketMQ源码(2)—NameServer启动流程源码解析

    详细介绍了RocketMQ的NameServer启动流程源码解析,包括RocketMQ的RPC通信模型。

  • RocketMq初探

    具体请参考官方说明:http://rocketmq.apache.org/docs/quick-start/ 2.启动服务 2.1 启动Name Server nohup sh bin/mqnamesrv &amp; 2.2启动Broker nohup sh bin/mqbroker -n localhost:9876 &amp; 3.创建...

  • RocketMQ 初探

    RocketMQ 是阿里开源的消息中间件,前不久捐献给了 Apache 。正如官网介绍如下:它是一个开源的分布式消息传递和流式数据平台。 特点如下: 产品发展历史 大约经历了三个主要版本迭代 一、Metaq...

  • rocketmq 初探(三)

    基于 rocketmq 4.9 版本。 BrokerStartup#BrokerController 按照代码的先后顺序撸源码: BrokerController.createBrokerController public static BrokerController createBrokerController(String[] args) { // .....

  • rocketmq 初探(二)

    上一篇简单介绍和rocketmq,这一篇看下源码之注册中心。 namesrv 先看两个初始化方法 NamesrvController.initialize() 和 NettyRemotingServer.start(); public boolean initialize() { // 加载配置文件 this....

  • 《RocketMQ技术内幕:RocketMQ架构设计与实现原理》一导读 ...

    本书由RocketMQ社区早期的布道者和技术专家撰写,Apache RocketMQ创始人/Linux OpenMessaging创始人兼主席/Alibaba Messaging开源技术负责人冯嘉对其高度评价并作序推荐。 源码角度:本书对RocketMQ的核心技术架构...

  • RocketMQ 源码初探(一)

    RocketMQ 本章主题: 1、发送及接受消息模型...消息发送:消息业务集群,构成一个消息发送者组,一个消息发送者组可以发送一个主题,或者多个主题,根据业务需求决定。 消息发送的逻辑结构: Producer group Name  

  • RocketMQ(五):namesrv初探

    NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息从机服务器。 Producer: 消息生产者。 Consumer: 消息消费者。 说明: rocke.....

  • RocketMQ(六):namesrv再探

    NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息从机服务器。 Producer: 消息生产者。 Consumer: 消息消费者。 说明: rocke.....

  • RocketMQ(八):消息发送

    NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息从机服务器。 Producer: 消息生产者。 Consumer: 消息消费者。 说明: rocke.....

  • RocketMQ 源码初探(二)

    紧接上一篇 三、消息消费 1. DEMO public class RocketMQConsumerDemo { public static void main(String[] args) throws Exception{ //创建消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer...

  • 家乡信息2

    来源:http://www.stats-ly.gov.cn/tjjzfw/dwml/jaqt24.htm 单位名称 单位地址法人代表主要业务联系电话苍山县卞庄大坊村民委员会卞庄镇大坊李思文村民委员会5213780苍山县卞庄镇宋圩子村民委员会卞庄镇宋圩子左洪才村民委员会5265079

  • 我的藏书

    各位,承蒙厚爱,以下这些书都是自己整理书籍记录下来的。我并没有电子书,很抱歉。但我这里有个观点:书固然重要,真正学到自己脑袋里才是最重要的。 不要象我空有很多书。 我的藏书

  • 大仲村镇概况-我的家乡

    大仲村镇概况?? 大仲村镇位于县境北部,驻地距县城18公里。北靠矿抗乡,南与贾庄乡、小岭乡相连,西接流井乡,东和沂堂乡毗邻。境域东西最大横距23公里,南北最大纵距15公里,总面积8224平方公里。总人口52795人,其中回族162人,非农业人口910人,农业人口51885人,人口密度为每平方公里624人。镇驻地——大仲村,建村于明朝永乐年间,因聚落中有一大家,故名大

  • 中国高校ftp 汇总

    http://campus.learning.sohu.com/6/0304/00/blank219270033.shtml   清华大学 名称 地址 推荐指数 推荐类别 有无HTTP页面 ·清华大学 ftp.tsinghua.edu.cn   软件 有 ·科学馆 166.111.27.140     有 ·工物系 166.111.32.2

Global site tag (gtag.js) - Google Analytics