- 浏览: 21526 次
- 性别:
- 来自: 长春
最近访客 更多访客>>
文章分类
最新评论
-
xxjjyy2008:
你的代码我运行报错。。。11/12/08 12:48:39 I ...
Hadoop The Definitive Guide 2nd Edition 读书笔记1 -
dongtianzhe:
恩,谢谢提醒,笔误<div class="qu ...
Hadoop The Definitive Guide 2nd Edition 读书笔记4 -
leibnitz:
很好,这么快就看2nd了!确认一下,上面的描述中:能力调度算法 ...
Hadoop The Definitive Guide 2nd Edition 读书笔记4
MapReduce工作流程:
job提交流程:
1.client节点在提交job之前,先要求JobTracker分配一个新的job id;
2.检查输出路径是否已经存在,如果存在则报错;
3.将job划分成inputsplit,mapreduce程序是以inputsplit作为单位执行的,这些splits会拷贝到HDFS中;
4.将运行job所需的资源,如jar文件、配置文件、inputsplits等上传到HDFS以jobid命名的目录中;
5.client节点通知jobtracker可以运行job。
在提交流程中将job分解成inputsplits并存到HDFS中,这些splits在map操作的时候直接作为map的输入。
job初始化流程:
当jobtracker收到job提交请求后(通过调用jobsubmit),会将这个job放入一个内部的队列等待job调度器的调度,当调度到这个job后,会将其从队列中摘除并对其进行初始化。job的初始化会创建一个封装了tasks、任务状态和执行日志的对象。
job调度器会为每一个split创建一个map任务,而reduce任务的数量是人工指定的,在以前的笔记中我们说过,建立map任务的时候会尽量保证每一个map任务执行所需要的数据存储在本地,所以创建map任务的时候会以本地优先的原则,如果不能再slpit本地创建,会在离存储这个split最近的节点上创建map任务。
任务分配:
每个tasktracker都会周期性的向jobtracker发送心跳通知jobtracker自己是否准备好接受一个新的任务的执行,如果准备好,则jobtracker分配一个任务,任务的执行结果通过心跳的返回值返回。
jobtracker是这样为tasktracker分配任务(task)的:首先从job队列中选择一个job,然后再这个job相关的tasks中为tasktracker选择一个task。
每个tasktracker可以执行的map任务和reduce任务的总数量是固定的,这个数量被称为插槽数,它是根据cpu核数和内存大小确定的。注意tasktracker会先填充map任务的插槽。将比如一台机器cpu是四核的,这台机器可能能并行执行两个map任务或者两个reduce任务,对于这个tasktracker,jobtracker会先选择一个map任务填充空的map插槽,map插槽满了后,再填充reduce插槽。
JobTracker在为takdtracker调度map任务的时候,会考虑这个map任务需要的split的位置,jobtracker会尽量选择与split距离最近的位置,首先尝试在split所在的节点填充map任务插槽,失败后悔在弄一机架的不同节点尝试,否则在不同机架上填充。而对于reduce任务,由于reduce任务所需的数据都是map任务的结果分区后通过网络传输的,大部分数据都不在本地,所以只需要顺序的分配reduce任务即可。
任务执行:
既然tasktracker已经获得task了,就可以开始执行任务了。首先tasktracker会从HDFS拷贝执行需要的资源文件,包括jar文件等,将非jar文件放到一个工作目录中,然后创建一个taskrunner实例来运行task。
为了防止用户定义的map任务和reduce任务影响到tasktracker,TaskRunner会为每个task创建一个新的jvm来运行任务。
进展与状态更新:
在job的运行过程中,用户需要获得job处理过程中的信息反馈,在hadoop中一个job和他的每个task都有一个status,包括job的状态、task的状态、mapreduce的任务进展、计数器的值等等。
在一个任务的运行过程中,任务以任务完成的百分比来保持着对任务进展的追踪,任务的进展程度被称为progress。对于map任务,progress是输入数据被处理的百分比。对于reduce任务,progress跟混洗的三个阶段相关(后面会讲到混洗分为三个阶段:copy、sort、reduce)。比如一个reduce任务执行到reduce阶段,reduce阶段已经处理了一半的输入,那么progress就是5/6,因为copy阶段和sort阶段都已完成,各占1/3,reduce阶段完成了一半,占1/6,加起来就是5/6。
容错机制:
下面我们看一下hadoop是怎样处理任务执行过程的发生的错误的。
1.任务失败:
最常见的出错情况是用户自定义的map函数和reduce函数在执行过程中抛出runtimeexception,当异常被抛出后,JVM子进程会想父进程报告错误,并写入到用户日志中,tasktracker在日志中标记这个任务失败,然后释放这个任务插槽等待下一个任务的执行。
还有一种错误是JVM子进程异常终止:执行任务的JVM子进程会不断的像父进程报告任务执行的信息,当执行任务的JVM子进程突然异常终止后,父进程不会收到终止进程的报告信息,如果父进程10分钟之内收不到子进程的报告信息,这个任务被认为执行失败。
当jobtracker被通知一个任务执行失败,jobtracker重新调度这个任务,并且jobtracker尽量保证这个任务不会被调度到上一个执行失败的tasktracker中。当一个任务失败4次后,整个job执行失败。
对于一些job来说,允许一些任务的失败,我们可以通过设置mapred.max.map.failures.percent和mapred.max.reduce.failures.percent来说明允许失败的任务百分比,任务失败量没达到这个百分比时job不会终止。
2.tasktracker崩溃
当tasktracker当机或者执行速度缓慢的时候,会停止向jobtracker发送心跳信息。当jobtracker 10分钟之内没有收到任何这个tasktracker发送的心跳信息后,jobtracker会通知这个tasktracker从调度队列中将这个节点删除,这个节点执行的所有task都会被重新调度。
job调度:
Hadoop中有多种调度算法可供选择,主要有FIFO调度算法、公平调度算法和能力调度算法。默认算法是FIFO调度。
FIFO不用多说。
公平算法的目标是给每个用户公平的分享集群的计算能力。每个用户(按用户名)都有自己的资源池(poll),用户将要提交的job放在自己的池中,可以指定一个pool需要的最少的插槽数。在公平调度算法下每个用户获得的集群计算能力是相同的,算法是按map任务插槽数和reduce任务插槽数来调度的。每个pool中的jobs平均分配获得的计算能力。如下图:
当一个pool占用的插槽数少于设置的值,就会抢占超过插槽数的那个pool让其释放多占用的那些插槽。
公平调度算法的库在contrib/fairscheduler目录下,要使用这个算法要将jar文件拷贝到lib目录中,并且设置mapred.jobtracker.taskScheduler的值为org.apache.hadoop.mapred.FairScheduler。
能力调度算法中,每个用户对应一个队列(类似于pool),每个队列被分配一个能力值(插槽数),但是每个队列中都是通过FIFO调度的,然后以每个队列为单位来执行公平调度算法。
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与能力调度算法一样了)。
混洗和排序:
在mapreduce过程中,map输出的结果默认是按照key进行排序的,这个排序的过程加上与将map的输出结果传送到reducer作为输入的过程统称为混洗。理解混洗的过程对于理解整个hadoop很有帮助,书中也提到混洗就是hadoop发挥它威力的地方。
1. map side:
map函数执行后会不断的产生结果,这些结果不是简单的写入磁盘的。每个map任务都有一个循环队列,map输出结果首先会存放在队列中,当队列中存放的内容超过一个门限值的时候(通过io.sort.spill.percent设置,默认为0.8, 80%),一个后台线程将队列中的内容写到磁盘中,此时map结果的写入到队列的过程并没有停止,当队列慢了以后,map现成会被阻塞直到队列中所有的数据都写入到磁盘。
在队列的内容被写入磁盘之前,线程首先将数据进行分组,分组的自然是按照最终会传送到哪个reducer进行。对于每个组,线程会对组中的数据按key排序,如果声明了combiner函数,在此时调用,然后将结果写入到一个文件中。
每次队列达到门限值的时候,都会产生一个这样的文件,文件达到一定数目或map任务要结束的时候,这些文件会merge成一个已经分组的有序的文件,如果声明了combiner函数,并且至少有三个这样的中间文件进行merge,就会调用此combiner。最终将这个已分区的有序的文件写入磁盘中。
2. reduce side:
在reduce side,混洗分为三个阶段:拷贝阶段、排序(归并)阶段、reduce阶段
reduce task默认有5个线程来拷贝已完成的map任务的相应分区。当map任务完成并将最终的那个文件写入到磁盘后,拷贝就会开始,reduce task不会等所有的map task都完成,而是有map task完成后,拷贝阶段就开始。reduce task将map task的结果通过http协议传送到队列中,当超过队列规定的容量或者已经获得从map task结果的数目达到门限值,就开始将这些数据写入磁盘中。
当所有的map task的结果都拷贝结束后,reduce task进入排序(归并)阶段,这个阶段会按照设置的归并因子来进行归并,比如有50个map结果,归并因子是10,则会归并5次,每次将10个文件归并为一个文件。
进行一次归并后,便进入到reduce阶段,将上阶段生成的多个文件作为reduce的输入,进行reduce操作,获得的结果进行最后一个归并,得到最终结果并将结果写入到HDFS中。
至此整个mapreduce的执行过程结束了,整个过程书上的图描述的很清楚:
我们可以优化混洗的过程来优化整个job的性能。在map side,我们应该尽量避免map的结果不断的写入到磁盘,可以提高io.sort.mb来增加循环队列的容量;在reduce side,应尽量保证中间数据都在内存中,我们可以设置接受map结果的门限值为0和设置缓冲区溢出百分比为100%来获得最佳性能。
任务执行控制:
上面我们介绍了mapreduce的执行过程,下面我们看一下用户可以通过那些手段来控制执行过程。
1.speculative execution
通过上面的学习,我们知道mapduce模型是将job分解成tasks来并行执行这些任务,但是如果其中一个task执行速度缓慢,将会影响到整个job。
当一个job被分解成成千上万个tasks,由于硬件或软件的原因,其中有一部分task执行缓慢是很正常的现象。但是job是可以正常执行,只不过速度要慢很多。在这种情况下,hadoop不会去诊断和修复执行缓慢的任务,而是尝试去发现执行缓慢的任务并运行另一个相同的任务作为备份。这种处理方式被称为speculative execution.
注意speculative execution并不是同时运行两个相同task,来对比运行效率,这样会两非集群的计算资源。
在所有任务运行之后,如果有任务执行了很长的时间(至少一分钟),但是并没有像其他任务那样很大的进展(progress),这是就为这个任务新建一个新的speculative任务,当这两个向东的任务其中一个结束后,另一个就会被杀死。
speculative execution默认虽然speculative execution的目的是提高job的执行效率,但是在一个很忙的集群中,speculative execution是很费计算资源的,所以我们可以关闭他。
2.JVM重用
默认下,任务执行的JVM是执行时创建的,但是对于那些很小的任务来说是很浪费资源的,这时就可以启动JVM重用功能。
3.跳过不合法的记录
在大的数据集中,有很多记录的格式是不合法的,如果map或reduce遇到不合法的记录而抛出异常,会导致任务失败从而使整个job执行失败,这是我们可以利用hadoop中的skipping mode来自动的跳过不合法记录。
skipping mode被打开后,当遇到不合法记录导致task失败时,tasktracker会重新执行任务并跳过导致异常的记录,由于重新启动任务会占用很多资源,所以一个任务失败两个后才会启动skipping mode。所以skipping mode被打开后含非法记录的map任务执行顺序是这样的:
失败一次;
失败两次;
skipping mode被打开,失败第三次,并记录引起异常的记录;
第四次成功。
注意默认skipping mode值会发现并跳过一个不合法记录,可以设置mapred.map.max.attempts来增加跳过个数。
默认skipping mode是关闭的,如果我们要打开skipping mode,必须用老的api进行job的提交,并且要添加一行SkipBadRecords.setMapperMaxSkipRecords(conf, 1);第二个参数就是可跳跃的非法记录数。下面我们写一个程序做测试:
首先处理一下输入数据,在第一行添加一行非法记录:
map程序:
reduce程序:
执行程序:
看一下控制台输出:
可以看到会map任务会失败三次,前两个失败后,会打开skipping mode,打开后第三次失败会记录那个引起异常的记录,第四次成功了。
执行完输出目录:
结果如下:
输出目录总的skip文件保存被跳过的记录,这个文件时sequence file,可以在控制台看一下这个文件的内容:
job提交流程:
1.client节点在提交job之前,先要求JobTracker分配一个新的job id;
2.检查输出路径是否已经存在,如果存在则报错;
3.将job划分成inputsplit,mapreduce程序是以inputsplit作为单位执行的,这些splits会拷贝到HDFS中;
4.将运行job所需的资源,如jar文件、配置文件、inputsplits等上传到HDFS以jobid命名的目录中;
5.client节点通知jobtracker可以运行job。
在提交流程中将job分解成inputsplits并存到HDFS中,这些splits在map操作的时候直接作为map的输入。
job初始化流程:
当jobtracker收到job提交请求后(通过调用jobsubmit),会将这个job放入一个内部的队列等待job调度器的调度,当调度到这个job后,会将其从队列中摘除并对其进行初始化。job的初始化会创建一个封装了tasks、任务状态和执行日志的对象。
job调度器会为每一个split创建一个map任务,而reduce任务的数量是人工指定的,在以前的笔记中我们说过,建立map任务的时候会尽量保证每一个map任务执行所需要的数据存储在本地,所以创建map任务的时候会以本地优先的原则,如果不能再slpit本地创建,会在离存储这个split最近的节点上创建map任务。
任务分配:
每个tasktracker都会周期性的向jobtracker发送心跳通知jobtracker自己是否准备好接受一个新的任务的执行,如果准备好,则jobtracker分配一个任务,任务的执行结果通过心跳的返回值返回。
jobtracker是这样为tasktracker分配任务(task)的:首先从job队列中选择一个job,然后再这个job相关的tasks中为tasktracker选择一个task。
每个tasktracker可以执行的map任务和reduce任务的总数量是固定的,这个数量被称为插槽数,它是根据cpu核数和内存大小确定的。注意tasktracker会先填充map任务的插槽。将比如一台机器cpu是四核的,这台机器可能能并行执行两个map任务或者两个reduce任务,对于这个tasktracker,jobtracker会先选择一个map任务填充空的map插槽,map插槽满了后,再填充reduce插槽。
JobTracker在为takdtracker调度map任务的时候,会考虑这个map任务需要的split的位置,jobtracker会尽量选择与split距离最近的位置,首先尝试在split所在的节点填充map任务插槽,失败后悔在弄一机架的不同节点尝试,否则在不同机架上填充。而对于reduce任务,由于reduce任务所需的数据都是map任务的结果分区后通过网络传输的,大部分数据都不在本地,所以只需要顺序的分配reduce任务即可。
任务执行:
既然tasktracker已经获得task了,就可以开始执行任务了。首先tasktracker会从HDFS拷贝执行需要的资源文件,包括jar文件等,将非jar文件放到一个工作目录中,然后创建一个taskrunner实例来运行task。
为了防止用户定义的map任务和reduce任务影响到tasktracker,TaskRunner会为每个task创建一个新的jvm来运行任务。
进展与状态更新:
在job的运行过程中,用户需要获得job处理过程中的信息反馈,在hadoop中一个job和他的每个task都有一个status,包括job的状态、task的状态、mapreduce的任务进展、计数器的值等等。
在一个任务的运行过程中,任务以任务完成的百分比来保持着对任务进展的追踪,任务的进展程度被称为progress。对于map任务,progress是输入数据被处理的百分比。对于reduce任务,progress跟混洗的三个阶段相关(后面会讲到混洗分为三个阶段:copy、sort、reduce)。比如一个reduce任务执行到reduce阶段,reduce阶段已经处理了一半的输入,那么progress就是5/6,因为copy阶段和sort阶段都已完成,各占1/3,reduce阶段完成了一半,占1/6,加起来就是5/6。
容错机制:
下面我们看一下hadoop是怎样处理任务执行过程的发生的错误的。
1.任务失败:
最常见的出错情况是用户自定义的map函数和reduce函数在执行过程中抛出runtimeexception,当异常被抛出后,JVM子进程会想父进程报告错误,并写入到用户日志中,tasktracker在日志中标记这个任务失败,然后释放这个任务插槽等待下一个任务的执行。
还有一种错误是JVM子进程异常终止:执行任务的JVM子进程会不断的像父进程报告任务执行的信息,当执行任务的JVM子进程突然异常终止后,父进程不会收到终止进程的报告信息,如果父进程10分钟之内收不到子进程的报告信息,这个任务被认为执行失败。
当jobtracker被通知一个任务执行失败,jobtracker重新调度这个任务,并且jobtracker尽量保证这个任务不会被调度到上一个执行失败的tasktracker中。当一个任务失败4次后,整个job执行失败。
对于一些job来说,允许一些任务的失败,我们可以通过设置mapred.max.map.failures.percent和mapred.max.reduce.failures.percent来说明允许失败的任务百分比,任务失败量没达到这个百分比时job不会终止。
2.tasktracker崩溃
当tasktracker当机或者执行速度缓慢的时候,会停止向jobtracker发送心跳信息。当jobtracker 10分钟之内没有收到任何这个tasktracker发送的心跳信息后,jobtracker会通知这个tasktracker从调度队列中将这个节点删除,这个节点执行的所有task都会被重新调度。
job调度:
Hadoop中有多种调度算法可供选择,主要有FIFO调度算法、公平调度算法和能力调度算法。默认算法是FIFO调度。
FIFO不用多说。
公平算法的目标是给每个用户公平的分享集群的计算能力。每个用户(按用户名)都有自己的资源池(poll),用户将要提交的job放在自己的池中,可以指定一个pool需要的最少的插槽数。在公平调度算法下每个用户获得的集群计算能力是相同的,算法是按map任务插槽数和reduce任务插槽数来调度的。每个pool中的jobs平均分配获得的计算能力。如下图:
当一个pool占用的插槽数少于设置的值,就会抢占超过插槽数的那个pool让其释放多占用的那些插槽。
公平调度算法的库在contrib/fairscheduler目录下,要使用这个算法要将jar文件拷贝到lib目录中,并且设置mapred.jobtracker.taskScheduler的值为org.apache.hadoop.mapred.FairScheduler。
能力调度算法中,每个用户对应一个队列(类似于pool),每个队列被分配一个能力值(插槽数),但是每个队列中都是通过FIFO调度的,然后以每个队列为单位来执行公平调度算法。
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与能力调度算法一样了)。
混洗和排序:
在mapreduce过程中,map输出的结果默认是按照key进行排序的,这个排序的过程加上与将map的输出结果传送到reducer作为输入的过程统称为混洗。理解混洗的过程对于理解整个hadoop很有帮助,书中也提到混洗就是hadoop发挥它威力的地方。
1. map side:
map函数执行后会不断的产生结果,这些结果不是简单的写入磁盘的。每个map任务都有一个循环队列,map输出结果首先会存放在队列中,当队列中存放的内容超过一个门限值的时候(通过io.sort.spill.percent设置,默认为0.8, 80%),一个后台线程将队列中的内容写到磁盘中,此时map结果的写入到队列的过程并没有停止,当队列慢了以后,map现成会被阻塞直到队列中所有的数据都写入到磁盘。
在队列的内容被写入磁盘之前,线程首先将数据进行分组,分组的自然是按照最终会传送到哪个reducer进行。对于每个组,线程会对组中的数据按key排序,如果声明了combiner函数,在此时调用,然后将结果写入到一个文件中。
每次队列达到门限值的时候,都会产生一个这样的文件,文件达到一定数目或map任务要结束的时候,这些文件会merge成一个已经分组的有序的文件,如果声明了combiner函数,并且至少有三个这样的中间文件进行merge,就会调用此combiner。最终将这个已分区的有序的文件写入磁盘中。
2. reduce side:
在reduce side,混洗分为三个阶段:拷贝阶段、排序(归并)阶段、reduce阶段
reduce task默认有5个线程来拷贝已完成的map任务的相应分区。当map任务完成并将最终的那个文件写入到磁盘后,拷贝就会开始,reduce task不会等所有的map task都完成,而是有map task完成后,拷贝阶段就开始。reduce task将map task的结果通过http协议传送到队列中,当超过队列规定的容量或者已经获得从map task结果的数目达到门限值,就开始将这些数据写入磁盘中。
当所有的map task的结果都拷贝结束后,reduce task进入排序(归并)阶段,这个阶段会按照设置的归并因子来进行归并,比如有50个map结果,归并因子是10,则会归并5次,每次将10个文件归并为一个文件。
进行一次归并后,便进入到reduce阶段,将上阶段生成的多个文件作为reduce的输入,进行reduce操作,获得的结果进行最后一个归并,得到最终结果并将结果写入到HDFS中。
至此整个mapreduce的执行过程结束了,整个过程书上的图描述的很清楚:
我们可以优化混洗的过程来优化整个job的性能。在map side,我们应该尽量避免map的结果不断的写入到磁盘,可以提高io.sort.mb来增加循环队列的容量;在reduce side,应尽量保证中间数据都在内存中,我们可以设置接受map结果的门限值为0和设置缓冲区溢出百分比为100%来获得最佳性能。
任务执行控制:
上面我们介绍了mapreduce的执行过程,下面我们看一下用户可以通过那些手段来控制执行过程。
1.speculative execution
通过上面的学习,我们知道mapduce模型是将job分解成tasks来并行执行这些任务,但是如果其中一个task执行速度缓慢,将会影响到整个job。
当一个job被分解成成千上万个tasks,由于硬件或软件的原因,其中有一部分task执行缓慢是很正常的现象。但是job是可以正常执行,只不过速度要慢很多。在这种情况下,hadoop不会去诊断和修复执行缓慢的任务,而是尝试去发现执行缓慢的任务并运行另一个相同的任务作为备份。这种处理方式被称为speculative execution.
注意speculative execution并不是同时运行两个相同task,来对比运行效率,这样会两非集群的计算资源。
在所有任务运行之后,如果有任务执行了很长的时间(至少一分钟),但是并没有像其他任务那样很大的进展(progress),这是就为这个任务新建一个新的speculative任务,当这两个向东的任务其中一个结束后,另一个就会被杀死。
speculative execution默认虽然speculative execution的目的是提高job的执行效率,但是在一个很忙的集群中,speculative execution是很费计算资源的,所以我们可以关闭他。
2.JVM重用
默认下,任务执行的JVM是执行时创建的,但是对于那些很小的任务来说是很浪费资源的,这时就可以启动JVM重用功能。
3.跳过不合法的记录
在大的数据集中,有很多记录的格式是不合法的,如果map或reduce遇到不合法的记录而抛出异常,会导致任务失败从而使整个job执行失败,这是我们可以利用hadoop中的skipping mode来自动的跳过不合法记录。
skipping mode被打开后,当遇到不合法记录导致task失败时,tasktracker会重新执行任务并跳过导致异常的记录,由于重新启动任务会占用很多资源,所以一个任务失败两个后才会启动skipping mode。所以skipping mode被打开后含非法记录的map任务执行顺序是这样的:
失败一次;
失败两次;
skipping mode被打开,失败第三次,并记录引起异常的记录;
第四次成功。
注意默认skipping mode值会发现并跳过一个不合法记录,可以设置mapred.map.max.attempts来增加跳过个数。
默认skipping mode是关闭的,如果我们要打开skipping mode,必须用老的api进行job的提交,并且要添加一行SkipBadRecords.setMapperMaxSkipRecords(conf, 1);第二个参数就是可跳跃的非法记录数。下面我们写一个程序做测试:
首先处理一下输入数据,在第一行添加一行非法记录:
map程序:
import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, DoubleWritable> { private static final double MISSING = 99.9; @Override public void map(LongWritable key, Text value, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(14, 18); double airTemperature = Double.parseDouble(line.substring(104, 108)); if (airTemperature != MISSING) { output.collect(new Text(year), new DoubleWritable(airTemperature)); } } }
reduce程序:
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException { double maxValue = Double.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new DoubleWritable(maxValue)); } }
执行程序:
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SkipBadRecords; import org.apache.hadoop.mapreduce.Job; public class MaxTemperature { public static void main(String[] args) { if (args.length != 2) { System.err.println("参数错误"); System.exit(-1); } try { JobConf conf = new JobConf(MaxTemperature.class); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setJobName("ProductMR"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(DoubleWritable.class); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); // 打开skipping mode SkipBadRecords.setMapperMaxSkipRecords(conf, 1); JobClient.runJob(conf); } catch (IOException e) { // TODO Auto-generated catch block } } }
看一下控制台输出:
可以看到会map任务会失败三次,前两个失败后,会打开skipping mode,打开后第三次失败会记录那个引起异常的记录,第四次成功了。
执行完输出目录:
结果如下:
输出目录总的skip文件保存被跳过的记录,这个文件时sequence file,可以在控制台看一下这个文件的内容:
评论
2 楼
dongtianzhe
2011-04-15
恩,谢谢提醒,笔误
leibnitz 写道
很好,这么快就看2nd了!
确认一下,上面的描述中:
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与公平调度算法一样了)。
是不是应该改为:
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与能力调度算法一样了)。
确认一下,上面的描述中:
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与公平调度算法一样了)。
是不是应该改为:
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与能力调度算法一样了)。
1 楼
leibnitz
2011-04-10
很好,这么快就看2nd了!
确认一下,上面的描述中:
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与公平调度算法一样了)。
是不是应该改为:
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与能力调度算法一样了)。
确认一下,上面的描述中:
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与公平调度算法一样了)。
是不是应该改为:
能力调度算法实际上是允许不同用户或组织以FIFO的形式分割集群的计算能力(一个用户或组织对应一个队列),而公平算法是保证每个pool获得的计算能力(插槽数),在pool中的job不是按FIFO调度的,而是平均分配pool获得的插槽(也可以设置pool中按FIFO调度,这样就与能力调度算法一样了)。
发表评论
-
Hadoop The Definitive Guide 2nd Edition 读书笔记5
2010-12-17 14:30 2310之前我们学习了MapReduce ... -
Hadoop The Definitive Guide 2nd Edition 读书笔记3
2010-12-04 20:11 1414第四章是介绍Hadoop的IO ... -
Hadoop The Definitive Guide 2nd Edition 读书笔记2
2010-12-03 21:01 1413第三章介绍的是Hadoop的分布式文件系统HDFS相关的内容。 ... -
Hadoop The Definitive Guide 2nd Edition 读书笔记1
2010-12-02 13:34 1827Hadoop The Definitive Guide 2nd ... -
MapReduce执行过程
2010-11-19 13:21 1418在研究hadoop之前,有必要将hadoop两个核心技术HDF ... -
ubuntu9.04+hadoop0.20.2+eclipse环境搭建
2010-11-18 20:47 2246看hadoop也有一段时间了,今天花了一些时间把整个开发环境搭 ...
相关推荐
Hadoop The Definitive Guide 2nd Edition.pdf
Hadoop The Definitive Guide 2nd Edition
Hadoop The Definitive Guide 2nd Edition-指南第二版 这个是Hadoop指南英文第二版,高清文字版本,可以复制文字内容。
hadoop指南第二版 hadoop指南2 Hadoop The Definitive Guide 2nd Edition
书名:Hadoop The Definitive Guide 语言:英文 The rest of this book is organized as follows. Chapter 2 provides an introduction to MapReduce. Chapter 3 looks at Hadoop filesystems, and in particular ...
Hadoop The Definitive Guide, 4th Edition.pdf(Hadoop权威指南第4版英文版O'REILLY)
The fourth edition covers Hadoop 2 exclusively. The Hadoop 2 release series is the current active release series and contains the most stable versions of Hadoop. There are new chapters covering YARN ...
Hadoop The Definitive Guide 3rd EditionHadoop The Definitive Guide 3rd Edition
Hadoop The Definitive Guide, 2nd Edition 第二版来了原版,下载吧
Hadoop The Definitive Guide, 4th Edition
Hadoop The Definitive Guide (4th Edition)
1449311520 Hadoop The Definitive Guide 3rd Edition 第三版全,2012、5月
Hadoop The Definitive Guide 3rd Edition Storage and Analysis at Internet Scale By Tom White May 2012 Pages: 688
OReilly.Hadoop.The.Definitive.Guide.June.2009.RETAiL.eBOOk-sUppLeX Description Apache Hadoop is ideal for organizations with a growing need to process massive application datasets. Hadoop: The ...