Tuple接口有很多方法可以读取从上游组件发送过来的数据,这些方法可以分为2类。
- 根据下标获取数据
- 根据字段名获取数据
读取数据方法
public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, IMeta, Tuple { private List<Object> values; private int taskId; private String streamId; private GeneralTopologyContext context; private MessageId id; private IPersistentMap _meta = null; public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) { this.values = values; this.taskId = taskId; this.streamId = streamId; this.id = id; this.context = context; String componentId = context.getComponentId(taskId); Fields schema = context.getComponentOutputFields(componentId, streamId); if(values.size()!=schema.size()) { throw new IllegalArgumentException( "Tuple created with wrong number of fields. " + "Expected " + schema.size() + " fields but got " + values.size() + " fields"); } } public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) { this(context, values, taskId, streamId, MessageId.makeUnanchored()); } Long _processSampleStartTime = null; Long _executeSampleStartTime = null; //从任务的上下文【任务创建时定义好的】里获取Tuple定义的Fields public Fields getFields() { return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId()); } //获取到field的下标,直接在values【List】中获取对应的数据 public Object getValueByField(String field) { return values.get(fieldIndex(field)); } public String getStringByField(String field) { return (String) values.get(fieldIndex(field)); } //根据下标直接在Values[List]中获取数据 public Object getValue(int i) { return values.get(i); } //....... }
任务上下文
public class GeneralTopologyContext implements JSONAware { private StormTopology _topology; private Map<Integer, String> _taskToComponent; private Map<String, List<Integer>> _componentToTasks; private Map<String, Map<String, Fields>> _componentToStreamToFields; private String _stormId; protected Map _stormConf; // pass in componentToSortedTasks for the case of running tons of tasks in single executor public GeneralTopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId) { _topology = topology; _stormConf = stormConf; _taskToComponent = taskToComponent; _stormId = stormId; _componentToTasks = componentToSortedTasks; _componentToStreamToFields = componentToStreamToFields; } /** * Gets the declared output fields for the specified component/stream. */ public Fields getComponentOutputFields(String componentId, String streamId) { Fields ret = _componentToStreamToFields.get(componentId).get(streamId); if(ret==null) { throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId); } return ret; } //...... }
相关推荐
Storm中spout和bolt之间发送和接收数据的java源代码实例
import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer extends BaseBasicBolt { public void cleanup() {} /** * The bolt will receive the line from the ...
Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。 Bolt中最重要的是execute方法,每当一个tuple传过来时它便...
storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级...
Storm Cassandra集成通过提供一个通用且可配置的backtype.storm.Bolt实现将Storm和Cassandra集成, backtype.storm.Bolt实现将Storm Tuple对象写入Cassandra Column Family。 如何将Storm Tuple数据写入Cassandra是...
16.项目1-地区销售额-优化Bolt支持重启及结果数据核查 17.项目1-地区销售额-HighCharts图表开发一及Web端架构设计 18.项目1-地区销售额-HTTP长链接实现实时推送 19.项目1-地区销售额-HighCharts图表开发二及jquery...
第一个是处理消息重放的Spout 部分,第二个是管理主要处理中间状态的Bolt 部分。 ####脱粒机####卡夫卡用作喷口的数据源。 这使得重播消息变得容易和方便。 并且使用 kafka 不需要(拓扑的)喷口自己跟踪消息。 ...
该平台使用ARIMA建立数据清洗模型,利用Storm中的Spout组件实时读取测点数据,将数据根据设定的样本容量进行封装并传递给Bolt组件,Bolt组件则完成噪声点判定、平稳化处理以及模型选参等具体的数据清洗工作。...
Tuple是Storm的数据模型,如['jdon',12346]多个Tuple组成事件流:Spout是读取需要分析处理的数据源,然后转为Tuples,这些数据源可以是Web日志、 API调用、数据库等等。Spout相当于事件流的生产者。
存档螺栓可重复使用的Storm螺栓,用于将数据存档到文件。 当前支持存储到s3。变更记录v0.1.10 将文件写入S3时,将内容类型设置为application/json 。 v0.1.8 将对象写入S3不再使用临时文件。 v0.1.7 暴风雨版本升至...
一个拓扑中包括spout和bolt两种角色,其中spout发送消息,负责将数据流以tuple元组的形式发送出去;而bolt则负责转换这些数据流,在bolt中可以完成计算、过滤等操作,bolt自身也可以随机将数据发送给其他bolt。由...
这些系统中有的拥有内建数据存储层,这是Storm所没有的,如果需要持久化,可以使用一个类似于Cassandra或Riak这样的外部数据库。 入门的最佳途径是阅读GitHub上的官方《Storm Tutorial》。 其中讨论了多种Storm概念...
Storm Tuple数据写入 Cassandra 的方式是动态配置的——您提供“确定”列族、行键和列名称/值的类,Bolt 会将数据写入 Cassandra 集群。 项目地点 Storm-cassandra 的主要开发将发生在: : 点/稳定(非快照)发布...
这些术语的字面意义翻译如下,由于这个工具的名字叫Storm,这些术语一律按照气象名词解释Storm 暴风雨spout 龙卷,读取原始数据为bolt提供数据bolt 雷电,从spout或其它bolt接收数据,并处理数据,处理结果可作为...
最新官方Bolt_2_0_0a10,可视化编程工具,比PlayMaker更兼容Unity3d,操作更人性化。无需写代码
螺栓笨拙的CSRF扫描仪重要的Bolt处于测试的Beta阶段,这意味着可能存在错误。 不鼓励使用此工具。 欢迎提出请求和问题。 如果您对此仓库感兴趣,我也建议您将它放在监视中。工作流程爬行Bolt将目标网站爬网到指定的...
WIN版 罗技新版BOLT 配对驱动
这是一个第三方开发的Unity可视化编程插件的最新版本Bolt 1.4.4,如果你想做2019版Unity上安装Bolt,劝你还是选择这个1.4.4版本,之前的版本(测试过Bolt1.4.0)可能都会出现错误提示,如:Library\PackageCache\...
photon bolt网络插件