`
bit1129
  • 浏览: 1051460 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark100】Spark Streaming Checkpoint的一个坑

 
阅读更多

Spark Streaming UI这块是本篇额外的内容,与主题无关,只是把它记录下来

Spark Streaming UI上一组统计数字的含义

 

Streaming

  • Started at: 1433563238275(Spark Streaming开始运行的时间)
  • Time since start: 3 minutes 51 seconds(Spark Streaming已经运行了多长时间)
  • Network receivers: 2(Receiver个数)
  • Batch interval: 1 second(每个Batch的时间间隔,即接收多长时间的数据就生成一个Batch,或者说是RDD)
  • Processed batches: 231 (已经处理的Batch个数,不管Batch中是否有数据,都会计算在内,)
  • Waiting batches: 0 (等待处理的Batch数据,如果这个值很大,表明Spark的处理速度较数据接收的速度慢,需要增加计算能力或者降低接收速度)
  • Received records: 66 (已经接收到的数据,每读取一次,读取到的所有数据称为一个record)
  • Processed records: 66 (已经处理的record)

 

(Processed batches + Waiting batches) * Batch Interval = Time Since Start

 

 

Spark Streaming Checkpoint的一个坑

 

 源代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object SparkStreamingCheckpointEnabledTest {
  def main(args: Array[String]) {

    val checkpointDirectory = "file:///d:/data/chk_streaming"
    def funcToCreateSSC(): StreamingContext = {
      val conf = new SparkConf().setAppName("NetCatWordCount")
      conf.setMaster("local[3]")
      val ssc = new StreamingContext(conf, Seconds(1))
      ssc.checkpoint(checkpointDirectory)
      ssc
    }
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, funcToCreateSSC)
    val numStreams = 2
    val streams = (1 to numStreams).map(i => ssc.socketTextStream("localhost", 9999))
    val lines = ssc.union(streams)
    lines.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 以上代码是错误的,因为停掉Driver后再次重启,将无法启动,解决办法是将streams的操作放到funcToCreateSSC函数里,ssc返回前

 

object SparkStreamingCheckpointEnabledTest {
  def process(streams: Seq[DStream[String]], ssc: StreamingContext) {
    val lines = ssc.union(streams)
    lines.print
  }

  def main(args: Array[String]) {
    val checkpointDirectory = "file:///d:/data/chk_streaming"
    def funcToCreateSSC(): StreamingContext = {
      val conf = new SparkConf().setAppName("NetCatWordCount")
      conf.setMaster("local[3]")
      val ssc = new StreamingContext(conf, Seconds(1))
      ssc.checkpoint(checkpointDirectory)
      val numStreams = 2
      val streams = (1 to numStreams).map(i => ssc.socketTextStream("localhost", 9999))
      process(streams, ssc)
      ssc
    }
    val ssc = StreamingContext.getOrCreate(checkpointDirectory, funcToCreateSSC)
    ssc.start()
    ssc.awaitTermination()
  }
}

 

 

 

 

  • 大小: 54 KB
分享到:
评论

相关推荐

    Spark从入门到精通

    3、覆盖Spark所有功能点(Spark RDD、Spark SQL、Spark Streaming,初级功能到高级特性,一个不少); 4、Scala全程案例实战讲解(近百个趣味性案例); 5、Spark案例实战的代码,几乎都提供了Java和Scala两个版本和...

    基于Scala的Spark_Core、Spark_SQL和Spark_Streaming设计源码

    本项目基于Scala开发,包含148个文件,包括Scala源代码、CRC校验文件、TXT文本文件、以及多个checkpoint和ck文件。系统实现了基于Scala的Spark_Core、Spark_SQL和Spark_Streaming功能,界面友好,功能完善,适合用于...

    Spark checkPoint Demo

    import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream object UpadateDemo { def main(args: Array[String]): Unit = { val conf =...

    Spark-2.3.1源码解读

    Spark-2.3.1源码解读。...Spark Streaming源码阅读 动态发现新增分区 Dstream join 操作和 RDD join 操作的区别 PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会

    Spark分布式内存计算框架视频教程

    9.SparkStreaming Checkpoint 10.消费Kafka偏移量管理 第六章、StructuredStreaming模块 1.StructuredStreaming 概述(核心设计和编程模型) 2.入门案例:WordCount 3.输入源InputSources 4.Streaming Query 设置 5....

    基于Apache Spark的Scala大数据处理设计源码

    本源码为基于Apache Spark的Scala大数据处理设计,共包含191个文件,其中crc文件56个,class文件39个,scala文件20个,bk文件10个,xml文件8个,txt文件2个,properties文件2个,idea/$PRODUCT_WORKSPACE_FILE$文件1...

    sparkStreaming-offset-to-zk:手动管理spark streaming集成kafka的数据偏移量到zookeeper中

    下面是使用过程中记录的一些心得和博客,感兴趣的朋友可以了解下:项目简介该项目提供了一个在使用spark streaming2.3+kafka1.3的版本集成时,手动存储偏移量到zookeeper中,因为自带的checkpoint弊端太多,不利于...

    streaming-offset-to-zk:一个手动管理spark streaming集成kafka时的偏移量到zookeeper中的小项目

    公司有一个比较核心的实时业务用的是spark streaming2.1.0+kafka0.9.0.0的流式技术来开发的,存储用的hbase+elasticsearch+redis,这中间趟过很多坑,解决了一些bug和问题,在这里我把它做成了一个骨架项目并开源...

    Fire Framework / fire

    Fire框架  Fire框架是由中通大数据自主研发并开源的、专门用于进行Spark和Flink任务开发的...@Streaming(interval = 100, unaligned = true) // 100s做一次checkpoint,开启非对齐checkpoint @Kafka(brokers = "local

    demo-kafka-sparkstream-hbase

    通过 SparkStreaming 从 Kafka 加载数据到 HBase 表的演示。 以分钟为基础计算 MIN、MAX、AVG(SUM、CNT)。 Kafka 主题:demo-stream-topic HBase 表:演示日志HBase 家族:demo-ts-metrics 输入数据示例(准确...

    亚信18年java笔试题-mix_rate:按照指定的不同维度,统计mix各类日志文件中所要字段的合计值,以及各相同维度下各日志文件在某一时间

    亚信18年java笔试题 1、程序的入口函数类: ...本机调试运行: 直接运行 ... spark yarn运行: ./bin/spark-submit ..../lib/streaming-log-0.0.1-...checkpoint路径 日志类型 本类型日志的个性化处理 存储日志HDFS路径 日

    一文弄懂Flink基础理论

    文章目录Flink概述Flink生态为什么选择Flink?系统架构JobManager运行...CheckpointFlink部署与运行Yarn运行Flink作业Flink YARN SessionRun a single Flink job on YARN(推荐)Standalone部署Storm、Spark-Streaming

Global site tag (gtag.js) - Google Analytics