Task的执行过程分析
Task的执行通过Worker启动时生成的Executor实例进行,
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
通过executor实例的launchTask启动task的执行操作。
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
valtr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
生成TaskRunner线程,把task与当前的Wroker的启动的executorBackend传入,
on yarn模式为CoarseGrainedExecutorBackend.
通过threadPool线程池执行生成TaskRunner线程。
TaskRunner.run函数:
用于执行task任务的线程
overridedef run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
valstartTime = System.currentTimeMillis()
SparkEvn后面在进行分析。
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
valser = SparkEnv.get.closureSerializer.newInstance()
logInfo("Running task ID " + taskId)
通过execBackend更新此task的状态。设置task的状态为RUNNING.向master发送StatusUpdate事件。
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
varattemptedTask: Option[Task[Any]] = None
vartaskStart: Long = 0
def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
valstartGCTime = gcTime
try {
SparkEnv.set(env)
Accumulators.clear()
解析出task的资源信息。包括要执行的jar,其它资源,task定义信息
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
更新资源信息,并把task执行的jar更新到当前Thread的ClassLoader中。
updateDependencies(taskFiles, taskJars)
通过SparkEnv中配置的Serialize实现对task定义进行反serialize,得到Task实例。
Task的具体实现为ShuffleMapTask或者ResultTask
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
如果killed的值为true,不执行当前task任务,进入catch处理。
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw TaskKilledException
}
attemptedTask = Some(task)
logDebug("Task " + taskId +"'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
生成TaskContext实例,通过Task.runTask执行task的任务,等待task执行完成。
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
valvalue = task.run(taskId.toInt)
valtaskFinish = System.currentTimeMillis()
此时task执行结束,检查如果task是被killed的结果,进入catch处理。
// If the task has been killed, let's fail it.
if (task.killed) {
throw TaskKilledException
}
对task执行的返回结果进行serialize操作。
valresultSer = SparkEnv.get.serializer.newInstance()
valbeforeSerialization = System.currentTimeMillis()
valvalueBytes = resultSer.serialize(value)
valafterSerialization = System.currentTimeMillis()
发送监控指标
for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
}
valaccumUpdates = Accumulators.values
把Task的返回结果生成DirectTaskResult实例。并对其进行serialize操作。
valdirectResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
valserializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
检查task result的大小是否超过了akka的发送消息大小,
如果是通过BlockManager来管理结果,设置RDD的存储级别为MEMORY与DISK,否则表示没有达到actor消息大小,
直接使用TaskResult,此部分信息主要是需要通过状态更新向Scheduler向送StatusUpdate事件调用。
valserializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
logInfo("Storing result for " + taskId + " in local BlockManager")
valblockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
ser.serialize(new IndirectTaskResult[Any](blockId))
} else {
logInfo("Sending result for " + taskId + " directly to driver")
serializedDirectResult
}
}
通过execBackend更新此task的状态。设置task的状态为FINISHED.向master发送StatusUpdate事件。
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
} catch {
出现异常,发送FAILED事件。
caseffe: FetchFailedException => {
valreason = ffe.toTaskEndReason
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}
case TaskKilledException => {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}
caset: Throwable => {
valserviceTime = (System.currentTimeMillis() - taskStart).toInt
valmetrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
}
valreason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
//System.exit(1)
}
} finally {
从shuffleMemoryMap中移出此线程对应的shuffle的内存空间
// TODO: Unregister shuffle memory only for ResultTask
valshuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
从runningTasks中移出此task
runningTasks.remove(taskId)
}
}
}
Task执行过程的状态更新
ExecutorBackend.statusUpdate
on yarn模式实现类CoarseGrainedExecutorBackend,通过master的actor发送StatusUpdate事件。
overridedef statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
master 中的ExecutorBackend处理状态更新操作:
实现类:CoarseGrainedSchedulerBackend.DriverActor
case StatusUpdate(executorId, taskId, state, data) =>
通过TaskSchedulerImpl的statusUpdate处理状态更新。
scheduler.statusUpdate(taskId, state, data.value)
如果Task状态为完成状态,完成状态包含(FINISHED, FAILED, KILLED, LOST)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
每一个task占用一个cpu core,此时task完成,把可用的core值加一
freeCores(executorId) += 1
在此executor上接着执行其于的task任务,此部分可参见scheduler调度过程分析中的部分说明。
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
valmsg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
logWarning(msg.format(taskId, state, sender, executorId))
}
}
TaskSchedulerImpl.statusUpdate函数处理流程
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
varfailedExecutor: Option[String] = None
synchronized {
try {
如果Task的状态传入为Task的执行丢失,同时task在executor列表中存在
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
得到此task执行的worker所属的executorID,
// We lost this entire executor, so remember that it's gone
valexecId = taskIdToExecutorId(tid)
如果此executor是active的Executor,执行scheduler的executorLost操作。
包含TaskSetManager,会执行TaskSetManager.executorLost操作.
设置当前的executor为failedExecutor,共函数最后使用。
if (activeExecutorIds.contains(execId)) {
removeExecutor(execId)
failedExecutor = Some(execId)
}
}
taskIdToTaskSetId.get(tid) match {
case Some(taskSetId) =>
如果task状态是完成状态,非RUNNING状态。移出对应的容器中的值
if (TaskState.isFinished(state)) {
taskIdToTaskSetId.remove(tid)
if (taskSetTaskIds.contains(taskSetId)) {
taskSetTaskIds(taskSetId) -= tid
}
taskIdToExecutorId.remove(tid)
}
activeTaskSets.get(taskSetId).foreach { taskSet =>
如果task是成功完成,从TaskSet中移出此task,同时通过TaskResultGetter获取数据。
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} elseif (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
task任务执行失败的处理部分:
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logInfo("Ignoring update with state %s from TID %s because its task set is gone"
.format(state, tid))
}
} catch {
casee: Exception => logError("Exception in statusUpdate", e)
}
}
如果有failed的worker,通过dagScheduler处理此executor.
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor != None) {
dagScheduler.executorLost(failedExecutor.get)
发起task执行的分配与任务执行操作。
backend.reviveOffers()
}
}
TaskStatus.LOST状态,同时executor在activeExecutorIds中
TaskStatus的状态为LOST时,同时executor是活动的executor(也就是有过执行task的情况)
privatedef removeExecutor(executorId: String) {
从activeExecutorIds中移出此executor
activeExecutorIds -= executorId
得到此executor对应的worker的host
valhost = executorIdToHost(executorId)
取出host对应的所有executor,并移出当前的executor
valexecs = executorsByHost.getOrElse(host, new HashSet)
execs -= executorId
if (execs.isEmpty) {
executorsByHost -= host
}
从executor对应的host容器中移出此executor
executorIdToHost -= executorId
此处主要是去执行TaskSetManager.executorLost函数。
rootPool.executorLost(executorId, host)
}
TaskSetManager.executorLost函数:
此函数主要处理executor导致task丢失的情况,把executor上的task重新添加到pending的tasks列表中
overridedef executorLost(execId: String, host: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
// task that used to have locations on only this host might now go to the no-prefs list. Note
// that it's okay if we add a task to the same queue twice (if it had multiple preferred
// locations), because findTaskFromList will skip already-running tasks.
重新生成此TaskSet中的pending队列,因为当前executor的实例被移出,需要重新生成。
for (index <- getPendingTasksForExecutor(execId)) {
addPendingTask(index, readding=true)
}
for (index <- getPendingTasksForHost(host)) {
addPendingTask(index, readding=true)
}
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
如果当前的RDD是shuffle的rdd,
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
for ((tid, info) <- taskInfosifinfo.executorId == execId) {
valindex = taskInfos(tid).index
if (successful(index)) {
successful(index) = false
copiesRunning(index) -= 1
tasksSuccessful -= 1
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
通过DAGScheduler发送CompletionEvent处理事件,事件类型为Resubmitted,
sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)
}
}
}
如果task还处于running状态,同时此task在lost的executor上运行,
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfosifinfo.running && info.executorId == execId) {
设置task的failed值为true,移出此task的running列表中的值,重新添加task到pendingtasks队列中。
handleFailedTask(tid, TaskState.FAILED, None)
}
}
DAGScheduler处理CompletionEvent事件。
...........................
casecompletion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)
.........................
case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
pendingTasks(stage) += task
(TaskState.FAILED, TaskState.KILLED, TaskState.LOST)状态
.........................
} elseif (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
把task从running容器中移出
taskSet.removeRunningTask(tid)
此函数主要是解析出出错的信息。并通过TaskSchedulerImpl.handleFailedTask处理exception
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
TaskSchedulerImpl.handleFailedTask函数:
def handleFailedTask(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
reason: Option[TaskEndReason]) = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
如果task不是被KILLED掉的task,重新发起task的分配与执行操作。
if (taskState != TaskState.KILLED) {
// Need to revive offers again now that the task set manager state has been updated to
// reflect failed tasks that need to be re-run.
backend.reviveOffers()
}
}
TaskSetManager.handleFailedTask函数流程
TaskSetManager.handleFailedTask,函数,处理task执行的exception信息。
def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {
valinfo = taskInfos(tid)
if (info.failed) {
return
}
removeRunningTask(tid)
valindex = info.index
info.markFailed()
varfailureReason = "unknown"
if (!successful(index)) {
logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
copiesRunning(index) -= 1
如果是通过TaskSetManager.executorLost函数发起的此函数调用(Task.LOST),下面的case部分不会执行,
否则是task的执行exception情况,也就是状态更新中非Task.LOST状态时。
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
reason.foreach {
casefetchFailed: FetchFailed =>
读取失败,移出所有此taskset的task执行。并从scheduler中移出此taskset的调度,不再执行下面流程
logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
sched.dagScheduler.taskEnded(tasks(index), fetchFailed, null, null, info, null)
successful(index) = true
tasksSuccessful += 1
sched.taskSetFinished(this)
removeAllRunningTasks()
return
case TaskKilled =>
task被kill掉,移出此task,同时不再执行下面流程
logWarning("Task %d was killed.".format(tid))
sched.dagScheduler.taskEnded(tasks(index), reason.get, null, null, info, null)
return
caseef: ExceptionFailure =>
sched.dagScheduler.taskEnded(
tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
if (ef.className == classOf[NotSerializableException].getName()) {
// If the task result wasn't rerializable, there's no point in trying to re-execute it.
logError("Task %s:%s had a not serializable result: %s; not retrying".format(
taskSet.id, index, ef.description))
abort("Task %s:%s had a not serializable result: %s".format(
taskSet.id, index, ef.description))
return
}
valkey = ef.description
failureReason = "Exception failure: %s".format(ef.description)
valnow = clock.getTime()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
recentExceptions(key) = (0, now)
(true, 0)
} else {
recentExceptions(key) = (dupCount + 1, printTime)
(false, dupCount + 1)
}
} else {
recentExceptions(key) = (0, now)
(true, 0)
}
}
if (printFull) {
vallocs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logWarning("Loss was due to %s\n%s\n%s".format(
ef.className, ef.description, locs.mkString("\n")))
} else {
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case TaskResultLost =>
failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
logWarning(failureReason)
sched.dagScheduler.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
case _ => {}
}
重新把task添加到pending的执行队列中,同时如果状态非KILLED的状态,设置并检查是否达到重试的最大次数
// On non-fetch failures, re-enqueue the task as pending for a max number of retries
addPendingTask(index)
if (state != TaskState.KILLED) {
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
logError("Task %s:%d failed %d times; aborting job".format(
taskSet.id, index, maxTaskFailures))
abort("Task %s:%d failed %d times (most recent failure: %s)".format(
taskSet.id, index, maxTaskFailures, failureReason))
}
}
} else {
logInfo("Ignoring task-lost event for TID " + tid +
" because task " + index + " is already finished")
}
}
DAGScheduler处理taskEnded流程:
def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics) {
eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
}
处理CompletionEvent事件:
casecompletion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)
DAGScheduler.handleTaskCompletion
读取失败的case,
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
valfailedStage = stageIdToStage(task.stageId)
running -= failedStage
failed += failedStage
..............................
// Mark the map whose fetch failed as broken in the map stage
valmapStage = shuffleToMapStage(shuffleId)
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}
...........................
failed += mapStage
// Remember that a fetch failed now; this is used to resubmit the broken
// stages later, after a small wait (to give other tasks the chance to fail)
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
把stage中可执行的partition中对应的executorid的location全部移出。
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
}
case ExceptionFailure(className, description, stackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures
case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
TaskStatus.FINISHED状态
此状态表示task正常完成,
if (state == TaskState.FINISHED) {
移出taskSet中的running队列中移出此task
taskSet.removeRunningTask(tid)
获取task的响应数据。
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
TaskResultGetter.enqueueSuccessfulTask函数:
def enqueueSuccessfulTask(
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(new Runnable {
overridedef run() {
try {
从响应的结果中得到数据,需要先执行deserialize操作。
valresult = serializer.get().deserialize[TaskResult[_]](serializedData) match {
如果result的结果小于akka的actor传输的大小,直接返回task的执行结果
casedirectResult: DirectTaskResult[_] => directResult
case IndirectTaskResult(blockId) =>
否则,result结果太大,通过BlockManager管理,通过blockManager拿到result的数据
logDebug("Fetching indirect task result for TID %s".format(tid))
给DAGScheduler发送GettingResultEvent事件处理,
见下面TaskSchedulerImpl.handleTaskGettingResult函数
scheduler.handleTaskGettingResult(taskSetManager, tid)
得到task的执行结果
valserializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
task执行完成,并拿结果失败,见上面的错误处理中的TaskResultLost部分。
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, Some(TaskResultLost))
return
}
对task的执行结果进行deserialized操作。
valdeserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
拿到执行结果,移出对应的blockid
sparkEnv.blockManager.master.removeBlock(blockId)
deserializedResult
}
result.metrics.resultSize = serializedData.limit()
见下面的TaskSchedulerImpl.handleSuccessfulTask处理函数。
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
casecnf: ClassNotFoundException =>
valloader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
caseex: Throwable =>
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
}
}
})
}
TaskSchedulerImpl.handleTaskGettingResult函数:
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
taskSetManager.handleTaskGettingResult(tid)
}
taskSetManager中
def handleTaskGettingResult(tid: Long) = {
valinfo = taskInfos(tid)
info.markGettingResult()
sched.dagScheduler.taskGettingResult(tasks(info.index), info)
}
通过DAGScheduler发起GettingResultEvent事件。
def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
eventProcessActor ! GettingResultEvent(task, taskInfo)
}
对GettingResultEvent事件的处理:其实就是打个酱油,无实际处理操作。
case GettingResultEvent(task, taskInfo) =>
listenerBus.post(SparkListenerTaskGettingResult(task, taskInfo))
TaskSchedulerImpl.handleSuccessfulTask处理函数:
def handleSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]) = synchronized {
taskSetManager.handleSuccessfulTask(tid, taskResult)
}
TastSetManager中
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
valinfo = taskInfos(tid)
valindex = info.index
info.markSuccessful()
从running队列中移出此task
removeRunningTask(tid)
if (!successful(index)) {
logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
tid, info.duration, info.host, tasksSuccessful, numTasks))
向dagscheduler发送success消息,
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
设置成功完成的task个数加一,同时在successful容器中设置task对应的运行状态为true,表示成功。
// Mark successful and stop if all the tasks have succeeded.
tasksSuccessful += 1
successful(index) = true
如果完成的task个数,达到task的总个数,完成此taskset,也就相当于完成了一个rdd
if (tasksSuccessful == numTasks) {
sched.taskSetFinished(this)
}
} else {
logInfo("Ignorning task-finished event for TID " + tid + " because task " +
index + " has already completed successfully")
}
}
DAGScheduler处理CompletionEvent的Success,,,,
case Success =>
logInfo("Completed " + task)
if (event.accumUpdates != null) {
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
}
把等待执行队列中移出此task
pendingTasks(stage) -= task
stageToInfos(stage).taskInfos += event.taskInfo -> event.taskMetrics
根据task的执行类型,处理两个类型的Task
taskmatch {
如果task是ResultTask,表示不需要shuffle操作
casert: ResultTask[_, _] =>
resultStageToJob.get(stage) match {
case Some(job) =>
如果此执行的stage的ActiveJob中对应此task的partition存储的finished标志为false,
if (!job.finished(rt.outputId)) {
设置task的完成标志为true
job.finished(rt.outputId) = true
把job中完成的task个数加一,同时检查是否所有的task都完成,如果所有task都完成,
从相关的容器中移出此job与对应的stage.
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
idToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
jobIdToStageIdsRemove(job.jobId)
listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
}
调用ActiveJob内的JobWaiter.taskSucceeded函数,更新此task为完成,同时把result传入进行输出处理。
job.listener.taskSucceeded(rt.outputId, event.result)
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
}
针对shuffle的task的执行完成,处理流程:
casesmt: ShuffleMapTask =>
valstatus = event.result.asInstanceOf[MapStatus]
valexecId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
把shuffle的result(MapStatus)写入到stage的outputLoc中。每添加一个会把numAvailableOutputs的值加一,
当numAvailableOutputs的值==numPartitions的值时,表示shuffle的map执行完成。
stage.addOutputLoc(smt.partitionId, status)
}
如果此stage还处在running状态,同时pendingTasks中所有的task已经处理完成
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
更新stage的状态
markStageAsFinished(stage)
.......................................
此处表示shuffle的stage处理完成,把shuffleid与stage的outputLocs注册到mapOutputTracker中。
把每一个shuffle taks执行的executor与host等信息,每一个task执行完成的大小。注册到mapoutput中。
每一个task的shuffle的writer都会有shuffleid的信息,注册成功后,
下一个stage会根据mapoutputtracker中此shuffleid的信息读取数据。
mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) nullelse list.head).toArray,
changeEpoch = true)
}
clearCacheLocs()
stage中每一个partition的outputLoc默认值为Nil,如果发现有partition的值为Nil,表示有task处理失败,
重新提交此stage.此时会把没有成功的task重新执行。
if (stage.outputLocs.exists(_ == Nil)) {
.........................................
submitStage(stage)
} else {
valnewlyRunnable = new ArrayBuffer[Stage]
for (stage <- waiting) {
logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
}
此处检查下面未执行的所有的stage,如果stage(RDD)的上级shuffle依赖完成,
或者后面所有的stage不再有shuffle的stage的所有stage,拿到这些个stage.
for (stage <- waitingif getMissingParentStages(stage) == Nil) {
newlyRunnable += stage
}
执行此stage后面的所有可执行的stage,把waiting中移出要执行的stage,
waiting --= newlyRunnable
在running队列中添加要执行的新的stage.
running ++= newlyRunnable
for {
stage <- newlyRunnable.sortBy(_.id)
jobId <- activeJobForStage(stage)
} {
提交下一个stage的task分配与执行。
logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
submitMissingTasks(stage, jobId)
}
}
}
}
JobWaiter.taskSucceeded函数,
task完成后的处理函数。
override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
if (_jobFinished) {
thrownew UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
}
通过resultHandler函数把结果进行处理。此函数是生成JobWaiter时传入
resultHandler(index, result.asInstanceOf[T])
把完成的task值加一
finishedTasks += 1
if (finishedTasks == totalTasks) {
如果完成的task个数等于所有的task的个数时,设置job的完成状态为true,并设置状态为JobSucceeded
如果设置为true,表示job执行完成,前面的等待执行完成结束等待。
_jobFinished = true
jobResult = JobSucceeded
this.notifyAll()
}
}
Task.runTask函数实现
Task的实现分为两类,
需要进行shuffle操作的ShuffleMapTask,
不需要进行shuffle操作的ResultTask.
ResulitTask.runTask
override def runTask(context: TaskContext): U = {
metrics = Some(context.taskMetrics)
try {
此处通过生成task实例时也就是DAGScheduler的runJob时传入的function进行处理
比如在PairRDDFunction.saveAsHadoopDataset中定义的writeToFile函数
rdd.iterator中会根据不现的RDD的实现,执行其compute函数,
而compute函数具体执行通过业务代码中定义的如map函数传入的定义的function进行执行,
func(context, rdd.iterator(split, context))
} finally {
context.executeOnCompleteCallbacks()
}
}
ShuffleMapTask.runTask
override def runTask(context: TaskContext): MapStatus = {
valnumOutputSplits = dep.partitioner.numPartitions
metrics = Some(context.taskMetrics)
valblockManager = SparkEnv.get.blockManager
valshuffleBlockManager = blockManager.shuffleBlockManager
varshuffle: ShuffleWriterGroup = null
varsuccess = false
try {
通过shuffleId拿到一个shuffle的写入实例
// Obtain all the block writers for shuffle blocks.
valser = SparkEnv.get.serializerManager.get(dep.serializerClass, SparkEnv.get.conf)
shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
执行rdd.iterator操作,生成Pair,也就是Product2,根据key重新shuffle到不同的文件中。
当所有的shuffle的task完成后,会把此stage注册到 mapOutputTracker中,
等待下一个stage从中读取数据并执行其它操作,每一个shuffle的task完成后会生成一个MapStatus实例,
此实例主要包含有shuffle执行的executor与host等信息,每一个task执行完成的大小。
具体的shuffle数据读取可参见后面的shufle分析.
// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, context)) {
valpair = elem.asInstanceOf[Product2[Any, Any]]
valbucketId = dep.partitioner.getPartition(pair._1)
shuffle.writers(bucketId).write(pair)
}
// Commit the writes. Get the size of each bucket block (total block size).
vartotalBytes = 0L
vartotalTime = 0L
valcompressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
valsize = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
}
// Update shuffle metrics.
valshuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
success = true
new MapStatus(blockManager.blockManagerId, compressedSizes)
} catch { casee: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
}
}
throwe
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && shuffle.writers != null) {
shuffle.releaseWriters(success)
}
// Execute the callbacks on task completion.
context.executeOnCompleteCallbacks()
}
}
相关推荐
我们基于Hadoop1.2.1源码分析MapReduceV1的处理流程。TaskTracker周期性地向JobTracker发送心跳报告,在RPC调用返回结果后,解析结果得到JobTracker下发的运行Task的指令,即LaunchTaskAction,就会在TaskTracker...
接着上一篇来说执行入口的分析,CliDriver最终将用户指令command提交给了Driver的run方法(针对常用查询语句而言),在这里用户的command将会被编译,优化并生成MapReduce任务进行执行。所以Driver也是Hive的核心,...
网络爬虫基础 网络爬虫的概述和原理 ...Python爬虫库的介绍 数据抓取与解析 ...JSON和XML数据的解析 动态网页爬取技术(如使用Selenium等) 反爬机制与应对策略 ...爬虫和IO项目开发流程与实践经验分享
业务流程管理模块:设计、执行、监控和优化业务流程,确保各项任务按照预定规则高效运转。 决策支持模块:基于数据分析结果,为管理者提供直观的可视化报告,辅助其进行科学决策。 用户界面与交互模块:提供...
在MapReduce程序运行的过程中,JobTracker端会在内存中维护一些与Job/Task运行相关的信息,了解这些内容对分析MapReduce程序执行流程的源码会非常有帮助。在编写MapReduce程序时,我们是以Job为单位进行编程处理,一...
6、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动 7、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程 8、Flink 源码解析 —— 分析 Streaming WordCount 程序...
支撑了基础库中的任务执行流程,主要用于执行任务 列出可执行任务 python -m tasks list 执行任务操作 python -m tasks start <task_name> <task_args> ... 绘制图表 绘制流量预测对比图 分时段交通流量预测精度...
一、开机的简要流程分析 Qualcomm 的平台软件支持两种启动方式:一种是Nor Flash 启动方式,另 外一种就 是 Nand Flash 启动方式。Nor Flash 启动方式就相当于硬件直接找到一个入口点 开始执行代码,相比较而言会...
6.8 进程的创建和执行 第七章 进程间通信 7.1 管道 7.2 信号(signal) 7.3 System V 的IPC机制 第八章 虚拟文件系统 8.1 概述 8.2 VFS中的数据结构 8.3 高速缓存 8.4 文件系统的注册、安装与拆卸 8.5 限额机制 8.6 ...
# 作业执行源码分析 当我们的代码执行到了 action(行动)操作之后就会触发作业运行。在 Spark 调度中最重要的是 DAGScheduler 和 TaskScheduler 两个调度器,其中,DAGScheduler 负责任务的逻辑调度, 将作业拆分为...
13.1 初始化流程 13.1.1 系统加电或复位 13.1.2 BIOS启动 13.1.3 Boot Loader 13.1.4 操作系统的初始化 13.2 初始化的任务 13.2.1 处理器对初始化的影响 13.2.2 其他硬件设备对处理器的影响 13.3 Linux 的...
近百节课视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 课程目录 000 上课方式和课程大纲介绍 ...065 回顾MapReduce执行过程以及MapReduce核心 066 Hadoop MapReduce框架数据类型讲解 067
在运行流程实例以及执行实例中添加了开始时间以及启动流程实例的人字段。在ru_task中增加了任务的认领时间(claim time)。 妥善解决数据库架构 (oracle/postgres) 的使用。 修复历史数据捕获。 大量重构 Activiti ...
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 3:对线程池的基本使用及其部分源码的分析...
5. Execution:执行计划,流程实例和流程执行中的所有节点都是Execution,如UserTask、ServiceTask等。 2.2.2. 服务接口 1. ProcessEngine:流程引擎的抽象,通过它我们可以获得我们需要的一切服务。 2. ...
导体NG UI任务和工作流程定义管理工作流程执行管理分析仪表盘 开始之前,有关Conductor Boot的详细信息,请参阅概述这个想法是为了构建一个单一的生产级Angular Web UI,以使用户与Conductor API进行交互,适用于...
刚开始接触这个STM32Cube+FreeRTOS的时候也是遇到这个问题,这个问题的原因是没有足够的内存分配给栈空间,以下是我的分析过程: 1.出现了调度器无法调度的问题,追查调度函数osKernelStart和vTaskStartScheduler...
其任务可以在多任务调度开始前建立,也可以在其它任务的执行过程中被建立。在开始多任务调度(即调用OSStart())前,用户必须建立至少一个任务。任务不能由中断服务程序(ISR)来建立。通过对代码的分析可知,...