DAGScheduler 的stage划分
/** org.apache.spark.scheduler.DAGScheduler 中的 submitStage * 提交stage, 先提交缺失的父stage */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) //注意这里 logDebug("missing: " + missing) if (missing.isEmpty) {// 所有的父stage都提交完了 再提交自己 logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent)//注意这里 } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => //注意shufDep宽依赖会产生新的stage val mapStage = getShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => //注意这里 narrowDep waitingForVisit.push(narrowDep.rdd) } } } } } waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } missing.toList }
相关推荐
spark-3.1.2.tgz版本 & spark-3.1.2-bin-hadoop2.7.tgz版本
本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载,本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载
Apache Spark版本3.1.3。Linux安装包。spark-3.1.3-bin-hadoop3.2.tgz
spark-3.2.0-bin-hadoop3.2.tgz
Spark安装包:spark-3.1.3-bin-without-hadoop.tgz
Spark Doris Connector(apache-doris-spark-connector-2.3_2.11-1.0.1-incubating-src.tar.gz) Spark Doris Connector Version:1.0.1 Spark Version:2.x Scala Version:2.11 Apache Doris是一个现代MPP分析...
spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....
spark-3.0.0-bin-hadoop3.2下载安装包
pyspark本地的环境配置包,spark-2.3.4-bin-hadoop2.7.tgz:spark-2.3.4-bin-hadoop2.7.tgz
spark-3.2.4-bin-hadoop3.2-scala2.13 安装包
内容概要:由于cdh6.3.2的spark版本为2.4.0,并且spark-sql被阉割,现基于cdh6.3.2,scala2.12.0,java1.8,maven3.6.3,,对spark-3.2.2源码进行编译 应用:该资源可用于cdh6.3.2集群配置spark客户端,用于spark-sql
linux的spark新版本,匹配hadoop2.7版本,spark-3.2.1-bin-hadoop2.7.tgz
spark-streaming-flume_2.11-2.1.0.jar
spark-3.2.0-bin-hadoop3-without-hive
spark-2.4.0-bin-hadoop2.7
这是每个学习spark必备的jar包,是根据我的个人试验后所得,官网正版,在spark官网下载。 资源包里不仅有需要的jar包,并且给不会再官网上下载的新手官方网址,可以自由下载资源
spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.1.1.jar
spark-3.2.1-bin-hadoop3.2-scala2.13.tgz
spark-streaming-kafka-0-8_2.11-2.4.0.jar
spark-assembly-1.5.2-hadoop2.6.0 在spark编程中使用的一个jar