`

Spark源码分析11-BlockManager

 
阅读更多

BlockManager主要在deriver和excutor构造。在deriver构造了一个BlockManagerMasterActor对象,主要负责收集block的info。在executor创建了BlockManagerMasterActor的ref,并且将ref封装到BlockManagerMaster中用于与BlockManagerMasterActor的通信。BlockManager封装了BlockManagerMaster,用于存储block,并调用BlockManagerMaster与master通信。

 

//BlockManagerMasterActor 处理的消息。updateBolckinfo主要是excutor向deriver报告block的信息。
def receive = {
    case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
      register(blockManagerId, maxMemSize, slaveActor)
      sender ! true

    case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
      // TODO: Ideally we want to handle all the message replies in receive instead of in the
      // individual private methods.
      updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)

    case GetLocations(blockId) =>
      sender ! getLocations(blockId)

    case GetLocationsMultipleBlockIds(blockIds) =>
      sender ! getLocationsMultipleBlockIds(blockIds)

    case GetPeers(blockManagerId, size) =>
      sender ! getPeers(blockManagerId, size)

    case GetMemoryStatus =>
      sender ! memoryStatus

    case GetStorageStatus =>
      sender ! storageStatus

    case RemoveRdd(rddId) =>
      sender ! removeRdd(rddId)

    case RemoveBlock(blockId) =>
      removeBlockFromWorkers(blockId)
      sender ! true

    case RemoveExecutor(execId) =>
      removeExecutor(execId)
      sender ! true

    case StopBlockManagerMaster =>
      logInfo("Stopping BlockManagerMaster")
      sender ! true
      if (timeoutCheckingTask != null) {
        timeoutCheckingTask.cancel()
      }
      context.stop(self)

    case ExpireDeadHosts =>
      expireDeadHosts()

    case HeartBeat(blockManagerId) =>
      sender ! heartBeat(blockManagerId)

    case other =>
      logWarning("Got unknown message: " + other)
  }
 
//BlockManagerSlaveActor处理的消息,主要用于master通知client删除block和RDD
  override def receive = {

    case RemoveBlock(blockId) =>
      blockManager.removeBlock(blockId)

    case RemoveRdd(rddId) =>
      val numBlocksRemoved = blockManager.removeRdd(rddId)
      sender ! numBlocksRemoved
  }
 

 

 

//blockmanager会调用BlockManagerWorker syncPutBlock和 syncGetBlock方法去远程拿数据或者写数据到远端
private[spark] object BlockManagerWorker extends Logging {
  private var blockManagerWorker: BlockManagerWorker = null

  def startBlockManagerWorker(manager: BlockManager) {
    blockManagerWorker = new BlockManagerWorker(manager)
  }
  //用于duplicate时往远端写数据
  def syncPutBlock(msg: PutBlock, toConnManagerId: ConnectionManagerId): Boolean = {
    val blockManager = blockManagerWorker.blockManager
    val connectionManager = blockManager.connectionManager
    val blockMessage = BlockMessage.fromPutBlock(msg)
    val blockMessageArray = new BlockMessageArray(blockMessage)
    val resultMessage = connectionManager.sendMessageReliablySync(
        toConnManagerId, blockMessageArray.toBufferMessage)
    resultMessage.isDefined
  }
   //用于 task运行时获取远端的数据
  def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
    val blockManager = blockManagerWorker.blockManager
    val connectionManager = blockManager.connectionManager
    val blockMessage = BlockMessage.fromGetBlock(msg)
    val blockMessageArray = new BlockMessageArray(blockMessage)
    val responseMessage = connectionManager.sendMessageReliablySync(
        toConnManagerId, blockMessageArray.toBufferMessage)
    responseMessage match {
      case Some(message) => {
        val bufferMessage = message.asInstanceOf[BufferMessage]
        logDebug("Response message received " + bufferMessage)
        BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
            logDebug("Found " + blockMessage)
            return blockMessage.getData
          })
      }
      case None => logDebug("No response message received")
    }
    null
  }
}
 

 

    远端的BlockManagerWorker会调用onBlockMessageReceive方法用来处理TYPE_PUT_BLOCK和TYPE_GET_BLOCK 这些事件

 

//BlockManagerWorker中的方法用来处理block 的读取,然后通过connectionManager回复response
def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = {
    logDebug("Handling message " + msg)
    msg match {
      case bufferMessage: BufferMessage => {
        try {
          logDebug("Handling as a buffer message " + bufferMessage)
          val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
          logDebug("Parsed as a block message array")
          val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
          Some(new BlockMessageArray(responseMessages).toBufferMessage)
        } catch {
          case e: Exception => logError("Exception handling buffer message", e)
          None
        }
      }
      case otherMessage: Any => {
        logError("Unknown type message received: " + otherMessage)
        None
      }
    }
  }

 def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
    blockMessage.getType match {
      case BlockMessage.TYPE_PUT_BLOCK => {
        val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
        logDebug("Received [" + pB + "]")
        putBlock(pB.id, pB.data, pB.level)
        None
      }
      case BlockMessage.TYPE_GET_BLOCK => {
        val gB = new GetBlock(blockMessage.getId)
        logDebug("Received [" + gB + "]")
        val buffer = getBlock(gB.id)
        if (buffer == null) {
          return None
        }
        Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
      }
      case _ => None
    }
  }

  

ConnectonManager用于连接的建立,数据的传输和接收.主要用了nio socket

 

MemoryStore存储的结构是  private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true)

,在存数据前,先会查看是否有足够的memory,如果没有,会删除老的block。如果StorageLevel是useDisk,会将老的block写到disk

 

DiskStore会按照blockId中的name创建文件,并把数据写到文件中

 

ShuffleBlockManager是BlockManager的扩展,主要用于处理shuffle操作时,shufflewrite 数据的存储

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics