`

Spark源码分析10-Schedualer

 
阅读更多

Spark很重要的一部分是Task的schedual,以下是具体的流程图。



 SchedulableBuilder分为两种,分别是FairSchedulableBuilder和FIFOSchedulableBuilder。主要是pool的getSortedTaskSetQueue方法中调用不同的taskSetSchedulingAlgorithm去排序schedulableQueue

 override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
    }
    sortedTaskSetQueue
  }

 Schedual另一个重要的类就是SchedulerBackend。它的子类有四类分别为MesosSchedulerBackendCoarseMesosSchedulerBackendSimrSchedulerBackendSparkDeploySchedulerBackendMesosSchedulerBackendCoarseMesosSchedulerBackend用于mesos的部署方式,SimrSchedulerBackend用于hadoop部署方式,SparkDeploySchedulerBackend用于纯spark的部署方式。

 

 

SparkDeploySchedulerBackendCoarseGrainedSchedulerBackend的子类,它的start方法中调用了父类的start方法,并且创建了一个AppClent实例,调用client.start()方法注册到masterMaster会通知workcommand启动CoarseGrainedExecutorBackend

override def start() {
    super.start()

    // The endpoint for executors to talk to us
    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
      conf.get("spark.driver.host"),  conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
    val command = Command(
      "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
    val sparkHome = sc.getSparkHome().getOrElse(null)
    val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
        "http://" + sc.ui.appUIAddress)

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()
  }

 以下是clientActor处理的消息

override def receive = {
      case RegisteredApplication(appId_, masterUrl) =>
        appId = appId_
        registered = true
        changeMaster(masterUrl)
        listener.connected(appId)

      case ApplicationRemoved(message) =>
        logError("Master removed our application: %s; stopping client".format(message))
        markDisconnected()
        context.stop(self)

      case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
        val fullId = appId + "/" + id
        logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
        listener.executorAdded(fullId, workerId, hostPort, cores, memory)

      case ExecutorUpdated(id, state, message, exitStatus) =>
        val fullId = appId + "/" + id
        val messageText = message.map(s => " (" + s + ")").getOrElse("")
        logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
        if (ExecutorState.isFinished(state)) {
          listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
        }

      case MasterChanged(masterUrl, masterWebUiUrl) =>
        logInfo("Master has changed, new master is at " + masterUrl)
        changeMaster(masterUrl)
        alreadyDisconnected = false
        sender ! MasterChangeAcknowledged(appId)

      case DisassociatedEvent(_, address, _) if address == masterAddress =>
        logWarning(s"Connection to $address failed; waiting for master to reconnect...")
        markDisconnected()

      case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
        logWarning(s"Could not connect to $address: $cause")

      case StopAppClient =>
        markDead()
        sender ! true
        context.stop(self)
}

 CoarseGrainedSchedulerBackendstart方法中创建了DriverActor,并调用preStart方法, schedule了一个reviveInterval,每个reviveInterval发送一个ReviveOffersReviveOffers launch task

   override def preStart() {
      // Listen for remote client disconnection events, since they don't go through Akka's watch()
      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

      // Periodically revive offers to allow delay scheduling to work
      val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
      import context.dispatcher
      context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}

 DriverActor中处理的事件

def receive = {
      case RegisterExecutor(executorId, hostPort, cores) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorActor.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor(sparkProperties)
          executorActor(executorId) = sender
          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
          freeCores(executorId) = cores
          executorAddress(executorId) = sender.path.address
          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          makeOffers()
        }

      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          if (executorActor.contains(executorId)) {
            freeCores(executorId) += 1
            makeOffers(executorId)
          } else {
            // Ignoring the update since we don't know about the executor.
            val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
            logWarning(msg.format(taskId, state, sender, executorId))
          }
        }

      case ReviveOffers =>
        makeOffers()

      case KillTask(taskId, executorId) =>
        executorActor(executorId) ! KillTask(taskId, executorId)

      case StopDriver =>
        sender ! true
        context.stop(self)

      case StopExecutors =>
        logInfo("Asking each executor to shut down")
        for (executor <- executorActor.values) {
          executor ! StopExecutor
        }
        sender ! true

      case RemoveExecutor(executorId, reason) =>
        removeExecutor(executorId, reason)
        sender ! true

      case DisassociatedEvent(_, address, _) =>
        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))

    }

 MesosSchedulerBackend的部署方式

 在start()方法中启动了MesosSchedulerDriver

override def start() {
    synchronized {
      classLoader = Thread.currentThread.getContextClassLoader

      new Thread("MesosSchedulerBackend driver") {
        setDaemon(true)
        override def run() {
          val scheduler = MesosSchedulerBackend.this
          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
          try {
            val ret = driver.run()
            logInfo("driver.run() returned with code " + ret)
          } catch {
            case e: Exception => logError("driver.run() failed", e)
          }
        }
      }.start()

      waitForRegister()
    }
  }

 TaskSchedulerImpl调用submitTasks()时会调用MesosSchedulerBackendreviveOffers()。通知MesosSchedulerDriver reviveOffersMesosSchedulerDriver 会被调用resourceOffers方法,resourceOffers方法中创建了mesosTasks,并且调用launchTasks方法提交taskmesos。bellow is resourceOffers function

 override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
    val oldClassLoader = setClassLoader()
    try {
      synchronized {
        // Build a big list of the offerable workers, and remember their indices so that we can
        // figure out which Offer to reply to for each worker
        val offerableIndices = new ArrayBuffer[Int]
        val offerableWorkers = new ArrayBuffer[WorkerOffer]

        def enoughMemory(o: Offer) = {
          val mem = getResource(o.getResourcesList, "mem")
          val slaveId = o.getSlaveId.getValue
          mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
        }

        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
          offerableIndices += index
          offerableWorkers += new WorkerOffer(
            offer.getSlaveId.getValue,
            offer.getHostname,
            getResource(offer.getResourcesList, "cpus").toInt)
        }

        // Call into the ClusterScheduler
        val taskLists = scheduler.resourceOffers(offerableWorkers)

        // Build a list of Mesos tasks for each slave
        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
        for ((taskList, index) <- taskLists.zipWithIndex) {
          if (!taskList.isEmpty) {
            val offerNum = offerableIndices(index)
            val slaveId = offers(offerNum).getSlaveId.getValue
            slaveIdsWithExecutors += slaveId
            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
            for (taskDesc <- taskList) {
              taskIdToSlaveId(taskDesc.taskId) = slaveId
              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
            }
          }
        }

        // Reply to the offers
        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
        for (i <- 0 until offers.size) {
          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
        }
      }
    } finally {
      restoreClassLoader(oldClassLoader)
    }
  }

CoarseMesosSchedulerBackend 

CoarseMesosSchedulerBackend也是mesos部署的一种,它和MesosSchedulerBackend区别在于CoarseMesosSchedulerBackend在mesos上运行一个CoarseGrainedExecutorBackend,然后CoarseMesosSchedulerBackend提交task给CoarseGrainedExecutorBackend,让CoarseGrainedExecutorBackend运行。MesosSchedulerBackend是直接将task转化为

 Mesos的task,提交到mesos上运行。下面是createCommand的代码,其他和MesosSchedulerBackend类似。

 def createCommand(offer: Offer, numCores: Int): CommandInfo = {
    val environment = Environment.newBuilder()
    sc.executorEnvs.foreach { case (key, value) =>
      environment.addVariables(Environment.Variable.newBuilder()
        .setName(key)
        .setValue(value)
        .build())
    }
    val command = CommandInfo.newBuilder()
      .setEnvironment(environment)
    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
      conf.get("spark.driver.host"),
      conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
      val uri = conf.get("spark.executor.uri", null)
    if (uri == null) {
      val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
      command.setValue(
        "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
          runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
    } else {
      // Grab everything to the first '.'. We'll use that and '*' to
      // glob the directory "correctly".
      val basename = uri.split('/').last.split('.').head
      command.setValue(
        "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d"
          .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
    }
    command.build()
  }

 SimrSchedulerBackend

目前还没有看懂

 

 

 

 

  • 大小: 72.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics