- 浏览: 72424 次
前文: http://humingminghz.iteye.com/blog/2314269
前面先看到了从action入口到如何切分stage, 随后submit stage的过程, 那么既然stage被submit了, 接下来就应该是cluster manager去分配各个任务到prefer location的executor上面去执行了.
submitstage的方法, 最终会把当前stage相关的所有祖先stage都提交(isActive=false),并把当前stage放到waiting的stage里面, 等所有前部stage执行完后, 再执行当前stage。 每个stage都有前后关系, 这也是为什么任意一个stage失败后, spark只需重新执行fail的stage, 而不需要执行所有的stage的原因。
好了, 我们看看submitstage里面做了什么:
getMissingParentStages这个方法就是要去看是不是前部的stage都已经存在了, 如果没有的话, 会把当前stage放到waitingStages里面, 然后继续通过submitStage(parent)去submit所有的未执行的前部stage。
spark里面会找到所有的前部stage, 先执行有依赖关系的stage, 当当前stage没有未执行的前部stage的时候就通过submitMissingTasks 去提交当前stage的task。
submitMissingTasks 很长, 我就截取其中比较重要的几个部分:
首先会把当前stage加入到runningstage里面, 防止被重复提交。
然后取得preferred location 存到taskIdToLocations里面。 这里面主要是通过getPreferredLocs(stage.rdd, p)获取。
获取到了task可以执行的worknode地址后, 创建taskBinary, 这个是用来序列化当前task, 序列化了后就可以分发到每台机器上面去反序列化再执行。 主要包含了 stage的rdd以及对这个rdd的func, 比如前面文章说的WriteToFile这个
接着创建tasks变量, 这个是实际的task, 我们前面是ResultStage, 所以这里创建了一堆ResultStage (根据partition数量来)
最后根据tasks来创建taskset以及submittask:
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
那么一步一步看, 先看怎么拿到preferredLocation的:
看一下getPreferredLocsInternal:
首先如果这个partition被cache过, 那么就返回这个cache的location, 这样就可以直接用这个partition, 减少重复计算。 这里调用了getCacheLocs来获取:
看到如果StorageLevel.NONE 那么就不从blockmanager去拿location了, 只有选择了memory或者disk的时候才会去跑else里面的代码。 在else里面是跟去rdd去计算出blockid,然后再从blockmanagermaster里面拿回TaskLocation。
如果这个partition没有被cache过, 那么就会去执行rdd.preferredLocations去看当前rdd有没有preferredLocation, 如果有直接就返回了, 如果没有, 那么就继续去rdd.dependencies里面看窄连接的RDD里面有没有preferred的location。 如果都没有, 那么就返回空。
可以看到找preferredLocation的先后顺序是:
1.Cache
2.当前RDD有没有preferredLocation
3.窄连接RDD中有没有preferredLocation
那么拿到preferredLocation过后就会去序列化stage.rdd及stage.func最后做成一个广播变量(所有的spark节点都能接收到) taskBinary。
再之后就通过stage, taskBinary, preferredLocation来创建一堆task的集合, 按照我们这条线下来, 就是ResultTask的集合。
再接下来就是把这个task集合创建成taskset, 再由taskScheduler (TaskSchedulerImpl) 去submit:
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
看一下taskSchedulerImpl里面的submitTasks方法:
看上去代码很复杂, 其实这里就是做了创建tasksetManager, 然后把taskSetManager放到schedulebleBuilder里面, 最终调用backend的reviveOffers方法。
backend是在SparkContext里面创建的, 我们的local模式的话就是LocalBackend:
那么看一下LocalBackend里面的reviveOffers方法:
可以看到这里直接通过localEndpoint发送了一个message。 到这里得提一下Akka了, 我不是很了解, 随便查了下资料, 貌似akka有actor和actorRef, 这个类LocalBackend应该就是actor了, 当actor send一个message后, actorRef会receive这个message, 然后执行相关方法。 这里面就是通过在LocalBackend的start方法里面localEndpoint = rpcEnv.setupEndpoint 关联了actorRef: LocalBackendEndpoint, 然后执行LocalBackendEndpoint中的receive方法, 我们看一下LocalBackendEndpoint里面的reveive方法:
看到receive里面会调用reviveOffers, 在reviveOffers有两个重要的方法:
1.scheduler.resourceOffers
2. executor.launchTask
先看第一个:
也蛮长的, 主要就是做:
1.Random.shuffle(offers) 打乱, 防止一个worker上面被执行太多的task
2.通过resourceOfferSingleTaskSet 做了为task分配executor的动作
到这里为止, 我们基本可以看到以下两点都已经被定义完了:
1.去哪台机器上面跑task
2.task在哪个executor里面执行
这样任务的调度就完成了
然后再看executor的launchTask代码:
其实就是在线程池里面起一个线程去跑tr (taskrunner)里面的run方法, 看一下TaskRunner: 很长, 也截取重要的部分
首先执行execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
这样就会触发在LocalEndpoint的statusUpdate:
最终执行的是scheduler.statusUpdate, 里面只是做一些map里面存的内容的删减。
接下来run方法里面回去执行task.run 方法, 看一下这个方法是做什么的:
里面主要返回和执行了:(runTask(context), context.collectAccumulators())
这个runTask是在Task类的子类 ResultTask里面的, 我们看一下:
看到了吗, 里面是一个反序列化动作, 把我们之前序列化的rdd和func拿出来。 现在已经在runtask了, 所以我们已经在目标机器的spark节点上面的executor里面跑task, 我们会把之前的广播变量taskBinary获取然后反序列化回来 value 和func, 这样就可以本地执行了。 最后调用func方法, 把就是我们的writeToFile的计算结果返回。
接下来再把这个value 结果序列化, 为传回Driver做准备:
val valueBytes = resultSer.serialize(value)
根据valueBytes创建 DirectTaskResult并序列化生成serializedDirectResult 对象
再然后根据各种情况, 生成并IndirectTaskResult或者直接返回DirectTaskResult
然后继续调用Localbackend的statusUpdate方法
在localbackend里面的statusUpdate:
这样就会调用endpoint里面的receive方法:
看到了, 实际就是调用TaskSchedulerImpl的statusUpdate, 传参TaskState.FINISHED
:
可以看到如果是Finished的状态那么就会通过taskResultGetter来把计算结果从spark节点 (executor)上面拿回Driver上(sparkcontext就是Driver, scheduler实在SC上创建的, 当然这里也是Driver了)。
那么我们看一下拿回来后做了什么:
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
可以看到直接在Driver上面起了一个线程拿数据, 数据如果是DirectTaskResult那么就不用做什么了, 把value直接返回就可以了, 如果是IndirectTaskResult, 那么就要从blockmanager通过remote的去取回序列化的对象:
然后再反序列化, 返回结果:
好了 到现在为止我们可以明白spark是在分割完stage后, 如何找到相应的spark节点上相应的executor去跑这些task, 然后跑完后结果是怎么返回到Driver端的。
其他模式(不是local)其实也差不多, 基本上就是actor和actorRef不一样, 基本上按照这个顺序看是没有问题的, 就不一一写了
先到这里把, 有什么不对的 烦请指正
前面先看到了从action入口到如何切分stage, 随后submit stage的过程, 那么既然stage被submit了, 接下来就应该是cluster manager去分配各个任务到prefer location的executor上面去执行了.
submitstage的方法, 最终会把当前stage相关的所有祖先stage都提交(isActive=false),并把当前stage放到waiting的stage里面, 等所有前部stage执行完后, 再执行当前stage。 每个stage都有前后关系, 这也是为什么任意一个stage失败后, spark只需重新执行fail的stage, 而不需要执行所有的stage的原因。
好了, 我们看看submitstage里面做了什么:
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
getMissingParentStages这个方法就是要去看是不是前部的stage都已经存在了, 如果没有的话, 会把当前stage放到waitingStages里面, 然后继续通过submitStage(parent)去submit所有的未执行的前部stage。
spark里面会找到所有的前部stage, 先执行有依赖关系的stage, 当当前stage没有未执行的前部stage的时候就通过submitMissingTasks 去提交当前stage的task。
submitMissingTasks 很长, 我就截取其中比较重要的几个部分:
private def submitMissingTasks(stage: Stage, jobId: Int) { ... runningStages += stage ... val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => val job = s.activeJob.get partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e)) runningStages -= stage return } ... var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array() case stage: ResultStage => closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array() } taskBinary = sc.broadcast(taskBinaryBytes) } ... val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators) } case stage: ResultStage => val job = stage.activeJob.get partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators) } } } ... if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingPartitions ++= tasks.map(_.partitionId) logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } ... }
首先会把当前stage加入到runningstage里面, 防止被重复提交。
然后取得preferred location 存到taskIdToLocations里面。 这里面主要是通过getPreferredLocs(stage.rdd, p)获取。
获取到了task可以执行的worknode地址后, 创建taskBinary, 这个是用来序列化当前task, 序列化了后就可以分发到每台机器上面去反序列化再执行。 主要包含了 stage的rdd以及对这个rdd的func, 比如前面文章说的WriteToFile这个
接着创建tasks变量, 这个是实际的task, 我们前面是ResultStage, 所以这里创建了一堆ResultStage (根据partition数量来)
最后根据tasks来创建taskset以及submittask:
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
那么一步一步看, 先看怎么拿到preferredLocation的:
private[spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { getPreferredLocsInternal(rdd, partition, new HashSet) }
看一下getPreferredLocsInternal:
private def getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { // If the partition has already been visited, no need to re-visit. // This avoids exponential path exploration. SPARK-695 if (!visited.add((rdd, partition))) { // Nil has already been returned for previously visited partitions. return Nil } // If the partition is cached, return the cache locations val cached = getCacheLocs(rdd)(partition) if (cached.nonEmpty) { return cached } // If the RDD has some placement preferences (as is the case for input RDDs), get those val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) { return rddPrefs.map(TaskLocation(_)) } // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency // that has any placement preferences. Ideally we would choose based on transfer sizes, // but this will do for now. rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } } case _ => } Nil }
首先如果这个partition被cache过, 那么就返回这个cache的location, 这样就可以直接用这个partition, 减少重复计算。 这里调用了getCacheLocs来获取:
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times if (!cacheLocs.contains(rdd.id)) { // Note: if the storage level is NONE, we don't need to get locations from block manager. val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) { IndexedSeq.fill(rdd.partitions.length)(Nil) } else { val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] blockManagerMaster.getLocations(blockIds).map { bms => bms.map(bm => TaskLocation(bm.host, bm.executorId)) } } cacheLocs(rdd.id) = locs } cacheLocs(rdd.id) }
看到如果StorageLevel.NONE 那么就不从blockmanager去拿location了, 只有选择了memory或者disk的时候才会去跑else里面的代码。 在else里面是跟去rdd去计算出blockid,然后再从blockmanagermaster里面拿回TaskLocation。
如果这个partition没有被cache过, 那么就会去执行rdd.preferredLocations去看当前rdd有没有preferredLocation, 如果有直接就返回了, 如果没有, 那么就继续去rdd.dependencies里面看窄连接的RDD里面有没有preferred的location。 如果都没有, 那么就返回空。
可以看到找preferredLocation的先后顺序是:
1.Cache
2.当前RDD有没有preferredLocation
3.窄连接RDD中有没有preferredLocation
那么拿到preferredLocation过后就会去序列化stage.rdd及stage.func最后做成一个广播变量(所有的spark节点都能接收到) taskBinary。
再之后就通过stage, taskBinary, preferredLocation来创建一堆task的集合, 按照我们这条线下来, 就是ResultTask的集合。
再接下来就是把这个task集合创建成taskset, 再由taskScheduler (TaskSchedulerImpl) 去submit:
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
看一下taskSchedulerImpl里面的submitTasks方法:
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers() }
看上去代码很复杂, 其实这里就是做了创建tasksetManager, 然后把taskSetManager放到schedulebleBuilder里面, 最终调用backend的reviveOffers方法。
backend是在SparkContext里面创建的, 我们的local模式的话就是LocalBackend:
case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler)
那么看一下LocalBackend里面的reviveOffers方法:
override def start() { val rpcEnv = SparkEnv.get.rpcEnv val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores) localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint) listenerBus.post(SparkListenerExecutorAdded( System.currentTimeMillis, executorEndpoint.localExecutorId, new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))) launcherBackend.setAppId(appId) launcherBackend.setState(SparkAppHandle.State.RUNNING) } override def reviveOffers() { localEndpoint.send(ReviveOffers) }
可以看到这里直接通过localEndpoint发送了一个message。 到这里得提一下Akka了, 我不是很了解, 随便查了下资料, 貌似akka有actor和actorRef, 这个类LocalBackend应该就是actor了, 当actor send一个message后, actorRef会receive这个message, 然后执行相关方法。 这里面就是通过在LocalBackend的start方法里面localEndpoint = rpcEnv.setupEndpoint 关联了actorRef: LocalBackendEndpoint, 然后执行LocalBackendEndpoint中的receive方法, 我们看一下LocalBackendEndpoint里面的reveive方法:
private val executor = new Executor( localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) override def receive: PartialFunction[Any, Unit] = { case ReviveOffers => reviveOffers() case StatusUpdate(taskId, state, serializedData) => scheduler.statusUpdate(taskId, state, serializedData) if (TaskState.isFinished(state)) { freeCores += scheduler.CPUS_PER_TASK reviveOffers() } case KillTask(taskId, interruptThread) => executor.killTask(taskId, interruptThread) } def reviveOffers() { val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, task.name, task.serializedTask) } }
看到receive里面会调用reviveOffers, 在reviveOffers有两个重要的方法:
1.scheduler.resourceOffers
2. executor.launchTask
先看第一个:
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
也蛮长的, 主要就是做:
1.Random.shuffle(offers) 打乱, 防止一个worker上面被执行太多的task
2.通过resourceOfferSingleTaskSet 做了为task分配executor的动作
到这里为止, 我们基本可以看到以下两点都已经被定义完了:
1.去哪台机器上面跑task
2.task在哪个executor里面执行
这样任务的调度就完成了
然后再看executor的launchTask代码:
def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
其实就是在线程池里面起一个线程去跑tr (taskrunner)里面的run方法, 看一下TaskRunner: 很长, 也截取重要的部分
class TaskRunner( execBackend: ExecutorBackend, val taskId: Long, val attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) extends Runnable { execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) ... val (value, accumUpdates) = try { val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } ... val valueBytes = resultSer.serialize(value) ... val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit ... val serializedResult: ByteBuffer = { if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) ... }
首先执行execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
这样就会触发在LocalEndpoint的statusUpdate:
case StatusUpdate(taskId, state, serializedData) => scheduler.statusUpdate(taskId, state, serializedData) if (TaskState.isFinished(state)) { freeCores += scheduler.CPUS_PER_TASK reviveOffers() }
最终执行的是scheduler.statusUpdate, 里面只是做一些map里面存的内容的删减。
接下来run方法里面回去执行task.run 方法, 看一下这个方法是做什么的:
final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem) : (T, AccumulatorUpdates) = { context = new TaskContextImpl( stageId, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, metricsSystem, internalAccumulators, runningLocally = false) TaskContext.setTaskContext(context) context.taskMetrics.setHostname(Utils.localHostName()) context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) } try { (runTask(context), context.collectAccumulators()) } catch { case e: Throwable => // Catch all errors; run task failure callbacks, and rethrow the exception. try { context.markTaskFailed(e) } catch { case t: Throwable => e.addSuppressed(t) } throw e } finally { // Call the task completion callbacks. context.markTaskCompleted() try { Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask() // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may // not be strictly necessary, we should revisit whether we can remove this in the future. val memoryManager = SparkEnv.get.memoryManager memoryManager.synchronized { memoryManager.notifyAll() } } } finally { TaskContext.unset() } } }
里面主要返回和执行了:(runTask(context), context.collectAccumulators())
这个runTask是在Task类的子类 ResultTask里面的, 我们看一下:
override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) }
看到了吗, 里面是一个反序列化动作, 把我们之前序列化的rdd和func拿出来。 现在已经在runtask了, 所以我们已经在目标机器的spark节点上面的executor里面跑task, 我们会把之前的广播变量taskBinary获取然后反序列化回来 value 和func, 这样就可以本地执行了。 最后调用func方法, 把就是我们的writeToFile的计算结果返回。
接下来再把这个value 结果序列化, 为传回Driver做准备:
val valueBytes = resultSer.serialize(value)
根据valueBytes创建 DirectTaskResult并序列化生成serializedDirectResult 对象
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) val serializedDirectResult = ser.serialize(directResult)
再然后根据各种情况, 生成并IndirectTaskResult或者直接返回DirectTaskResult
val serializedResult: ByteBuffer = { if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } }
然后继续调用Localbackend的statusUpdate方法
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
在localbackend里面的statusUpdate:
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { localEndpoint.send(StatusUpdate(taskId, state, serializedData)) }
这样就会调用endpoint里面的receive方法:
override def receive: PartialFunction[Any, Unit] = { case ReviveOffers => reviveOffers() case StatusUpdate(taskId, state, serializedData) => scheduler.statusUpdate(taskId, state, serializedData) if (TaskState.isFinished(state)) { freeCores += scheduler.CPUS_PER_TASK reviveOffers() } case KillTask(taskId, interruptThread) => executor.killTask(taskId, interruptThread) }
看到了, 实际就是调用TaskSchedulerImpl的statusUpdate, 传参TaskState.FINISHED
:
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { ... if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } ... }
可以看到如果是Finished的状态那么就会通过taskResultGetter来把计算结果从spark节点 (executor)上面拿回Driver上(sparkcontext就是Driver, scheduler实在SC上创建的, 当然这里也是Driver了)。
那么我们看一下拿回来后做了什么:
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
def enqueueSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { try { val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { case directResult: DirectTaskResult[_] => if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { return } // deserialize "value" without holding any lock so that it won't block other threads. // We should call it here, so that when it's called again in // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value. directResult.value() (directResult, serializedData.limit()) case IndirectTaskResult(blockId, size) => if (!taskSetManager.canFetchMoreResults(size)) { // dropped by executor if size is larger than maxResultSize sparkEnv.blockManager.master.removeBlock(blockId) return } logDebug("Fetching indirect task result for TID %s".format(tid)) scheduler.handleTaskGettingResult(taskSetManager, tid) val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId) if (!serializedTaskResult.isDefined) { /* We won't be able to get the task result if the machine that ran the task failed * between when the task ended and when we tried to fetch the result, or if the * block manager had to flush the result. */ scheduler.handleFailedTask( taskSetManager, tid, TaskState.FINISHED, TaskResultLost) return } val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( serializedTaskResult.get) sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size) } result.metrics.setResultSize(size) scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { case cnf: ClassNotFoundException => val loader = Thread.currentThread.getContextClassLoader taskSetManager.abort("ClassNotFound with classloader: " + loader) // Matching NonFatal so we don't catch the ControlThrowable from the "return" above. case NonFatal(ex) => logError("Exception while getting task result", ex) taskSetManager.abort("Exception while getting task result: %s".format(ex)) } } }) }
可以看到直接在Driver上面起了一个线程拿数据, 数据如果是DirectTaskResult那么就不用做什么了, 把value直接返回就可以了, 如果是IndirectTaskResult, 那么就要从blockmanager通过remote的去取回序列化的对象:
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
然后再反序列化, 返回结果:
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]]( serializedTaskResult.get) sparkEnv.blockManager.master.removeBlock(blockId) (deserializedResult, size)
好了 到现在为止我们可以明白spark是在分割完stage后, 如何找到相应的spark节点上相应的executor去跑这些task, 然后跑完后结果是怎么返回到Driver端的。
其他模式(不是local)其实也差不多, 基本上就是actor和actorRef不一样, 基本上按照这个顺序看是没有问题的, 就不一一写了
先到这里把, 有什么不对的 烦请指正
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1044最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
源码跟踪executor如何写数据到blockmanager, 以及如何从blockmanager读数据
2016-08-10 19:41 1356之前看了Job怎么submit 以 ... -
Spark中Blockmanager相关代码解析
2016-08-04 19:47 1788前一段时间看了如何划分stage以及如何提交Job, 最后把结 ... -
Spark中saveAsTextFile至stage划分和job提交的源代码分析
2016-07-29 14:20 3287之前看了Spark Streaming和Spark SQL, ... -
SparkSQL DF.agg 执行过程解析
2016-07-19 10:21 4042在上一篇文章前, 我一直没看懂为什么下面的代码就能得到max或 ... -
SparkSQL SQL语句解析过程源代码浅析
2016-07-15 19:34 6558前两天一直在忙本职工 ... -
SparkSQL SQL语句解析过程浅析
2016-07-15 19:06 0前两天一直在忙本职工 ... -
SparkStreaming从启动Receiver到收取数据生成RDD的代码浅析
2016-07-08 17:54 2191前面一片文章介绍了SocketTextStream 是如何从b ... -
Sparkstreaming是如何获取数据组成Dstream的源码浅析
2016-07-08 11:23 1412前面一篇文章介绍了SparkStreaming是如何不停的循环 ... -
SparkSQL 使用SQLContext读取csv文件 分析数据 (含部分数据)
2016-07-06 11:24 10083前两天开始研究SparkSQL, 其主要分为HiveConte ... -
SparkStreaming是如何完成不停的循环处理的代码浅析
2016-07-02 12:26 4581一直很好奇Sparkstreaming的ssc.start是怎 ... -
SparkStreaming 对Window的reduce的方法解析
2016-06-30 11:57 4659在sparkstreaming中对窗口 ... -
Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析
2016-06-29 19:50 8697最近在玩spark streaming, 感觉到了他的强大。 ... -
关于Eclipse开发环境下 Spark+Kafka 获取topic的时候连接出错
2016-06-28 17:20 7353林林总总玩了Spark快一个月了, 打算试一下kafka的消息 ...
相关推荐
clustermanager for AS3安装手册
通过这种方式,它能够将适当的资源分配到正确的工作负载,合并分散的集群基础架构及多个工作负载调度程序,从而提升资源利用率,还有助于满足或超出服务级别协议所需的级别,同时降低基础架构和管理成本。...
spark官方版本的driver-class-path不支持hdfs路径,只支持本地路径。本资源解决了这个问题,driver-class-path在cluster模式时可以支持hdfs路径,解决了cluster模式driver有大量jar依赖的问题。
spark学习 Spark: Cluster Computing withWorking Sets Matei
ClusterManager:在Standalone模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器。 Worker:从节点,负责控制计算节点,启动Executor。在YARN模式中为NodeManager,负责计算节点的...
Spark: Big Data Cluster Computing in Production English | 2016 | ISBN: 1119254019 | 216 pages | PDF | 5 MB Production-targeted Spark guidance with real-world use cases Spark: Big Data Cluster ...
MySQL_Cluster_Manager
Spark Cluster Computing with Data Set
chap05 Cluster Manager.pdf
藏经阁-Running Apache Spark on a High Performance Cluster Using RDM
分布并行算法,讲解详细,深入浅出,内容丰富,cluster计算 分布并行
windows中使用yarn-cluster模式提交spark任务,百度找不着的啦,看我这里。另外spark的版本要使用正确哦 更简单的方式参考: https://blog.csdn.net/u013314600/article/details/96313579
本文提出了CMP—CLUSTER混合并行计算环境概念模型的特征,提出针对 于该混合并行计算环境的并行编程模型,该模型 分为五层:任务分配模式层、任务计算模式层、设计模式并行编程框架层、中间 框架代码层、任务部署与...
spark初始化源码阅读sparkonyarn的client和cluster区别
MySQL Cluster Manager simplifies the creation and management of the MySQL Cluster database by automating common management tasks. As a result, Database Administrators (DBAs) and Systems Administrator ...
Platform Computing应用程序旨在加快计算密集型和数据密集型应用程序获取结果的速度,可以为众多行业的产品开发、关键业务决策和科学突破提供巨大动力。目前使用Platform Computing解决方案的客户已超过2500家,其中...
Ansible-ansible-spark-cluster.zip,安装spark独立集群(hdfs/spark/jupyter笔记本)或基于ambari的spark集群的ansible角色此存储库定义了多个ansible角色以帮助部署spark集群的不同模式,ansible是一个简单而强大的...
Recently updated for Spark 1.3, this book introduces Apache Spark, the open source cluster computing system that makes data analytics fast to write and fast to run. With Spark, you can tackle big ...