`
liyonghui160com
  • 浏览: 761799 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Stormstarter-RollingTopWords

阅读更多

 

实现了滑动窗口计数和TopN排序, 比较有意思, 具体分析一下代码
Topology

这是一个稍微复杂些的topology, 主要体现在使用不同的grouping方式, fieldsGrouping和globalGrouping

String spoutId = "wordGenerator";
String counterId = "counter";
String intermediateRankerId = "intermediateRanker";
String totalRankerId = "finalRanker";
builder.setSpout(spoutId, new TestWordSpout(), 5);
builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj"));
builder.setBolt(totalRankerId, new TotalRankingsBolt TOP_N)).globalGrouping(intermediateRankerId);

 
 
RollingCountBolt

首先使用RollingCountBolt, 并且此处是按照word进行fieldsGrouping的, 所以相同的word会被发送到同一个bolt, 这个field id是在上一级的declareOutputFields时指定的

RollingCountBolt, 用于基于时间窗口的counting, 所以需要两个参数, the length of the sliding window in seconds和the emit frequency in seconds

    new RollingCountBolt(9, 3), 意味着output the latest 9 minutes sliding window every 3 minutes

1. 创建SlidingWindowCounter(SlidingWindowCounter和SlotBasedCounter参考下面)
counter = new SlidingWindowCounter(this.windowLengthInSeconds / this.windowUpdateFrequencyInSeconds);
如何定义slot数? 对于9 min的时间窗口, 每3 min emit一次数据, 那么就需要9/3=3个slot
那么在3 min以内, 不停的调用countObjAndAck(tuple)来递增所有对象该slot上的计数
每3分钟会触发调用emitCurrentWindowCounts, 用于滑动窗口(通过getCountsThenAdvanceWindow), 并emit (Map<obj, 窗口内的计数和>, 实际使用时间)
因为实际emit触发时间, 不可能刚好是3 min, 会有误差, 所以需要给出实际使用时间

 

2. TupleHelpers.isTickTuple(tuple), TickTuple

前面没有说的一点是, 如何触发emit? 这是比较值得说明的一点, 因为其使用Storm的TickTuple特性.
这个功能挺有用, 比如数据库批量存储, 或者这里的时间窗口的统计等应用
"__system" component会定时往task发送 "__tick" stream的tuple
发送频率由TOPOLOGY_TICK_TUPLE_FREQ_SECS来配置, 可以在default.ymal里面配置
也可以在代码里面通过getComponentConfiguration()来进行配置,

public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;

 

配置完成后, storm就会定期的往task发送ticktuple
只需要通过isTickTuple来判断是否为tickTuple, 就可以完成定时触发的功能

public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) \\ SYSTEM_COMPONENT_ID == "__system"
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); \\ SYSTEM_TICK_STREAM_ID == "__tick"
}

 
最终, 这个blot的输出为, collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
obj, count(窗口内的计数和), 实际使用时间

 
SlotBasedCounter

基于slot的counter, 模板类, 可以指定被计数对象的类型T
这个类其实很简单, 实现计数对象和一组slot(用long数组实现)的map, 并可以对任意slot做increment或reset等操作

关键结构为Map<T, long[]> objToCounts, 为每个obj都对应于一个大小为numSlots的long数组, 所以对每个obj可以计numSlots个数
incrementCount, 递增某个obj的某个slot, 如果是第一次需要创建counts数组
getCount, getCounts, 获取某obj的某slot值, 或某obj的所有slot值的和
wipeSlot, resetSlotCountToZero, reset所有对象的某solt为0, reset某obj的某slot为0
wipeZeros, 删除所有total count为0的obj, 以释放空间

public final class SlotBasedCounter<T> implements Serializable {
private static final long serialVersionUID = 4858185737378394432L;
private final Map<T, long[]> objToCounts = new HashMap<T, long[]>();
private final int numSlots;
public SlotBasedCounter(int numSlots) {
if (numSlots <= 0) {
throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots
+ ")");
}
this.numSlots = numSlots;
}
public void incrementCount(T obj, int slot) {
long[] counts = objToCounts.get(obj);
if (counts == null) {
counts = new long[this.numSlots];
objToCounts.put(obj, counts);
}
counts[slot]++;
}
public long getCount(T obj, int slot) {
long[] counts = objToCounts.get(obj);
if (counts == null) {
return 0;
}
else {
return counts[slot];
}
}
public Map<T, Long> getCounts() {
Map<T, Long> result = new HashMap<T, Long>();
for (T obj : objToCounts.keySet()) {
result.put(obj, computeTotalCount(obj));
}
return result;
}
private long computeTotalCount(T obj) {
long[] curr = objToCounts.get(obj);
long total = 0;
for (long l : curr) {
total += l;
}
return total;
}
/**
* Reset the slot count of any tracked objects to zero for the given slot.
*
* @param slot
*/
public void wipeSlot(int slot) {
for (T obj : objToCounts.keySet()) {
resetSlotCountToZero(obj, slot);
}
}
private void resetSlotCountToZero(T obj, int slot) {
long[] counts = objToCounts.get(obj);
counts[slot] = 0;
}
private boolean shouldBeRemovedFromCounter(T obj) {
return computeTotalCount(obj) == 0;
}
/**
* Remove any object from the counter whose total count is zero (to free up memory).
*/
public void wipeZeros() {
Set<T> objToBeRemoved = new HashSet<T>();
for (T obj : objToCounts.keySet()) {
if (shouldBeRemovedFromCounter(obj)) {
objToBeRemoved.add(obj);
}
}
for (T obj : objToBeRemoved) {
objToCounts.remove(obj);
}
}
}

 


SlidingWindowCounter

SlidingWindowCounter只是对SlotBasedCounter做了进一步的封装, 通过headSlot和tailSlot提供sliding window的概念

incrementCount, 只能对headSlot进行increment, 其他slot作为窗口中的历史数据

核心的操作为, getCountsThenAdvanceWindow
1. 取出Map<T, Long> counts, 对象和窗口内所有slots求和值的map
2. 调用wipeZeros, 删除已经不被使用的obj, 释放空间
3. 最重要的一步, 清除tailSlot, 并advanceHead, 以实现滑动窗口
    advanceHead的实现, 如何在数组实现循环的滑动窗口

public final class SlidingWindowCounter<T> implements Serializable {
private static final long serialVersionUID = -2645063988768785810L;
private SlotBasedCounter<T> objCounter;
private int headSlot;
private int tailSlot;
private int windowLengthInSlots;
public SlidingWindowCounter(int windowLengthInSlots) {
if (windowLengthInSlots < 2) {
throw new IllegalArgumentException("Window length in slots must be at least two (you requested "
+ windowLengthInSlots + ")");
}
this.windowLengthInSlots = windowLengthInSlots;
this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots);
this.headSlot = 0;
this.tailSlot = slotAfter(headSlot);
}
public void incrementCount(T obj) {
objCounter.incrementCount(obj, headSlot);
}
/**
* Return the current (total) counts of all tracked objects, then advance the window.
*
* Whenever this method is called, we consider the counts of the current sliding window to be available to and
* successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent
* objects within the next "chunk" of the sliding window.
*
* @return
*/
public Map<T, Long> getCountsThenAdvanceWindow() {
Map<T, Long> counts = objCounter.getCounts();
objCounter.wipeZeros();
objCounter.wipeSlot(tailSlot);
advanceHead();
return counts;
}
private void advanceHead() {
headSlot = tailSlot;
tailSlot = slotAfter(tailSlot);
}
private int slotAfter(int slot) {
return (slot + 1) % windowLengthInSlots;
}
}

 

IntermediateRankingsBolt

这个bolt作用就是对于中间结果的排序, 为什么要增加这步, 应为数据量比较大, 如果直接全放到一个节点上排序, 会负载太重
所以先通过IntermediateRankingsBolt, 过滤掉一些
这里仍然使用, 对于obj进行fieldsGrouping, 保证对于同一个obj, 不同时间段emit的统计数据会被发送到同一个task

IntermediateRankingsBolt继承自AbstractRankerBolt(参考下面)
并实现了updateRankingsWithTuple,

void updateRankingsWithTuple(Tuple tuple) {
Rankable rankable = RankableObjectWithFields.from(tuple);
super.getRankings().updateWith(rankable);
}

 

逻辑很简单, 将Tuple转化Rankable, 并更新Rankings列表

参考AbstractRankerBolt, 该bolt会定时将Ranking列表emit出去


Rankable

Rankable除了继承Comparable接口, 还增加getObject()和getCount()接口

public interface Rankable extends Comparable<Rankable> {
Object getObject();
long getCount();
}

 

RankableObjectWithFields

RankableObjectWithFields实现Rankable接口
1. 提供将Tuple转化为RankableObject
Tuple由若干field组成, 第一个field作为obj, 第二个field作为count, 其余的都放到List<Object> otherFields中

2. 实现Rankable定义的getObject()和getCount()接口

3. 实现Comparable接口, 包含compareTo, equals

public class RankableObjectWithFields implements Rankable

public static RankableObjectWithFields from(Tuple tuple) {
List<Object> otherFields = Lists.newArrayList(tuple.getValues());
Object obj = otherFields.remove(0);
Long count = (Long) otherFields.remove(0);
return new RankableObjectWithFields(obj, count, otherFields.toArray());
}

 

Rankings

Rankings维护需要排序的List, 并提供对List相应的操作

核心的数据结构如下, 用来存储rankable对象的list
List<Rankable> rankedItems = Lists.newArrayList();

提供一些简单的操作, 比如设置maxsize(list size), getRankings(返回rankedItems, 排序列表)

核心的操作是,

public void updateWith(Rankable r) {
addOrReplace(r);
rerank();
shrinkRankingsIfNeeded();
}

 

上一级的blot会定期的发送某个时间窗口的(obj, count), 所以obj之间的排序是在不断变化的
1. 替换已有的, 或新增rankable对象(包含obj, count)
2. 从新排序(Collections.sort)
3. 由于只需要topN, 所以大于maxsize的需要删除
AbstractRankerBolt

首先以TopN为参数, 创建Rankings对象

private final Rankings rankings;
public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
count = topN;
this.emitFrequencyInSeconds = emitFrequencyInSeconds;
rankings = new Rankings(count);
}

 

在execute中, 也是定时触发emit, 同样是通过emitFrequencyInSeconds来配置tickTuple
一般情况, 只是使用updateRankingsWithTuple不断更新Rankings
这里updateRankingsWithTuple是abstract函数, 需要子类重写具体的update逻辑

public final void execute(Tuple tuple, BasicOutputCollector collector) {
if (TupleHelpers.isTickTuple(tuple)) {
emitRankings(collector);
}
else {
updateRankingsWithTuple(tuple);
}
}

 

最终将整个rankings列表emit出去

private void emitRankings(BasicOutputCollector collector) {
collector.emit(new Values(rankings));
getLogger().info("Rankings: " + rankings);
}

 


TotalRankingsBolt

该bolt会使用globalGrouping, 意味着所有的数据都会被发送到同一个task进行最终的排序.
TotalRankingsBolt同样继承自AbstractRankerBolt

void updateRankingsWithTuple(Tuple tuple) {
Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
super.getRankings().updateWith(rankingsToBeMerged);
}

 

唯一的不同是, 这里updateWith的参数是个rankable列表, 在Rankings里面的实现一样, 只是多了遍历

最终可以得到, 全局的TopN的Rankings列表

 

 

 

分享到:
评论

相关推荐

    图解迪杰斯特拉(Dijkstra)最短路径算法.docx

    一、最短路径的概念及应用 在介绍最短路径之前我们首先要明白两个概念:什么是源点,什么是终点?在一条路径中,起始的第 一个节点叫做源点;终点:在一条路径中,最后一个的节点叫做终点;注意!源点和终点都只是相对 于一条路径而言,每一条路径都会有相同或者不相同的源点和终点。 而最短路径这个词不用过多解释,就是其字面意思: 在图中,对于非带权无向图而言, 从源点到终点 边最少的路径(也就是 BFS 广度优先的方法); 而对于带权图而言, 从源点到终点权值之和最少的 路径叫最短路径; 最短路径应用:道路规划; 我们最关心的就是如何用代码去实现寻找最短路径, 通过实现最短路径有两种算法:Dijkstra 迪杰斯 特拉算法和 Floyd 弗洛伊德算法, 接下来我会详细讲解 Dijkstra 迪杰斯特拉算法;

    基于faster-rcnn实现的行人检测算法python源码+项目说明+详细注释.zip

    基于faster-rcnn实现的行人检测算法python源码+项目说明+详细注释.zip 使用方法: 1.编译安装faster-rcnn的python接口,代码在:https://github.com/rbgirshick/py 2.下载训练好的caffe模型,百度云链接为:https://pan.baidu.com/s/1w479QUUAwLBS2AJbc-eXIA,将下载的模型文件放到faster-rcnn文件夹的data/faster_rcnn_models文件夹中 3.将本项目中的文件夹替换安装好的faster-rcnn源码中的文件夹 4.使用tools文件夹下的测试脚本运行demo:python person_detect.py

    jsp基于Web的可维护的数据库浏览器(源代码+论文+答辩PPT).zip

    jsp基于Web的可维护的数据库浏览器(源代码+论文+答辩PPT)

    node-v12.7.0-linux-ppc64le.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    unet + pytorch 数据科学碗2018-python源码.zip

    unet + pytorch 数据科学碗2018-python源码.zip

    1999-2020年各地级市实际利用外资数据.xlsx

    数据预览链接:https://pan.baidu.com/s/17y5tiSmC5U4d1Mben250gg 提取码:u1da 更多介绍:https://blog.csdn.net/m0_71334485/article/details/138400336

    基于Torch Hub的渐进式GAN架构-python源码.zip

    基于Torch Hub的渐进式GAN架构-python源码.zip

    JSP基于Iptables图形管理工具的设计与实现(源代码+论文).zip

    JSP基于Iptables图形管理工具的设计与实现(源代码+论文)

    使用Keras+TensorFlow+FCN分割KITTI数据集-python源码.zip

    使用Keras+TensorFlow+FCN分割KITTI数据集-python源码.zip

    基于RRT采样对六轴机械臂进行路径规划Matlab完整源码+代码注释+项目说明.zip

    基于RRT采样对六轴机械臂进行路径规划Matlab完整源码+代码注释+项目说明.zip

    深蹲姿势分析-python源码.zip

    深蹲姿势分析-python源码.zip

    基于python实现的交通网络中的流量调控使用复杂网络中的级联失效模型.rar

    基于python实现的交通网络中的流量调控使用复杂网络中的级联失效模型.rar

    node-v4.4.1.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    node-v4.8.7.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    深度残差网络ResNet-python源码.zip

    深度残差网络ResNet-python源码.zip

    基于oecms内核蓝色经典大方手机wap企业网站源码.zip

    触屏版自适应手机wap软件网站模板 触屏版自适应手机wap软件网站模板

    2007-2022商业银行绿色信贷数据绿色信贷余额贷款总额绿色信贷比率不良贷款率

    2007-2022商业银行绿色信贷数据绿色信贷余额贷款总额绿色信贷比率不良贷款率 在一定的客观缺失。 1、数据来源:公司年报、可持续发展报告、社会责任报告 2、数 据范围: 36家上市银行 北京银行、常熟银行、成都银行、工商银行、光大银行、贵阳 银行、杭州银行、华夏银行、建设银行、江苏银行、江阴银行、交通银行、民生银行、南京 银行、宁波银行、农业银行、平安银行、浦发银行、青岛银行、青农商行、上海银行、苏农 银行、苏州银行、无锡农村商业银行、西安银行、兴业银行、邮储银行、渝农商行、张家港 行、长沙银行、招商银行、浙商银行、郑州银行、中国银行、中信银行、紫金银行

    基于优化设计的储油罐变位识别与罐容表标定的研究.doc

    本文档是课题研究的研究报告内含调研以及源码设计以及结果分析

    更新全球夜间灯光数据大全(包含各省、地级市及县区)1992-2022年

    全球夜间灯光数据 参考文献 [1]徐康宁, 陈丰龙, and 刘修岩. "中国经 济增长的真实性:基于全球夜间灯光数据的检验." 经济研究 (2015). [2] 王贤彬等. "中国地区经济差距动态趋势重估——基于卫星灯光数据的考察." 经济学 (季刊) 16.2(2017):20. 基于美国国家海洋与大气管理局(NOAA) 的DMSP/OLS影像数据和VIIRS/DNB影像数据开发而成的,目前主要反映中 国各省、地级市及县区夜间灯光数据和一带一路沿线国家的夜间灯光数据情况。DMSP/ OLS影像灯光数据目前已经应用于经济类研究,并且已有较多文献发表在诸多高水平杂志 上,因此逐渐受到学者关注。VIIRS/DNB影像灯光数据在国内文献中还未被广泛应 用,但基于其具有许多优点因此逐渐受到学者关注。由于学者通常只能获得NOAA上面的 原始图片资料,将这些图像资料转化为可用数据需要用到较为复杂的计算机和编程技术。 数据包含 核心指标:[DN均值]-计算所得的DN总值/栅格数 项目 字段内容 数 据起始时间 DMSP中国各省份灯光数据(校正后) 省份名称、年度、DN均值 1992~20

    Dijkstra算法Java实现示例

    Dijkstra算法是一种用于在加权图中找到单个源点到所有其他顶点的最短路径的算法。以下是Java语言实现Dijkstra算法的一个简单示例,这个示例假设你有一个图的邻接矩阵表示,并且所有边的权重都是正数。 代码定义了一个DijkstraExample类,其中包含了Dijkstra算法的实现。dijkstra方法接受一个图的邻接矩阵和源顶点作为输入,计算从源顶点到图中所有其他顶点的最短路径。minDistance方法用于找到未被考虑的顶点中距离最小的顶点。printSolution方法用于打印最终的最短路径结果。 在main方法中,我们创建了一个图的邻接矩阵,并调用dijkstra方法来计算从顶点0到其他所有顶点的最短路径。 请注意,这个示例仅适用于没有负权边的图,因为Dijkstra算法不能处理负权边。如果你的图有负权边,你可能需要使用Bellman-Ford算法。

Global site tag (gtag.js) - Google Analytics