1.背景
spark streaming有状态计算(如UV)通常采用DStream.updateStateByKey(实际是PairDStreamFunctions增强到DStream的),具体实现网上讲的很多。spark streaming是持续计算,有状态时不能通过简单的DAG/lineage容错,所以必须设置checkpoint(否则Job启动会报错)
checkpoint会持久化当批次RDD的快照、未完成的Task状态等。SparkContext通过checkpoint可以重建DStream,即使Driver宕机,重启后仍可用SparkContext.getOrElse从checkpoint恢复之前的状态。如果上游不丢数据(如kafka),那么宕机重启后原则上可以实现续传
事情似乎是很完美,但是拿到实际环境中还是会有问题
2.过压时的表现
首先来看下计算量过载以后发生的事情。这个不是Spark的问题,但分析一下有助于理解spark streaming有状态计算的原理
手动向spark灌超量数据(数据规模大至一个Duration内无法消化当批次数据),最终报错如下
java.lang.StackOverflowError
at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
..........
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
..........(很长的重复的堆栈,栈溢出你)
出错是正常的,因为目的就是为观察压垮以后的情况,但为什么是StackOverflow(而不是通常预期的OOM)?为此研究了一下相关的源码:
首先就是PairDStreamFunctions.updateStateByKey,这里没什么特殊,就是说嘛使用的实现类是StateDStream
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean,initialRDD: RDD[(K, S)]): DStream[(K, S)] =
ssc.withScope {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,rememberPartitioner, Some(initialRDD))
}
然后来看StateDStream.getOrcompute,这是RDD实际生成的方法。这里带参数time就是有状态计算的专有逻辑
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// 先从缓存拿,没有就计算
generatedRDDs.get(time).orElse {
if (isTimeValid(time)) {
// 这里createRDDWithLocalProperties和disableOutputSpecValidatio是做一些配置相关的预处理,这里不罗列代码了。主要是调用compute方法
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
......
再来看compute方法
override def compute(validTime: Time): Option[RDD[(K, S)]] = {
getOrCompute(validTime - slideDuration) match {
case Some(prevStateRDD) => {
........
这里关键就是递归去拿上一时间窗口的RDD,这就是有状态计算的实现方法。相当于把每个时间窗口的RDD串在一起。于是当计算跟不上数据的时候,会导致每次计算落后于上次的时间越来越大。而slideDuration是固定的,于是递归层数就越来越多,最终导致栈溢出
3.探讨
如果服务宕机很长时间(比如周末挂了),重启的时候会递归很多层来恢复数据,造成栈溢出。所以这个checkpoint机制在有状态机制下实际效果是有限的
有状态实时计算比无状态复杂很多,Spark Streaming虽然提供了理论上可行的方案,但是在数据恢复方面还是有限制的。这一点目前没有想到太完善的解决方案
a) 能无状态尽量无状态计算
b) 如果需要维护的状态不是特别复杂(比如少数几个当前的累加数),可以自己单独维护状态和checkpoint(比如记录在redis或者Accumulator,在启动和shutdown的时候自己实现状态记录和恢复),这样就不需要用Spark streaming的机制
c) Spark streaming的递归机制相当于把各时间点的DAG串联成一个大DAG,从而把问题归化为无状态。这种设计还是很精妙的,但是也带来一点副作用(DAG可能变得很庞大)。事实上长时间宕机期间都是没有数据的,完全没必要逐个interval去递归。如果能够动态调整interval,也许可以解决栈溢出的问题。期待spark streaming在这方面进行优化
相关推荐
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、...基于Spark Streaming的大数据实时流计算平台和框架(包括:调度平台,开发框架,开发demo),并且是基于运行在yarn模式运行的spark streaming
java的sparkstreaming连接kafka的例子,kafka生产者生产消息,消费者读取消息,sparkstreaming读取kafka小区并进行存储iotdb数据库。
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
spark Streaming和structed streaming分析,理解整个 Spark Streaming 的模块划分和代码逻辑。
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
spark streaming spark流式计算 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据
1.理解Spark Streaming的工作流程。 2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)...
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
Spark核心概念简介: Spark使用maven进行打包(减少jar包大小): Spark中的(弹性分布式数据集)简称RDD: ...SparkStreaming中的正常操作(每读2秒就计算一次): Spark中的local[2]: Spark中的处理流程图像:
写的非常好,早了好久才找到。SparkStreaming预研报告
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
spark Streaming的原理介绍和与storm的对比
spark之sparkStreaming 理解,总结了自己的理解,欢迎大家下载观看!
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
包含kafka消息中间件的使用和Spark Streaming的示例。
06Spark Streaming原理和实践
Spark Streaming Programming Guide 翻译+个人学习笔记整理
讲述Storm与sparkStreaming分别用法与区别,在操作流程等。