`
bit1129
  • 浏览: 1056439 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark二十九】Driver

 
阅读更多

在Spark Standalone集群模式下,Driver运行在客户端,所谓的客户端就是提交代码的那台机器。在Standalone模式下,角色包括:

Driver(Client,这里的Client对应到Spark的代码中是AppClient吗?)如下图所示,Driver位于提交代码的那台机器(提交代码的机器是Client),

Master

Worker(Worker是一个进程,它其中会有多个Executor)

Executor

 

为什么说Driver是在提交代码的那台机器上呢?

SparkConf类中有个关于Driver的参数设置,如下代码在SparkContext的构造方法中

 

  // Set Spark driver host and port system properties
  conf.setIfMissing("spark.driver.host", Utils.localHostName()) ////host是本地,意思是可以设置的??
  conf.setIfMissing("spark.driver.port", "0")

 

 



 



 

 

 

时序:

1.Client(Driver)向Master提交Application----通过spark-sumbit提交,指定master=spark:///

2. Master收到Driver的Application请求,申请资源(实际上是Worker的Executor),启动StandaloneExecutorBackend,StandaloneExecutorBackend是Worker跟外界通信的代表

3.图中的第3步代码中是否有体现?

4.Executor启动后,Driver就可以分配Task(launchTask)

5.作业执行过程中,Worker向Driver汇报任务的执行情况

 

 

用户的程序分成两部分,一个是初始化SparkContext,定义针对数据的各种函数操作实现业务逻辑(对应不同的RDD),当SparkContext通过runJob提交后,接下来的工作由Driver完成?

Driver是作业的驱动器(或者主进程),负责Job的解析,生成Stage,并调度Task到Executor上执行,其核心和精髓是DAGScheduler和TaskScheduler,通过AKKA消息驱动的方式完成

不是很理解!!这些工作都是SparkContext来完成的,SparkContext中有DAGScheduler和TaskScheduler,为什么会分成两部分?

 

Driver分为两部分:

1是SparkContext以及围绕这SparkContext的SparkConf和SparkEnv

2是DAGScheduler,TaskScheduler以及部署模块(部署模块主要是TaskScheduler使用)

 

Driver通过launchTask发送任务给Executor?Executor内部以线程池多线程的方式并行的运行任务(实际顺序是SparkContext.runJob->DagScheduler.runJob->DAGScheduler.submitJob->TaskScheduler.runbJob->TaskSetManager给LocalActor或者CoarseGrainedActor发送lanchTask消息,CoarseGrainedActor受到消息后调用Executor的lauchTask方法)

 

 

SparkConf

SparkConf一旦传递给SparkContext后就不能再修改,因为SparkContext构造时使用了SparkConf的clone方法。

 

 

SparkEnv

1.LiveListenerBus

里面有个org.apache.spark.scheduler.LiveListenerBus用于广播SparkListenerEvents到SparkListeners,SparkListenerEvents都定义在SparkListener.scala中

 

/**
 * Asynchronously passes SparkListenerEvents to registered SparkListeners.
 *
 * Until start() is called, all posted events are only buffered. Only after this listener bus
 * has started will events be actually propagated to all attached listeners. This listener bus
 * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
 */

 2. SparkEnv类似集群全局变量,在Driver中有,在Worker的Executors中也有,而Worker的Executors有多个,那么每个Executor的每个线程都会访问SparkEnv变量,Spark使用ThreadLocal来保存SparkEnv变量。因此,SparkEnv是一个重量级的东西。

 

 

CoarseGrainedSchedulerBackend

1. 在org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend其中创建了DriverActor

 

    // TODO (prashant) send conf instead of properties
    driverActor = actorSystem.actorOf(
      Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)

 

 

2.CoarseGrainedSchedulerBackend有一个子类org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend

关注它的start方法,其中的一句:

val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)

这个命令用于在Standalone模式下,通过CoarseGrainedExecutorBackend的命令方式启动Executor?

 

  override def start() {
    super.start()

    // The endpoint for executors to talk to us
    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
      SparkEnv.driverActorSystemName,
      conf.get("spark.driver.host"),
      conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
      "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
      cp.split(java.io.File.pathSeparator)
    }
    val libraryPathEntries =
      sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
        cp.split(java.io.File.pathSeparator)
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    ///用于启动Executor的指令?
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    ////将command封装到appDesc类中
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir)

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) ////App的Client,
    
    ///启动ClientActor
    client.start()

    waitForRegistration()
  }

 

 

3.AppClient类

 

  def start() {
    // Just launch an actor; it will call back into the listener.
    actor = actorSystem.actorOf(Props(new ClientActor))
  }

 

 

 

 

 

Client

org.apache.spark.deploy.Client(是一个object)
org.apache.spark.deploy.yarn.Client(是一个object)
org.apache.spark.deploy.yarn.client(这是一个私有类)
org.apache.spark.deploy.client.AppClient(这是一个私有类)

这几个类都在什么集群模式下起作用,用来做什么的?

 

 

 

 

 

未分类:

1.除了action触发Job提交,checkpoint也会触发job提交

2.提交Job时,首先计算Stage的依赖关系,从后面往前追溯,前面

 

 

 

 

 

 

 

 

 

  • 大小: 53.2 KB
  • 大小: 36.5 KB
  • 大小: 82.6 KB
分享到:
评论

相关推荐

    spark-submit cluster模式时driver-class-path支持hdfs路径

    spark官方版本的driver-class-path不支持hdfs路径,只支持本地路径。本资源解决了这个问题,driver-class-path在cluster模式时可以支持hdfs路径,解决了cluster模式driver有大量jar依赖的问题。

    0438-如何指定Spark1作业中Driver和Executor使用指定范围内端口

    在CDH集群中提交Spark作业,大家也都知道Spark的Driver和Executor之间通讯端口是随机的,Spark会随选择1024和65535(含)之间的端口,因此在集群之间不建议启用防火墙。在前面Fayson介绍了《如何指定Spark2作业中...

    spark期末复习题总结

    1. Spark的核心组件包括Driver、Executor和Task。Driver是作业的主进程,负责作业的调度和解析;Executor是工作节点上的工作单元,负责执行Task;Task是Spark应用程序的基本执行单元。 2. Scala是Spark的主要编程...

    Hadoop原理与技术Spark操作实验

    一、实验目的 1. 理解Spark编程思想; 2. 学会在Spark Shell中编写Scala程序; 3. 学会在Spark Shell中运行Scala程序。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 ...(二)spark运行wordcount程序

    Spark驱动文件 Simba_Spark_JDBC.zip

    Spark驱动文件 Simba_Spark_JDBC.zip Apache Commons Copyright ?2001-2015 The Apache Software Foundation Apache Commons Codec Copyright ?2002-2014 The Apache Software Foundation Apache Hadoop Common ...

    Spark 3.0.0 Driver 启动内幕

    本课程讲解Spark 3.0.0 Driver 启动内幕 的内容,包括:Spark Driver Program 剖析:Spark Driver Program、SparkContext 深度剖析、SparkContext 源码解析;DAGScheduler 解析:DAG 的实例化 、DAGScheduler 划分...

    Spark性能调优与故障处理.pdf

    4. Driver 的配置:Driver 是 Spark 作业的控制中心,Driver 的配置包括 Driver 的内存大小和 CPU core 数量等。 5. Spark Standalone 模式和 Spark Yarn 模式:Spark 支持两种 Cluster 运行模式,分别是 Spark ...

    Spark源码分析2-Driver generate jobs and launch task

    NULL 博文链接:https://frankfan915.iteye.com/blog/2062111

    大数据组件-监控-spark-driver/executor性能的prometheus-grafana模板插件

    大数据组件-监控-spark-driver/executor性能的prometheus-grafana模板插件

    mongo-spark-connector_2.11-2.2.0.jar

    mongodb spark连接器,适用版本spark2.1.X ,Scala2.11.X, java 6 or later,mongodb 2.6 or later,请根据上面的版本选择,不然会报各种错误

    07-尚硅谷大数据技术之Spark源码1

    Driver 是 Spark 应用程序的主入口,负责调度和管理 Executor 的执行。Executor 是 Spark 应用程序的执行单元,负责执行具体的任务。在 Yarn 集群中,Driver 和 Executor 之间的通信基于 Socket。 组件通信(Socket...

    Spark-内核源码解析.docx

    在 Spark 中,Application 指的是用户编写的 Spark 应用程序/代码,包含了 Driver 功能代码和分布在集群中多个节点上运行的 Executor 代码。一个 Spark 应用程序由一个或多个作业 JOB 组成,Driver 负责和 Cluster...

    spark-mong连接jar包

    spark-mong连接jar包 这个主要是spark 用JAVA语言连接mysql , mongodb 数据库的

    Spark源码系列(一)spark-submit提交作业过程

    这个是Spark的App运行图,它通过一个Driver来和集群通信,集群负责作业的分配。今天我要讲的是如何创建这个Driver Program的过程。我们先看一下用SparkSubmit提交的方法吧,下面是从官方上面摘抄的内容。这个是提交...

    Spark On K8s实战教程

    二、Spark on K8s工作原理 具体流程,包括以下几步: ①:用户使用kubectl 创建 SparkApplication 对象,提交sparkApplication的请求到api-server,并把sparkApplication的CRD持久化到etcd; ②:SparkApplication ...

    Spark核心技术原理透视一Spark运行原理.pdf

    2. Driver:驱动程序,Spark中的Driver即运行上述Application的Main()函数,并且创建SparkContext, 其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。在Spark中由SparkContext负责和ClusterManager...

    spark官方文档中文版

    Spark 应用程序都由一个驱动程序(driver programe)构成,驱动程序在集群上运行用户的 mian 函数来执行 各种各样的并行操作(parallel operations)。Spark 的主要抽象是提供一个弹性分布式数据集(RDD),RDD 是指能横跨...

    spark rdd 实战 ,基本语法

    在 Spark 中,Driver 程序负责启动多个 Worker,Worker 从文件系统加载数据并将其转换为 RDD。 RDD 的概念 RDD(Resilient Distributed Dataset)是 Spark 中的核心概念。RDD 是一个只读、分区记录的集合,可以被...

    Spark On K8s实战教程分享

    二、在将 Spark 任务提交到 K8s 集群上时,不同的公司可能会采取不同的方法。以下是目前常见的几种做法以及我们在线上所采用的任务提交和管理方式。 1、使用原生 spark-submit 原生的 spark-submit 命令可以直接提交...

    大数据技术之Spark优化

    在 Spark 中,可以配置的资源包括 Executor 的数量、Driver 的内存、Executor 的内存和 Executor 的 CPU core 数量。 Executor 的数量 Executor 的数量是 Spark 中的一个重要配置项。增加 Executor 的数量,可以...

Global site tag (gtag.js) - Google Analytics