`

Trident实战之计算网站PV

 
阅读更多
1、Trident实战之计算网站PV

/**
 * Trident实战之计算网站PV
 */
public class TridentPVTopo {

	public static StormTopology buildTopology(LocalDRPC drpc) {

		Random random = new Random();
		String[] hosts = { "www.taobao.com" };
		String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34",
				"BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
		String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
				"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };

		FixedBatchSpout spout = new FixedBatchSpout(new Fields("eachLog"), 3, // 第一个参数表示输出类型,与topo的输入类型对应,第二个参数表示以三行作为一个批次
				new Values(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)]),
				new Values(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)]),
				new Values(hosts[0] + "\t" + session_id[random.nextInt(5)] + "\t" + time[random.nextInt(8)]));
		spout.setCycle(false);

		/**
		 * topo处理数据,存入中间存储
		 */
		TridentTopology topology = new TridentTopology();
		TridentState wordCounts = topology.newStream("spout1", spout)// 获取数据源
				.each(new Fields("eachLog"), new Mysplit("\t"), new Fields("date", "session_id"))// 第一参数是输入数据类型,第二参数是实现“分割”功能,第三个参数是输出数据类型
				.groupBy(new Fields("date"))// 按日期分组

				// 持久化到内存,传入session_id分组,输出pv数据类型
				.persistentAggregate(new MemoryMapState.Factory(), new Fields("session_id"), new Count(),
						new Fields("pv"));

		        // .parallelismHint(16);

		/**
		 * 读取中间存储数据
		 */
		topology.newDRPCStream("GetPV", drpc)// 输入函数名称
				.each(new Fields("args"), new Split(" "), new Fields("date"))// 对传入参数进行“分割”处理,
				.groupBy(new Fields("date"))// 用日期进行查询
				.stateQuery(wordCounts, new Fields("date"), new MapGet(), new Fields("PV"))// 第一个参数是中间存储。第二个参数是查询参数,可以不输入默认是传入流的值。
				.each(new Fields("PV"), new FilterNull());// 查询结果过滤
		return topology.build();
	}

	public static void main(String[] args) throws Exception {
		// 客户端调用topo
		Config conf = new Config();
		conf.setMaxSpoutPending(20);
		if (args.length == 0) {
			LocalDRPC drpc = new LocalDRPC();
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
			for (int i = 0; i < 100; i++) {
				System.err.println("DRPC RESULT: " + drpc.execute("GetPV", "2014-01-07 2014-01-08"));
				Thread.sleep(1000);
			}
		} else {
			conf.setNumWorkers(3);
			StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
		}
	}
}



2、自定义分割数据

public class Mysplit extends BaseFunction {

	/**
	 * 自定义分割数据
	 */
	private static final long serialVersionUID = 1L;

	String patton = null;

	public Mysplit(String patton) {
		this.patton = patton;
	}

	public void execute(TridentTuple tuple, TridentCollector collector) {
		String log = tuple.getString(0);
		String logArr[] = log.split(patton);
		if (logArr.length == 3) {
			collector.emit(new Values(DateFmt.getCountDate(logArr[2], DateFmt.date_short), logArr[1]));
		}

	}

}




public class Split extends BaseFunction {
    /**
	 * 分割数据
	 */
	private static final long serialVersionUID = 1L;

	String patton = null;

	public Split(String patton)
	{
		this.patton = patton;
	}
	
	
	public void execute(TridentTuple tuple, TridentCollector collector) {
      String sentence = tuple.getString(0);
      for (String word : sentence.split(patton)) {
        collector.emit(new Values(word));
      }
    }
  }



3、日期处理类

public class DateFmt {
	/*
	 * 日期处理类
	 */
	public static final String date_long = "yyyy-MM-dd HH:mm:ss" ;
	public static final String date_short = "yyyy-MM-dd" ;
	
	public static SimpleDateFormat sdf = new SimpleDateFormat(date_short);
	
	public static String getCountDate(String date,String patton)
	{
		SimpleDateFormat sdf = new SimpleDateFormat(patton);
		Calendar cal = Calendar.getInstance(); 
		if (date != null) {
			try {
				cal.setTime(sdf.parse(date)) ;
			} catch (ParseException e) {
				e.printStackTrace();
			}
		}
		return sdf.format(cal.getTime());
	}
	
	public static Date parseDate(String dateStr) throws Exception
	{
		return sdf.parse(dateStr);
	}
	
	public static void main(String[] args) throws Exception{

//		System.out.println(DateFmt.getCountDate("2014-03-01 12:13:14", DateFmt.date_short));
		System.out.println(parseDate("2014-05-02").after(parseDate("2014-05-01")));
	}

}



4、pom文件
引用

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.test</groupId>
  <artifactId>StormMavenProject</artifactId>
  <packaging>jar</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>StormMavenProject</name>
  <url>http://maven.apache.org</url>
  <dependencies>
   
   <dependency>
    <groupId>org.ow2.asm</groupId>
    <artifactId>asm</artifactId>
    <version>5.0.3</version>
   </dependency>
<dependency>
    <groupId>org.clojure</groupId>
    <artifactId>clojure</artifactId>
    <version>1.7.0</version>
</dependency>
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.2</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>3.0.3</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-api</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.6.6</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-slf4j-impl</artifactId>
    <version>2.8</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>minlog</artifactId>
    <version>1.3.0</version>
</dependency>
<dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
</dependency>
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>reflectasm</artifactId>
    <version>1.10.1</version>
</dependency>

<dependency>
    <groupId>javax.servlet</groupId>
    <artifactId>servlet-api</artifactId>
    <version>2.5</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.21</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-rename-hack</artifactId>
    <version>1.1.0</version>
</dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

<dependency>
    <groupId>ring-cors</groupId>
    <artifactId>ring-cors</artifactId>
    <version>0.1.5</version>
</dependency>


<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
</dependency>



  </dependencies>
  <build>
    <finalName>StormMavenProject</finalName>
  </build>
</project>



5、测试结果
引用

DRPC RESULT: [["2014-01-07 2014-01-08","2014-01-07",3]]
DRPC RESULT: [["2014-01-07 2014-01-08","2014-01-07",3]]
DRPC RESULT: [["2014-01-07 2014-01-08","2014-01-07",3]]












分享到:
评论

相关推荐

    Storm Trident实战之计算网站PV.rar

    Storm Trident实战之计算网站PV.rar

    Storm实战:构建大数据实时计算

     《Storm实战:构建大数据实时计算 》一共分为10章:第1章全面介绍了Storm的特性、能解决什么问题,以及和其他流计算系统的对比;第2章通过实际运行一个简单的例子,以及介绍本地环境和集群环境的搭建,让读者对...

    Storm入门教程 之Storm原理和概念详解

    Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群、Zookeeper集群等,涉及项目:网站PV、UV案例实战、其他案例; Storm视频教程亮点: 1、Storm...

    Trident数据手册.pdf

    Trident数据手册pdf,主要介绍Trident特点及技术规格,Trident 择是一款 3U 机架抽取式 KVM 切换器,配有一体化抽取式键盘和 3X17 英寸 LCD 显示屏,使用了高对比度的显示器(50:1),可以折叠放入 3U 机架内。

    基于Storm流计算天猫双十一作战室项目实战

    1、课程中完整开发3个Storm项目,均为企业实际项目,其中一个是完全由Storm Trident开发。 项目源码均可以直接运行,也可直接用于商用或企业。 2、Storm全面、系统、深入讲解 3、注重实践,对较抽象难懂的技术点如...

    Storm_Trident

    storm_Trident例子

    Trident8493_NVR.tar.gz

    建议选择4G或8G的小U盘,U盘的格式为FAT32,在U盘中新建一个T16的文件夹,将附件Trident8493_NVR.tar直接拷贝到T16文件夹中升级,插入U盘后点击系统维护升级。附件不要解压作直接拷贝。不要使用制作过U盘启动的U盘,...

    substance.jar和trident.jar

    Java界面GUI设计难看,所以用换肤所需的两个包substance.jar和trident.jar,方便换肤,怎样使用百度一下就可以

    trident-7.0.jar

    java swing用户交互界面的美观开发工具包,便于界面开发。

    基于Storm流计算天猫双十一作战室项目实战.docx

    3、掌握Storm Trident项目开发模式; 4、掌握Storm集成Kafka开发及项目实战; 5、掌握HighCharts各类图表开发和实时无刷新加载数据; 6、掌握Storm+Kafka+HBase的经典组合,完整呈现企业实践项目; 7、可以做到以...

    Storm流计算项目:1号店电商实时数据分析系统-24.项目1-地区销售额-Trident代码开发二.pptx

    02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API 简单开发测试 08.storm-kafka...

    基于Trident构建大规模实时流数据处理系统.pdf

    #资源达人分享计划#

    TridentNet

    本文致力于在深度学习目标检测问题中,提高对小目标的检测率

    Trident:TridentSDK的服务器和API实现

    三叉戟 Trident项目是新一代多线程,高性能和无尘Minecraft服务器的实现。最新发布的获得JAR方法一:自己构建如果您确定我们的分发形式有问题,或者您想要在获取JAR文件之前进行一些修改,则希望直接从源代码进行...

    Storm Trident API 使用详解.docx

    Storm Trident API 使用详解.docx

    Storm流计算之项目开发视频教程

    Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...

    trident-tutorial:实用的Storm Trident教程

    三叉戟教程实用的Storm Trident教程本教程以的的出色为基础。 流浪者的设置基于Taylor Goetz的。 Hazelcast状态代码基于wurstmeister的。 看看随附的。本教程的结构浏览Part * .java,了解Trident的基础知识使用...

    substance和trident javaGUI界面美化用到的包

    substance和trident javaGUI界面美化用到的包substance和trident javaGUI界面美化用到的包

    DSP中的Trident Microsystems已获得CEVA的DSP技术授权

    硅产品知识产权(SIP)平台解决方案和数字信号处理器(DSP)内核... CEVA首席执行官Gideon Wertheizer称:“Trident不断扩展其数字家庭娱乐产品线,并将推出一系列全新的创新产品,我们很高兴与之合作并助其一臂之力。CEVA

Global site tag (gtag.js) - Google Analytics