一、介绍
Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:
177 unique files.
7 files ignored.
http://cloc.sourceforge.net v 1.55 T=1.0 s (171.0 files/s, 46869.0 lines/s)
-------------------------------------------------------------------------------
Language files blank comment code
-------------------------------------------------------------------------------
Java 125 5010 2414 25661
Lisp 33 732 283 4871
Python 7 742 433 4675
CSS 1 12 45 1837
ruby 2 22 0 104
Bourne Shell 1 0 0 6
Javascript 2 1 15 6
-------------------------------------------------------------------------------
SUM: 171 6519 3190 37160
-------------------------------------------------------------------------------
Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。
二、Topology和Nimbus
Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交:
我们将topology打成jar包后,利用bin/storm这个python脚本,执行如下命令:
将jar包提交给storm集群。storm脚本会启动JVM执行Topology的main方法,执行submitTopology的过程。而submitTopology会将jar文件上传到nimbus,上传是通过socket传输。在storm这个python脚本的jar方法里可以看到:
exec_storm_class(
klass,
jvmtype="-client",
extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
args=args,
prefix="export STORM_JAR=" + jarfile + ";")
将jar文件的地址设置为环境变量STORM_JAR,这个环境变量在执行submitTopology的时候用到:
private static void submitJar(Map conf) {
if(submittedJar==null) {
LOG.info("Jar not uploaded to master yet. Submitting jar");
String localJar = System.getenv("STORM_JAR");
submittedJar = submitJar(conf, localJar);
} else {
LOG.info("Jar already uploaded to master. Not submitting jar.");
}
}
通过环境变量找到jar包的地址,然后上传。利用环境变量传参是个小技巧。
其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录,nimbus数据目录的结构:
-inbox
-stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
-stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar
-stormdist
-storm-id
-stormjar.jar
-stormconf.ser
-stormcode.ser
其中inbox用于存放提交的jar文件,每个jar文件都重命名为stormjar加上一个32位的UUID。而stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。
进入重点,topology任务的分配过程(zookeeper路径说明忽略root):
1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]
3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息
4.开始分配任务(内部称为assignment), 具体步骤:
(1)从zk上获得已有的assignment(新的toplogy当然没有了)
(2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。
(3)将任务均匀地分配给可用的worker,这里有两种情况:
(a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配
3 : [host1:port1] 4 : [host2:port1]}
,可以看到任务平均地分配在两个worker上。
(b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成
,然后分配任务为
(4)记录启动时间
(5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。
5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。
6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。
相关推荐
storm提交topology的过程共1页.pdf.zip
大家都知道,要提交StormTopology到Cluster,需要运行如下命令:bin目录下storm是一个Python文件,我们可以看一下Python脚本的main方法首先解析args参数,解析完了之后,把所有的参数传递给COMMANDS,由COMMANDS调用...
STORM的TOPOLOGY在线上运行时,随着数据量的增加,在一定的服务器性能及集群规模下,会渐渐达到一个极限,到达极限后,服务器的load、io、cpu、mem等可能会出现耗尽,系统很卡,storm吞吐量骤降的情况。本文档中截图...
Topology-Scanner是WeOps团队免费开放的一个网络拓扑自动扫描模块,可以自动发现网络设备的类型、网络设备之间的互联
Storm分布式实时计算模式由Apache Storm 项目核心贡献者吉奥兹、奥尼尔亲笔撰 写,融合了作者丰富的Storm实战经验,通过大量示例,全面而系统地讲解使用Storm进行分布式实 时计算的核心概念及应用,并针对不同的应用...
Chapter 3 Topology design Chapter 4 Creating robust topologies Chapter 5 Moving from local to remote topologies Chapter 6 Tuning in Storm Chapter 7 Resource contention Chapter 8 Storm internals
压缩包里有一个topology类的文件,主要是kafkaspout类的配置和调用方法,另包含需要的kafkaspouot的jar包。
Storm作为开源的分布式实时计算系统在业界得到了广泛应用,针对Storm自带调度策略忽略了Topology组件任务间的逻辑耦合性,从而引起大量tuple传输产生较大网络时延问题,结合进程代数将Topology等效简化为具有明显...
2.远程模式(Remote Mode):在这个模式,我们把我们的Topology提交到集群,在这个模式中,Storm的所有组件都是线程安全的, 因为它们都会运行在不同的Jvm或物理机器上,这个模式就是正式的生产模式。 二、写一个...
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import ...
32.项目3-非跳出UV-Storm topology开发一 33.项目3-非跳出UV-Storm topology开发二 34.项目3-非跳出UV-Web端Servlet开发 35.项目3-非跳出UV-Web端Highcharts图表开发 36.项目3-非跳出UV-项目效果调试 37.项目3-非...
• Storm 数据模型(topology) • Storm ack和fail • Storm 批处理 • Storm TOPN • Storm 流程聚合 • Storm DRPC • Storm executor、worker、task之间的关系和调优 • Storm异常解决
java开发的基于kafka、xlog的web日志实时分析storm topology.zip
系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法、Storm与Hadoop的集成方法、实时机器...
风暴拓扑示例 概述: 该项目提供了有关使用各种Apache Storm拓扑的示例集合... cd /tmp/storm-topology-examples && bash -x bin/install_mongodb.sh 如果使用HiveBolt,则创建表(您可能要修改ddl) cd /tmp/storm
系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法、Storm与Hadoop的集成方法、实时机器...
利用Trident topology实现预测疾病暴发的实例完整实例源码,具体详情参见博文:http://blog.csdn.net/l1028386804/article/details/79120204
01.Storm基础知识02.Storm集群安装-1-new .avi.baiduyun.p05.Storm配置文件配置项讲解07.Storm基本API介绍08.Storm Topology的并发度09.Strom消息机制原理讲解10.Storm DRPC实战讲解
本文所述程序实例主要实现在后端应用服务器上实时获取STORM集群的运行信息和topology相关的提交和控制。对此,通过对STORM UI和CMD源码的分析,得出可以通过其thrift接口调用实现这些功能。先下载一个thrift库进行...