`
文章列表
1.分区事务spout public class MyPtTxSpout implements IPartitionedTransactionalSpout<MyMata>{ /** * 分区事务spout */ private static final long serialVersionUID = 1L; public static int BATCH_NUM = 10 ; public Map<Integer, Map<Long,String>> PT_DATA_MP = new HashMap<Intege ...
1、普通事务Spout /** * 普通事务Spout */ public class MyTxSpout implements ITransactionalSpout<MyMata>{ private static final long serialVersionUID = 1L; /** * 数据源 */ Map<Long, String> dbMap = null; public MyTxSpout() { Random random = new Random(); dbMap = ...
1、普通事务Spout /** * 普通事务Spout */ public class MyTxSpout implements ITransactionalSpout<MyMata>{ private static final long serialVersionUID = 1L; /** * 数据源 */ Map<Long, String> dbMap = null; public MyTxSpout() { Random random = new Random(); dbMap = ...

Storm事务API

Spout ITransactionalSpout<T>,同BaseTransactionalSpout<T>,普通事务Spout IPartitionedTransactionalSpout<T>,同BasePartitionedTransactionalSpout<T>,分区事务Spout IOpaquePartitionedTransactionalSpout<T>:同BaseOpaquePartitionedTransactionalSpout<T> ...
事务:Storm容错机制通过一个系统级别的组件acker,结合xor校验机制判断一个tuple是否发送成功,进而spout可以重发该tuple ,保证一个tuple在出错的情况下至少被重发一次。 在需要精确统计tuple的数量如销售金额场景时,希望每个tu ...
1、cd 2、ls -al 显示隐藏目录 3、rm -rf .ssh  删除各节点的.ssh目录 4、ssh master/slave 验证是否需要密码   exit退出登陆 一、设置ssh无密码访问slave节点 1、各节点(master/slave)执行ssh-keygen -t rsa      建立ssh目录,一路敲回车,生成的密钥对id_rsa,id_rsa.pub,默认存储在~/.ssh目录下。 赋予.ssh文件755权限:chmod 755 .ssh 查看.ssh文件: cd .ssh #ls –l id_rsa  id_rsa.pub 2、在master工作节点上生成 ...

storm高并发UV统计

统计高并发UV可行的方案(类似WordCount的计算去重word总数): bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到: Pv、UV、访问深度(按每个session_id 的浏览数) 2014-05-01     UV数(按日期统计) 既然去重,必须持久化。两种持久化数据: 1、内存(适用中小型数据) 数据结构Map 2、no-sql 分布式数据库,如Hbase(适用大型数据) 1、数据源 public class SourceSpout implements I ...
汇总型方案: 1、shuffleGrouping下,pv(单线程结果) * Executer并发数 一个Executer默认一个task,如果设置Task数大于1,公式应该是: pv(单线程结果) * Task 数 , 同一个Executer下task的线程ID相同,taskId不同 优点:简单、计算量小 缺点:稍有误差,但绝大多数场景能接受 优化: 案例PVBolt中每个Task都会输出一个汇总值,实际只需要一个Task输出汇总值, 利用Zookeeper锁来做到只一个Task输出汇总值,而且每5S输出一次 1、pom.xml增加zk 引用pom.xml中增加ZK: <depen ...

storm高并发PV统计

一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 线程安全:多线程处理的结果和单线程一致 如下是否可行? 不可行方案:   定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。 可行方案两个方案: 1、shuffleGrouping下,pv * Executer并发数 2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总 二、实现 注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。 1、bolt1进行多并发(局部)汇总处 ...
1、创建发射所有字符串统计总个数及去重个数处理类 public class SumBolt implements IBasicBolt { /** * 对发射所有字符串统计总个数及去重个数 */ private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple input, BasicOutputCollector col ...

storm分组策略介绍

一、storm数据来源 Spout的数据源: MQ:直接流数据源 Db:只能读配置文件 文件:只能学习用,其他无用。 问题:1、分布式应用无法读;2、spout开并发会重复读 Log文件增量数据:1、读出内容写入MQ,2、Storm处理 二、分组策略 stream grouping就是用来定义一个stream应该如果分配给Bolts上面的多个 Executors(多线程,并发度) 注:不是一个spout或bolt emit到多个bolt(广播方式)。 storm里面有6种类型的stream grouping。 单线程下均等同于All Grouping 1.Shuffle Group ...

Storm高并发介绍

并发度:   worker:指的是component (spout或bolt),并行的跑在不同的machine上的topology子集。   一个Topology可以包含一个或多个worker,worker process就是执行一个topology的子集, 并且worker只能属于一个topology。 设置worker数量 Config conf = new Config(); conf.setNumWorkers(2);   executor:worker的执行线程。   一个worker可用包含一个或多个executor, 每个component (spout或bolt) ...

Storm 字符统计Demo

1、数据源读取,字符发射spout类 /** * 字符发射spout类 */ public class RandomSentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; SpoutOutputCollector _collector; Random _rand; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { ...
一、Eclipse配置Maven 1、window-->perferences-->Maven-->installations配置目录  选择本地路径:   2、设置maven配置文件及本地库     二、创建项目 1、 启动 Eclipse, 点击 File->New->Maven Project   2、 在上面的屏幕上,保留 ‘Use default workspace location’ 选择创建这个项目在目前的工作空间。点击 Next。 向下滚动,Artifact Id 选择选项为 maven-archetype-webapp. ...

Storm 本地模式

  本地模式,是在eclipse等编译器编写strom运行文件,在于模拟storm在集群运行的结果,便于代码的编写和调试。 一、下载开发环境的zip文件,将storm相关jar包导入编辑器。注意是zip文件,不是gz运行linux文件。       http://storm.apache.org/downloads.html 二、创建数据文件      storm有Tail属性,只适合文本源,会对文本文件内容进行监听。 public class GetData { /** * 生成数据 * @param args */ public static vo ...
Global site tag (gtag.js) - Google Analytics