- 浏览: 54079 次
- 性别:
- 来自: 北京
文章分类
最新评论
一、PV统计思考
方案需要考虑分析多线程下,注意线程安全问题。
线程安全:多线程处理的结果和单线程一致
如下是否可行?
不可行方案:
定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。
可行方案两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
二、实现
注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。
1、bolt1进行多并发(局部)汇总处理类
2、bolt2单线程进行全局汇总处理类
3、topology运行main类
-------------------------------其它辅助类---------------------------
4、数据读取spout处理类
5、pom文件引用前几篇文章
6、处理结果
方案需要考虑分析多线程下,注意线程安全问题。
线程安全:多线程处理的结果和单线程一致
如下是否可行?
不可行方案:
定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。
可行方案两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总
二、实现
注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。
1、bolt1进行多并发(局部)汇总处理类
public class PVBolt1 implements IRichBolt{ /** * bolt1进行多并发(局部)汇总 */ OutputCollector collector = null; private static final long serialVersionUID = 1L; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } String logString; String session_id; long pv = 0; public void execute(Tuple input) { logString = input.getString(0); session_id = logString.split("\t")[1]; if(session_id !=null){ pv ++; } collector.emit(new Values(Thread.currentThread().getId(),pv)); System.err.println("threadId = "+ Thread.currentThread().getId()+"; pv="+pv); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("threadId", "count")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
2、bolt2单线程进行全局汇总处理类
public class PVBolt2 implements IRichBolt{ /** * bolt2单线程进行全局汇总 */ private static final long serialVersionUID = 1L; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub } Map<Long,Long>counts = new HashMap<Long,Long>(); public void execute(Tuple input) { Long thread_id = input.getLong(0); Long pv = input.getLong(1); counts.put(thread_id,pv); System.err.println(" threadId="+thread_id+"-------------pv="+pv); long word_sum = 0; //获取总数,遍历counts 的values,进行sum Iterator<Long> i = counts.values().iterator() ; while(i.hasNext()) { word_sum += i.next(); } System.err.println("PVBolt2-------------pv="+word_sum+"\r"); } public void cleanup() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
3、topology运行main类
public class Main { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new MySpout(), 1); builder.setBolt("bolt1", new PVBolt1(),4).shuffleGrouping("spout"); builder.setBolt("bolt2", new PVBolt2(),1).shuffleGrouping("bolt1"); Map conf = new HashMap(); conf.put(Config.TOPOLOGY_WORKERS, 4); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); }catch (AuthorizationException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
-------------------------------其它辅助类---------------------------
4、数据读取spout处理类
public class MySpout implements IRichSpout{ /** * 数据读取spout处理类 */ private static final long serialVersionUID = 1L; FileInputStream fis; InputStreamReader isr; BufferedReader br; SpoutOutputCollector collector = null; String str = null; public void nextTuple() { try { while ((str = this.br.readLine()) != null) { // 过滤动作 collector.emit(new Values(str)); // Thread.sleep(3000); //to do } } catch (Exception e) { // TODO: handle exception } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; this.fis = new FileInputStream("track.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); } catch (Exception e) { e.printStackTrace(); } // 打开文件 } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 发射数据格式,与bolt接收数据一致 declarer.declare(new Fields("log")); } public Map<String, Object> getComponentConfiguration() { // 与ope方法中的map对应 return null; } public void ack(Object msgId) { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub } }
5、pom文件引用前几篇文章
6、处理结果
引用
threadId=156-------------pv=44
PVBolt2-------------pv=44
threadId=156-------------pv=45
PVBolt2-------------pv=45
threadId=156-------------pv=46
PVBolt2-------------pv=46
threadId=156-------------pv=47
PVBolt2-------------pv=47
threadId=152-------------pv=1
PVBolt2-------------pv=48
threadId=215-------------pv=1
PVBolt2-------------pv=49
9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
threadId=227-------------pv=1
PVBolt2-------------pv=50
PVBolt2-------------pv=44
threadId=156-------------pv=45
PVBolt2-------------pv=45
threadId=156-------------pv=46
PVBolt2-------------pv=46
threadId=156-------------pv=47
PVBolt2-------------pv=47
threadId=152-------------pv=1
PVBolt2-------------pv=48
threadId=215-------------pv=1
PVBolt2-------------pv=49
9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
threadId=227-------------pv=1
PVBolt2-------------pv=50
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1001一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6221、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 708一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 481英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 389一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6511、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5581.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4501、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 7931、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 583Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2064事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4181、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1087统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 869汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10401、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 673一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 557并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5051、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 376本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 641一、安装Storm wget ...
相关推荐
此案例使用的是IDEA开发工具,项目属于maven项目 该词频统计案例中,数据源是自动产生的(java程序自定义生成的),针对自定义生成的数据完成词频统计,完成后打包上传到storm程序中执行
Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...
Storm Trident实战之计算网站PV.rar
这样的话,storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计 用得技术方案非常简单,从lua脚本直接创建一个kafka producer,发送数据到kafka ``` wget ...
一个非常实用的storm入门demo,从生产数据源,后端到前端展示一应俱全,只需安装一个kafka和hbase环境即可,有readme说明!
本demo根据《learning-storm》这本书籍中的实例,改写的。对于初次学习storm的朋友,是理解storm工作流程的很好入门实例
storm利用ack保证数据的可靠性,发送失败时进行重发,保证数据不丢失。
主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...
storm的入门,东西很不错!看完就算是基本入门啦!!还等什么?
第4章和第5章阐述了Storm的并发度、可靠处理的特性;第6章~第8章详细而系统地讲解了几个高级特性:事务、DRPC和Trident;第9章以实例的方式讲解了Storm在实际业务场景中的应用;第10章总结了几个在大数据场景应用...
Storm 安装配置快速上手 Storm 是一个分布式实时计算系统,用于处理大量数据流。下面是 Storm 安装配置的快速上手指南。 安装准备 在安装 Storm 之前,需要准备好 Linux 机器和相关环境。这里选择使用 VMware ...
01-storm简介 02-storm部署-1 03-storm部署-2 04-storm部署概念 05-streamgrouping 06-storm组件生命周期 07-storm可靠性1 08-storm可靠性2
基于Storm实时统计CallLog实现【可运行】 使用Maven创建项目,引入Storm依赖,编写Storm组件,调试运行 本地集群搭建
分析storm的相关框架,底层实现原理。
Apache Storm(apache-storm-2.3.0.tar.gz) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言...
细细品味Storm_Storm简介及安装
作为分布式系统架构,Hadoop具有高可靠性、高扩展性、高效性、高容错性和低成本的优点。然而随着数据体积越来越大,实时处理能力成为了许多机构需要面对的首要挑战。Hadoop是一个批处理系统,在实时计算处理方面显得...
Java高级互联网架构师系统培训班课程Java高级互联网架构师系统培训班课程Java高级互联网架构师系统培训班课程