这篇文章通过用一个如何处理feed数据的应用场景来说明为什么会出现Storm实时计算框架, 对我们自己的技术选型和系统架构设计非常有参考价值.
原文在
这里
Storm 成为最近开源社区的一个热门, 其作者
Nathan Marz 所在的公司Backtype现在已经被Twitter收购. 该项目的
wiki 非常完善. 从这里对Storm做一个全面了解.
目前的场景是这样的, 通过解析xml feed来生成索引数据. 已经通过搭建hadoop来批量生成全量索引. 但是如果需要实时更新数据增量生成索引该如何处理呢?
1.最简单单机方案
用一台机器持续抓取feed数据, 但是这个会受到单机的处理和网络带宽限制, 系统不具备可伸缩性.
2.可伸缩架构方案
通过多台机器搭建集群, 并根据feedId hash值取模将整个feed数据分散到多台机器. 这种方式缺点非常明显: 无法做到系统自动伸缩, 每加入一台机器, 需要重新调整整个集群的feed处理. 另外也无法实现自动容灾(failover), 一台机器挂了,必须手动启动备机.
3.Queue/Worker方案
通过在master上维护一个消息队列(queue), 然后分发到slave机器上的worker进行处理. 该方案能解决上一种方案中的其中一台机器挂掉造成的局部处理失败的问题.
另外这种方案也是类似场景中的常用架构.
不过这种架构也有一些自己的复杂性: 比如消息队列的选择, 消息队列的运维问题.
4.Hadoop方案
采用Hadoop来更新feed, 这里需要编写Map/Reduce Job, 先将更新的feed导入Map中, 然后将所有数据汇总到Reduce, 这里每个Reduce处理其中一部分feed, 需要直到所有Reduce执行完成才能得到最终的索引结果. 因此实时性会大打折扣.
注:我觉得Hadoop更侧重于批量并行处理海量的数据, 而不是来一条更新数据处理一条.
5.Multi Queue/Worker方案
在原来的基础上再加一层Queue, 第一层N个worker从Work Queue获取feed数据, 然后添加到DB Queue, 最后M个(M<N)DB worker从DB Queue中获取数据更新到数据库或者其他地方.
注:老实说我也没看出来为什么需要搞两层Queue. 但是Strom就是这么干的.
6.Storm方案
Storm对Multi Queue/Worker系统的复杂性进行了抽象. 用户只需要编写Topology而无需关系系统的扩展, 容灾以及进程间的通讯.
Storm集群包括一个master节点(Nimbus)和N个slave节点(Supervisor), 节点之间通过ZooKeeper来协调, 进程间通讯采用ZeroMQ. 而Topology不仅可以使用Java实现, 还支持其他动态语言.
这里我们使用FeedSpout来获取feed url并将其发射(emit)给FetcherBolt, 然后FetcherBolt去下载并解析数据.
下面是FeedSpout代码:
public class FeedSpout extends SimpleSpout {
private static final long serialVersionUID = 1L;
Queue<String> feedQueue = new LinkedList<String>();
String[] feeds;
public FeedSpout(String[] feeds) {
this.feeds = feeds;
}
@Override
public void nextTuple() {
String nextFeed = feedQueue.poll();
if(nextFeed != null) {
collector.emit(new Values(nextFeed), nextFeed);
}
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
super.open(conf, context, collector);
for(String feed: feeds) {
feedQueue.add(feed);
}
}
@Override
public void ack(Object feedId) {
feedQueue.add((String)feedId);
}
@Override
public void fail(Object feedId) {
feedQueue.add((String)feedId);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("feed"));
}
}
这里对例子进行了简化, 在FeedSpout构造器中直接传入feed数据, 然后在open方法中将其添加的内存queue中. 而Storm内部会负责控制nextTuple方法的调用. 这样来保证queue中的数据持续不断的被处理. 如果消息处理成功会调用ack方法, 失败会调用fail方法
下面是FetcherBolt方法:
public class FetcherBolt implements IRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
FeedFetcher feedFetcher = new HttpURLFeedFetcher();
String feedUrl = input.getStringByField("feed");
try {
SyndFeed feed = feedFetcher.retrieveFeed(new URL(feedUrl));
for(Object obj : feed.getEntries()) {
SyndEntry syndEntry = (SyndEntry) obj;
Date entryDate = getDate(syndEntry, feed);
collector.emit(new Values(syndEntry.getLink(), entryDate.getTime(), syndEntry.getDescription().getValue()));
}
collector.ack(input);
} catch(Throwable t) {
t.printStackTrace();
collector.fail(input);
}
}
private Date getDate(SyndEntry syndEntry, SyndFeed feed) {
return syndEntry.getUpdatedDate() == null ? (syndEntry.getPublishedDate() == null ? feed.getPublishedDate() : syndEntry.getPublishedDate()) : syndEntry.getUpdatedDate();
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("link", "date", "description"));
}
}
这里对Bolt也做了简化, 在execute方法中根据feed url下载并解析feed数据, 最后将数据发送出去(这里只发送了"link", "date", "description"字段), 这些数据将被下一个Bolt处理(如果有的话).
在Bolt中如果处理数据成功或失败, 将会调用ack或者fail方法. 在Storm内部会用Acker来跟踪Tuple的执行流程. Strom将发射Tuple(会带一个id)的操作称之为Anchoring, 如果一个Tuple在中间某个Bolt处理失败或者超时(内部默认是30s), Storm会向Spout发送FAIL消息, 这样我们就可以采用相应的策略来保证数据可靠.
最后是编写FeedTopology来封装Spout和Bolt:
package datasalt.storm.feeds;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
/**
* This class builds the topology that needs to be submitted to Storm. It puts {@link FeedSpout}, {@link FetcherBolt}
* and {@link ListingBolt} all together.
*
* @author pere
*
*/
public class FeedTopology {
public static StormTopology buildTopology(String[] feeds) {
TopologyBuilder builder = new TopologyBuilder();
// One single feed spout feeding data
builder.setSpout("feedSpout", new FeedSpout(feeds), 1);
// Various (2) fetcher bolts -> shuffle grouping from feed spout
builder.setBolt("fetcherBolt", new FetcherBolt(), 2).shuffleGrouping("feedSpout");
// One single listing bolt calculating statistics
builder.setBolt("listingBolt", new ListingBolt(), 1).globalGrouping("fetcherBolt");
return builder.createTopology();
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
conf.setMaxSpoutPending(1);
StormSubmitter.submitTopology("feedTopology", conf, buildTopology(Constants.FEEDS));
}
}
更详细的例子可以从
这里下载源代码
总结
通过上面的例子来说明Strom是什么, 以及如何使用Storm. 这里我们可以知道, 如果没有Storm, 要实现多层Queue/Worker架构将会是多么复杂. 而有了Storm, 我们只需要在Toplogy中实现我们的业务逻辑然后部署到Storm集群中即可. 而要对系统进行水平扩展, 只需要向Storm集群中添加机器即可.
另外Storm主要专注于实时计算, 而Hadoop则侧重于批量计算.
更多的例子常见
这里
分享到:
相关推荐
Storm实时数据处理
系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法、Storm与Hadoop的集成方法、实时机器...
storm实时流处理。欢迎下载学习
使用Storm实现实时大数据分析.doc
基于Storm构建实时热力分布项目实战.txt,视频还行。
系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法、Storm与Hadoop的集成方法、实时机器...
基于Storm构建实时热力分布项目实战,欢迎小伙伴们下载哦
Storm实时数据处理.pdf
《Storm实时数据处理》
#资源达人分享计划#
Storm官方网站有段简介 Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理大数据一样,Storm可以实时处理数据。Storm简单,可以使用任何编程语言。
Storm实时数据处理_PDF电子书下载 带书签目录 完整版
随着大数据实时处理需求的强劲增长,Storm的出现填补了大数据处理生态系统的缺失,并被越来越多的公司所采用。阿里巴巴集团数据平台事业部商家数据业务部正是最早使用Storm的技术团队之一。 《Storm实战:构建...
Storm实时数据处理_中文版Storm实时数据处理_中文版Storm实时数据处理_中文版
Storm实时数据处理中文完整版,带有完整的书签
( Storm实时数据处理.zip )PDF 高清版 个人收集电子书,仅用学习使用,不可用于商业用途,如有版权问题,请联系删除!
本书涵盖搭建基于Storm的开发环境和测试实时系统的许多实用方法与实战用例,以及如何应用交付最佳实践来将系统部署至云端。 通过阅读本书,你将学到如何构建包含统计面板和可视化的实时日志处理系统。通过集成Storm...
Storm分布式实时计算在物联网系统中的应用
Storm实时数据处理-超清文字版.pdf 这个是带完整目录书签的文字版本,文本内容可以复制的哦
storm实时数据处理 带书签目录pdf高清完整版 这个是带完整目录书签的高清扫描版