package inok.storm.kafka.sample; import java.io.FileInputStream; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MyKafkaTopology { public static class KafkaWordSplitter extends BaseRichBolt { private static final Log LOG = LogFactory .getLog(KafkaWordSplitter.class); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String line = input.getString(0); LOG.info("RECV[kafka -> splitter] " + line); String[] words = line.split("\\s+"); for (String word : words) { LOG.info("EMIT[splitter -> counter] " + word); collector.emit(input, new Values(word, 1)); } collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static class WordCounter extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(WordCounter.class); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; private Map<String, AtomicInteger> counterMap; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counterMap = new HashMap<String, AtomicInteger>(); } public void execute(Tuple input) { String word = input.getString(0); int count = input.getInteger(1); LOG.info("RECV[splitter -> counter] " + word + " : " + count); AtomicInteger ai = this.counterMap.get(word); if (ai == null) { ai = new AtomicInteger(); this.counterMap.put(word, ai); } ai.addAndGet(count); collector.ack(input); LOG.info("CHECK statistics map: " + this.counterMap); } @Override public void cleanup() { LOG.info("The final result:"); Iterator<Entry<String, AtomicInteger>> iter = this.counterMap .entrySet().iterator(); while (iter.hasNext()) { Entry<String, AtomicInteger> entry = iter.next(); LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get()); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException { String path = System.getProperty("user.dir") + "/kafkaSpout.properties"; FileInputStream is = new FileInputStream(path); Properties props = new Properties(); props.load(is); String zks = props.getProperty("zks"); String topic = props.getProperty("topic"); String zkRoot = props.getProperty("zkRoot"); // String zks = "localhost:2181"; // String topic = "inoktext"; // String zkRoot = "/storm"; String id = "word"; BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.forceFromStart = true; spoutConf.zkServers = Arrays.asList(new String[] { "localhost" }); spoutConf.zkPort = 2181; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5 builder.setBolt("word-splitter", new KafkaWordSplitter(), 2) .shuffleGrouping("kafka-reader"); builder.setBolt("word-counter", new WordCounter()).fieldsGrouping( "word-splitter", new Fields("word")); Config conf = new Config(); String name = MyKafkaTopology.class.getSimpleName(); if (args != null && args.length > 0) { // Nimbus host name passed from command line conf.put(Config.NIMBUS_HOST, args[0]); conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60000); cluster.shutdown(); } } }
References
http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/
https://github.com/apache/storm/tree/master/external/storm-kafka
https://storm.apache.org/2014/06/25/storm092-released.html
http://www.tuicool.com/articles/NzyqAn
https://github.com/stealthly/dropwizard-kafka-http
相关推荐
Storm的Spout test,订阅Kafka的producer,数据在bolt中处理完成之后再次发送到Kafka中 1.启动zookeeper ./bin/zkServer.sh start 2.启动Kafka nohup ./bin/kafka-server-start.sh ./config/server.properties & 这...
14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试 15.项目1-地区销售额-Bolt业务逻辑处理一 16.项目1-地区销售额-优化Bolt支持重启及结果数据核查 17.项目1-地区销售额-HighCharts图表开发一及Web端架构...
该Storm拓扑使用Kafka Spout读取来自Kafka的消息,并使用Bolt将从Kafka读取的传入消息解析为JSON消息。 然后将已解析的JSON消息加载到Elastic搜索中以使用Kibana进行仪表板和分析 该项目的前提条件:Zookeeper安装...
Flume+kafka+Storm整合 示例简介: 以下为三个组建整合,这里只做操作也演示结果,原理性方面大家...流程顺序是flume获取telnet数据,将接收到的数据发送至kafak,kafka作为Storm的spout,Storm进行有向无环分析数据。
此项目不再维护有关最新信息,请参阅 。Storm-kafka-hdfs-starter 提供使用 KafkaSpout 和 HdfsBolt 的示例
Storm spout 实现从 kafka 主题读取消息,并将这些消息作为单字段元组发送到风暴拓扑中。 可以在上找到文档。 发展 该实现由荷兰法医研究所创建,仍在开发中。 欢迎投稿,请阅读。 执照 这项工作是根据 Apache 许可...
水槽卡夫卡风暴流口水示例 这个项目使用来自kafka的storm spout消费者日志消息并使用drools定义他们的规则,最后将有用的数据保存到redis。 你可以在看到更多信息
积分java源码Kafka-Spark-Consumer 的 README 文件 注意:此 Kafka Spark Consumer 代码取自 Apache Storm 项目 () 的 Kafka spout,该项目最初由 wurstmeister () 创建。 原始 Storm Kafka Spout 代码已被修改以与 ...
14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试 15.项目1-地区销售额-Bolt业务逻辑处理一 16.项目1-地区销售额-优化Bolt支持重启及结果数据核查 17.项目1-地区销售额-HighCharts图表开发一及Web端架构...
使用 Avro 数据序列化的 Kafka-Storm 概念验证。 Java 客户端连接到 Schema-Repo 以获取模式并构建序列化消息。 该消息随后发布在 kafka 的 avrotopic 队列中。 已经创建了一个带有 Spout 的拓扑,负责读取发布在...
Storm Kafka Integration架包,包含storm.kafka.KafkaSpout、import、storm.kafka.SpoutConfig、import storm.kafka.StringScheme、import storm.kafka.ZkHosts等
集群版(cluster branch) : 项目基于Kafka storm的实时nginx日志监控,将nginx的日志文件access.log读取并放入Kafka队列中,Storm的Spout来对接Kafka消息队列,来收集nginx服务器的状态,并在一定时间内,统计访问ip...
14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试 15.项目1-地区销售额-Bolt业务逻辑处理一 16.项目1-地区销售额-优化Bolt支持重启及结果数据核查 17.项目1-地区销售额-HighCharts图表开发一及Web端架构...
1、kafka consumer spout 单独的线程消费,写入队列 nextTuple,每次都是判断队列有没有数据,有的话再去获取并发射出去,不能阻塞 2、日志解析bolt 3、商品访问次数统计bolt 基于LRUMap完成统计 三、基于...
Kafka出口是基于JVM的,并在src/clj/fintank/spouts/tick_spout.clj 。 该喷口发出的元组是JSON格式的字符串,因此Python螺栓可以处理反序列化JSON。 从技术上讲,这不是必需的,因为Kafka喷口完全可以使用自定义...
14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试 15.项目1-地区销售额-Bolt业务逻辑处理一 16.项目1-地区销售额-优化Bolt支持重启及结果数据核查 17.项目1-地区销售额-HighCharts图表开发一及Web端架构...
14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试 15.项目1-地区销售额-Bolt业务逻辑处理一 16.项目1-地区销售额-优化Bolt支持重启及结果数据核查 17.项目1-地区销售额-HighCharts图表开发一及Web端架构...
14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试 15.项目1-地区销售额-Bolt业务逻辑处理一 16.项目1-地区销售额-优化Bolt支持重启及结果数据核查 17.项目1-地区销售额-HighCharts图表开发一及Web端架构...
14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试 15.项目1-地区销售额-Bolt业务逻辑处理一 16.项目1-地区销售额-优化Bolt支持重启及结果数据核查 17.项目1-地区销售额-HighCharts图表开发一及Web端架构...