- 浏览: 230818 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
lwb314:
你的这个是创建的临时的hive表,数据也是通过文件录入进去的, ...
Spark SQL操作Hive数据库 -
yixiaoqi2010:
你好 我的提交上去 总是报错,找不到hive表,可能是哪里 ...
Spark SQL操作Hive数据库 -
bo_hai:
target jvm版本也要选择正确。不能选择太高。2.10对 ...
eclipse开发spark程序配置本地运行
测试Spark Streaming 统计单词的例子
1.准备
事先在hdfs上创建两个目录:
保存上传数据的目录:hdfs://master1:9000/library/SparkStreaming/data
checkpoint的目录:hdfs://master1:9000/library/SparkStreaming/CheckPoint_data
2.源码
3.调度脚本run.sh
4.测试
执行脚本后,试着通过hdfs dfs -put命令上传文件到hdfs://master1:9000/library/SparkStreaming/data目录中,可以查看到隔10秒钟会处理一次。因为我设置的间隔时间是:Durations.seconds(10)
1.准备
事先在hdfs上创建两个目录:
保存上传数据的目录:hdfs://master1:9000/library/SparkStreaming/data
checkpoint的目录:hdfs://master1:9000/library/SparkStreaming/CheckPoint_data
2.源码
package com.imf.spark.SparkApps.sparkstreaming; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; import scala.Tuple2; /** * * @Description:用Spark Streaming统计单词个数 * @Author: lujinyong168 * @Date: 2016年5月21日 下午6:14:56 */ public class SparkStreamingOnHDFS { public static void main(String[] args) { /* * 第一步:配置SparkConf * 1.至少2条线程:因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据, * 并且至少有一条线程用于处理接收数据(否则无法有线程用于处理数据,随着时间的推移,内存和磁盘都会不堪重负) * 2.对于集群而言,每个Executor一般肯定不止一个Thread,那对于处理spark streaming的应用程序而言,每个Executor * 一般分配多少个Core比较合适?根据经验,5个左右的Core是最佳的; */ final SparkConf conf = new SparkConf().setMaster("spark://master1:7077") .setAppName("SparkStreamingOnHDFS"); /* * 第二步:创建SparkStreamingContext,这个是SparkStreaming应用程序所有功能的起始点和程序调度核心 * 1.SparkStreamingContext的构建可以基于SparkConf参数,也可以基于持久化的SparkStreamingContext的内容 * 来恢复过来(典型的场景是Driver崩溃后重新启动,由于Spark Streaming具有连续7*24小时不间断运行的特征, * 所以需要在Driver重启后继续上一次的状态,此时的状态恢复需要基于曾经的Checkpoint); * 2.在一个Spark Streaming应用程序中可以创建若干个SparkStreamingContext对象,使用一个SparkStream之前需要把前面 * 运行的SparkStreamingContext对象关闭掉,由此,我们获得一个重大的启发SparkStreaming也只是Spark Core上的一个应用 * 程序而已,只不过Spark Streaming框架运行的话需要Spark工程师写业务逻辑处理代码; */ final String checkpointDirectory = "hdfs://master1:9000/library/SparkStreaming/CheckPoint_data"; JavaStreamingContextFactory factory = new JavaStreamingContextFactory(){ @Override public JavaStreamingContext create() { // TODO Auto-generated method stub return createContext(checkpointDirectory,conf); } }; // JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); /** * 可以从失败中恢复Driver,不过还需要指定Driver这个进程运行在Cluster,并且提交应用程序的时候指定--supervise */ JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory); /* * 第三步:创建Spark Streaming输入数据来源input Stream * 1.数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等; * 2.在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候,一直监听该端口的数据 * (当然该端口服务首先必须存在),并且在后续会根据业务需要不断的有数据产生(当然对于Spark Streaming应用程序 * 的运行而言,有无数其处理流程都有一样); * 3.如果经常在每间隔5秒钟没有数据的话,会不断的启动空的Job其实是会造成调度资源的浪费,因为并没有数据需要发生计算,所以 * 实际的企业级生产环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job; * 4.此处没有Receiver,SparkStreaming应用程序只是按照时间间隔监控目录下每个Batch新增的内容(把新增的)作为RDD的数据来源生成原始RDD */ JavaDStream<String> lines = jsc.textFileStream("hdfs://master1:9000/library/SparkStreaming/data");//此处的是hdfs文件 /* * 第四步:接下来就像对于RDD编程一样基于DStream进行编程!!!原因是DStream是RDD产生的模板(或者说类),在Spark Streaming具体 * 发生计算前,其实质是把每个Batch的DStream的操作翻译成为对RDD的操作!!! *对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * 第4.1步:将每一行的字符串拆分成单个的单词 */ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { //如果是Scala,由于SAM转换,所以可以写成val words = lines.flatMap { line => line.split(" ")} @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); /* * 第四步:对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1) */ JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); /* * 第四步:对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * 第4.3步:在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数 */ JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) /** * */ private static final long serialVersionUID = 1L; public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); /* * 此处的print并不会直接出发Job的执行,因为现在的一切都是在Spark Streaming框架的控制之下的,对于Spark Streaming * 而言具体是否触发真正的Job运行是基于设置的Duration时间间隔的 * * 诸位一定要注意的是Spark Streaming应用程序要想执行具体的Job,对Dtream就必须有output Stream操作, * output Stream有很多类型的函数触发,类print、saveAsTextFile、saveAsHadoopFiles等,最为重要的一个 * 方法是foraeachRDD,因为Spark Streaming处理的结果一般都会放在Redis、DB、DashBoard等上面,foreachRDD * 主要就是用来完成这些功能的,而且可以随意的自定义具体数据到底放在哪里!!! * */ wordsCount.print(); /* * Spark Streaming执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于 * 接受应用程序本身或者Executor中的消息; */ jsc.start(); jsc.awaitTermination(); jsc.close(); } /** * * @Description:工厂模式创建JavaStreamingContext * @Author: lujinyong168 * @Date: 2016年5月12日 下午10:01:40 */ private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf sc){ System.out.println("Creating new context"); JavaStreamingContext ssc = new JavaStreamingContext(sc,Durations.seconds(10)); ssc.checkpoint(checkpointDirectory); return ssc; } }
3.调度脚本run.sh
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \ --class com.imf.spark.SparkApps.sparkstreaming.SparkStreamingOnHDFS \ --files /usr/local/hive/apache-hive-1.2.1-bin/conf/hive-site.xml \ --master spark://master1:7077 \ /usr/local/sparkApps/SparkStreamingOnHDFS/SparkStreamingOnHDFS.jar
4.测试
执行脚本后,试着通过hdfs dfs -put命令上传文件到hdfs://master1:9000/library/SparkStreaming/data目录中,可以查看到隔10秒钟会处理一次。因为我设置的间隔时间是:Durations.seconds(10)
发表评论
-
SparkStreaming pull data from Flume
2016-06-19 17:29 1196Spark Streaming + Flume Integra ... -
Flume push数据到SparkStreaming
2016-06-19 15:16 1895上节http://kevin12.iteye.com/blog ... -
Spark Streaming 统计单词的例
2016-06-19 14:55 3测试Spark Streaming 统计单词的例子 1.准 ... -
Spark SQL窗口函数
2016-04-22 07:18 2512窗口函数又叫着窗口分析函数,Spark 1.4版本SparkS ... -
Spark SQL内置函数应用
2016-04-22 07:00 8571简单说明 使用Spark SQL中的内置函数对数据进行 ... -
Spark SQL操作Hive数据库
2016-04-13 22:37 17546本次例子通过scala编程实现Spark SQL操作Hive数 ... -
Spark SQL on hive配置和实战
2016-03-26 18:40 5492spark sql 官网:http://spark ... -
eclipse开发hadoop环境搭建
2016-02-13 14:54 1351Hadoop2.6.0集群搭建完毕后,下面介绍一下eclips ... -
Spark RDD弹性表现和来源
2016-02-09 20:12 3820hadoop 的MapReduce是基于数 ... -
Spark内核架构
2016-02-07 12:24 9781.在将spark内核架构前,先了解一下Hadoop的MR,H ... -
spark集群HA搭建
2016-01-31 08:50 4448spark集群的HA图: 搭建spark的HA需要安装z ... -
Spark集群中WordCount运行原理
2016-01-31 07:05 2478以数据流动的视角解释一下wordcount运行的原理 pa ... -
eclipse开发spark程序配置在集群上运行
2016-01-27 08:08 9297这篇bolg讲一下,IDE开发的spark程序如何提交到集群上 ... -
eclipse开发spark程序配置本地运行
2016-01-27 07:58 12313今天简单讲一下在local模式下用eclipse开发一个简单的 ... -
spark1.6.0搭建(基于hadoop2.6.0分布式)
2016-01-24 10:11 5926本文是基于hadoop2.6.0的分布式环境搭建spark1. ... -
hadoop2.6.0集群的搭建方法
2016-01-23 22:37 33131.集群环境的安装 1.1工具软件版本说明(软件尽量去官网下载 ... -
Hadoop Shuffle(洗牌)过程
2014-03-25 14:26 999博客来源:http://www.wnt.c ... -
hadoop2.2运行wordcount例子
2014-03-10 11:46 2577转载请注明出处:http://kevin12.iteye.co ...
相关推荐
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...
java的sparkstreaming连接kafka的例子,kafka生产者生产消息,消费者读取消息,sparkstreaming读取kafka小区并进行存储iotdb数据库。
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
spark Streaming和structed streaming分析,理解整个 Spark Streaming 的模块划分和代码逻辑。
基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的...
1.理解Spark Streaming的工作流程。 2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)...
包含kafka消息中间件的使用和Spark Streaming的示例。
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
spark Streaming的原理介绍和与storm的对比
写的非常好,早了好久才找到。SparkStreaming预研报告
spark streaming streaming
本文SparkStream从磁盘文件、HDFS、KAFKA获取数据源,以单词频次统计作为入门案例,介绍了SparkStream模块API的使用。同时介绍了SparkStream的特点
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
基于spark streaming和kafka,hbase的日志统计分析系统 仅用于学习和参考
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
spark之sparkStreaming 理解,总结了自己的理解,欢迎大家下载观看!
讲述Storm与sparkStreaming分别用法与区别,在操作流程等。