storm集群和hadoop集群类似,在hadoop上运行mapreduce任务,而在storm上称为topologies任务,两种任务之间有区别,典型的一点是mapreduce任务最后会结束,而topologies不会结束。
一个storm集群包含两种节点,master节点和worker节点,其中master节点运行一个daemon进程("Nimbus"),在hadoop中称为JobTracker,Nimbus负责向集群分发代码,分配任务,处理错误。
每个worker节点运行一个daemon进程("Supervisor"),它负责监听分配给自己的任务并开启工作进程处理。nimbus和supervisors之间通过zookeeper协调。
stream是Storm里抽象的核心概念,stream是一组没有边界的数组(tuples)
spout是stream的来源(可能有多个spout源头),比如spout从Kestrel队列中读取数据发送成一个流;bolt负责处理输入流,发送新的数据流到别的bolt,Bolts能通过运行函数做很多事情,过滤tuples,聚集和拼装数据流,和操作数据库等。spout和bolt的网络结构也会被打包到"topology"中,数据流会按照网络结构流下去。_一个topology会永远运行知道我们kill它,storm会重分配失败的任务,此外它保证不因为机器挂掉丢失正在处理的数据
下面的代码统计各个数字0~100之间的计数结果
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.FeederSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.HashMap; import java.util.Map; import java.util.Random; /** * This is a basic example of a Storm topology. */ public class CounterTopology { public static class CounterBolt extends BaseRichBolt { OutputCollector _collector; HashMap<Integer,Integer> hm; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; hm = new HashMap<Integer,Integer>(); } @Override public void execute(Tuple tuple) { int id = tuple.getInteger(0); int count = tuple.getInteger(1); Object oldcount = hm.get(id); if(oldcount != null){ count += (Integer)oldcount; } hm.put(id, count); _collector.emit(tuple, new Values(id,count)); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","count")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); FeederSpout cs = new FeederSpout(new Fields("id","count")); builder.setSpout("countspout", cs, 10); builder.setBolt("countbolt1", new CounterBolt(), 3) .shuffleGrouping("countspout"); // builder.setBolt("countbolt2", new CounterBolt(), 2) // .shuffleGrouping("countbolt1"); Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("counter", conf, builder.createTopology()); // // Utils.sleep(10000); // cluster.killTopology("counter"); // cluster.shutdown(); } while(true){ Utils.sleep(100); Random rand = new Random(); int id = rand.nextInt(100); int count = rand.nextInt(100); cs.feed(new Values(id,count)); } } }
运行方式如下:
javac -cp storm-0.8.2.jar storm/starter/CounterTopology.java jar cvf CT.jar storm bin/storm jar CT.jar storm.starter.CounterTopology
生产环境使用storm
1,定义一个topology任务(java语言使用TopologyBuilder)
2,使用StormSubmitter提交任务到集群,三个参数分别是(任务名称,配置项,任务)
Config conf = new Config(); conf.setNumWorkers(20); conf.setMaxSpoutPending(5000); StormSubmitter.submitTopology("mytopology", conf, topology);
3,创建代码jar文件以及提交到集群
storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3
针对topology的常用配置如下:
Config.TOPOLOGY_WORKERS:集群中处理任务的进程数,如果配置为25,并发数设置为150,则每个进程会开启6个线程处理任务 Config.TOPOLOGY_ACKERS:用来检测任务是否正确处理的进程数目 Config.TOPOLOGY_MAX_SPOUT_PENDING:配置spout上最多可以保存为未处理和未失败的任务数,建议设置以免queue爆满 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS:默认30s Config.TOPOLOGY_SERIALIZATIONS:注册自定义serilaizer
终止topology: storm kill {stormname}
更新运行中的topology:暂时只能杀掉重启,计划设计storm swap命令实现此功能
监控topology:使用storm ui
bolt的格式如下:
public class DoubleAndTripleBolt extends BaseRichBolt { private OutputCollectorBase _collector; @Override //prepare函数里面也可以通过collector发送数据流 public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override //处理数据,发送数据流,记着ack保证数据不丢失 public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); } @Override //需要声明输出数据流的格式 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); } }
参照网页
https://github.com/nathanmarz/storm/wiki/Tutorial
https://github.com/nathanmarz/storm-starter
http://xumingming.sinaapp.com/category/storm/
http://www.oschina.net/p/twitter-storm
Trident用来做实时分析不错
https://github.com/nathanmarz/storm/wiki/Trident-tutorial
http://www.ibm.com/developerworks/cn/opensource/os-twitterstorm/index.html
相关推荐
简单和明了,Storm让大数据分析变得轻松加愉快。当今世界,公司的日常运营经常会生成TB级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据...
实时流系统是在大规模数据分析的基础上实现系统性的分析。另外,它还需要:每分钟处理数十亿事件的能力、有秒级延迟,和行为可预见;在故障时保证数据的准确性,在达到流量峰值时是弹性的,并且易于调试和在共享的...
要点 数据分析流程、方法论(PEST、5W2H、逻辑树)、基础数据分析方法、数据分析师能力层级、数据的度量、探索、抽样、原理及实际操作,结合SPSS工具使用 第2周 数据挖掘基础 要点(数据挖掘概念、流程、重要环节、...
• 如何入门和学习安全数据分析 ..... 如何学习安全数据分析? 1、先学习基本的算法原理,补充数学知识——Coursera 上的机器学习课程 2、学习Python的几个机器学习工具——pandas,numpy, seaborn,sklearn 3、去Kaggle...
linux实验环境,storm搭建完毕后的开发。eclipse开发环境,大数据界hello world——wordcount详解,bolt、分组机制、storm DRPC详解
⼤数据产业包括IT基础设施层、数据源层、数据管理层、数据分析层、数据平台层和数据应⽤层。 ⼋,⼤数据与云计算、物联⽹: ⼀)云计算: 1)云计算概念:云计算实现了通过⽹络提供可伸缩的、廉价的分布式计算能⼒...
常驻空间,时效性,有序性,数据量,数据速率,是否可重现,移动对象,数据精确度 Storm:任务拓扑=有向⽆环图(Spout、Bolt)Spout读取数据(元组)——》Blot。节点:Nimbus Supervisor。特征:编程模型简单 ...
LOGO M.94275.CN 1 大数据、数据挖掘与智慧运营综述 1.7 现有数据挖掘的主要分析软件与系统 1.7.1 Hadoop 01 1.7.2 Storm 02 1.7.5 SAS 05 1.7.4 SPASS(SPSS) 04 1.7.3 Spark 03 大数据、数据挖掘与智慧...
数据分析与报表 预测 数据挖掘与BI 机器学习与Google大 脑 起源与目标 大数据与Hadoop 应用模式 大数据技术IT人员的挑战——DevOps DevOps Development和Operations的 组合,是一组过程、方法与 系统的统称,用于...
Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...
此时此景,笔者只专注Android、Iphone等移动平台开发,看着这些源码心中有万分感慨,写此文章纪念那时那景! Java 源码包 Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。编辑音乐软件的朋友,这...