1. 背景
推测执行(Speculative Execution)是指在分布式集群环境下,因为程序BUG,负载不均衡或者资源分布不均等原因,造成同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task(比如:一个job的某个task进度只有10%,而其他所有task已经运行完毕),则这些task拖慢了作业的整体执行进度,为了避免这种情况发生,Hadoop会为该task启动备份任务,让该speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果。
推测执行优化机制采用了典型的以空间换时间的优化策略,它同时启动多个相同task(备份任务)处理相同的数据块,哪个完成的早,则采用哪个task的结果,这样可防止拖后腿Task任务出现,进而提高作业计算速度,但是,这样却会占用更多的资源,在集群资源紧缺的情况下,设计合理的推测执行机制可在多用少量资源情况下,减少大作业的计算时间。
关于MRv1中的推测执行机制,我已经在“Hadoop中Speculative Task调度策略”(http://dongxicheng.org/mapreduce/hadoop-speculative-task/)中进行了介绍,有兴趣的读者可先阅读这篇文章。
2. MRv2中采用的算法
MRv2限定了任意一个作业的备份任务的数目上线,该数目是以下三个数值的最大值:
(1)MINIMUM_ALLOWED_SPECULATIVE_TASKS(常量10)
(2)PROPORTION_TOTAL_TASKS_SPECULATABLE(常量0.01) * totalTaskNumber
(3)PROPORTION_RUNNING_TASKS_SPECULATABLE (常量0. 1)* numberRunningTasks
当决定一个任务是够可以启动备份任务时,采用了下面的计算方法:
总是取speculationValue值最大的任务并为之启动备份任务,speculationValue计算方法为:
speculationValue= estimatedEndTime – estimatedReplacementEndTime
其中,estimatedEndTime是通过预测算法推测的该任务的最终完成时刻,计算方法为:
estimatedEndTime = estimatedRunTime + taskAttemptStartTime
其中,taskAttemptStartTime为该任务的启动时间,而estimatedRunTime为推测出来的任务运行时间,计算方法如下:
estimatedEndTime = (timestamp – start) / Math.max(0.0001, progress)
其中,timestamp为当前时刻,而start为任务开始运行时间,(timestamp-start)表示已经运行时间,progress为任务运行进度(0.0~1.0)。
estimatedReplacementEndTime含义为:如果此刻启动该任务,(可推测出来的)任务最终可能的完成时刻。很明显,如果estimatedReplacementEndTime大于estimatedEndTime,则没必要启动备份任务,因为即使启动了,它的完成时刻也会大于当前正在运行任务的完成时刻,只有当estimatedReplacementEndTime小于estimatedEndTime时,才有必要启动备份任务。而MRv2总是选择speculationValue值最大的任务并为之启动备份任务,且启动备份任务之前需检查是否满足以下条件:
(1) 每个任务最多只能有一个备份任务
(2) 已经完成的任务数目比例不小于MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE(0.05,即5%),只有这样,才能有足够的历史信息估算estimatedReplacementEndTime
estimatedReplacementEndTime计算过方法为:{当前时刻}+{已经成功运行完成的任务所使用的平均运行时间}。
简单提炼一下核心思想:在某一时刻,判断一个任务是否拖后腿或者是否是值得为其运行备份任务时,则首先假设为其启动一个备份任务,那我们估算一下它的完成时间estimatedReplacementEndTime,同样,如果按照此刻该任务的计算速度,我们可以估算一下该任务最有可能的完成时间estimatedEndTime,如果estimatedEndTime与estimatedReplacementEndTime之差越大,则表明为该任务启动备份任务的价值越大。
3. 推测执行相关类
Speculator是一个服务,它由DefaultSpeculator实现,DefaultSpeculator接受并处理以下几种事件:
(1)ATTEMPT_STATUS_UPDATE
(2)ATTEMPT_START
(3)TASK_CONTAINER_NEED_UPDATE
(4) JOB_CREATE
DefaultSpeculator每隔一段事件会扫描一次所有正在运行的任务,如果一个任务可以启动备份任务,则会向Task发出一个T_ADD_SPEC_ATTEMPT事件,以启动另外一个任务实例。
DefaultSpeculator依赖于一个执行时间估算器,默认采用了LegacyTaskRuntimeEstimator,此外,MRv2还提供了另外一个实现:ExponentiallySmoothedTaskRuntimeEstimator,该实现采用了平滑算法对结果进行平滑处理。
分享到:
相关推荐
YARN(MRv2)搭建
yarn-v0.23.2.tar.gz 在安装ambari,源码编译的时候下载的文件有问题 手动下载 地址 https://github.com/yarnpkg/yarn/releases/download/v0.23.2/yarn-v0.23.2.tar.gz
1.作业完成时间取决于最慢的任务完成时间 2.推测执行机制 3.执行推测任务的前提条件 4.不能启用推测执行机制情况
前提:配置好执行脚本的主机到其他主机的ssh登录 脚本使用:vim编辑脚本,按照自己的配置修改主机号,我的是hadoop1、2是NN;hadoop2、3是Spark Master;hadoop3还是RM;hadoop4、5、6是DN、NM、Spark Worker。编辑...
yarn-v1.22.5.tar.gz
hadoop YARN应用开发与核心源码剖析
官网直接安装的不支持vite2+vue3的 主要修复: 1.build或者dev项目时不报错,兼容vite2,vue3; 2.加入deep监听watch,直接在父组件中修改图表中的config参数即可完成图表中的数据变更。 yarn npm cnpm pnpm可通用...
YARN的工作机制.doc
关于Yarn内存分配与管理,主要涉及到了ResourceManage、ApplicationMatser、NodeManager这几个概念,相关的优化也要紧紧围绕着这几方面来开展。这里还有一个Container的概念,现在可以先把它理解为运行map/reduce ...
官网直接安装的不支持vite2/3+vue2的 主要修复: 1.build或者dev项目时不报错,兼容vite2/vite3,vue2; 2.加入deep监听watch,直接在父组件中修改图表中的config参数即可完成图表中的数据变更。 yarn npm cnpm ...
yarn-workspace-plugin-since
SPARK2_ON_YARN-2.4.0 jar包下载
从架构、设计模式和代码对Yarn进行的详细剖析和原理机制的分析
Hadoop技术内幕深入解析YARN架构设计与实现原理
Flink 2018峰会 阿里大牛的技术, 在线教程有github:**,第7个文档 详细的讲解Flink和YARN及kubenete的集成,值得收藏
这将使git存储库中的.yarn/cache不存在。 它使用并重写历史记录,因此使用后果自负。 它的工作方式是先将带有--mirror标志的--mirror到临时路径中。 然后,它将计算.yarn/cache中文件的git对象ID列表,并将其删除。...
YARN配置、启动与验证 YARN配置、启动与验证 序号 任务名称 任务一 YARN组件参数配置 任务二 MapReduce组件参数配置 任务三 配置SSH无密钥登录(slave1为主节点) 任务四 分发YARN与MapReduce配置文件 任务五 启动...
Apache Flink 进阶(四):Flink on Yarn/K8s 原理剖析及实践 41 Apache Flink 进阶(五):数据类型和序列化 60 Apache Flink 进阶(六):Flink 作业执行深度解析 71 Apache Flink 进阶(七):网络流控及反压剖析...
深入解析YARN架构设计与实现原理,深入解析YARN架构设计与实现原理深入解析YARN架构设计与实现原理深入解析YARN架构设计与实现原理
npm install -g yarn yarn install 安装失败,使用官方下载的yarn.lock文件