- 浏览: 198626 次
- 性别:
- 来自: 哈尔滨
文章分类
- 全部博客 (267)
- java.lang (8)
- 问题汇总 (21)
- 异常记录 (20)
- 功能实现 (19)
- 面试总结 (25)
- 技巧总结 (8)
- 常用代码 (4)
- 编程习惯 (3)
- 编码规则 (3)
- java.util (10)
- java.io (1)
- JavaWeb (9)
- MySQL (16)
- SVN (3)
- MyBatis (11)
- Velocity (7)
- 其他知识 (10)
- 人生哲理 (1)
- 人生故事 (1)
- 自我感悟 (1)
- shiro (3)
- 基础知识 (0)
- 问题总结 (1)
- Spring 标签 (1)
- Spring (3)
- 点滴生活 (1)
- DOS (1)
- CAS (4)
- Linux (9)
- Storm (6)
- Shell (1)
- regex (1)
- Collection (4)
- poi (1)
- 经典语句 (1)
- NIO (5)
- concurrent (14)
- RPC (1)
- zookeeper (3)
- 待整理 (2)
- Hadoop (9)
- RabbitMq (2)
- flume (1)
- hive (7)
- hbase (4)
- kafka (1)
- scala (1)
- GC (0)
- java.util.concurrent.atomic (1)
- java.lang.ref (6)
- JVM (2)
- algorithm (1)
- conception (1)
- java key word (1)
- sun.misc (1)
最新评论
Trident
一、Storm 保证性
1.数据一定会发送
通过 ack / fail 方法确认,若失败,则提供重新发送的机制
2.数据一定只会统计一次
数据发送后有一个唯一性的标识,通过判断此标识,若存在,则不处理
3.数据一定会按照顺序进行处理
数据发送后有一个唯一性的标识,按照标识编号的顺序进行处理
二、Storm 保证性实现
1.逐个发送,逐个处理
如果这样处理,则原有的并行处理会变成穿行处理,不可取
2.批量发送,批量处理
如果这样处理,则如果当前这批数据处理完毕但未发送,则无法处理下一批数据,且这一批数据之间的处理顺序是并发的在进行的
3.分成两个步骤
一个处理数据,一个发送数据;
数据处理完毕,则继续处理下一批数据;数据是否发送到下一个缓解,由发送数据的步骤决定
采用此方式
三、Trident
1.Spout
package com.study.storm.trident.wordcount; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * @description * 数据来源 * 模拟批量数据发送 * <br/> * @remark * Storm 的保证及实现 * 1.数据一定被发送 * 通过 ack() 、 fail() 的确认机制,若发送失败,则重新发送 * 2.数据只被处理一次 * 数据发送时带有唯一的编号,判断此编号是否被处理过,若是,则忽略,不处理 * 3.数据被按照一定的顺序处理 * 数据发送时带有唯一的编号,按照编号的顺序进行处理,若数据不是按照顺序到达,则等待 * * <br/> * * Trident 处理批量数据 * */ public class SentenceSpout extends BaseRichSpout { /** * */ private static final long serialVersionUID = 2122598284858356171L; private SpoutOutputCollector collector = null ; /** * 模拟批量数据发送 * key : name * value : sentence */ private Values [] valuesArray = new Values[] { new Values("a","111111111111"), new Values("b","222222222222"), new Values("c","333333333333"), new Values("d","444444444444"), new Values("e","555555555555"), new Values("f","666666666666"), new Values("g","777777777777"), new Values("h","888888888888") }; @SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector ; } // 发送的顺序,即数据组合的下标,标识数据发送到哪个位置 private int index = 0 ; @Override public void nextTuple() { if(index >= valuesArray.length){ return ; } index = index == valuesArray.length ? 0 : index++ ; this.collector.emit(valuesArray[index]); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("name","sentence")); } }
简化实现
package com.study.storm.trident.wordcount; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import storm.trident.Stream; import storm.trident.TridentTopology; import storm.trident.testing.FixedBatchSpout; public class TridentTopologyDemo { public static void main(String[] args) { // 相当于原有的 Spout 实现 @SuppressWarnings("unchecked") FixedBatchSpout tridentSpout = new FixedBatchSpout(new Fields("name","sentence"), 1, new Values("a","111111111111"), new Values("b","222222222222"), new Values("c","333333333333"), new Values("d","444444444444"), new Values("e","555555555555"), new Values("f","666666666666"), new Values("g","777777777777"), new Values("h","888888888888")); // 是否循环发送,false 不 tridentSpout.setCycle(false); TridentTopology topology = new TridentTopology(); /** * 1.本地过滤器设置 */ // 设置数据源 Stream initStream = topology.newStream("tridentSpout", tridentSpout); // 设置过滤器 -- 过滤name : d 的数据 initStream = initStream.each(new Fields("name"),new RemovePartDataFilter()); // 添加函数,输出字母对应的位置 initStream = initStream.each(new Fields("name"),new NameIndexFunction() ,new Fields("indexNum")); // 设置过滤器 -- 拦截数据并打印 Stream filterPrintStream = initStream.each(new Fields("name","sentence"), new PrintFilter()); //--提交Topology给集群运行 Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("MyTopology", conf, topology.build()); //--运行10秒钟后杀死Topology关闭集群 Utils.sleep(1000 * 10); cluster.killTopology("MyTopology"); cluster.shutdown(); } }
package com.study.storm.trident.wordcount; import java.util.Iterator; import backtype.storm.tuple.Fields; import storm.trident.operation.BaseFilter; import storm.trident.tuple.TridentTuple; /** * @description * 打印:key 与 value ,fields 与 fields 对应传输的内容 */ public class PrintFilter extends BaseFilter { /** * */ private static final long serialVersionUID = 4393484291178519442L; @Override public boolean isKeep(TridentTuple tuple) { Fields fields = tuple.getFields(); Iterator<String> iterator = fields.iterator(); while(iterator.hasNext()){ String key = iterator.next(); Object valueByField = tuple.getValueByField(key); System.out.println("fields : "+ key + " values : "+valueByField); } return true; } }
package com.study.storm.trident.wordcount; import storm.trident.operation.BaseFilter; import storm.trident.tuple.TridentTuple; /** * 过滤name = d 的数据 * return false 过滤 * return true 继续传递 */ public class RemovePartDataFilter extends BaseFilter { /** * */ private static final long serialVersionUID = 8639858690618579558L; @Override public boolean isKeep(TridentTuple tuple) { String stringByField = tuple.getStringByField("name"); return !stringByField.equals("d"); } }
package com.study.storm.trident.wordcount; import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Values; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.tuple.TridentTuple; public class NameIndexFunction extends BaseFunction { /** * */ private static final long serialVersionUID = 9085021905838331812L; static Map<String,Integer> indexMap = new HashMap<String,Integer>(); static { indexMap.put("a", 1); indexMap.put("b", 2); indexMap.put("c", 3); indexMap.put("d", 4); indexMap.put("e", 5); indexMap.put("f", 6); indexMap.put("g", 7); indexMap.put("h", 8); indexMap.put("i", 9); } @Override public void execute(TridentTuple tuple, TridentCollector collector) { String name = tuple.getStringByField("name"); collector.emit(new Values(indexMap.get(name))); } }
相关推荐
该库提供了核心storm bolt,并在Elasticsearch 之上实现了Trident 状态。 它支持非事务性、事务性和不透明状态类型。 Maven 依赖 < groupId>com.github.fhuss</ groupId> < artifactId>storm-elasticsearch ...
这个项目是 Storm's Trident 的游乐场。 在这个项目中,您可以找到我用于柏林的 Trident hackaton @ Big Data ... 包含 hackaton 会话内容的博客文章: ://www.datasalt.com/2013/04/an-storms-trident-api-overview/
三叉戟《风暴蓝图:分布式实时计算模式》一书的源码和翻译=============(已完成,待校对)(未开始)(已完成,待校对)(已完成,待校对)(未开始)(未开始)(进行中)(未开始)(未开始)(未开始)
风暴三叉戟示例 Clojure 库旨在......好吧,那部分取决于您。 用法 整我 执照 版权所有 :copyright: 2014 FIXME 根据 Eclipse 公共许可证分发 1.0 版或(由您选择)任何更高版本。
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
###必读把大数进行分片,根据数据中某个字段分组Origin_Stream.partitionAggregate(new Fields("a","b") , new Test(),new Fields("A1","B1")).partitionPersist(new LocationDBFactory(), new Fields("A1","B1"), ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
storm_Trident例子
Storm Trident实战之计算网站PV.rar
用于 OpenTSDB 的 Storm 连接器这个用于 Apache Storm 的连接器使用 OpenTSDB java 库,使用 AsyncHBase 客户端将原始数据和 Trident 状态直接持久化到 HBase。 因为每个应用程序应该只有一个 AsyncHBase 客户端,...
storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...