`
ladymaidu
  • 浏览: 685829 次
文章分类
社区版块
存档分类
最新评论

MapReduce编程基础

 
阅读更多

1.WordCount示例及MapReduce程序框架

2. MapReduce程序执行流程

3. 深入学习MapReduce编程(1)

4. 参考资料及代码下载

<1>. WordCount示例及MapReduce程序框架

首先通过一个简单的程序来实际运行一个MapReduce程序,然后通过这个程序我们来哦那个结一下MapReduce编程模型。

下载源程序:/Files/xuqiang/WordCount.rar,将该程序打包成wordcount.jar下面的命令,随便写一个文本文件,这里是WordCountMrtrial,并上传到hdfs上,这里的路径是/tmp/WordCountMrtrial,运行下面的命令:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ ./bin/hadoop jar wordcount.jar WordCount /tmp/WordCountMrtrial /tmp/result

如果该任务运行完成之后,将在hdfs的/tmp/result目录下生成类似于这样的结果:

gentleman 11

get 12
give 8
go 6
good 9
government 16

运行一个程序的基本上就是这样一个过程,我们来看看具体程序:

main函数中首先生成一个Job对象,Job job = new Job(conf, "word count");然后设置job的MapperClass,ReducerClass,设置输入文件路径FileInputFormat.addInputPath(job, new Path(otherArgs[0]));设置输出文件路径:FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));等待程序运行完成:System.exit(job.waitForCompletion(true) ? 0 : 1);可以看出main中仅仅是启动了一个job,然后设置该job相关的参数,具体实现MapReduce是mapper类和reducer类。

TokenizerMapper类中map函数将一行分割成<K2, V2>,然后IntSumReducer的reduce将<K2, list<V2>>转换成最终结果<K3, V3>。

通过这个示例基本上也能总结出简单的MapReduce编程的模型:一个Mapper类,一个Reducer类,一个Driver类。

<2>. MapReduce程序执行流程

这里所描述的执行流程更加注重是从程序的角度去理解,更加全面的流程可参考[这里]。

首先用户指定待处理的文件,在WordCount就是文件WordCountMrtrial,这是hadoop根据设定的InputDataFormat来将输入文件分割成一个record(key/value对),然后将这些record传递给map函数,在WordCount示例中,对应的record就是<line_number行号, line_content该行内容>;

然后map函数根据输入的record,形成<K2, V2>,在WordCount示例中形成<K2, V2>就是<single_word, word_count>,例如<"a", 1>;

如果map过程完成之后,hadoop将这些生成的<K2, V2>按照K2进行分组,形成<K2,list(V2) >,之后传递给reduce函数,在该函数中最终得到程序的输出结果<K3, V3>。

<3>. 深入学习MapReduce编程(1)

3.1 hadoop data types

由于在hadoop需要将key/value对序列化,然后通过网络network发送到集群中的其他机器上,所以说hadoop中的类型需要能够序列化。

具体而言,自定义的类型,如果一个类class实现了Writable interface的话,那么这个可以作为value类型,如果一个class实现了WritableComparable<T> interface的话,那么这个class可以作为value类型或者是key类型。

hadoop本身已经实现了一些预定义的类型predefined classes,并且这些类型实现了WritableComparable<T>接口。

3.2 Mapper

如果一个类想要成为一个mapper,那么该类需要实现Mapper接口,同时继承自MapReduceBase。在MapReduceBase类中,两个方法是特别需要注意的:

void configure( JobConf job):这个方法是在任务被运行之前调用

void close():在任务运行完成之后调用

剩下的工作就是编写map方法,原型如下:

void map(Object key, Text value, Context context

) throws IOException, InterruptedException;

这个方法根据<K1, V1>生成<K2, V2>,然后通过context输出。

同样的在hadoop中预先定义了如下的Mapper:

3.3 Reducer

如果一个类想要成为Reducer的话,需要首先实现Reducer接口,然后需要继承自MapReduceBase。

当reducer接收从mapper传递而来的key/value对,然后根据key来排序,分组,最终生成<K2, list<V2>> ,然后reducer根据<K2, list<V2>>生成<K3, V3>.

同样在hadoop中预定义了一些Reducer:

3.4Partitioner

Partitioner的作用主要是将mapper运行的结果“导向directing”到reducer。如果一个类想要成为Partitioner,那么需要实现Partitioner接口,该接口继承自JobConfigurable,定义如下:

publicinterfacePartitioner<K2,V2>extendsJobConfigurable{
/**
*Gettheparititionnumberforagivenkey(hencerecord)giventhetotal
*numberofpartitionsi.e.numberofreduce-tasksforthejob.
*
*<p>Typicallyahashfunctiononaallorasubsetofthekey.</p>
*
*
@paramkeythekeytobeparitioned.
*
@paramvaluetheentryvalue.
*
@paramnumPartitionsthetotalnumberofpartitions.
*
@returnthepartitionnumberforthe<code>key</code>.
*/
intgetPartition(K2key,V2value,intnumPartitions);
}

hadoop将根据方法getPartition的返回值确定将mapper的值发送到那个reducer上。返回值相同的key/value对将被“导向“至同一个reducer。

3.5 Input Data Format and Output Data Format

3.5.1 Input Data Format

上面我们的假设是MapReduce程序的输入是key/value对,也就是<K1, V1>,但是实际上一般情况下MapReduce程序的输入是big file的形式,那么如何将这个文件转换成<K1, V1>,即file -> <K1, V1>。这就需要使用InputFormat接口了。

下面是几个常用InputFormat的实现类:

当然除了使用hadoop预先定义的InputDataFormat之外,还可以自定义,这是需要实现InputFormat接口。该接口仅仅包含两个方法:

InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;该接口实现将大文件分割成小块split。
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,

Reporter reporter) throws IOException;

该方法输入分割成的split,然后返回RecordReader,通过RecordReader来遍历该split内的record。

3.5.2 Output Data Format

每个reducer将自己的输出写入到结果文件中,这是使用output data format来配置输出的文件的格式。hadoop预先实现了:

3.6 Streaming in Hadoop

3.6.1 执行流程

我们知道在linux中存在所谓的“流”的概念,也就是说我们可以使用下面的命令:

cat input.txt | RandomSample.py 10 >sampled_output.txt

同样在hadoop中我们也可以使用类似的命令,显然这样能够在很大程度上加快程序的开发进程。下面来看看hadoop中流执行的过程:

hadoop streaming从标砖输入STDIN读取数据,默认的情况下使用/t来分割每行,如果不存在/t的话,那么这时正行的内容将被看作是key,而此时的value内容为空;

然后调用mapper程序,输出<K2, V2>;

之后,调用Partitioner来将<K2, V2>输出到对应的reducer上;

reducer根据输入的<K2, list(V2)> 得到最终结果<K3, V3>并输出到STDOUT上。

3.6.2 简单示例程序

下面我们假设需要做这样一个工作,输入一个文件,文件中每行是一个数字,然后得到该文件中的数字的最大值(当然这里可以使用streaming中自带的Aggregate)。 首先我们编写一个python文件(如果对python不是很熟悉,看看[这里]):

3.6.2.1 准备数据

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >url1

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >url2

上传到hdfs上:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -mkdir urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url1 urls/
xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url2 urls/

3.6.2.2 编写mappermultifetch.py

#!/usr/bin/envpython

importsys,urllib,re

title_re
=re.compile("<title>(.*?)</title>",
re.MULTILINE
|re.DOTALL|re.IGNORECASE)

forlineinsys.stdin:
#WeassumethatwearefedaseriesofURLs,oneperline
url=line.strip()
#Fetchthecontentandoutputthetitle(pairsaretab-delimited)
match=title_re.search(urllib.urlopen(url).read())
ifmatch:
printurl,"/t",match.group(1).strip()

该文件的主要作用是给定一个url,然后输出该url代表的html页面的title部分。

在本地测试一下该程序:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >urls
xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >>urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ sudo chmod u+x ./multifetch.py

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py 将输出:

http://www.cs.brandeis.edu Computer Science Department | Brandeis University

http://www.nytimes.com The New York Times - Breaking News, World News &amp; Multimedia

3.6.2.3 编写reducer reducer.py

编写reducer.py文件

#!/usr/bin/envpython
fromoperatorimportitemgetter
importsys

forlineinsys.stdin:
line
=line.strip()
printline

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ chmod u+x ./reducer.py

现在我们的mapper和reducer已经准备好了,那么首先在本地上运行测试一下程序的功能,下面的命令模拟在hadoop上运行的过程:

首先mapper从stdin读取数据,这里是一行;

然后读取该行的内容作为一个url,然后得到该url代表的html的title的内容,输出<url, url-title-content>;

调用sort命令将mapper输出排序;

将排序完成的结果交给reducer,这里的reducer仅仅是将结果输出。

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$caturls|./multifetch.py|sort|./reducer.py
http:
//www.cs.brandeis.eduComputerScienceDepartment|BrandeisUniversity
http://www.nytimes.comTheNewYorkTimes-BreakingNews,WorldNews&amp;Multimedia

显然程序能够正确

3.6.2.4 在hadoop streaming上运行

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$bin/hadoopjar./mapred/contrib/streaming/hadoop-0.21.0-streaming.jar/
>-mapper/home/xuqiang/hadoop/src/hadoop-0.21.0/multifetch.py/
>-reducer/home/xuqiang/hadoop/src/hadoop-0.21.0/reducer.py/
>-inputurls/*/
>-outputtitles

程序运行完成之后,查看运行结果:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -cat titles/part-00000

<4>. 参考资料及代码下载

http://pages.cs.brandeis.edu/~cs147a/lab/hadoop-example/

http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Hadoop+Streaming

<Hadoop In Action>

1. 本博客中的文章均是个人在学习和项目开发中总结。其中难免存在不足之处 ,欢迎留言指正。 2. 本文版权归作者和博客园共有,转载时,请保留本文链接。


分享到:
评论

相关推荐

    大数据技术基础实验报告-MapReduce编程.doc

    大数据技术基础实验报告-MapReduce编程

    实验项目 MapReduce 编程

    4 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后查看 MapReduce Web 界面。 5. 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后练习 MapReduce Shell 常用命令。 。。

    Hadoop之MapReduce编程实例完整源码

    一个自己写的Hadoop MapReduce实例源码,网上看到不少网友在学习MapReduce编程,但是除了wordcount范例外实例比较少,故上传自己的一个。包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作...

    大数据分析技术基础PPT课件(共9单元)4-MapReduce 编程.pdf

    大数据分析技术基础PPT课件(共9单元)4-MapReduce 编程.pdf大数据分析技术基础PPT课件(共9单元)4-MapReduce 编程.pdf大数据分析技术基础PPT课件(共9单元)4-MapReduce 编程.pdf大数据分析技术基础PPT课件(共9单元)4-...

    大数据实验5实验报告:MapReduce 初级编程实践

    MapReduce 初级编程实践 姓名: 实验环境:  操作系统:Linux(建议Ubuntu16.04);  Hadoop版本:3.2.2; 实验内容与完成情况: (一)编程实现文件合并和去重操作 对于两个输入文件,即文件 A 和文件 B,请...

    08.mapreduce编程案例--流量统计求和--自定义数据类型.mp4

    08.mapreduce编程案例--流量统计求和--自定义数据类型.mp4

    hadoop mapreduce编程实战

    此文档用于指导在hadoop完全分布式环境上做mapreduce开发,包括了11个mapreduce实例,讲解详细,适合初步接触mapreduce开发的同学,希望对大家有帮助

    hadoop实验+作业.zip

    hadoop实验+作业hadoop实验+作业hadoop实验+作业hadoop实验+作业hadoop实验+作业hadoop实验+作业hadoop实验+作业hadoop实验+作业hadoop实验+作业

    MapReduce初级例程

    一些MapReduce的入门程序 来自《hadoop权威指南》《hadoop in action》

    Hadoop集群程序设计与开发

    包括初识Hadoop、Hadoop基础知识、Hadoop开发环境配置与搭建、Hadoop分布式文件系统、Hadoop的I/O操作、MapReduce编程基础、MapReduce高级编程、初识HBase、初识Hive。通过本书的学习,读者可以较全面地了解Hadoop的...

    Mapreduce实验报告.doc

    Mapreduce实验报告 前言和简介 MapReduce是Google提出的一种编程模型,在这个模型的支持下可以实现大规模并行化计 算。在Mapreduce框架下一个计算机群通过统一的任务调度将一个巨型任务分成许多部分 ,分别解决然后...

    基于MapReduce的学生平均成绩统计

    利用MapReduce实现了求学生成绩的最大值,最小值,及成绩分布。结合我的博客“MapReduce之学生平均成绩”看,效果更好。

    Mapreduce的基础搭建以及Java API编程

    mapreduce的基础搭建 以及Hadoop 完全分布式的配置

    MapReduce 2.0源码分析与编程实

    《MapReduce2.0源码分析与编程实战》强调理论联系实际,帮助读者在掌握MapReduce2.0基本知识和特性的基础上,培养实际编程和解决大数据处理相关问题的能力。《MapReduce2.0源码分析与编程实战》可作为学习MapReduce...

    MapReduce2.0源码分析与实战编程

     《MapReduce 2.0源码分析与编程实战》强调理论联系实际,帮助读者在掌握MapReduce 2.0基本知识和特性的基础上,培养实际编程和解决大数据处理相关问题的能力。《MapReduce 2.0源码分析与编程实战》可作为学习...

    大数据技术基础实验报告-MapReduce编程.docx

    。。。

    大数据技术基础实验报告-MapReduce编程 (2).pdf

    。。。

    mapreduce基础实战.doc

    4. **实战演练**:通过实际的案例来学习MapReduce编程,如词频统计任务、成绩统计、文件内容合并去重等。 5. **工具使用**:掌握如何使用Hadoop Streaming和Pig等工具来进行数据处理,这些工具可以帮助简化MapReduce...

    mapreduce基础实战.pdf

    二、MapReduce编程规范 用户编写的MapReduce程序代码通常分为三个部分:Mapper、Reducer和Driver(客户端提交作业驱动程序)。Mapper和Reducer都需要继承各自的父类,并实现相应的接口方法。 Mapper阶段 Mapper...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    MapReduce编程模型篇第3章 MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系结构3.1.2 新旧MapReduce API比较3.2 MapReduce API基本概念3.2.1 序列化3.2.2 Reporter参数3.2.3 回调...

Global site tag (gtag.js) - Google Analytics