一、简介
Storm是开源的分布式容错实时计算系统,目前被托管在GitHub上,遵循 Eclipse Public License 1.0。最初由BackType开发,现在已被Twitter收入麾下。Storm最新版本是Storm 0.9,核心采用Clojure实现。Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息;Storm也可被用于“连续计算”(continuous computation),对数据流做连续处理,在计算时就将结果以流的形式输出给用户;它还可被用于“分布式RPC”,以并行的方式执行运算。
Storm主要特点如下:
0、简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了实时处理的复杂性。
1、语言无关。Storm的消息处理组件可以用任何语言来定义。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
2、容错性。如果在消息处理过程中出了一些异常,Storm会重新调度出问题的处理逻辑。Storm保证一个处理单元永远运行,除非显式杀掉。
3、可伸缩性。Storm的可伸缩性可以使其每秒处理的消息量达到很高。为了扩展一个实时计算任务,需要做的就是增加节点并且提高计算任务的并行度设置(parallelism setting)。Storm应用在10个节点的集群上每秒可以处理高达1000000个消息,包括每秒一百多次的数据库调用[5]。同时Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易扩展。
4、保证无数据丢失。实时系统必须保证所有的数据被成功的处理。 那些会丢失数据的系统的适用场景非常窄,而Storm保证每一条消息都会被处理。
5、适用场景广泛。消息流处理、持续计算、分布式方法调用等是Storm适用场景广泛的基础,Storm的这些基础原语可以满足大量的场景。
虽然Storm具备诸多优势,但也存在不足:
0、Storm目前还存在Nimbus SPOF的问题;
1、存在雪崩问题;
2、资源粒度较粗;
3、Clojure实现引入了学习成本;
为此,阿里巴巴中间件团队用Java重新实现了类Storm的JStorm,同样被托管在GitHub上,遵循 Eclipse Public License 1.0,目前版本0.9.3。相关资料显示,阿里巴巴内部已经大规模部署了Storm/JStorm集群。
JStorm继承了Storm的所有优点,同时与Storm相比JStorm所特有的如下特点:
0、兼容Storm接口。开发者在Storm上运行的程序无需任何修改即可运行在JStorm上。
1、Nimbus HA。解决了Storm的Nimbus单点问题,支持自动热备切换Nimbus。
2、更细粒度的资源划分。JStorm从CPU、MEMORY、DISK和NET四个维度进行任务调度,同时不存在任务抢占问题。
3、可定制的任务调度机制。(Storm的任务调度目前也可定制)
4、更好的性能。通过底层ZeroMQ和Netty使JStorm具有更好的性能,同时具有更好的稳定性。
5、解决了Storm的雪崩问题。通过Netty和disruptor机制实现RPC保证可以匹配的数据发送和接收速度避免雪崩问题。
此外,JStorm通过减少对zookeeper的访问量、增加反序列化线程、优化ACK、增加监控内容及JAVA本身优势等各个方面优化了Storm的性能和稳定性。总之,JStorm比Storm更强大、更稳定、性能更好。
(本文后面所述关于JStorm的部分内容同样适用Storm)
二、数据模型
JStorm通过一系列基本元素实现实时计算的目标,其中包括了Topology、Stream、Spout、Bolt等等。JStorm在模型上和MapReduce有很多相似的地方,下表从不同维度对JStorm和MapReduce进行了比较。
实时计算任务需要打包成Topology提交,和MapReduce Job相似,不同的是,MapReduce Job在计算完成后结束,而JStorm的Topology任务一旦提交永远不会结束,除非显式停止。
计算任务Topology是由不同的Spout和Bolt通过Stream连接起来的DAG图。下面是一个典型Topology的结构示意图:
其中:
Spout:JStorm的消息源。用于生产消息,一般是从外部数据源(如MQ/RDBMS/NoSQL/RTLog等)不间断读取数据并向下游发送消息。
Bolt:JStorm的消息处理者。用于为Topology进行消息处理,Bolt可以执行查询、过滤、聚合及各种复杂运算操作,Bolt的消息处理结果可以作为下游Bolt的输入不断迭代。
Stream:JStorm中对数据进行的抽象,它是时间上无界的Tuple元组序列。在Topology中Spout是Stream的源头,负责从特定数据源发射Stream;Bolt可以接收任意多个Stream输入然后进行数据的加工处理,如果需要Bolt还可以发射出新Stream给下游Bolt。
Tuple:JStorm使用Tuple作为数据模型,存在于任意两个有数据交互的组件(Spout/Bolt)之间。每个Tuple是一组具有各自名称的值,值可以是任何类型,JStorm支持所有的基本类型、字符串以及字节数组,也可以使用自定义类型(需实现对应序列化器)作为值类型。简单来说,Tuple就是一组实现了序列化器带有名称的Java对象集合。
从整个Topology上看,Spout/Bolt可以看作DAG的节点,Stream是连接不同节点之间的有向边,Tuple则是流过Stream的数据集合。
下面是一个Topology内部Spout和Bolt之间的数据流关系:
Topology中每一个计算组件(Spout和Bolt)都有一个并行度,在创建Topology时指定(默认为1),JStorm在集群内分配对应个数的线程Task并行。
如上图示,既然对于Spout/Bolt都会有多个线程来并行执行,那么如何在两个组件(Spout和Bolt)之间发送Tuple会成为新的问题。
JStorm通过定义Topology时为每个Bolt指定输入Stream以及指定提供的若干种数据流分发(Stream Grouping)策略用来解决这一问题。
JStorm提供了以下几种Stream Grouping策略:
0) Shuffle Grouping:随机分组,随机派发Stream里面的Tuple,保证每个Bolt接收到的Tuple数目大致相同,通过轮询随机的方式使得下游Bolt之间接收到的Tuple数目差值不超过1。
1) Fields Grouping:按字段分组,具有同样字段值的Tuple会被分到相同Bolt里的Task,不同字段值则会被分配到不同Task。
2) All Grouping:广播分组,每一个Tuple,所有的Bolt都会收到。
3) Global Grouping:全局分组,Tuple被分配到Bolt中ID值最低的的一个Task。
4) Non Grouping:不分组,Tuple会按照完全随机的方式分发到下游Bolt。
5) Direct Grouping:直接分组,Tuple需要指定由Bolt的哪个Task接收。 只有被声明为Direct Stream的消息流可以声明这种分组方法。
6) Local or Shuffle Grouping:基本同Shuffle Grouping。
7) Custom Grouping:用户自定义分组策略,CustomStreamGrouping是自定义分组策略时用户需要实现的接口。
三、系统架构
JStorm与Hadoop相似,保持了Master/Slave的简洁优雅架构。与Hadoop不同,JStorm的M/S之间不是直接通过RPC交换心跳信息,而是借助ZK来实现,这样的设计虽然引入了第三方依赖,但是简化了Nimbus/Supervisor的设计,同时也极大提高了系统的容错能力。
整个JStorm系统中共存三类不同的Daemon进程,分别是Nimbus,Supervisor和Worker。
Nimbus:JStorm中的主控节点,Nimbus类似于MR的JT,负责接收和验证客户端提交的Topology,分配任务,向ZK写入任务相关的元信息,此外,Nimbus还负责通过ZK来监控节点和任务健康情况,当有Supervisor节点变化或者Worker进程出现问题时及时进行任务重新分配。Nimbus分配任务的结果不是直接下发给Supervisor,也是通过ZK维护分配数据进行过渡。特别地,JStorm 0.9.0领先Apache Storm实现了Nimbus HA,由于Nimbus是Stateless节点,所有的状态信息都交由ZK托管,所以HA相对比较简单,热备Nimbus subscribe ZK关于Master活跃状态数据,一旦发现Master出现问题即从ZK里恢复数据后可以立即接管。
Supervisor:JStorm中的工作节点,Supervisor类似于MR的TT,subscribe ZK分配到该节点的任务数据,根据Nimbus的任务分配情况启动/停止工作进程Worker。Supervisor需要定期向ZK写入活跃端口信息以便Nimbus及时监控。Supervisor不执行具体的数据处理工作,所有的数据处理工作都交给Worker完成。
Worker:JStorm中任务执行者,Worker类似于MR的Task,所有实际的数据处理工作最后都在Worker内执行完成。Worker需要定期向Supervsior汇报心跳,由于在同一节点,同时为保持节点的无状态,Worker定期将状态信息写入本地磁盘,Supervisor通过读本地磁盘状态信息完成心跳交互过程。Worker绑定一个独立端口,Worker内所有单元共享Worker的通信能力。
Nimbus、Supervisor和Worker均为Stateless节点,支持Fail-Fast,这为JStorm的扩展性和容错能力提供了很好的保障。
还剩一个问题是Topology的各个计算组件(Spout/Bolt)如何映射到计算资源上。梳理这个问题前需要先明确Worker/Executor/Task之间的关系:
0、Worker:完整的Topology任务是由分布在多个Supervisor节点上的Worker进程(JVM)来执行,每个Worker都执行且仅执行Topology任务的一个子集。
1、Executor:Worker内部会有一个或多个Executor,每个Executor对应一个线程。Executor包括SpoutExecutor和BoltExecutor,同一个Worker里所有的*Executor只能属于某一个Topology里的执行单元。
2、Task:执行具体数据处理实体,也就是用户实现的Spout/Blot实例。一个Executor可以对应多个Task,定义Topology时指定,默认Executor和Task一一对应。这就是说,系统中Executor数量一定是小于等于Task数量(#Executor≤#Task)。
下图给出了一个简单的例子,上半部分描述的是Topology结构及相关说明,其中定义了整个Topology的worker=2,DAG关系,各个计算组件的并行度;下半部分描述了Topology的Task在Supervisor节点的分布情况。从中可以看出Topology到Executor之间的关系。
0、Worker数在提交Topology时在配置文件中指定;
例:#Worker=2
1、执行线程/Executor数在定义Topology的各计算组件并行度时决定,可以不指定,默认为1。其中各个计算组件的并行度之和即为该Topology执行线程总数。
例:#Executor=sum(#parallelism hint)=2+2+6=10
2、Task数目也在定义Toplogy时确定,若不指定默认每个Executor线程对应一个Task,若指定Task数目会在指定数目的线程里平均分配。
例:#Task=sum(#task)=2+4+6=12,其中Executor4={Task0,Task1}
四、 关键流程
0、Topology提交
JStorm为用户提供了StormSubmitter. submitTopology用来向集群提交Topology,整个提交流程:
Client端:
0)客户端简单验证;
1)检查是否已经存在同名Topology;
2)提交jar包;
3)向Nimbus提交Topology;
Nimbus端:
0)Nimbus端简单合法性检查;
1)生成Topology Name;
2)序列化配置文件和Topology Code;
3)Nimbus本地准备运行时所需数据;
4)向ZK注册Topology和Task;
5)将Task压入分配队列等待TopologyAssign分配;
1、任务调度策略
从0.9.0开始,JStorm提供非常强大的调度功能,基本上可以满足大部分的需求,同时支持自定义任务调度策略。JStorm的资源不再仅是Worker的端口,而从CPU/Memory/Disk/Net等四个维度综合考虑。
Nimbus任务调度算法[2]如下:
0)优先使用自定义任务分配算法,当资源无法满足需求时,该任务放到下一级任务分配算法;
1)使用历史任务分配算法(如果打开使用历史任务属性),当资源无法满足需求时,该任务放到下一级任务分配算法;
2)使用默认资源平衡算法,计算每个Supervisor上剩余资源权值,取权值最高的Supervisor分配任务。
2、Acker机制
为保证无数据丢失,Storm/JStorm使用了非常漂亮的可靠性处理机制,如图当定义Topology时指定Acker,JStorm除了Topology本身任务外,还会启动一组称为Acker的特殊任务,负责跟踪Topolgogy DAG中的每个消息。每当发现一个DAG被成功处理完成,Acker就向创建根消息的Spout任务发送一个Ack信号。Topology中Acker任务的并行度默认parallelism hint=1,当系统中有大量的消息时,应该适当提高Acker任务的并行度。
Acker按照Tuple Tree的方式跟踪消息。当Spout发送一个消息的时候,它就通知对应的Acker一个新的根消息产生了,这时Acker就会创建一个新的Tuple Tree。当Acker发现这棵树被完全处理之后,他就会通知对应的Spout任务。
Acker任务保存了数据结构Map<MessageID,Map< TaskID, Value>>,
其中MessageID是Spout根消息ID,TaskID是Spout任务ID,Value表示一个64bit的长整型数字,是树中所有消息的随机ID的异或结果。通过TaskID,Acker知道当消息树处理完成后通知哪个Spout任务,通过MessageID,Acker知道属于Spout任务的哪个消息被成功处理完成。Value表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的MessageID发送过来做异或。当Acker发现一棵树的Value值为0的时候,表明这棵树已经被成功处理完成。
例如,对于前面Topology中消息树,Acker数据的变化过程:
Step0.A发送T0给B后:
R0=r0
<id0,<taskA,R0>>
Step1.B接收到T0并成功处理后向C发送T1,向D发送T2:
R1=R0^r1^r2=r0^r1^r2
<id0,<taskA,R0^R1>>
=<id0,<taskA,r0^r0^r1^r2>>
=<id0,<taskA,r1^r2>>
Step2.C接收到T1并成功处理后:
R2=r1
<id0,<taskA,r1^r2^R2>>
=<id0,<taskA,r1^r2^r1>>
=<id0,<taskA,r2>>
Step3.D接收到T2并成功处理后:
R3=r2
<id0,<taskA,r2^R3>>
=<id0,<taskA,r2^r2>>
=<id0,<taskA,0>>
当结果为0时Acker可以通知taskA根消息id0的消息树已被成功处理完成。
需要指出的是,Acker并不是必须的,当实际业务可以容忍数据丢失情况下可以不用Acker,对数据丢失零容忍的业务必须打开Acker,另外当系统的消息规模较大是可适当增加Acker的并行度。
3、故障恢复
0)节点故障
Nimbus故障。Nimbus本身无状态,所以Nimbus故障不会影响正在正常运行任务,另外Nimbus HA保证Nimbus故障后可以及时被备份Nimbus接管。
Supervisors节点故障。Supervisor故障后,Nimbus会将故障节点上的任务迁移到其他可用节点上继续运行,但是Supervisor故障需要外部监控并及时手动重启。
Worker故障。Worker健康状况监控由Supervisor负责,当Woker出现故障时,Supervisor会及时在本机重试重启。
Zookeeper节点故障。Zookeeper本身具有很好的故障恢复机制,能保证至少半数以上节点在线就可正常运行,及时修复故障节点即可。
1)任务失败
Spout失败。消息不能被及时被Pull到系统中,造成外部大量消息不能被及时处理,而外部大量计算资源空闲。
Bolt失败。消息不能被处理,Acker持有的所有与该Bolt相关的消息反馈值都不能回归到0,最后因为超时最终Spout的fail将被调用。
Acker失败。Acker持有的所有反馈信息不管成功与否都不能及时反馈到Spout,最后同样因为超时Spout的fail将被调用。
任务失败后,需要Nimbus及时监控到并重新分配失败任务。
五、基础接口
这里把几个基础接口中注释摘出来说明其的作用:
0、ISpout: ISpout is the core interface for implementing spouts. A Spout is responsible for feeding messages into the topology for processing. For every tuple emitted by a spout, Storm will track the (potentially very large) DAG of tuples generated based on a tuple emitted by the spout. When Storm detects that every tuple in that DAG has been successfully processed, it will send an ack message to the Spout.
1、IBolt: IBolt represents a component that takes tuples as input and produces tuples as output. An IBolt can do everything from filtering to joining to functions to aggregations. It does not have to process a tuple immediately and may hold onto tuples to process later.
2、TopologyBuilder: TopologyBuilder exposes the Java API for specifying a topology for Storm to execute.
3、StormSubmitter: Use this class to submit topologies to run on the Storm cluster.
特别声明该文章系他人所作,忘记源地址!!!
相关推荐
jstorm框架介绍,包含架构图、jstorm安装部署以及配置、如何在jstorm框架里写业务代码。
【大纲】 现状 Jstorm概叙 & 流式计算 为什么开发Jstorm 特性 Question and Answer.
该pdf 介绍了阿里巴巴自研 jstorm的特性,和使用方法。
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答
【课程大纲】 01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建...40.JStorm介绍 41.会员问题收集和解答