Anatomy of a MapReduce Job
In MapReduce, a YARN application is called a Job. The implementation of the Application Master provided by the MapReduce framework is called MRAppMaster
.
Timeline of a MapReduce Job
This is the timeline of a MapReduce Job execution:
- Map Phase: several Map Tasks are executed
- Reduce Phase: several Reduce Tasks are executed
Notice that the Reduce Phase may start before the end of Map Phase. Hence, an interleaving between them is possible.
Map Phase
We now focus our discussion on the Map Phase. A key decision is how many MapTasks the Application Master needs to start for the current job.
What does the user give us?
Let’s take a step back. When a client submits an application, several kinds of information are provided to the YARN infrastucture. In particular:
- a configuration: this may be partial (some parameters are not specified by the user) and in this case the default values are used for the job. Notice that these default values may be the ones chosen by a Hadoop provider like Amanzon.
- a JAR containing:
- a
map()
implementation - a combiner implementation
- a
reduce()
implementation
- a
- input and output information:
- input directory: is the input directory on HDFS? On S3? How many files?
- output directory: where will we store the output? On HDFS? On S3?
The number of files inside the input directory is used for deciding the number of Map Tasks of a job.
How many Map Tasks?
The Application Master will launch one MapTask for each map split. Typically, there is a map split for each input file. If the input file is too big (bigger than the HDFS block size) then we have two or more map splits associated to the same input file. This is the pseudocode used inside the method getSplits()
of the FileInputFormat
class:
num_splits = 0
for each input file f:
remaining = f.length
while remaining / split_size > split_slope:
num_splits += 1
remaining -= split_size
where:
split_slope = 1.1
split_size =~ dfs.blocksize
Notice that the configuration parameter mapreduce.job.maps
is ignored in MRv2 (in the past it was just an hint).
MapTask Launch
The MapReduce Application Master asks to the Resource Manager for Containers needed by the Job: one MapTask container request for each MapTask (map split).
A container request for a MapTask tries to exploit data locality of the map split. The Application Master asks for:
- a container located on the same Node Manager where the map split is stored (a map split may be stored on multiple nodes due to the HDFS replication factor);
- otherwise, a container located on a Node Manager in the same rack where the the map split is stored;
- otherwise, a container on any other Node Manager of the cluster
This is just an hint to the Resource Scheduler. The Resource Scheduler is free to ignore data locality if the suggested assignment is in conflict with the Resouce Scheduler’s goal.
When a Container is assigned to the Application Master, the MapTask is launched.
Map Phase: example of an execution scenario
This is a possible execution scenario of the Map Phase:
- there are two Node Managers: each Node Manager has 2GB of RAM (NM capacity) and each MapTask requires 1GB, we can run in parallel 2 containers on each Node Manager (this is the best scenario, the Resource Scheduler may decide differently)
- there are no other YARN applications running in the cluster
- our job has 8 map splits (e.g., there are 7 files inside the input directory, but only one of them is bigger than the HDFS block size so we split it into 2 map splits): we need to run 8 Map Tasks.
Map Task Execution Timeline
Let’s now focus on a single Map Task. This is the Map Task execution timeline:
- INIT phase: we setup the Map Task
-
EXECUTION phase: for each (key, value) tuple inside the map split we run the
map()
function - SPILLING phase: the map output is stored in an in-memory buffer; when this buffer is almost full then we start (in parallel) the spilling phase in order to remove data from it
- SHUFFLE phase: at the end of the spilling phase, we merge all the map outputs and package them for the reduce phase
MapTask: INIT
During the INIT phase, we:
- create a context (
TaskAttemptContext.class
) - create an instance of the user
Mapper.class
- setup the input (e.g.,
InputFormat.class
,InputSplit.class
,RecordReader.class
) - setup the output (
NewOutputCollector.class
) - create a mapper context (
MapContext.class
,Mapper.Context.class
) - initialize the input, e.g.:
- create a
SplitLineReader.class
object - create a
HdfsDataInputStream.class
object
MapTask: EXECUTION
The EXECUTION phase is performed by the run
method of the Mapper
class. The user can override it, but by default it will start by calling the setup
method: this function by default does not do anything useful but can be override by the user in order to setup the Task (e.g., initialize class variables). After the setup, for each <key, value> tuple contained in the map split, the map()
is invoked. Therefore, map()
receives: a key a value, and a mapper context. Using the context, a map
stores its output to a buffer.
Notice that the map split is fetched chuck by chunk (e.g., 64KB) and each chunk is split in several (key, value) tuples (e.g., usingSplitLineReader.class
). This is done inside the Mapper.Context.nextKeyValue
method.
When the map split has been completely processed, the run
function calls the clean
method: by default, no action is performed but the user may decide to override it.
MapTask: SPILLING
As seen in the EXECUTING phase, the map
will write (using Mapper.Context.write()
) its output into a circular in-memory buffer (MapTask.MapOutputBuffer
). The size of this buffer is fixed and determined by the configuration parametermapreduce.task.io.sort.mb
(default: 100MB).
Whenever this circular buffer is almost full (mapreduce.map. sort.spill.percent
: 80% by default), the SPILLING phase is performed (in parallel using a separate thread). Notice that if the splilling thread is too slow and the buffer is 100% full, then the map()
cannot be executed and thus it has to wait.
The SPILLING thread performs the following actions:
- it creates a
SpillRecord
andFSOutputStream
(local filesystem) - in-memory sorts the used chunk of the buffer: the output tuples are sorted by (partitionIdx, key) using a quicksort algorithm.
- the sorted output is split into partitions: one partition for each ReduceTask of the job (see later).
- Partitions are sequentially written into the local file.
How Many Reduce Tasks?
The number of ReduceTasks for the job is decided by the configuration parameter mapreduce.job.reduces
.
What is the partitionIdx associated to an output tuple?
The paritionIdx of an output tuple is the index of a partition. It is decided inside the Mapper.Context.write()
:
partitionIdx = (key.hashCode() & Integer.MAX_VALUE) % numReducers
It is stored as metadata in the circular buffer alongside the output tuple. The user can customize the partitioner by setting the configuration parameter mapreduce.job.partitioner.class
.
When do we apply the combiner?
If the user specifies a combiner then the SPILLING thread, before writing the tuples to the file (4), executes the combiner on the tuples contained in each partition. Basically, we:
- create an instance of the user
Reducer.class
(the one specified for the combiner!) - create a
Reducer.Context
: the output will be stored on the local filesystem - execute
Reduce.run()
: see Reduce Task description
The combiner typically use the same implementation of the standard reduce()
function and thus can be seen as a local reducer.
MapTask: end of EXECUTION
At the end of the EXECUTION phase, the SPILLING thread is triggered for the last time. In more detail, we:
- sort and spill the remaining unspilled tuples
- start the SHUFFLE phase
Notice that for each time the buffer was almost full, we get one spill file (SpillReciord
+ output file). Each Spill file contains several partitions (segments).
MapTask: SHUFFLE
Reduce Phase
[…]
YARN and MapReduce interaction
出处:http://ercoppa.github.io/HadoopInternals/Container.html
相关推荐
main函数的执行过程做了一个粗略的跟踪描叙,对一个烧录了程序的STM32开发板从启动复位到进入用户main函数的过程有了一个大概的了解,但是有很多问题感觉还是模模糊糊,因此,今天又把KEIL MDK编译、链接后生成的map...
(3)VC程序运行过程中用户可以在编辑框上输入经纬度坐标,点击按钮后地图跳转到坐标位置 (4)压缩包里含google map API中文参考手册,在javascript中用的到的。 对于想在VC中调用google map 的新手是极大的帮助。
大数据企业级调优的完整过程:9.1 Fetch抓取;9.2 本地模式;9.3 表的优化(9.3.1 小表、大表Join;9.3.2 大表Join大表;9.3.3 MapJoin;9.3.4 Group By;9.3.5 Count(Distinct) 去重统计;9.3.6 笛卡尔积;9.3.7 ...
考虑一下在JavaScript构建过程中执行多个处理步骤的场景: 将您的代码转换为JavaScript(CoffeeScript,TypeScript) 依赖性解析和打包缩小这些步骤中的每一个都创建一个Source-Map。 但是最后,您需要从最小代码到...
MapMagic 2(免费)世界生成器官方模块。...也随附函数节点,从而可以在子图中执行复杂的生成过程。将它们视作含有输入和输出连接器的生物群落。 请注意,必须使用 MapMagic 2 的现有安装才能使用该模块。
使用高斯过程回归(GPR)模型和导数观测值(也称为局部线性模型)的非线性动力学系统的最大后验(MAP)状态估计量。 这是我的CGNCC 2016论文随附的源代码。 X. Yang,B。Peng,H。Zhou和L. Yang,[使用高斯过程和...
介绍了GSM移动信令网中移动应用部分(MAP)的监测问题,主要...MAP信令监测系统的实现对更好地维护管理GSM信令网,优化GSM网络提高网络的运行质量都是不可缺少的。实测结果对MAP信令检测系统的实现有一定的参考价值。
它被认为是可以通过MAP,INAP和CAP建模的电信过程领域中的一种主动知识库。 该项目支持两种操作模式–用于开发和测试方案的仿真模式,以及用于针对网络执行方案的网络模式。 与网络的通信使用SCTP和M2PA的简易版本...
因为更新这些项目可能是一个漫长的过程,所以它是手动执行的,而不是在构建时执行: iD预设数据库( ) 的iD预设图标( , ) 名称建议索引( ) NSI品牌形象(摘自Facebook / twitter / wikipedia) WebLate...
学有余力的同学可以在 map-reduce 的基础上添加 combine 与 shuffle 过程,并可以计算线程运行时间来考察这些过程对算法整体的影响。 提示:实现 shuffle 过程时应保证每个 reduce 节点的工作量尽量相当,来减少整体...
本章介绍了MapReduce编程模型的相关知识。...在这几个阶段中,Shuffle阶段非常关键,必须深刻理解这个阶段的详细执行过程 MapReduce具有广泛的应用,比如关系代数运算、分组与聚合运算、矩阵-向量乘法、矩阵乘法等。
使用RTAB-MAP映射我的世界-GraphSLAM 该机器人程序使用图形技术执行同时定位和映射(SLAM)。 在凉亭中模拟了机器人及其环境。 该机器人配备了深度相机和激光雷达。 该程序使用RTAB_MAP ros软件包。 RTAB_MAP软件包...
本节将对 Hadoop MapReduce 的工作机制进行介绍,主要从 MapReduce 的作业执行流程和 Shuffle 过程方面进行阐述。通过加深对 MapReduce 工作机制的了解,可以使程序开发者更合理地使用 MapReduce 解决实际问题。 ...
这项工作开发了一个基于模糊认知图的智能工具,用于监督过程控制...主管的目标是在正常和关键条件下运行该过程。关于两种条件下的过程行为的专业知识用于构建D-FCM监督者。给出了仿真结果,以验证所提出的智能监控器。
Map:俗点说就是直接把数据打散,一份数据把它切分成多份小的数据进行处理,这个过程可以称之为Map。 Reduce:有打散当然要有聚合,把处理完的数据再重新合成一个,这个过程称之为Reduce。 这两个操作实际上就是...
编译器:编译过程主要分为“词法分析”,“语法分析”和“代码生成”。作用域(范围):根据名称查找变量的一套规则,用于管理引擎如何在当前作用域以及嵌套的子作用域中根据标识符名称进行变量查找。执行上下文...
我们进一步证明在对偶分解中添加BPSP等效于在LP松弛中强制执行一组k元循环不等式,这为添加BPSP的过程提供了新的见识。 此外,提出了一种选择BPSP的新准则,方法是首先选择违反的k元周期不等式,然后将尽可能多的...
根据柴油机运行工况的特点,提出了变步长试验点,减小试验量,引入均方根误差衡量不同试验点数拟合控制MAP的精度,结合实例给出了电控初始MAP的散点曲面拟合过程,并在493共轨柴油机上进行初始MAP的控制效果试验,...
遇到的问题和解决方法3.1 第一步运行kde.py3.2 第二步运行skeleton.py 1. 论文链接 Map Inference in the Face of Noise and Disparity 代码下载地址 2. 安装环境 python 2.7.15 win64位 Opencv 2.4.13 win64位 ...
笔记这是一个周末破解,设置过程将有些手动。 如果您希望该工具的版本运行良好,请查看 。 所有繁重的工作都外包给了OpenStreetMap软件堆栈的某些核心部分: osmium用于过滤OSM PBF出口gpsbabel用于将FIT文件转换为...