在分析源代码之前,首先对Standalone Cluster Mode的资源调度有一个基本的认识:
首先,运行一个Application需要Driver进程和一组Executor进程。在Standalone Cluster Mode下,Driver和Executor都是在Master的监护下给Worker发消息创建(Driver进程和Executor进程都需要分配内存和CPU,这就需要Master进行资源的调度)。
另外,在为Application的Executor进程s分配CPU内核时,需要考虑CPU内核是尽可能的分散到所有的Worker上分配,还是尽可能在尽量少的Worker上分配,这是有计算任务的特点决定的,是数据密集型还是计算密集型(比如求PI值)
源代码:
/** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ private def schedule() { if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications // Randomization helps balance drivers //首先为已经提交Application,但是Application对应的Driver进程还没有启动的应用分配Driver资源 val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 ///waitingDrivers列表保存了已经提交Application,但是Application对应的Driver进程还没有启动的DriverInfo信息,每个元素是DriverInfo类型 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false var numWorkersVisited = 0 //对于给定的待申请资源的Driver,遍历所有可用的Worker,寻找满足条件的Worker(CPU和内存足够) while (numWorkersVisited < numWorkersAlive && !launched) { //获取第curPos号的可用Worker val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 ////如果该worker满足Driver的资源需求,则启动Driver进程,并且设置为已经动(launched=true) if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } ///判断下一个可用的Worker curPos = (curPos + 1) % numWorkersAlive } } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. //将Executor尽量分散到尽可能多的Worker上 if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length //assigned数组记录每个Worker上已经分配的CPU内核数 val assigned = new Array[Int](numUsable) // Number of cores to give on each node //toAssign表示为该app将要分配的CPU内核数,它是app申请的内核数和所有Worker的剩余CPU内核数的较小值 var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) var pos = 0 while (toAssign > 0) { //遍历所有的可用Worker,一次分配一个CPU内核,记录在assigned数组中 if (usableWorkers(pos).coresFree - assigned(pos) > 0) { toAssign -= 1 assigned(pos) += 1 } pos = (pos + 1) % numUsable } // Now that we've decided how many cores to give on each node, let's actually give them //根据assigned数组中记录的每个Worker要分配的CPU内核数,进行实际的分配 for (pos <- 0 until numUsable) { if (assigned(pos) > 0) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //启动给定指定内核数的Executor进程(实际上是CoarceGrainedExecutorBackend进程) launchExecutor(usableWorkers(pos), exec) app.state = ApplicationState.RUNNING } } } } else { // Pack each app into as few nodes as possible until we've assigned all its cores //将Executor的分配集中在尽可能上的Worker上,外围循环是Worker,内层循环是Application for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { if (canUse(app, worker)) {//针对该Application,如果可以分配资源, //分配的内核数是该Worker可用的内核数与App申请的内核数的最小值 val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse > 0) { //要么Worker的内核数全部分配,要么app需要的内核数得到满足 val exec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } } } } }
相关推荐
Spark Standalone架构设计.docx
1. 解压Spark安装包 2. 配置Spark环境变量 2. 修改 spark-env.sh 文件,完成以下设置: 1. 设置运行master进程的节点, e
Spark 官方文档翻译,Spark 单机版部署方式,在CentOS 7 的环境下的开发方法
Spark standalone 分布式集群搭建,Spark standalone运行模式,Spark Standalone运行架构解析---Spark基本工作流程,Spark Standalone运行架构解析---Spark local cluster模式
这篇博客,Alice为大家带来的是Spark集群环境搭建之——standalone集群模式。 文章目录集群角色介绍集群规划修改配置并分发启动和停止查看web界面测试 集群角色介绍 Spark是基于内存计算的大数据并行计算框架,...
独立部署模式standalone下spark配置,从乌班图到jak,scala,hadoop,spark的安装 部署
Flink_Standalone_Cluster
Spark standalone 单机版部署,看了网上很多方法,事实证明都是错误的,本人亲身经历,在导师的指导下,成功配置成功单机版。
1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合...
standalone模式下executor调度策略 Spark Sql源码阅读 Spark Sql源码阅读 hive on spark调优 Spark SQL 多维聚合分析应用案例 Spark Streaming源码阅读 动态发现新增分区 Dstream join 操作和 RDD join 操作的...
springboot整合spark连接远程服务计算框架使用standAlone模式
│ 03-[掌握]-Spark环境搭建-Standalone集群模式.mp4 │ 06-[理解]-Spark环境搭建-On-Yarn-两种模式.mp4 │ 07-[掌握]-Spark环境搭建-On-Yarn-两种模式演示.mp4 │ 09-[掌握]-Spark代码开发-准备工作.mp4 │ 10...
本资源为网页,不是PDF Apache Spark 2.0.2 中文文档 Spark 概述 编程指南 快速入门 Spark 编程指南 概述 Spark 依赖 Spark 的初始化 Shell 的使用 弹性分布式数据集(RDDS) 并行集合 外部数据集 RDD ...
spark的Standalone模式安装 一、安装流程 1、将spark-2.2.0-bin-hadoop2.7.tgz 上传到 /usr/local/spark/ 下,然后解压 2、进入到conf中修改名字 改为 .sh 结尾的 3、编辑 spark-env.sh 4、修改slaves 的...
知识点介绍、代码演示、逻辑分析、灵活举例、使用图形的方式详细演示代码的流程和细节、整合企业级实战案例,全面讲解并突出重点,让学习也变成一种快乐。 课程亮点 1,知识体系完备,阶段学习者都能学有所获。 2,...
Spark-Core文档是本人经三年总结笔记汇总而来,对于自我学习Spark核心基础知识非常方便,资料中例举完善,内容丰富。具体目录如下: 目录 第一章 Spark简介与计算模型 3 1 What is Spark 3 2 Spark简介 3 3 Spark...
对spark源码进行分析,从如何搭建standalone集群到提交用户程序
Spark Standalone模式集成HDFS配置清单,教你如何配置spark和hdfs平台。由于Linux的防火墙限制,初学者嫌麻烦可以关闭防火墙。
三种方式的spark on kubernetes对比,第一种:spark原生支持Kubernetes资源调度;第二种:google集成的Kubernetes的spark插件sparkoperator;第三种:standalone方式运行spark集群
spark-standalone-cluster-on-docker:通过在Docker上使用JupyterLab接口构建自己的集群,学习Scala,Python(PySpark)和R(SparkR)中的Apache Spark