BlockManager主要在deriver和excutor构造。在deriver构造了一个BlockManagerMasterActor对象,主要负责收集block的info。在executor创建了BlockManagerMasterActor的ref,并且将ref封装到BlockManagerMaster中用于与BlockManagerMasterActor的通信。BlockManager封装了BlockManagerMaster,用于存储block,并调用BlockManagerMaster与master通信。
//BlockManagerMasterActor 处理的消息。updateBolckinfo主要是excutor向deriver报告block的信息。 def receive = { case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => register(blockManagerId, maxMemSize, slaveActor) sender ! true case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => // TODO: Ideally we want to handle all the message replies in receive instead of in the // individual private methods. updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) case GetLocations(blockId) => sender ! getLocations(blockId) case GetLocationsMultipleBlockIds(blockIds) => sender ! getLocationsMultipleBlockIds(blockIds) case GetPeers(blockManagerId, size) => sender ! getPeers(blockManagerId, size) case GetMemoryStatus => sender ! memoryStatus case GetStorageStatus => sender ! storageStatus case RemoveRdd(rddId) => sender ! removeRdd(rddId) case RemoveBlock(blockId) => removeBlockFromWorkers(blockId) sender ! true case RemoveExecutor(execId) => removeExecutor(execId) sender ! true case StopBlockManagerMaster => logInfo("Stopping BlockManagerMaster") sender ! true if (timeoutCheckingTask != null) { timeoutCheckingTask.cancel() } context.stop(self) case ExpireDeadHosts => expireDeadHosts() case HeartBeat(blockManagerId) => sender ! heartBeat(blockManagerId) case other => logWarning("Got unknown message: " + other) }
//BlockManagerSlaveActor处理的消息,主要用于master通知client删除block和RDD override def receive = { case RemoveBlock(blockId) => blockManager.removeBlock(blockId) case RemoveRdd(rddId) => val numBlocksRemoved = blockManager.removeRdd(rddId) sender ! numBlocksRemoved }
//blockmanager会调用BlockManagerWorker syncPutBlock和 syncGetBlock方法去远程拿数据或者写数据到远端 private[spark] object BlockManagerWorker extends Logging { private var blockManagerWorker: BlockManagerWorker = null def startBlockManagerWorker(manager: BlockManager) { blockManagerWorker = new BlockManagerWorker(manager) } //用于duplicate时往远端写数据 def syncPutBlock(msg: PutBlock, toConnManagerId: ConnectionManagerId): Boolean = { val blockManager = blockManagerWorker.blockManager val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromPutBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) val resultMessage = connectionManager.sendMessageReliablySync( toConnManagerId, blockMessageArray.toBufferMessage) resultMessage.isDefined } //用于 task运行时获取远端的数据 def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = { val blockManager = blockManagerWorker.blockManager val connectionManager = blockManager.connectionManager val blockMessage = BlockMessage.fromGetBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) val responseMessage = connectionManager.sendMessageReliablySync( toConnManagerId, blockMessageArray.toBufferMessage) responseMessage match { case Some(message) => { val bufferMessage = message.asInstanceOf[BufferMessage] logDebug("Response message received " + bufferMessage) BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => { logDebug("Found " + blockMessage) return blockMessage.getData }) } case None => logDebug("No response message received") } null } }
远端的BlockManagerWorker会调用onBlockMessageReceive方法用来处理TYPE_PUT_BLOCK和TYPE_GET_BLOCK 这些事件
//BlockManagerWorker中的方法用来处理block 的读取,然后通过connectionManager回复response def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = { logDebug("Handling message " + msg) msg match { case bufferMessage: BufferMessage => { try { logDebug("Handling as a buffer message " + bufferMessage) val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage) logDebug("Parsed as a block message array") val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get) Some(new BlockMessageArray(responseMessages).toBufferMessage) } catch { case e: Exception => logError("Exception handling buffer message", e) None } } case otherMessage: Any => { logError("Unknown type message received: " + otherMessage) None } } } def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = { blockMessage.getType match { case BlockMessage.TYPE_PUT_BLOCK => { val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) logDebug("Received [" + pB + "]") putBlock(pB.id, pB.data, pB.level) None } case BlockMessage.TYPE_GET_BLOCK => { val gB = new GetBlock(blockMessage.getId) logDebug("Received [" + gB + "]") val buffer = getBlock(gB.id) if (buffer == null) { return None } Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer))) } case _ => None } }
ConnectonManager用于连接的建立,数据的传输和接收.主要用了nio socket
MemoryStore存储的结构是 private val entries = new LinkedHashMap[BlockId, Entry](32, 0.75f, true)
,在存数据前,先会查看是否有足够的memory,如果没有,会删除老的block。如果StorageLevel是useDisk,会将老的block写到disk
DiskStore会按照blockId中的name创建文件,并把数据写到文件中
ShuffleBlockManager是BlockManager的扩展,主要用于处理shuffle操作时,shufflewrite 数据的存储
相关推荐
spark-blockmanager基础及源码彻底解析
spark-BlockManager向BlockManagerMaster注册
spark2.2.0源码 我们很高兴地宣布Spark 2.2.3的可用性!请访问发行说明以了解有关新功能的信息,或立即下载该发行版。
spark源码:spark-master.zip。方便不能登录GitHub的小伙伴下载。如果实在需要留言,可以私下给。
Spark安装包:spark-3.1.3-bin-without-hadoop.tgz
北风网spark课程源码spark-study-scala.rar,
NULL 博文链接:https://frankfan915.iteye.com/blog/2062111
Apache Spark版本3.1.3。Linux安装包。spark-3.1.3-bin-hadoop3.2.tgz
内容概要:由于cdh6.3.2的spark版本为2.4.0,并且spark-sql被阉割,现基于cdh6.3.2,scala2.12.0,java1.8,maven3.6.3,,对spark-3.2.2源码进行编译 应用:该资源可用于cdh6.3.2集群配置spark客户端,用于spark-sql
spark-3.0.0-bin-hadoop3.2下载安装包
spark-2.1.0-bin-hadoop2.7.tgz linux 安装文件 。
本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载,本资源是spark-2.0.0-bin-hadoop2.6.tgz百度网盘资源下载
spark-3.2.4-bin-hadoop3.2-scala2.13 安装包
spark-3.2.0-bin-hadoop3.2.tgz
pyspark本地的环境配置包,spark-2.3.4-bin-hadoop2.7.tgz:spark-2.3.4-bin-hadoop2.7.tgz
spark-3.1.2.tgz版本 & spark-3.1.2-bin-hadoop2.7.tgz版本
spark-streaming-kafka-0-8_2.11-2.4.0.jar
spark-3.0.0-bin-hadoop2.7.tgz 官网下载不了的,需要资源的,可以到这里下载哦
NULL 博文链接:https://frankfan915.iteye.com/blog/2062125
spark-2.4.0-bin-hadoop2.7