- 浏览: 53942 次
- 性别:
- 来自: 北京
文章分类
最新评论
统计高并发UV可行的方案(类似WordCount的计算去重word总数):
bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:
Pv、UV、访问深度(按每个session_id 的浏览数)
2014-05-01 UV数(按日期统计)
既然去重,必须持久化。两种持久化数据:
1、内存(适用中小型数据)
数据结构Map
2、no-sql 分布式数据库,如Hbase(适用大型数据)
1、数据源
2、日期格式化处理类
3、多线程局部汇总深度数据
4、单线程汇总数据
5、topoly类
6、pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
7、日期处理类
8、测试结果
bolt1通过fieldGrouping 进行多线程局部汇总,下一级blot2进行单线程保存session_id和count数到Map且进行遍历,可以得到:
Pv、UV、访问深度(按每个session_id 的浏览数)
2014-05-01 UV数(按日期统计)
既然去重,必须持久化。两种持久化数据:
1、内存(适用中小型数据)
数据结构Map
2、no-sql 分布式数据库,如Hbase(适用大型数据)
1、数据源
public class SourceSpout implements IRichSpout{ /** * 数据源Spout */ private static final long serialVersionUID = 1L; Queue<String> queue = new ConcurrentLinkedQueue<String>(); SpoutOutputCollector collector = null; String str = null; public void nextTuple() { if (queue.size() >= 0) { collector.emit(new Values(queue.poll())); } try { Thread.sleep(500) ; } catch (InterruptedException e) { e.printStackTrace(); } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.collector = collector; Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; for (int i = 0; i < 20; i++) { queue.add(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } } catch (Exception e) { e.printStackTrace(); } } public void close() { // TODO Auto-generated method stub } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("log")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void ack(Object msgId) { // TODO Auto-generated method stub System.out.println("spout ack:"+msgId.toString()); } public void activate() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub System.out.println("spout fail:"+msgId.toString()); } }
2、日期格式化处理类
public class FmtLogBolt implements IBasicBolt{ /** * 格式化日期 */ private static final long serialVersionUID = 1L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date","session_id")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } String eachLog = null; public void execute(Tuple input, BasicOutputCollector collector) { eachLog=input.getStringByField("log"); if (eachLog != null && eachLog.length() > 0 ) { collector.emit(new Values(DateFmt.getCountDate(eachLog.split("\t")[2],DateFmt.date_short),eachLog.split("\t")[1])) ;// 日期, session_id } } public void cleanup() { // TODO Auto-generated method stub } }
3、多线程局部汇总深度数据
public class DeepVisitBolt implements IBasicBolt{ /** * 多线程局部汇总深度数据 */ private static final long serialVersionUID = 1L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date_session_id","count")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { // TODO Auto-generated method stub } Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple input, BasicOutputCollector collector) { String dateString =input.getStringByField("date"); String session_id = input.getStringByField("session_id"); Integer count = counts.get(dateString+"_"+session_id); if (count == null) { count = 0; } count ++ ; counts.put(dateString+"_"+session_id,count) ; collector.emit(new Values(dateString+"_"+session_id,count)) ; } public void cleanup() { // TODO Auto-generated method stub }; }
4、单线程汇总数据
public class UVSumBolt implements IBasicBolt{ /** * 单线程汇总数据 */ private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void prepare(Map stormConf, TopologyContext context) { cur_date = DateFmt.getCountDate("2014-01-07", DateFmt.date_short); } long beginTime = System.currentTimeMillis() ; long endTime = 0; String cur_date = null; public void execute(Tuple input, BasicOutputCollector collector) { try { endTime = System.currentTimeMillis() ; long PV = 0;// 总数 long UV = 0; // 个数,去重后 String dateSession_id = input.getString(0); Integer count = input.getInteger(1); //清空不是当天的数据 if (!dateSession_id.startsWith(cur_date) && DateFmt.parseDate(dateSession_id.split("_")[0]).after( DateFmt.parseDate(cur_date))) { cur_date = dateSession_id.split("_")[0]; counts.clear(); } counts.put(dateSession_id, count); if (endTime - beginTime >= 2000) {//两秒输出一次 // 获取word去重个数,遍历counts 的keySet,取count Iterator<String> i2 = counts.keySet().iterator(); while (i2.hasNext()) { String key = i2.next(); if (key != null) { if (key.startsWith(cur_date)) { UV++; PV += counts.get(key); } } } System.err.println("PV=" + PV + "; UV="+ UV); } } catch (Exception e) { throw new FailedException("SumBolt fail!"); } } public void cleanup() { // TODO Auto-generated method stub } }
5、topoly类
public class UVTopo { /** * topoly类 */ public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SourceSpout(), 1); builder.setBolt("FmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout"); // Fields Grouping:按Field分组,比如按word来分组, 具有同样word的tuple会被分到相同的Bolts, 而不同的word则会被分配到不同的Bolts。 builder.setBolt("sumBolt", new DeepVisitBolt(),4).fieldsGrouping("FmtLogBolt", new Fields("date","session_id")); builder.setBolt("UvSum", new UVSumBolt(), 1).shuffleGrouping("sumBolt") ; Config conf = new Config() ; conf.setDebug(true); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
6、pom.xml文件
引用
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>StormMavenProject</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>StormMavenProject</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.0.3</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
<build>
<finalName>StormMavenProject</finalName>
</build>
</project>
7、日期处理类
public class DateFmt { /* * 日期处理类 */ public static final String date_long = "yyyy-MM-dd HH:mm:ss" ; public static final String date_short = "yyyy-MM-dd" ; public static SimpleDateFormat sdf = new SimpleDateFormat(date_short); public static String getCountDate(String date,String patton) { SimpleDateFormat sdf = new SimpleDateFormat(patton); Calendar cal = Calendar.getInstance(); if (date != null) { try { cal.setTime(sdf.parse(date)) ; } catch (ParseException e) { e.printStackTrace(); } } return sdf.format(cal.getTime()); } public static Date parseDate(String dateStr) throws Exception { return sdf.parse(dateStr); } public static void main(String[] args) throws Exception{ // System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short)); System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01"))); } }
8、测试结果
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 999一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6211、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 706一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 477英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 389一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6501、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5571.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4501、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 7921、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 580Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2061事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4151、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 868汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 649一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10401、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 670一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 556并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5031、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 375本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 640一、安装Storm wget ...
相关推荐
此案例使用的是IDEA开发工具,项目属于maven项目 该词频统计案例中,数据源是自动产生的(java程序自定义生成的),针对自定义生成的数据完成词频统计,完成后打包上传到storm程序中执行
Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...
这样的话,storm才能去消费kafka中的实时的访问日志,然后去进行缓存热数据的统计 用得技术方案非常简单,从lua脚本直接创建一个kafka producer,发送数据到kafka ``` wget ...
一个非常实用的storm入门demo,从生产数据源,后端到前端展示一应俱全,只需安装一个kafka和hbase环境即可,有readme说明!
本demo根据《learning-storm》这本书籍中的实例,改写的。对于初次学习storm的朋友,是理解storm工作流程的很好入门实例
32.项目3-非跳出UV-Storm topology开发一 33.项目3-非跳出UV-Storm topology开发二 34.项目3-非跳出UV-Web端Servlet开发 35.项目3-非跳出UV-Web端Highcharts图表开发 36.项目3-非跳出UV-项目效果调试 37.项目3-非...
storm利用ack保证数据的可靠性,发送失败时进行重发,保证数据不丢失。
主分支: ##包裹包战 mvn clean package -DskipTests=true -Dwarcp ./target/storm-ui.war $TOMCAT_HOME/webapps/包装罐 mvn clean package -DskipTests=truecp ./target/storm-ui-*.jar $STORM_HOME/external/...
storm的入门,东西很不错!看完就算是基本入门啦!!还等什么?
第4章和第5章阐述了Storm的并发度、可靠处理的特性;第6章~第8章详细而系统地讲解了几个高级特性:事务、DRPC和Trident;第9章以实例的方式讲解了Storm在实际业务场景中的应用;第10章总结了几个在大数据场景应用...
01-storm简介 02-storm部署-1 03-storm部署-2 04-storm部署概念 05-streamgrouping 06-storm组件生命周期 07-storm可靠性1 08-storm可靠性2
基于Storm实时统计CallLog实现【可运行】 使用Maven创建项目,引入Storm依赖,编写Storm组件,调试运行 本地集群搭建
分析storm的相关框架,底层实现原理。
Apache Storm(apache-storm-2.3.0.tar.gz) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言...
细细品味Storm_Storm简介及安装
作为分布式系统架构,Hadoop具有高可靠性、高扩展性、高效性、高容错性和低成本的优点。然而随着数据体积越来越大,实时处理能力成为了许多机构需要面对的首要挑战。Hadoop是一个批处理系统,在实时计算处理方面显得...
Java高级互联网架构师系统培训班课程Java高级互联网架构师系统培训班课程Java高级互联网架构师系统培训班课程
Storm Applied is a practical guide to using Apache Storm for the real-world tasks associated with processing and analyzing real-time data streams. This immediately useful book starts by building a ...