`

[spark-src-core] 2.2 job submitted flow for local mode-part II

 
阅读更多

  in this section,we will verify that  how does spark collect data from prevous stage to next stage(result task)

 



 
  figure after finishing ShuffleMapTask computation(ie post process ).note:the last method 'reviveOffers()' is redundant in this mode as the step 13 will setup next stage(reuslttask) there!

 

 

    preparation for ResultTask(pre-process in reduce side)

 



   figure of running ResultTask(reduce)

 

=====

 

-ResultTask core method

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    //-no log is extended by this class,so use console
    println("-func %s in this task %s,serializer %s,rdd %s".format(func.getClass,toString,ser,rdd))
    metrics = Some(context.taskMetrics)
    //-eg. func is as ScalaWordCount#collect()>SparkContext#runJob().p1769,ie iterator.toArray()
    func(context, rdd.iterator(partition, context))
  }

 

*redudent code path:
  LocalEndpoint#receive()
case StatusUpdate(taskId, state, serializedData) =>
      scheduler.statusUpdate(taskId, state, serializedData) //-async op
      if (TaskState.isFinished(state)) {
        freeCores += scheduler.CPUS_PER_TASK //-release cpu resource assigned in reviveOffers()
        logInfo("**startup next offers..")
        //-same effect if comment below
        reviveOffers()  //-TBD redundant code path
      

  no any tasks issued within code paths below
def reviveOffers() {
    //-construct one executor with several cores,so tasks can be issued concurrently
    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) //-offset one exer
    logInfo(s"-launching tasks,executor size ${offers.size}")
    for (task <- scheduler.resourceOffers(offers).flatten) {
      logInfo("-task:" + task + ",freeCores:" + freeCores)
      freeCores -= scheduler.CPUS_PER_TASK  //-in locally running,no limit is enable
      //-processed result will be writen back via Executor.TaskRunner#run()
      executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
        task.name, task.serializedTask) //-note:differnet mode will deliver tasks differently
    }
    logInfo(s"-finished launching tasks!")
}  
detailed logs==
2016-08-10 16:21:03,016 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - **startup next offers..
2016-08-10 16:21:03,017 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -launching tasks,executor size 1
2016-08-10 16:21:03,017 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - parentName: , name: TaskSet_0, runningTasks: 1
2016-08-10 16:21:03,018 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality NO_PREF,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba
2016-08-10 16:21:03,019 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality ANY,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba
2016-08-10 16:21:03,020 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -finished launching tasks!
2016-08-10 16:21:03,021 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSetManager (Logging.scala:logInfo(59)) - -removing task id 0,parent org.apache.spark.scheduler.Pool@2061b97c
2016-08-10 16:21:03,021 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - **startup next offers..
2016-08-10 16:21:03,022 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -launching tasks,executor size 1
2016-08-10 16:21:03,022 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - parentName: , name: TaskSet_0, runningTasks: 0
2016-08-10 16:21:03,021 INFO  [task-result-getter-0] scheduler.TaskResultGetter (Logging.scala:logInfo(59)) - -deserialized result from tid 1
2016-08-10 16:21:03,022 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality NO_PREF,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba
2016-08-10 16:21:03,023 INFO  [sparkDriver-akka.actor.default-dispatcher-3] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - -max locality ANY,launchedTask false,taskset org.apache.spark.scheduler.TaskSetManager@432cf6ba
2016-08-10 16:21:03,023 INFO  [sparkDriver-akka.actor.default-dispatcher-3] local.LocalEndpoint (Logging.scala:logInfo(59)) - -finished launching tasks!
2016-08-10 16:21:03,023 INFO  [task-result-getter-1] scheduler.TaskResultGetter (Logging.scala:logInfo(59)) - -deserialized result from tid 0	//but the previous task’s result deserialization is started up now in enqueueSuccessfulTask() in TaskResultGetter

 

ref:

[spark-src-core] 2.2 job submitted flow for local mode-part I

  • 大小: 89.1 KB
  • 大小: 128.2 KB
  • 大小: 80.1 KB
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics