最开始使用storm命令来启动topology, 如下
storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology
这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvmtype="-client"
而exec_storm_class其实就是拼出一条java执行命令, 然后用os.system(command)去执行, 为何用Python写, 简单? 可以直接使用storm命令?
这儿的klass就是topology类, 所以java命令只是调用Topology类的main函数
def jar(jarfile, klass, *args): """Syntax: [storm jar topology-jar-path class ...] Runs the main method of class with the specified arguments. The storm jars and configs in ~/.storm are put on the classpath. The process is configured so that StormSubmitter (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html) will upload the jar at topology-jar-path when the topology is submitted. """ exec_storm_class( klass, jvmtype="-client", extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"], args=args, childopts="-Dstorm.jar=" + jarfile) def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]): nativepath = confvalue("java.library.path", extrajars) args_str = " ".join(map(lambda s: "\"" + s + "\"", args)) command = "java" + jvmtype + " -Dstorm.home=" + STORM_DIR + " " + get_config_opts() + " -Djava.library.path=" + nativepath + " " + childopts + " -cp" + get_classpath(extrajars) + " " + klass + " " + args_str print "Running:" + command os.system(command)
直接看看WordCountTopology例子的main函数都执行什么?
除了定义topology, 最终会调用StormSubmitter.submitTopology(args[0], conf, builder.createTopology()), 来提交topology
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8) .shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12) .fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
StormSubmitter
直接看看submitTopology,
1. 配置参数
把命令行参数放在stormConf, 从conf/storm.yaml读取配置参数到conf, 再把stormConf也put到conf, 可见命令行参数的优先级更高
将stormConf转化为Json, 因为这个配置是要发送到服务器的
2. Submit Jar
StormSubmitter的本质是个Thrift Client, 而Nimbus则是Thrift Server, 所以所有的操作都是通过Thrift RPC来完成, Thrift参考Thrift,Storm-源码分析-
Thrift的使用
先判断topologyNameExists, 通过Thrift client得到现在运行的topology的状况, 并check
然后Submit Jar, 通过底下三步
client.getClient().beginFileUpload();
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
client.getClient().finishFileUpload(uploadLocation);
把数据通过RPC发过去, 具体怎么存是nimbus自己的逻辑的事...
3. Submit Topology
很简单只是简单的调用RPC
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
/** * Submits a topology to run on the cluster. A topology runs forever or until * explicitly killed. * * * @param name the name of the storm. * @param stormConf the topology-specific configuration. See {@link Config}. * @param topology the processing to execute. * @param options to manipulate the starting of the topology * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted */ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException { if(!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } stormConf = new HashMap(stormConf); stormConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); try { String serConf = JSONValue.toJSONString(stormConf); if(localNimbus!=null) { LOG.info("Submitting topology " + name + " in local mode"); localNimbus.submitTopology(name, null, serConf, topology); } else { NimbusClient client = NimbusClient.getConfiguredClient(conf); if(topologyNameExists(conf, name)) { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } submitJar(conf); try { LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); if(opts!=null) { client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); } else { // this is for backwards compatibility client.getClient().submitTopology(name, submittedJar, serConf, topology); } } catch(InvalidTopologyException e) { LOG.warn("Topology submission exception", e); throw e; } catch(AlreadyAliveException e) { LOG.warn("Topology already alive exception", e); throw e; } finally { client.close(); } } LOG.info("Finished submitting topology: " + name); } catch(TException e) { throw new RuntimeException(e); } }
相关推荐
Topology-Scanner是WeOps团队免费开放的一个网络拓扑自动扫描模块,可以自动发现网络设备的类型、网络设备之间的互联
2_ODX-V_Vehicle_Topology
sigma-delta技术,用提高精度,很值得学习其原理,应用到高精度设计中
32.项目3-非跳出UV-Storm topology开发一 33.项目3-非跳出UV-Storm topology开发二 34.项目3-非跳出UV-Web端Servlet开发 35.项目3-非跳出UV-Web端Highcharts图表开发 36.项目3-非跳出UV-项目效果调试 37.项目3-非...
风暴拓扑示例 概述: 该项目提供了有关使用各种Apache Storm拓扑的示例集合... cd /tmp/storm-topology-examples && bash -x bin/install_mongodb.sh 如果使用HiveBolt,则创建表(您可能要修改ddl) cd /tmp/storm
32.项目3-非跳出UV-Storm topology开发一 33.项目3-非跳出UV-Storm topology开发二 34.项目3-非跳出UV-Web端Servlet开发 35.项目3-非跳出UV-Web端Highcharts图表开发 36.项目3-非跳出UV-项目效果调试 37.项目3-非...
Topology design guidance for maximizing system resources
topology-react什么是topology?是一款开源的基于canvas+typescript的绘图引擎,可用于实现软件架构图、微服务部署结构图、流程图、活动图、类图、时序图、SCADA等;我们也可以按照自己的想法实现任何我们想要的图形库...
Storm作为开源的分布式实时计算系统在业界得到了广泛应用,针对Storm自带调度策略忽略了Topology组件任务间的逻辑耦合性,从而引起大量tuple传输产生较大网络时延问题,结合进程代数将Topology等效简化为具有明显...
sigma-delta adc技术,高精度adc,扩展数据有效位宽的技术,很值得一学,文档分上下两部分,此文档为上部。
vue-flow-topology 介绍 vue-flow-topology 该项目可以看做是一个独立的 Vue 项目(大数据流水线拓展流程工作台),也可以嵌入到其他vue项目中使用,新版会作为优先版本持续迭代。 版本一:基于 Vue-cli3.0 + view-...
java开发的基于kafka、xlog的web日志实时分析storm topology storm-kafka-xlog 使用java语言开发的基于storm、kafka、xlog的web日志实时分析系统,实时分析指定时间周期内web日志中每个ip访问的各项数据指标(访问总...
分析电力系统的网络拓扑结构,为后续系统潮流计算做准备
下载示例,导入到Eclipse中。 打开里面Topology代码,直接Run As->Java Application就可以在Eclipse中先看一下输出效果。Task之间的计算进行了隔离。
热应力载荷条件下结构拓扑优化设计,高彤,张卫红,研究了热应力载荷条件下结构拓扑优化设计模型与方法。热应力载荷具有设计相关特性,即热应力载荷的有无取决于结构材料的有无。采
set topology.name=test_topology; set storm.jar=./jstorm-example-0.9.0.jar; add jar storm-example-0.9.0.jar; set topology.workers=3; REGISTER spout=SPOUT(2, "storm.starter.spout.RandomSentenceSpout");...
利用NS仿真工具动态调整节点发射功率&节点定位拓扑控制
使用java语言开发的基于storm、kafka、xlog的web日志实时分析系统,实时分析指定时间周期内web日志中每个ip访问的各项数据指标(访问总次数,动、静态次数,GET、POST、HEAD等请求类型次数,HTTP状态码各种状态前缀...
最近收到这样一个问题:Storm处理消息时会根据Topology生成一棵消息树,Storm如何跟踪每个消息、如何保证消息不丢失以及如何实现重发消息机制?虽已回复,但心想还是看下storm这块的源码吧.那看静态多不爽啊,那总得...
下载示例,导入到Eclipse中。 打开里面Topology代码,直接Run As->Java Application就可以在Eclipse中先看一下输出效果