`
woodding2008
  • 浏览: 284962 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Storm Bolt中读取Tuple数据

阅读更多

 

 

Tuple接口有很多方法可以读取从上游组件发送过来的数据,这些方法可以分为2类。

  • 根据下标获取数据
  • 根据字段名获取数据

读取数据方法

public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, IMeta, Tuple {
    private List<Object> values;
    private int taskId;
    private String streamId;
    private GeneralTopologyContext context;
    private MessageId id;
    private IPersistentMap _meta = null;
    
    public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
        this.values = values;
        this.taskId = taskId;
        this.streamId = streamId;
        this.id = id;
        this.context = context;
        
        String componentId = context.getComponentId(taskId);
        Fields schema = context.getComponentOutputFields(componentId, streamId);
        if(values.size()!=schema.size()) {
            throw new IllegalArgumentException(
                    "Tuple created with wrong number of fields. " +
                    "Expected " + schema.size() + " fields but got " +
                    values.size() + " fields");
        }
    }

    public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
        this(context, values, taskId, streamId, MessageId.makeUnanchored());
    }    
    
    Long _processSampleStartTime = null;
    Long _executeSampleStartTime = null;
	
	//从任务的上下文【任务创建时定义好的】里获取Tuple定义的Fields
    public Fields getFields() {
        return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
    }
	
	//获取到field的下标,直接在values【List】中获取对应的数据
	public Object getValueByField(String field) {
        return values.get(fieldIndex(field));
    }
	
	public String getStringByField(String field) {
        return (String) values.get(fieldIndex(field));
    }
	
	//根据下标直接在Values[List]中获取数据
	public Object getValue(int i) {
        return values.get(i);
    }
     
    //.......	 
	
}

 

任务上下文

public class GeneralTopologyContext implements JSONAware {
    private StormTopology _topology;
    private Map<Integer, String> _taskToComponent;
    private Map<String, List<Integer>> _componentToTasks;
    private Map<String, Map<String, Fields>> _componentToStreamToFields;
    private String _stormId;
    protected Map _stormConf;
    
    // pass in componentToSortedTasks for the case of running tons of tasks in single executor
    public GeneralTopologyContext(StormTopology topology, Map stormConf,
            Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
            Map<String, Map<String, Fields>> componentToStreamToFields, String stormId) {
        _topology = topology;
        _stormConf = stormConf;
        _taskToComponent = taskToComponent;
        _stormId = stormId;
        _componentToTasks = componentToSortedTasks;
        _componentToStreamToFields = componentToStreamToFields;
    }
	    /**
     * Gets the declared output fields for the specified component/stream.
     */
    public Fields getComponentOutputFields(String componentId, String streamId) {
        Fields ret = _componentToStreamToFields.get(componentId).get(streamId);
        if(ret==null) {
            throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId);
        }
        return ret;
    }
	
     //......	
	
}

 

分享到:
评论

相关推荐

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    Storm中spout和bolt之间发送和接收数据的java源代码实例

    storm 示例demo

    import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer extends BaseBasicBolt { public void cleanup() {} /** * The bolt will receive the line from the ...

    第一个Storm应用

    Spout已经成功读取文件并把每一行作为一个tuple(在Storm数据以tuple的形式传递)发射过来,我们这里需要创建两个bolt分别来负责解析每一行和对单词计数。 Bolt中最重要的是execute方法,每当一个tuple传过来时它便...

    storm记录级容错.docx

    storm允许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id可以是任意的object对象。多个源tuple可以共用一个message id,表示这多个源 tuple对用户来说是同一个消息单元。storm中记录级...

    storm-cassandra:Storm Cassandra集成

    Storm Cassandra集成通过提供一个通用且可配置的backtype.storm.Bolt实现将Storm和Cassandra集成, backtype.storm.Bolt实现将Storm Tuple对象写入Cassandra Column Family。 如何将Storm Tuple数据写入Cassandra是...

    Storm流计算项目:1号店电商实时数据分析系统-16.项目1-地区销售额-优化Bolt支持重启及结果数据核查.pptx

    16.项目1-地区销售额-优化Bolt支持重启及结果数据核查 17.项目1-地区销售额-HighCharts图表开发一及Web端架构设计 18.项目1-地区销售额-HTTP长链接实现实时推送 19.项目1-地区销售额-HighCharts图表开发二及jquery...

    storm-redis:Storm Bolt 状态管理

    第一个是处理消息重放的Spout 部分,第二个是管理主要处理中间状态的Bolt 部分。 ####脱粒机####卡夫卡用作喷口的数据源。 这使得重播消息变得容易和方便。 并且使用 kafka 不需要(拓扑的)喷口自己跟踪消息。 ...

    采煤机运行状态数据实时清洗技术研究

    该平台使用ARIMA建立数据清洗模型,利用Storm中的Spout组件实时读取测点数据,将数据根据设定的样本容量进行封装并传递给Bolt组件,Bolt组件则完成噪声点判定、平稳化处理以及模型选参等具体的数据清洗工作。...

    Storm大数据实时处理

    Tuple是Storm的数据模型,如['jdon',12346]多个Tuple组成事件流:Spout是读取需要分析处理的数据源,然后转为Tuples,这些数据源可以是Web日志、 API调用、数据库等等。Spout相当于事件流的生产者。

    archive-bolt:可重复使用的Storm螺栓,用于将内容存储到s3

    存档螺栓可重复使用的Storm螺栓,用于将数据存档到文件。 当前支持存储到s3。变更记录v0.1.10 将文件写入S3时,将内容类型设置为application/json 。 v0.1.8 将对象写入S3不再使用临时文件。 v0.1.7 暴风雨版本升至...

    流式大数据处理的三种框架:Storm,Spark和Samza

    一个拓扑中包括spout和bolt两种角色,其中spout发送消息,负责将数据流以tuple元组的形式发送出去;而bolt则负责转换这些数据流,在bolt中可以完成计算、过滤等操作,bolt自身也可以随机将数据发送给其他bolt。由...

    漫谈大数据第四期-storm

    这些系统中有的拥有内建数据存储层,这是Storm所没有的,如果需要持久化,可以使用一个类似于Cassandra或Riak这样的外部数据库。 入门的最佳途径是阅读GitHub上的官方《Storm Tutorial》。 其中讨论了多种Storm概念...

    Storm---Cassandra

    Storm Tuple数据写入 Cassandra 的方式是动态配置的——您提供“确定”列族、行键和列名称/值的类,Bolt 会将数据写入 Cassandra 集群。 项目地点 Storm-cassandra 的主要开发将发生在: : 点/稳定(非快照)发布...

    GettingStartedWithStorm-cn:翻译Storm 入门

    这些术语的字面意义翻译如下,由于这个工具的名字叫Storm,这些术语一律按照气象名词解释Storm 暴风雨spout 龙卷,读取原始数据为bolt提供数据bolt 雷电,从spout或其它bolt接收数据,并处理数据,处理结果可作为...

    Bolt_2_0_0a10.unitypackage

    最新官方Bolt_2_0_0a10,可视化编程工具,比PlayMaker更兼容Unity3d,操作更人性化。无需写代码

    Bolt:CSRF扫描仪-源码

    螺栓笨拙的CSRF扫描仪重要的Bolt处于测试的Beta阶段,这意味着可能存在错误。 不鼓励使用此工具。 欢迎提出请求和问题。 如果您对此仓库感兴趣,我也建议您将它放在监视中。工作流程爬行Bolt将目标网站爬网到指定的...

    WIN版 罗技新版BOLT 配对驱动

    WIN版 罗技新版BOLT 配对驱动

    Bolt 1.4.4.unitypackage

    这是一个第三方开发的Unity可视化编程插件的最新版本Bolt 1.4.4,如果你想做2019版Unity上安装Bolt,劝你还是选择这个1.4.4版本,之前的版本(测试过Bolt1.4.0)可能都会出现错误提示,如:Library\PackageCache\...

    photon bolt网络插件

    photon bolt网络插件

Global site tag (gtag.js) - Google Analytics