spark中很重要的一点就是task具体分配到哪个excutor上执行,如果分配不合理,将会消耗很多额外的资源。例如:executor1用flume receiver接收到数据,并将数据保存到block1上,excutor2用flume receiver接收到数据,并将数据保存到block2上。RDD将有两个patition,将对应产生两个task. task1处理block1,task2处理block2.如果将 task1分配到excutor2上去处理,那么excutor2将需要从excutor1上拿到block1,然后再计算,这样就加重 了数据传输的消耗。那么spark是如何来选择的呢?spark是通过RDD的getPreferredLocations来确定某一个partition期望分配到哪个executor的。下面这个流程图中显示在创建Task的时候会先调用getPreferredLocations()这个函数获取当前patition的期望运行的位置,在addPendingTask()函数中预先将task加到各个列表中
以下是具体的代码,以及例子
//通过resourceOffers来为每个work确定需要提交的task。 def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { var launchedTask = false // TaskLocality.values is PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { do { launchedTask = false for (i <- 0 until offers.size) { val execId = offers(i).executorId val host = offers(i).host for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetId(tid) = taskSet.taskSet.id taskIdToExecutorId(tid) = execId activeExecutorIds += execId executorsByHost(host) += execId availableCpus(i) -= 1 launchedTask = true } } } while (launchedTask) } }
//按照传入的maxLocality和AllowedLocalityLevel for current time来确定allowedLocality def resourceOffer( execId: String, host: String, availableCpus: Int, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { if (!isZombie && availableCpus >= CPUS_PER_TASK) { val curTime = clock.getTime() // get the allowed locality level for current time var allowedLocality = getAllowedLocalityLevel(curTime) if (allowedLocality > maxLocality) { allowedLocality = maxLocality // We're not allowed to search for farther-away tasks } findTask(execId, host, allowedLocality) match { case Some((index, taskLocality)) => { // Found a task; do some bookkeeping and return a task description … return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) } case _ => } } None }
private def findTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) { return Some((index, TaskLocality.PROCESS_LOCAL)) } if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { for (index <- findTaskFromList(getPendingTasksForHost(host))) { return Some((index, TaskLocality.NODE_LOCAL)) } } if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) index <- findTaskFromList(getPendingTasksForRack(rack)) } { return Some((index, TaskLocality.RACK_LOCAL)) } } // Look for no-pref tasks after rack-local tasks since they can run anywhere. for (index <- findTaskFromList(pendingTasksWithNoPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL)) } if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { for (index <- findTaskFromList(allPendingTasks)) { return Some((index, TaskLocality.ANY)) } } // Finally, if all else has failed, find a speculative task, speculative task is some task that run slowly, then we may consider to run this task on other executor of other host findSpeculativeTask(execId, host, locality) }
具体的示例
Task 1 – 50 prefer Location is excutor2
Two work: excutor1 core1 core2
excutor2 core3 core4
Schedule task every 1 second
TaskSetManager
myLocalityLevels = Process_Local, Node_local, Any
Locality Wait = 3s 3s 0s
1s: localityForCurrentTime= process_local
maxLocality = PROCESS_LOCAL allowedLocality = PROCESS_LOCAL excutor1 = none excutor2=task1
maxLocality = NODE_LOCAL allowedLocality = PROCESS_LOCAL excutor1 = none excutor2=task2
maxLocality = RACK_LOCAL allowedLocality = PROCESS_LOCAL excutor1 = none excutor2= none (because core size is 2)
maxLocality = ANY allowedLocality = PROCESS_LOCAL excutor1 = none excutor2= none (because core size is 2)
If all task assign to excutor2 finished
2s: localityForCurrentTime = process_local
maxLocality = PROCESS_LOCAL allowedLocality = PROCESS_LOCAL excutor1 = none excutor2=task3
maxLocality = NODE_LOCAL allowedLocality = PROCESS_LOCAL excutor1 = none excutor2=task4
maxLocality = RACK_LOCAL allowedLocality = PROCESS_LOCAL excutor1 = none excutor2= none (because core size is 2)
maxLocality = ANY allowedLocality = PROCESS_LOCAL excutor1 = none excutor2= none (because core size is 2)
If all task assign to excutor2 finished
3s: localityForCurrentTime = Node_local (because localityWait for Process_Local is 3s)
maxLocality = PROCESS_LOCAL allowedLocality = PROCESS_LOCAL excutor1 = none excutor2=task5
maxLocality = NODE_LOCAL allowedLocality = Node_local excutor1 = none excutor2=task6
maxLocality = RACK_LOCAL allowedLocality = Node_local excutor1 = none excutor2= none (because core size is 2)
maxLocality = ANY allowedLocality = Node_local excutor1 = none excutor2= none (because core size is 2)
…
6s: localityForCurrentTime = Any (because localityWait for Node_local is 3s)
maxLocality = PROCESS_LOCAL allowedLocality = PROCESS_LOCAL excutor1 = none excutor2=task11
maxLocality = NODE_LOCAL allowedLocality = NODE_LOCAL excutor1 = none excutor2=task12
maxLocality = RACK_LOCAL allowedLocality = RACK_LOCAL excutor1 = none excutor2= none
maxLocality = ANY allowedLocality = ANY excutor1 = task13 excutor2= none (allowedLocality change to ANY,now can find task from allPendingTasks list for excutor1)
总结:1. task的选择主要依赖allowedLocality,以及task的prefer location
2.task不是一定会分配到数据所在的那台机器上,如果有台机器长时间都没有可执行的task,它会从allPendingTasks列表里面找一个task
相关推荐
hive-on-spark客户端
spark2.2.0源码 我们很高兴地宣布Spark 2.2.3的可用性!请访问发行说明以了解有关新功能的信息,或立即下载该发行版。
Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
spark源码:spark-master.zip。方便不能登录GitHub的小伙伴下载。如果实在需要留言,可以私下给。
NULL 博文链接:https://frankfan915.iteye.com/blog/2062111
内容概要:由于cdh6.3.2的spark版本为2.4.0,并且spark-sql被阉割,现基于cdh6.3.2,scala2.12.0,java1.8,maven3.6.3,,对spark-3.2.2源码进行编译 应用:该资源可用于cdh6.3.2集群配置spark客户端,用于spark-sql
spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
NULL 博文链接:https://frankfan915.iteye.com/blog/2062125
Spark安装包:spark-3.1.3-bin-without-hadoop.tgz
spark-streaming-kafka-0-8_2.11-2.4.0.jar
北风网spark课程源码spark-study-scala.rar,
Apache Spark版本3.1.3。Linux安装包。spark-3.1.3-bin-hadoop3.2.tgz
spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....
spark-3.0.0-bin-hadoop3.2下载安装包
spark streaming 链接kafka必用包,欢迎大家下载与使用
spark-2.1.0-bin-hadoop2.7.tgz linux 安装文件 。
本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载,本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载
Apache Spark源码走读之2 -- Job的提交与运行.pdf
spark-3.2.4-bin-hadoop3.2-scala2.13 安装包
spark-3.2.0-bin-hadoop3.2.tgz