`
农村外出务工男JAVA
  • 浏览: 104726 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

storm trident实战 filter,function的使用

阅读更多

一、Storm trident filter

      filter通过返回true和false。来判断是否对信息过滤。

     1.1 代码

	public static void main(String[] args) throws InterruptedException,
			AlreadyAliveException, InvalidTopologyException,
			AuthorizationException {
		FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b"),
				1, new Values(1, 2), new Values(4, 1),
				new Values(3, 0));
		spout.setCycle(false);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.each(new Fields("a"), new MyFilter())
				.each(new Fields("a", "b"), new PrintFilterBolt(),new Fields(""));
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident_filter", config,
				topology.build());
	}

    MyFilter:

import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;

public class MyFilter extends BaseFilter {

    /**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public boolean isKeep(TridentTuple tuple) {
		return tuple.getInteger(0) == 1;
	}
	
}

    PrintFilterBolt:

public class PrintFilterBolt extends BaseFunction {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		int firstIndex = tuple.getInteger(0);
		int secondIndex = tuple.getInteger(1);
		List<Integer> list = new ArrayList<Integer>();
		list.add(firstIndex);
		list.add(secondIndex);
		System.out.println("after storm filter opertition change is : "
				+ list.toString());
	}

}

   运行结果:

2016-12-22 13:16:09.079 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:16:09.088 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:16:09.736 STDIO [INFO] after storm filter opertition change is : [1, 2]

二、Storm trident function

       函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面, 如果一个function不输出tuple,那就意味这这个tuple被过滤掉了。

    2.1 代码

public static void main(String[] args) throws InterruptedException,
			AlreadyAliveException, InvalidTopologyException,
			AuthorizationException {

		FixedBatchSpout spout = new FixedBatchSpout(new Fields("a", "b", "c"),
				1, new Values(1, 2, 3), new Values(4, 1, 6),
				new Values(3, 0, 8));
		spout.setCycle(false);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.each(new Fields("b"), new MyFunction(), new Fields("d"))
				.each(new Fields("a", "b", "c", "d"), new PrintFunctionBolt(),
						new Fields(""));
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident_function", config,
				topology.build());
	}
	

   MyFunction:

public class MyFunction extends BaseFunction {

    /**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	
	public void execute(TridentTuple tuple, TridentCollector collector) {
        for(int i=0; i < tuple.getInteger(0); i++) {
            collector.emit(new Values(i));
        }
    }

}

   PrintFunctionBolt:

public class PrintFunctionBolt extends BaseFunction {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		int firstIndex = tuple.getInteger(0);
		int secondIndex = tuple.getInteger(1);
		int threeIndex = tuple.getInteger(2);
		int fourIndex = tuple.getInteger(3);
		List<Integer> list = new ArrayList<Integer>();
		list.add(firstIndex);
		list.add(secondIndex);
		list.add(threeIndex);
		list.add(fourIndex);
		System.out.println("after storm function opertition change is : " +list.toString());
	}

}

    运行效果:

2016-12-22 13:22:34.365 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server 192.168.80.130/192.168.80.130:2181, sessionid = 0x159285f1109000c, negotiated timeout = 20000
2016-12-22 13:22:34.366 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2016-12-22 13:22:34.374 o.a.s.d.executor [INFO] Prepared bolt $spoutcoord-spout-spout:(2)
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 0]
2016-12-22 13:22:34.415 STDIO [INFO] after storm function opertition change is : [1, 2, 3, 1]
2016-12-22 13:22:34.442 STDIO [INFO] after storm function opertition change is : [4, 1, 6, 0]
1
1
分享到:
评论

相关推荐

    Storm Trident实战之计算网站PV.rar

    Storm Trident实战之计算网站PV.rar

    Storm Trident API 使用详解.docx

    Storm Trident API 使用详解.docx

    Storm入门教程 之Storm原理和概念详解

    Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...

    Storm流计算之项目开发视频教程

    Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...

    Storm实战:构建大数据实时计算

    阿里巴巴集团数据平台事业部商家数据业务部正是最早使用Storm的技术团队之一。  《Storm实战:构建大数据实时计算》是一本系统并且具有实践指导意义的Storm工具书和参考书,对Storm整个技术体系进行了全面的讲解,...

    基于Storm流计算天猫双十一作战室项目实战

    1、课程中完整开发3个Storm项目,均为企业实际项目,其中一个是完全由Storm Trident开发。 项目源码均可以直接运行,也可直接用于商用或企业。 2、Storm全面、系统、深入讲解 3、注重实践,对较抽象难懂的技术点如...

    Storm_Trident

    storm_Trident例子

    trident-tutorial:实用的Storm Trident教程

    本教程的结构浏览Part * .java,了解Trident的基础知识使用Skeleton.java实现自己的拓扑,或者看看其他示例├── environment ------ Vagrant resources to simulate a Storm cluster locally ├── src └── ...

    trident-examples:用 Storm Trident 编写的一组应用程序

    三叉戟的例子一组用 Storm Trident 编写的应用程序。应用用法建造$ git clone git@github.com:mayconbordin/trident-examples.git$ cd trident-examples/$ mvn -P &lt; profile&gt; package 使用local配置文件以本地模式...

    storm_trident_state

    storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。

    基于Storm流计算天猫双十一作战室项目实战.docx

    3、掌握Storm Trident项目开发模式; 4、掌握Storm集成Kafka开发及项目实战; 5、掌握HighCharts各类图表开发和实时无刷新加载数据; 6、掌握Storm+Kafka+HBase的经典组合,完整呈现企业实践项目; 7、可以做到以...

    trident-elasticsearch:ElasticSearch 的 Storm Trident 集成层

    三叉戟弹性搜索ElasticSearch 的 Storm Trident 集成层

    storm-trident:《风暴蓝图》

    三叉戟《风暴蓝图:分布式实时计算模式》一书的源码和翻译=============(已完成,待校对)(未开始)(已完成,待校对)(已完成,待校对)(未开始)(未开始)(进行中)(未开始)(未开始)(未开始)

    细细品味Storm_Storm简介及安装

    Storm分布式实时计算模式由Apache Storm 项目核心贡献者吉奥兹、奥尼尔亲笔撰 写,融合了作者丰富的Storm实战经验,通过大量示例,全面而系统地讲解使用Storm进行分布式实 时计算的核心概念及应用,并针对不同的应用...

    data-pipeline-storm:使用 Apache Storm 和 Trident 将事件中心流转换为大块 blob

    数据管道指南(使用 Apache Storm)该项目侧重于将 Apache Storm/Trident 与 Java 结合使用。 有关在没有Storm 的情况下使用 .NET 的指南,请参阅随附的。概述该项目的两个主要问题是: 促进数据的冷存储以供以后...

    Storm流计算项目

    Storm流计算项目(文档中含有视频下载地址和解压密码),内容包含 storm、trident、kafka、hbase、cdh、hightcharts 等内容

    KafkaStormES:这是从Kafka中读取并在Elastic Search中建立索引的简单Apache Storm Trident拓扑

    卡夫卡风暴这是从Kafka中读取并在Elastic Search中建立索引的简单Apache Storm Trident拓扑。 ##运行此Storm拓扑所需的设置### 1)Zookeeper。 Download from ...

    rabbit-packetstorm:使用Storm Trident处理网络数据包

    Trident用于数据包分析的连续计算和状态处理。 最后,有一个简单的仪表板可以实时观察分析。要求单节点或多节点Storm集群。 也可以在本地群集模式下测试拓扑动物园管理员卡夫卡PostgreSQLJDK 8 Node.js 去

    Storm:使用 Apache Storm 的示例

    此存储库专用于 Apache Storm 项目和代码示例。 关于风暴 要了解有关 Storm 的更多信息,最好阅读 Storm 官方网页上的。 这是一个很好的指南,并且有一些非常好的链接。 它也不长且易于理解。 运行示例 在做任何事情...

Global site tag (gtag.js) - Google Analytics