`
distantlight1
  • 浏览: 43647 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Spark Streaming有状态计算的实际问题

 
阅读更多

1.背景

spark streaming有状态计算(如UV)通常采用DStream.updateStateByKey(实际是PairDStreamFunctions增强到DStream的),具体实现网上讲的很多。spark streaming是持续计算,有状态时不能通过简单的DAG/lineage容错,所以必须设置checkpoint(否则Job启动会报错)

checkpoint会持久化当批次RDD的快照、未完成的Task状态等。SparkContext通过checkpoint可以重建DStream,即使Driver宕机,重启后仍可用SparkContext.getOrElse从checkpoint恢复之前的状态。如果上游不丢数据(如kafka),那么宕机重启后原则上可以实现续传

事情似乎是很完美,但是拿到实际环境中还是会有问题

 

2.过压时的表现

首先来看下计算量过载以后发生的事情。这个不是Spark的问题,但分析一下有助于理解spark streaming有状态计算的原理

 

手动向spark灌超量数据(数据规模大至一个Duration内无法消化当批次数据),最终报错如下

java.lang.StackOverflowError

at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)

 

at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)

..........

at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)

at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)

at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)

at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at scala.Option.orElse(Option.scala:257)

at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)

..........(很长的重复的堆栈,栈溢出你)

 

出错是正常的,因为目的就是为观察压垮以后的情况,但为什么是StackOverflow(而不是通常预期的OOM)?为此研究了一下相关的源码:

 

首先就是PairDStreamFunctions.updateStateByKey,这里没什么特殊,就是说嘛使用的实现类是StateDStream

  def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean,initialRDD: RDD[(K, S)]): DStream[(K, S)] =    

    ssc.withScope {

     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,rememberPartitioner, Some(initialRDD))

  }

 

然后来看StateDStream.getOrcompute,这是RDD实际生成的方法。这里带参数time就是有状态计算的专有逻辑

  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {

    // 先从缓存拿,没有就计算

    generatedRDDs.get(time).orElse {

      if (isTimeValid(time)) {

          // 这里createRDDWithLocalProperties和disableOutputSpecValidatio是做一些配置相关的预处理,这里不罗列代码了。主要是调用compute方法

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {

          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

            compute(time)

          }

 

        }

     ......

 

再来看compute方法

  override def compute(validTime: Time): Option[RDD[(K, S)]] = {

    getOrCompute(validTime - slideDuration) match {

      case Some(prevStateRDD) => { 

     ........

这里关键就是递归去拿上一时间窗口的RDD,这就是有状态计算的实现方法。相当于把每个时间窗口的RDD串在一起。于是当计算跟不上数据的时候,会导致每次计算落后于上次的时间越来越大。而slideDuration是固定的,于是递归层数就越来越多,最终导致栈溢出

 

3.探讨

如果服务宕机很长时间(比如周末挂了),重启的时候会递归很多层来恢复数据,造成栈溢出。所以这个checkpoint机制在有状态机制下实际效果是有限的

有状态实时计算比无状态复杂很多,Spark Streaming虽然提供了理论上可行的方案,但是在数据恢复方面还是有限制的。这一点目前没有想到太完善的解决方案

a) 能无状态尽量无状态计算

b) 如果需要维护的状态不是特别复杂(比如少数几个当前的累加数),可以自己单独维护状态和checkpoint(比如记录在redis或者Accumulator,在启动和shutdown的时候自己实现状态记录和恢复),这样就不需要用Spark streaming的机制

c) Spark streaming的递归机制相当于把各时间点的DAG串联成一个大DAG,从而把问题归化为无状态。这种设计还是很精妙的,但是也带来一点副作用(DAG可能变得很庞大)。事实上长时间宕机期间都是没有数据的,完全没必要逐个interval去递归。如果能够动态调整interval,也许可以解决栈溢出的问题。期待spark streaming在这方面进行优化

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics