`
hongs_yang
  • 浏览: 59735 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

Task的执行过程分析

阅读更多

Task的执行过程分析

 

Task的执行通过Worker启动时生成的Executor实例进行,

 

case RegisteredExecutor(sparkProperties) =>

 

logInfo("Successfully registered with driver")

 

// Make this host instead of hostPort ?

 

executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)

 

 

 

通过executor实例的launchTask启动task的执行操作。

 

 

 

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {

 

valtr = new TaskRunner(context, taskId, serializedTask)

 

runningTasks.put(taskId, tr)

 

threadPool.execute(tr)

 

}

 

 

 

生成TaskRunner线程,把task与当前的Wroker的启动的executorBackend传入,

 

on yarn模式为CoarseGrainedExecutorBackend.

 

通过threadPool线程池执行生成TaskRunner线程。

 

 

 

TaskRunner.run函数:

 

用于执行task任务的线程

 

overridedef run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>

 

valstartTime = System.currentTimeMillis()

 

SparkEvn后面在进行分析。

 

SparkEnv.set(env)

 

Thread.currentThread.setContextClassLoader(replClassLoader)

 

valser = SparkEnv.get.closureSerializer.newInstance()

 

logInfo("Running task ID " + taskId)

 

通过execBackend更新此task的状态。设置task的状态为RUNNING.master发送StatusUpdate事件。

 

execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)

 

varattemptedTask: Option[Task[Any]] = None

 

vartaskStart: Long = 0

 

def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum

 

valstartGCTime = gcTime

 

 

 

try {

 

SparkEnv.set(env)

 

Accumulators.clear()

 

解析出task的资源信息。包括要执行的jar,其它资源,task定义信息

 

val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)

 

更新资源信息,并把task执行的jar更新到当前ThreadClassLoader中。

 

updateDependencies(taskFiles, taskJars)

 

通过SparkEnv中配置的Serialize实现对task定义进行反serialize,得到Task实例。

 

Task的具体实现为ShuffleMapTask或者ResultTask

 

task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

 

 

 

如果killed的值为true,不执行当前task任务,进入catch处理。

 

// If this task has been killed before we deserialized it, let's quit now. Otherwise,

 

// continue executing the task.

 

if (killed) {

 

// Throw an exception rather than returning, because returning within a try{} block

 

// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl

 

// exception will be caught by the catch block, leading to an incorrect ExceptionFailure

 

// for the task.

 

throw TaskKilledException

 

}

 

 

 

attemptedTask = Some(task)

 

logDebug("Task " + taskId +"'s epoch is " + task.epoch)

 

env.mapOutputTracker.updateEpoch(task.epoch)

 

生成TaskContext实例,通过Task.runTask执行task的任务,等待task执行完成。

 

// Run the actual task and measure its runtime.

 

taskStart = System.currentTimeMillis()

 

valvalue = task.run(taskId.toInt)

 

valtaskFinish = System.currentTimeMillis()

 

 

 

此时task执行结束,检查如果task是被killed的结果,进入catch处理。

 

// If the task has been killed, let's fail it.

 

if (task.killed) {

 

throw TaskKilledException

 

}

 

task执行的返回结果进行serialize操作。

 

valresultSer = SparkEnv.get.serializer.newInstance()

 

valbeforeSerialization = System.currentTimeMillis()

 

valvalueBytes = resultSer.serialize(value)

 

valafterSerialization = System.currentTimeMillis()

 

发送监控指标

 

for (m <- task.metrics) {

 

m.hostname = Utils.localHostName()

 

m.executorDeserializeTime = (taskStart - startTime).toInt

 

m.executorRunTime = (taskFinish - taskStart).toInt

 

m.jvmGCTime = gcTime - startGCTime

 

m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt

 

}

 

 

 

valaccumUpdates = Accumulators.values

 

Task的返回结果生成DirectTaskResult实例。并对其进行serialize操作。

 

valdirectResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))

 

valserializedDirectResult = ser.serialize(directResult)

 

logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)

 

检查task result的大小是否超过了akka的发送消息大小,

 

如果是通过BlockManager来管理结果,设置RDD的存储级别为MEMORYDISK,否则表示没有达到actor消息大小,

 

直接使用TaskResult,此部分信息主要是需要通过状态更新向Scheduler向送StatusUpdate事件调用。

 

valserializedResult = {

 

if (serializedDirectResult.limit >= akkaFrameSize - 1024) {

 

logInfo("Storing result for " + taskId + " in local BlockManager")

 

valblockId = TaskResultBlockId(taskId)

 

env.blockManager.putBytes(

 

blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

 

ser.serialize(new IndirectTaskResult[Any](blockId))

 

} else {

 

logInfo("Sending result for " + taskId + " directly to driver")

 

serializedDirectResult

 

}

 

}

 

通过execBackend更新此task的状态。设置task的状态为FINISHED.master发送StatusUpdate事件。

 

execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

 

logInfo("Finished task ID " + taskId)

 

} catch {

 

出现异常,发送FAILED事件。

 

caseffe: FetchFailedException => {

 

valreason = ffe.toTaskEndReason

 

execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

 

}

 

 

 

case TaskKilledException => {

 

logInfo("Executor killed task " + taskId)

 

execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))

 

}

 

 

 

caset: Throwable => {

 

valserviceTime = (System.currentTimeMillis() - taskStart).toInt

 

valmetrics = attemptedTask.flatMap(t => t.metrics)

 

for (m <- metrics) {

 

m.executorRunTime = serviceTime

 

m.jvmGCTime = gcTime - startGCTime

 

}

 

valreason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)

 

execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

 

 

 

// TODO: Should we exit the whole executor here? On the one hand, the failed task may

 

// have left some weird state around depending on when the exception was thrown, but on

 

// the other hand, maybe we could detect that when future tasks fail and exit then.

 

logError("Exception in task ID " + taskId, t)

 

//System.exit(1)

 

}

 

} finally {

 

shuffleMemoryMap中移出此线程对应的shuffle的内存空间

 

// TODO: Unregister shuffle memory only for ResultTask

 

valshuffleMemoryMap = env.shuffleMemoryMap

 

shuffleMemoryMap.synchronized {

 

shuffleMemoryMap.remove(Thread.currentThread().getId)

 

}

 

runningTasks中移出此task

 

runningTasks.remove(taskId)

 

}

 

}

 

}

 

 

 

Task执行过程的状态更新

 

ExecutorBackend.statusUpdate

 

on yarn模式实现类CoarseGrainedExecutorBackend,通过masteractor发送StatusUpdate事件。

 

overridedef statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {

 

driver ! StatusUpdate(executorId, taskId, state, data)

 

}

 

 

 

master 中的ExecutorBackend处理状态更新操作:

 

实现类:CoarseGrainedSchedulerBackend.DriverActor

 

case StatusUpdate(executorId, taskId, state, data) =>

 

通过TaskSchedulerImplstatusUpdate处理状态更新。

 

scheduler.statusUpdate(taskId, state, data.value)

 

如果Task状态为完成状态,完成状态包含(FINISHED, FAILED, KILLED, LOST)

 

if (TaskState.isFinished(state)) {

 

if (executorActor.contains(executorId)) {

 

每一个task占用一个cpu core,此时task完成,把可用的core值加一

 

freeCores(executorId) += 1

 

在此executor上接着执行其于的task任务,此部分可参见scheduler调度过程分析中的部分说明。

 

makeOffers(executorId)

 

} else {

 

// Ignoring the update since we don't know about the executor.

 

valmsg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"

 

logWarning(msg.format(taskId, state, sender, executorId))

 

}

 

}

 

 

 

TaskSchedulerImpl.statusUpdate函数处理流程

 

 

 

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {

 

varfailedExecutor: Option[String] = None

 

synchronized {

 

try {

 

如果Task的状态传入为Task的执行丢失,同时taskexecutor列表中存在

 

if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {

 

得到此task执行的worker所属的executorID

 

// We lost this entire executor, so remember that it's gone

 

valexecId = taskIdToExecutorId(tid)

 

如果此executoractiveExecutor,执行schedulerexecutorLost操作。

 

包含TaskSetManager,会执行TaskSetManager.executorLost操作.

 

设置当前的executorfailedExecutor,共函数最后使用。

 

if (activeExecutorIds.contains(execId)) {

 

removeExecutor(execId)

 

failedExecutor = Some(execId)

 

}

 

}

 

taskIdToTaskSetId.get(tid) match {

 

case Some(taskSetId) =>

 

如果task状态是完成状态,非RUNNING状态。移出对应的容器中的值

 

if (TaskState.isFinished(state)) {

 

taskIdToTaskSetId.remove(tid)

 

if (taskSetTaskIds.contains(taskSetId)) {

 

taskSetTaskIds(taskSetId) -= tid

 

}

 

taskIdToExecutorId.remove(tid)

 

}

 

activeTaskSets.get(taskSetId).foreach { taskSet =>

 

如果task是成功完成,从TaskSet中移出此task,同时通过TaskResultGetter获取数据。

 

if (state == TaskState.FINISHED) {

 

taskSet.removeRunningTask(tid)

 

taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)

 

} elseif (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {

 

task任务执行失败的处理部分:

 

taskSet.removeRunningTask(tid)

 

taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)

 

}

 

}

 

case None =>

 

logInfo("Ignoring update with state %s from TID %s because its task set is gone"

 

.format(state, tid))

 

}

 

} catch {

 

casee: Exception => logError("Exception in statusUpdate", e)

 

}

 

}

 

如果有failedworker,通过dagScheduler处理此executor.

 

// Update the DAGScheduler without holding a lock on this, since that can deadlock

 

if (failedExecutor != None) {

 

dagScheduler.executorLost(failedExecutor.get)

 

发起task执行的分配与任务执行操作。

 

backend.reviveOffers()

 

}

 

}

 

 

 

TaskStatus.LOST状态,同时executoractiveExecutorIds

 

TaskStatus的状态为LOST时,同时executor是活动的executor(也就是有过执行task的情况)

 

privatedef removeExecutor(executorId: String) {

 

activeExecutorIds中移出此executor

 

activeExecutorIds -= executorId

 

得到此executor对应的workerhost

 

valhost = executorIdToHost(executorId)

 

取出host对应的所有executor,并移出当前的executor

 

valexecs = executorsByHost.getOrElse(host, new HashSet)

 

execs -= executorId

 

if (execs.isEmpty) {

 

executorsByHost -= host

 

}

 

executor对应的host容器中移出此executor

 

executorIdToHost -= executorId

 

此处主要是去执行TaskSetManager.executorLost函数。

 

rootPool.executorLost(executorId, host)

 

}

 

 

 

TaskSetManager.executorLost函数:

 

此函数主要处理executor导致task丢失的情况,把executor上的task重新添加到pendingtasks列表中

 

overridedef executorLost(execId: String, host: String) {

 

logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)

 

 

 

// Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a

 

// task that used to have locations on only this host might now go to the no-prefs list. Note

 

// that it's okay if we add a task to the same queue twice (if it had multiple preferred

 

// locations), because findTaskFromList will skip already-running tasks.

 

重新生成此TaskSet中的pending队列,因为当前executor的实例被移出,需要重新生成。

 

for (index <- getPendingTasksForExecutor(execId)) {

 

addPendingTask(index, readding=true)

 

}

 

for (index <- getPendingTasksForHost(host)) {

 

addPendingTask(index, readding=true)

 

}

 

 

 

// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage

 

如果当前的RDDshufflerdd,

 

if (tasks(0).isInstanceOf[ShuffleMapTask]) {

 

for ((tid, info) <- taskInfosifinfo.executorId == execId) {

 

valindex = taskInfos(tid).index

 

if (successful(index)) {

 

successful(index) = false

 

copiesRunning(index) -= 1

 

tasksSuccessful -= 1

 

addPendingTask(index)

 

// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our

 

// stage finishes when a total of tasks.size tasks finish.

 

通过DAGScheduler发送CompletionEvent处理事件,事件类型为Resubmitted,

 

sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)

 

}

 

}

 

}

 

如果task还处于running状态,同时此tasklostexecutor上运行,

 

// Also re-enqueue any tasks that were running on the node

 

for ((tid, info) <- taskInfosifinfo.running && info.executorId == execId) {

 

设置taskfailed值为true,移出此taskrunning列表中的值,重新添加taskpendingtasks队列中。

 

handleFailedTask(tid, TaskState.FAILED, None)

 

}

 

}

 

 

 

DAGScheduler处理CompletionEvent事件。

 

...........................

 

casecompletion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

 

listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))

 

handleTaskCompletion(completion)

 

.........................

 

case Resubmitted =>

 

logInfo("Resubmitted " + task + ", so marking it as still running")

 

pendingTasks(stage) += task

 

 

 

(TaskState.FAILED, TaskState.KILLED, TaskState.LOST)状态

 

.........................

 

} elseif (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {

 

taskrunning容器中移出

 

taskSet.removeRunningTask(tid)

 

此函数主要是解析出出错的信息。并通过TaskSchedulerImpl.handleFailedTask处理exception

 

taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)

 

}

 

 

 

 

 

TaskSchedulerImpl.handleFailedTask函数:

 

def handleFailedTask(

 

taskSetManager: TaskSetManager,

 

tid: Long,

 

taskState: TaskState,

 

reason: Option[TaskEndReason]) = synchronized {

 

taskSetManager.handleFailedTask(tid, taskState, reason)

 

如果task不是被KILLED掉的task,重新发起task的分配与执行操作。

 

if (taskState != TaskState.KILLED) {

 

// Need to revive offers again now that the task set manager state has been updated to

 

// reflect failed tasks that need to be re-run.

 

backend.reviveOffers()

 

}

 

}

 

 

 

TaskSetManager.handleFailedTask函数流程

 

TaskSetManager.handleFailedTask,函数,处理task执行的exception信息。

 

def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {

 

valinfo = taskInfos(tid)

 

if (info.failed) {

 

return

 

}

 

removeRunningTask(tid)

 

valindex = info.index

 

info.markFailed()

 

varfailureReason = "unknown"

 

if (!successful(index)) {

 

logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))

 

copiesRunning(index) -= 1

 

如果是通过TaskSetManager.executorLost函数发起的此函数调用(Task.LOST),下面的case部分不会执行,

 

否则是task的执行exception情况,也就是状态更新中非Task.LOST状态时。

 

// Check if the problem is a map output fetch failure. In that case, this

 

// task will never succeed on any node, so tell the scheduler about it.

 

reason.foreach {

 

casefetchFailed: FetchFailed =>

 

读取失败,移出所有此tasksettask执行。并从scheduler中移出此taskset的调度,不再执行下面流程

 

logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)

 

sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null)

 

successful(index) = true

 

tasksSuccessful += 1

 

sched.taskSetFinished(this)

 

removeAllRunningTasks()

 

return

 

 

 

case TaskKilled =>

 

taskkill掉,移出此task,同时不再执行下面流程

 

logWarning("Task %d was killed.".format(tid))

 

sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null)

 

return

 

 

 

caseef: ExceptionFailure =>

 

sched.dagScheduler.taskEnded(

 

tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))

 

if (ef.className == classOf[NotSerializableException].getName()) {

 

// If the task result wasn't rerializable, there's no point in trying to re-execute it.

 

logError("Task %s:%s had a not serializable result: %s; not retrying".format(

 

taskSet.id, index, ef.description))

 

abort("Task %s:%s had a not serializable result: %s".format(

 

taskSet.id, index, ef.description))

 

return

 

}

 

valkey = ef.description

 

failureReason = "Exception failure: %s".format(ef.description)

 

valnow = clock.getTime()

 

val (printFull, dupCount) = {

 

if (recentExceptions.contains(key)) {

 

val (dupCount, printTime) = recentExceptions(key)

 

if (now - printTime > EXCEPTION_PRINT_INTERVAL) {

 

recentExceptions(key) = (0, now)

 

(true, 0)

 

} else {

 

recentExceptions(key) = (dupCount + 1, printTime)

 

(false, dupCount + 1)

 

}

 

} else {

 

recentExceptions(key) = (0, now)

 

(true, 0)

 

}

 

}

 

if (printFull) {

 

vallocs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))

 

logWarning("Loss was due to %s\n%s\n%s".format(

 

ef.className, ef.description, locs.mkString("\n")))

 

} else {

 

logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))

 

}

 

 

 

case TaskResultLost =>

 

failureReason = "Lost result for TID %s on host %s".format(tid, info.host)

 

logWarning(failureReason)

 

sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null)

 

 

 

case _ => {}

 

}

 

重新把task添加到pending的执行队列中,同时如果状态非KILLED的状态,设置并检查是否达到重试的最大次数

 

// On non-fetch failures, re-enqueue the task as pending for a max number of retries

 

addPendingTask(index)

 

if (state != TaskState.KILLED) {

 

numFailures(index) += 1

 

if (numFailures(index) >= maxTaskFailures) {

 

logError("Task %s:%d failed %d times; aborting job".format(

 

taskSet.id, index, maxTaskFailures))

 

abort("Task %s:%d failed %d times (most recent failure: %s)".format(

 

taskSet.id, index, maxTaskFailures, failureReason))

 

}

 

}

 

} else {

 

logInfo("Ignoring task-lost event for TID " + tid +

 

" because task " + index + " is already finished")

 

}

 

}

 

 

 

DAGScheduler处理taskEnded流程:

 

def taskEnded(

 

task: Task[_],

 

reason: TaskEndReason,

 

result: Any,

 

accumUpdates: Map[Long, Any],

 

taskInfo: TaskInfo,

 

taskMetrics: TaskMetrics) {

 

eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)

 

}

 

处理CompletionEvent事件:

 

casecompletion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

 

listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))

 

handleTaskCompletion(completion)

 

 

 

DAGScheduler.handleTaskCompletion

 

读取失败的case,

 

case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>

 

// Mark the stage that the reducer was in as unrunnable

 

valfailedStage = stageIdToStage(task.stageId)

 

running -= failedStage

 

failed += failedStage

 

..............................

 

// Mark the map whose fetch failed as broken in the map stage

 

valmapStage = shuffleToMapStage(shuffleId)

 

if (mapId != -1) {

 

mapStage.removeOutputLoc(mapId, bmAddress)

 

mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)

 

}

 

...........................

 

failed += mapStage

 

// Remember that a fetch failed now; this is used to resubmit the broken

 

// stages later, after a small wait (to give other tasks the chance to fail)

 

lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock

 

// TODO: mark the executor as failed only if there were lots of fetch failures on it

 

if (bmAddress != null) {

 

stage中可执行的partition中对应的executoridlocation全部移出。

 

handleExecutorLost(bmAddress.executorId, Some(task.epoch))

 

}

 

 

 

case ExceptionFailure(className, description, stackTrace, metrics) =>

 

// Do nothing here, left up to the TaskScheduler to decide how to handle user failures

 

 

 

case TaskResultLost =>

 

// Do nothing here; the TaskScheduler handles these failures and resubmits the task.

 

 

 

 

 

TaskStatus.FINISHED状态

 

此状态表示task正常完成,

 

if (state == TaskState.FINISHED) {

 

移出taskSet中的running队列中移出此task

 

taskSet.removeRunningTask(tid)

 

获取task的响应数据。

 

taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)

 

 

 

TaskResultGetter.enqueueSuccessfulTask函数:

 

 

 

def enqueueSuccessfulTask(

 

taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {

 

getTaskResultExecutor.execute(new Runnable {

 

overridedef run() {

 

try {

 

从响应的结果中得到数据,需要先执行deserialize操作。

 

valresult = serializer.get().deserialize[TaskResult[_]](serializedData) match {

 

如果result的结果小于akkaactor传输的大小,直接返回task的执行结果

 

casedirectResult: DirectTaskResult[_] => directResult

 

case IndirectTaskResult(blockId) =>

 

否则,result结果太大,通过BlockManager管理,通过blockManager拿到result的数据

 

logDebug("Fetching indirect task result for TID %s".format(tid))

 

DAGScheduler发送GettingResultEvent事件处理,

 

见下面TaskSchedulerImpl.handleTaskGettingResult函数

 

scheduler.handleTaskGettingResult(taskSetManager, tid)

 

得到task的执行结果

 

valserializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)

 

task执行完成,并拿结果失败,见上面的错误处理中的TaskResultLost部分。

 

if (!serializedTaskResult.isDefined) {

 

/* We won't be able to get the task result if the machine that ran the task failed

 

* between when the task ended and when we tried to fetch the result, or if the

 

* block manager had to flush the result. */

 

scheduler.handleFailedTask(

 

taskSetManager, tid, TaskState.FINISHED, Some(TaskResultLost))

 

return

 

}

 

task的执行结果进行deserialized操作。

 

valdeserializedResult = serializer.get().deserialize[DirectTaskResult[_]](

 

serializedTaskResult.get)

 

拿到执行结果,移出对应的blockid

 

sparkEnv.blockManager.master.removeBlock(blockId)

 

deserializedResult

 

}

 

result.metrics.resultSize = serializedData.limit()

 

见下面的TaskSchedulerImpl.handleSuccessfulTask处理函数。

 

scheduler.handleSuccessfulTask(taskSetManager, tid, result)

 

} catch {

 

casecnf: ClassNotFoundException =>

 

valloader = Thread.currentThread.getContextClassLoader

 

taskSetManager.abort("ClassNotFound with classloader: " + loader)

 

caseex: Throwable =>

 

taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))

 

}

 

}

 

})

 

}

 

 

 

TaskSchedulerImpl.handleTaskGettingResult函数:

 

 

 

def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {

 

taskSetManager.handleTaskGettingResult(tid)

 

}

 

taskSetManager

 

def handleTaskGettingResult(tid: Long) = {

 

valinfo = taskInfos(tid)

 

info.markGettingResult()

 

sched.dagScheduler.taskGettingResult(tasks(info.index), info)

 

}

 

通过DAGScheduler发起GettingResultEvent事件。

 

def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {

 

eventProcessActor ! GettingResultEvent(task, taskInfo)

 

}

 

 

 

GettingResultEvent事件的处理:其实就是打个酱油,无实际处理操作。

 

case GettingResultEvent(task, taskInfo) =>

 

listenerBus.post(SparkListenerTaskGettingResult(task, taskInfo))

 

 

 

 

 

TaskSchedulerImpl.handleSuccessfulTask处理函数:

 

def handleSuccessfulTask(

 

taskSetManager: TaskSetManager,

 

tid: Long,

 

taskResult: DirectTaskResult[_]) = synchronized {

 

taskSetManager.handleSuccessfulTask(tid, taskResult)

 

}

 

TastSetManager

 

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {

 

valinfo = taskInfos(tid)

 

valindex = info.index

 

info.markSuccessful()

 

running队列中移出此task

 

removeRunningTask(tid)

 

if (!successful(index)) {

 

logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(

 

tid, info.duration, info.host, tasksSuccessful, numTasks))

 

dagscheduler发送success消息,

 

sched.dagScheduler.taskEnded(

 

tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)

 

设置成功完成的task个数加一,同时在successful容器中设置task对应的运行状态为true,表示成功。

 

// Mark successful and stop if all the tasks have succeeded.

 

tasksSuccessful += 1

 

successful(index) = true

 

如果完成的task个数,达到task的总个数,完成此taskset,也就相当于完成了一个rdd

 

if (tasksSuccessful == numTasks) {

 

sched.taskSetFinished(this)

 

}

 

} else {

 

logInfo("Ignorning task-finished event for TID " + tid + " because task " +

 

index + " has already completed successfully")

 

}

 

}

 

 

 

DAGScheduler处理CompletionEventSuccess,,,,

 

case Success =>

 

logInfo("Completed " + task)

 

if (event.accumUpdates != null) {

 

Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted

 

}

 

把等待执行队列中移出此task

 

pendingTasks(stage) -= task

 

stageToInfos(stage).taskInfos += event.taskInfo -> event.taskMetrics

 

根据task的执行类型,处理两个类型的Task

 

taskmatch {

 

如果taskResultTask,表示不需要shuffle操作

 

casert: ResultTask[_, _] =>

 

resultStageToJob.get(stage) match {

 

case Some(job) =>

 

如果此执行的stageActiveJob中对应此taskpartition存储的finished标志为false,

 

if (!job.finished(rt.outputId)) {

 

设置task的完成标志为true

 

job.finished(rt.outputId) = true

 

job中完成的task个数加一,同时检查是否所有的task都完成,如果所有task都完成,

 

从相关的容器中移出此job与对应的stage.

 

job.numFinished += 1

 

// If the whole job has finished, remove it

 

if (job.numFinished == job.numPartitions) {

 

idToActiveJob -= stage.jobId

 

activeJobs -= job

 

resultStageToJob -= stage

 

markStageAsFinished(stage)

 

jobIdToStageIdsRemove(job.jobId)

 

listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))

 

}

 

调用ActiveJob内的JobWaiter.taskSucceeded函数,更新此task为完成,同时把result传入进行输出处理。

 

job.listener.taskSucceeded(rt.outputId, event.result)

 

}

 

case None =>

 

logInfo("Ignoring result from " + rt + " because its job has finished")

 

}

 

针对shuffletask的执行完成,处理流程:

 

casesmt: ShuffleMapTask =>

 

valstatus = event.result.asInstanceOf[MapStatus]

 

valexecId = status.location.executorId

 

logDebug("ShuffleMapTask finished on " + execId)

 

if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {

 

logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)

 

} else {

 

shuffleresult(MapStatus)写入到stageoutputLoc中。每添加一个会把numAvailableOutputs的值加一,

 

numAvailableOutputs的值==numPartitions的值时,表示shufflemap执行完成。

 

stage.addOutputLoc(smt.partitionId, status)

 

}

 

如果此stage还处在running状态,同时pendingTasks中所有的task已经处理完成

 

if (running.contains(stage) && pendingTasks(stage).isEmpty) {

 

更新stage的状态

 

markStageAsFinished(stage)

 

.......................................

 

 

 

此处表示shufflestage处理完成,把shuffleidstageoutputLocs注册到mapOutputTracker中。

 

把每一个shuffle taks执行的executorhost等信息,每一个task执行完成的大小。注册到mapoutput中。

 

每一个taskshufflewriter都会有shuffleid的信息,注册成功后,

 

下一个stage会根据mapoutputtracker中此shuffleid的信息读取数据。

 

mapOutputTracker.registerMapOutputs(

 

stage.shuffleDep.get.shuffleId,

 

stage.outputLocs.map(list => if (list.isEmpty) nullelse list.head).toArray,

 

changeEpoch = true)

 

}

 

clearCacheLocs()

 

stage中每一个partitionoutputLoc默认值为Nil,如果发现有partition的值为Nil,表示有task处理失败,

 

重新提交此stage.此时会把没有成功的task重新执行。

 

if (stage.outputLocs.exists(_ == Nil)) {

 

.........................................

 

submitStage(stage)

 

} else {

 

valnewlyRunnable = new ArrayBuffer[Stage]

 

for (stage <- waiting) {

 

logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))

 

}

 

此处检查下面未执行的所有的stage,如果stage(RDD)的上级shuffle依赖完成,

 

或者后面所有的stage不再有shufflestage的所有stage,拿到这些个stage.

 

for (stage <- waitingif getMissingParentStages(stage) == Nil) {

 

newlyRunnable += stage

 

}

 

执行此stage后面的所有可执行的stage,waiting中移出要执行的stage,

 

waiting --= newlyRunnable

 

running队列中添加要执行的新的stage.

 

running ++= newlyRunnable

 

for {

 

stage <- newlyRunnable.sortBy(_.id)

 

jobId <- activeJobForStage(stage)

 

} {

 

提交下一个stagetask分配与执行。

 

logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")

 

submitMissingTasks(stage, jobId)

 

}

 

}

 

}

 

}

 

 

 

JobWaiter.taskSucceeded函数,

 

task完成后的处理函数。

 

override def taskSucceeded(index: Int, result: Any): Unit = synchronized {

 

if (_jobFinished) {

 

thrownew UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")

 

}

 

通过resultHandler函数把结果进行处理。此函数是生成JobWaiter时传入

 

resultHandler(index, result.asInstanceOf[T])

 

把完成的task值加一

 

finishedTasks += 1

 

if (finishedTasks == totalTasks) {

 

如果完成的task个数等于所有的task的个数时,设置job的完成状态为true,并设置状态为JobSucceeded

 

如果设置为true,表示job执行完成,前面的等待执行完成结束等待。

 

_jobFinished = true

 

jobResult = JobSucceeded

 

this.notifyAll()

 

}

 

}

 

 

 

 

 

Task.runTask函数实现

 

Task的实现分为两类,

 

需要进行shuffle操作的ShuffleMapTask,

 

不需要进行shuffle操作的ResultTask.

 

 

 

ResulitTask.runTask

 

override def runTask(context: TaskContext): U = {

 

metrics = Some(context.taskMetrics)

 

try {

 

此处通过生成task实例时也就是DAGSchedulerrunJob时传入的function进行处理

 

比如在PairRDDFunction.saveAsHadoopDataset中定义的writeToFile函数

 

rdd.iterator中会根据不现的RDD的实现,执行其compute函数,

 

compute函数具体执行通过业务代码中定义的如map函数传入的定义的function进行执行,

 

func(context, rdd.iterator(split, context))

 

} finally {

 

context.executeOnCompleteCallbacks()

 

}

 

}

 

 

 

ShuffleMapTask.runTask

 

 

 

override def runTask(context: TaskContext): MapStatus = {

 

valnumOutputSplits = dep.partitioner.numPartitions

 

metrics = Some(context.taskMetrics)

 

 

 

valblockManager = SparkEnv.get.blockManager

 

valshuffleBlockManager = blockManager.shuffleBlockManager

 

varshuffle: ShuffleWriterGroup = null

 

varsuccess = false

 

 

 

try {

 

通过shuffleId拿到一个shuffle的写入实例

 

// Obtain all the block writers for shuffle blocks.

 

valser = SparkEnv.get.serializerManager.get(dep.serializerClass, SparkEnv.get.conf)

 

shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

 

执行rdd.iterator操作,生成Pair,也就是Product2,根据key重新shuffle到不同的文件中。

 

当所有的shuffletask完成后,会把此stage注册到 mapOutputTracker中,

 

等待下一个stage从中读取数据并执行其它操作,每一个shuffletask完成后会生成一个MapStatus实例,

 

此实例主要包含有shuffle执行的executorhost等信息,每一个task执行完成的大小。

 

具体的shuffle数据读取可参见后面的shufle分析.

 

// Write the map output to its associated buckets.

 

for (elem <- rdd.iterator(split, context)) {

 

valpair = elem.asInstanceOf[Product2[Any, Any]]

 

valbucketId = dep.partitioner.getPartition(pair._1)

 

shuffle.writers(bucketId).write(pair)

 

}

 

 

 

// Commit the writes. Get the size of each bucket block (total block size).

 

vartotalBytes = 0L

 

vartotalTime = 0L

 

valcompressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>

 

writer.commit()

 

writer.close()

 

valsize = writer.fileSegment().length

 

totalBytes += size

 

totalTime += writer.timeWriting()

 

MapOutputTracker.compressSize(size)

 

}

 

 

 

// Update shuffle metrics.

 

valshuffleMetrics = new ShuffleWriteMetrics

 

shuffleMetrics.shuffleBytesWritten = totalBytes

 

shuffleMetrics.shuffleWriteTime = totalTime

 

metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

 

 

 

success = true

 

new MapStatus(blockManager.blockManagerId, compressedSizes)

 

} catch { casee: Exception =>

 

// If there is an exception from running the task, revert the partial writes

 

// and throw the exception upstream to Spark.

 

if (shuffle != null && shuffle.writers != null) {

 

for (writer <- shuffle.writers) {

 

writer.revertPartialWrites()

 

writer.close()

 

}

 

}

 

throwe

 

} finally {

 

// Release the writers back to the shuffle block manager.

 

if (shuffle != null && shuffle.writers != null) {

 

shuffle.releaseWriters(success)

 

}

 

// Execute the callbacks on task completion.

 

context.executeOnCompleteCallbacks()

 

}

 

}

 

0
0
分享到:
评论

相关推荐

    MapReduceV1:TaskTracker端启动Task流程分析

    我们基于Hadoop1.2.1源码分析MapReduceV1的处理流程。TaskTracker周期性地向JobTracker发送心跳报告,在RPC调用返回结果后,解析结果得到JobTracker下发的运行Task的指令,即LaunchTaskAction,就会在TaskTracker...

    HiveDriver源码执行流程分析

    接着上一篇来说执行入口的分析,CliDriver最终将用户指令command提交给了Driver的run方法(针对常用查询语句而言),在这里用户的command将会被编译,优化并生成MapReduce任务进行执行。所以Driver也是Hive的核心,...

    4.利用线程池执行Task.zip

    网络爬虫基础 网络爬虫的概述和原理 ...Python爬虫库的介绍 数据抓取与解析 ...JSON和XML数据的解析 动态网页爬取技术(如使用Selenium等) 反爬机制与应对策略 ...爬虫和IO项目开发流程与实践经验分享

    task管理系统.zip

    业务流程管理模块:设计、执行、监控和优化业务流程,确保各项任务按照预定规则高效运转。 决策支持模块:基于数据分析结果,为管理者提供直观的可视化报告,辅助其进行科学决策。 用户界面与交互模块:提供...

    MapReduceV1:JobTracker端Job/Task数据结构

    在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。在编写MapReduce程序时,我们是以Job为单位进行编程处理,一...

    Flink源码解析.zip

    6、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动 7、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程 8、Flink 源码解析 —— 分析 Streaming WordCount 程序...

    交通流量多模型预测python实现源码.zip

    支撑了基础库中的任务执行流程,主要用于执行任务 列出可执行任务 python -m tasks list 执行任务操作 python -m tasks start &lt;task_name&gt; &lt;task_args&gt; ... 绘制图表 绘制流量预测对比图 分时段交通流量预测精度...

    Qualcomm手机—开机过程【—大揭密 】(中文版本).rar

    一、开机的简要流程分析 Qualcomm 的平台软件支持两种启动方式:一种是Nor Flash 启动方式,另 外一种就 是 Nand Flash 启动方式。Nor Flash 启动方式就相当于硬件直接找到一个入口点 开始执行代码,相比较而言会...

    深入分析Linux内核源码.chm

    6.8 进程的创建和执行 第七章 进程间通信 7.1 管道 7.2 信号(signal) 7.3 System V 的IPC机制 第八章 虚拟文件系统 8.1 概述 8.2 VFS中的数据结构 8.3 高速缓存 8.4 文件系统的注册、安装与拆卸 8.5 限额机制 8.6 ...

    Spark 学习之路,包含 Spark Core,Spark SQL,Spark Streaming,Spark mllib 学

    # 作业执行源码分析 当我们的代码执行到了 action(行动)操作之后就会触发作业运行。在 Spark 调度中最重要的是 DAGScheduler 和 TaskScheduler 两个调度器,其中,DAGScheduler 负责任务的逻辑调度, 将作业拆分为...

    深入分析Linux内核源码

    13.1 初始化流程 13.1.1 系统加电或复位 13.1.2 BIOS启动 13.1.3 Boot Loader 13.1.4 操作系统的初始化 13.2 初始化的任务 13.2.1 处理器对初始化的影响 13.2.2 其他硬件设备对处理器的影响 13.3 Linux 的...

    Hadoop从入门到上手企业开发

    近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 课程目录 000 上课方式和课程大纲介绍 ...065 回顾MapReduce执行过程以及MapReduce核心 066 Hadoop MapReduce框架数据类型讲解 067

    Activiti6.0.0最新源码

    在运行流程实例以及执行实例中添加了开始时间以及启动流程实例的人字段。在ru_task中增加了任务的认领时间(claim time)。 妥善解决数据库架构 (oracle/postgres) 的使用。 修复历史数据捕获。 大量重构 Activiti ...

    java线程池概念.txt

     ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)  ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 3:对线程池的基本使用及其部分源码的分析...

    Activiti6.0教程例子下载

    5. Execution:执行计划,流程实例和流程执行中的所有节点都是Execution,如UserTask、ServiceTask等。 2.2.2. 服务接口 1. ProcessEngine:流程引擎的抽象,通过它我们可以获得我们需要的一切服务。 2. ...

    conductor-ng-ui:Conductor NG UI是用于Conductor工作流和工作流执行管理的Angular UI

    导体NG UI任务和工作流程定义管理工作流程执行管理分析仪表盘 开始之前,有关Conductor Boot的详细信息,请参阅概述这个想法是为了构建一个单一的生产级Angular Web UI,以使用户与Conductor API进行交互,适用于...

    新建文本文档.txt

    刚开始接触这个STM32Cube+FreeRTOS的时候也是遇到这个问题,这个问题的原因是没有足够的内存分配给栈空间,以下是我的分析过程: 1.出现了调度器无法调度的问题,追查调度函数osKernelStart和vTaskStartScheduler...

    操作系统的期末课程设计

    其任务可以在多任务调度开始前建立,也可以在其它任务的执行过程中被建立。在开始多任务调度(即调用OSStart())前,用户必须建立至少一个任务。任务不能由中断服务程序(ISR)来建立。通过对代码的分析可知,...

Global site tag (gtag.js) - Google Analytics