`

Spark源码分析5-Master

 
阅读更多

本节主要分析Master 的主要功能。

Master主要分为两块. 1. Master leader的选举。2.Master对work,application,deriver的管理

 

首先看Master是怎么启动的

调用了 actorSystem.actorOf()创建了Master Actor对象

  def main(argStrings: Array[String]) {
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
    actorSystem.awaitTermination()
  }

  def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf)
      : (ActorSystem, Int, Int) =
  {
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf)
    val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
    val timeout = AkkaUtils.askTimeout(conf)
    val respFuture = actor.ask(RequestWebUIPort)(timeout)
    val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
    (actorSystem, boundPort, resp.webUIBoundPort)
  }

 接下来Master运行prestart函数,在这个函数中创建了web UI service,schedule了CheckForWorkerTimeOut的任务,创建了persistenceEngine和leaderElectionAgent。

 

persistenceEngine是用来做master recover的,leaderElectionAgent是用来选举master的leader的。

 override def preStart() {
    logInfo("Starting Spark master at " + masterUrl)
    // Listen for remote client disconnection events, since they don't go through Akka's watch()
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
    webUi.start()
    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

    masterMetricsSystem.registerSource(masterSource)
    masterMetricsSystem.start()
    applicationMetricsSystem.start()

    persistenceEngine = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
      case "FILESYSTEM" =>
        logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
        new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
      case _ =>
        new BlackHolePersistenceEngine()
    }
    leaderElectionAgent = RECOVERY_MODE match {
        case "ZOOKEEPER" =>
          context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
        case _ =>
          context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
      }

 接下来以ZooKeeperLeaderElectionAgent和ZooKeeperPersistenceEngine为例。

SparkZooKeeperSession是用来与zookeeper交互的,如create path,getData等


  checkLeader函数中拿到master的列表并以第一个作为leader,然后添加了一个watcher来监控   leaderFile,当leaderFile被删除的时候,将会重新选举leader。然后会调用updateLeadershipStatus通知master,leader已经选举出来。

 

 def checkLeader() {
    val masters = zk.getChildren(WORKING_DIR).toList
    val leader = masters.sorted.head
    val leaderFile = WORKING_DIR + "/" + leader

    // Setup a watch for the current leader.
    zk.exists(leaderFile, watcher)

    try {
      leaderUrl = new String(zk.getData(leaderFile))
    } catch {
      // A NoNodeException may be thrown if old leader died since the start of this method call.
      // This is fine -- just check again, since we're guaranteed to see the new values.
      case e: KeeperException.NoNodeException =>
        logInfo("Leader disappeared while reading it -- finding next leader")
        checkLeader()
        return
    }

    // Synchronization used to ensure no interleaving between the creation of a new session and the
    // checking of a leader, which could cause us to delete our real leader file erroneously.
    synchronized {
      val isLeader = myLeaderFile == leaderFile
      if (!isLeader && leaderUrl == masterUrl) {
        // We found a different master file pointing to this process.
        // This can happen in the following two cases:
        // (1) The master process was restarted on the same node.
        // (2) The ZK server died between creating the file and returning the name of the file.
        //     For this case, we will end up creating a second file, and MUST explicitly delete the
        //     first one, since our ZK session is still open.
        // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
        // leader changes.
        assert(leaderFile < myLeaderFile)
        logWarning("Cleaning up old ZK master election file that points to this master.")
        zk.delete(leaderFile)
      } else {
        updateLeadershipStatus(isLeader)
      }
    }
  }

 

   ZooKeeperPersistenceEngine将Application,Driver和worker的信息记录到zookeeper中,用于leader的master当掉时,恢复。

 

  下面是master处理的消息,主要是leader的消息,对worker,application,deriver的管理

 

 override def receive = {
    case ElectedLeader => {
      val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty)
        RecoveryState.ALIVE
      else
        RecoveryState.RECOVERING
      logInfo("I have been elected leader! New state: " + state)
      if (state == RecoveryState.RECOVERING) {
        beginRecovery(storedApps, storedDrivers, storedWorkers)
        context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
      }
    }

    case RevokedLeadership => {
      logError("Leadership has been revoked -- master shutting down.")
      System.exit(0)
    }

    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress) => {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          sender, workerWebUiPort, publicAddress)
        registerWorker(worker)
        persistenceEngine.addWorker(worker)
        sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
        schedule()
      }
    }

    case RequestSubmitDriver(description) => {
      if (state != RecoveryState.ALIVE) {
        val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
        sender ! SubmitDriverResponse(false, None, msg)
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".

        sender ! SubmitDriverResponse(true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}")
      }
    }

    case RequestKillDriver(driverId) => {
      if (state != RecoveryState.ALIVE) {
        val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
        sender ! KillDriverResponse(driverId, success = false, msg)
      } else {
        logInfo("Asked to kill driver " + driverId)
        val driver = drivers.find(_.id == driverId)
        driver match {
          case Some(d) =>
            if (waitingDrivers.contains(d)) {
              waitingDrivers -= d
              self ! DriverStateChanged(driverId, DriverState.KILLED, None)
            }
            else {
              // We just notify the worker to kill the driver here. The final bookkeeping occurs
              // on the return path when the worker submits a state change back to the master
              // to notify it that the driver was successfully killed.
              d.worker.foreach { w =>
                w.actor ! KillDriver(driverId)
              }
            }
            // TODO: It would be nice for this to be a synchronous response
            val msg = s"Kill request for $driverId submitted"
            logInfo(msg)
            sender ! KillDriverResponse(driverId, success = true, msg)
          case None =>
            val msg = s"Driver $driverId has already finished or does not exist"
            logWarning(msg)
            sender ! KillDriverResponse(driverId, success = false, msg)
        }
      }
    }

    case RequestDriverStatus(driverId) => {
      (drivers ++ completedDrivers).find(_.id == driverId) match {
        case Some(driver) =>
          sender ! DriverStatusResponse(found = true, Some(driver.state),
            driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
        case None =>
          sender ! DriverStatusResponse(found = false, None, None, None, None)
      }
    }

    case RegisterApplication(description) => {
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, sender)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        sender ! RegisteredApplication(app.id, masterUrl)
        schedule()
      }
    }

    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
      execOption match {
        case Some(exec) => {
          exec.state = state
          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
          if (ExecutorState.isFinished(state)) {
            val appInfo = idToApp(appId)
            // Remove this executor from the worker and app
            logInfo("Removing executor " + exec.fullId + " because it is " + state)
            appInfo.removeExecutor(exec)
            exec.worker.removeExecutor(exec)

            // Only retry certain number of times so we don't go into an infinite loop.
            if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
              schedule()
            } else {
              logError("Application %s with ID %s failed %d times, removing it".format(
                appInfo.desc.name, appInfo.id, appInfo.retryCount))
              removeApplication(appInfo, ApplicationState.FAILED)
            }
          }
        }
        case None =>
          logWarning("Got status update for unknown executor " + appId + "/" + execId)
      }
    }

    case DriverStateChanged(driverId, state, exception) => {
      state match {
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }
    }

    case Heartbeat(workerId) => {
      idToWorker.get(workerId) match {
        case Some(workerInfo) =>
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        case None =>
          logWarning("Got heartbeat from unregistered worker " + workerId)
      }
    }

    case MasterChangeAcknowledged(appId) => {
      idToApp.get(appId) match {
        case Some(app) =>
          logInfo("Application has been re-registered: " + appId)
          app.state = ApplicationState.WAITING
        case None =>
          logWarning("Master change ack from unknown app: " + appId)
      }

      if (canCompleteRecovery) { completeRecovery() }
    }

    case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
      idToWorker.get(workerId) match {
        case Some(worker) =>
          logInfo("Worker has been re-registered: " + workerId)
          worker.state = WorkerState.ALIVE

          val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
          for (exec <- validExecutors) {
            val app = idToApp.get(exec.appId).get
            val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
            worker.addExecutor(execInfo)
            execInfo.copyState(exec)
          }

          for (driverId <- driverIds) {
            drivers.find(_.id == driverId).foreach { driver =>
              driver.worker = Some(worker)
              driver.state = DriverState.RUNNING
              worker.drivers(driverId) = driver
            }
          }
        case None =>
          logWarning("Scheduler state from unknown worker: " + workerId)
      }

      if (canCompleteRecovery) { completeRecovery() }
    }

 

 

  • 大小: 12.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics