`
cocoIT
  • 浏览: 48806 次
  • 性别: Icon_minigender_1
  • 来自: 福建
文章分类
社区版块
存档分类
最新评论

mapreduce中map处理过程?参数如何解析传递给map方法?

 
阅读更多

1.首先介绍一下wordcount 在mapreduce框架中的 对应关系

大家都知道 mapreduce 分为 map 和reduce 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 reduce;

大家都明白 map接受一个参数,经过map处理后,将处理结果作为reduce的入参分发给reduce,然后在reduce中统计了word 的数量,最终输出到输出结果;

但是初看遇到的问题:

一、map的输入参数是个 Text之类的 对象,并不是 file对象

二、reduce中并没有if-else之类的判断语句 ,来说明 这个word 数量 加 一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在reduce的时候才判断的

三、map过程到底做了什么,reduce过程到底做了什么?为什么它能够做到多个map多个reduce?

一、

1. 怎么将 文件参数 传递 到 job中呢?

在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)

public static void addInputPath(Job job,

Path path) throws IOException {

Configuration conf = job.getConfiguration();

path = path.getFileSystem(conf).makeQualified(path);

String dirStr = StringUtils.escapeString(path.toString());

String dirs = conf.get(INPUT_DIR);

conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);

}

我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。

我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了

List<InputSplit>getSplits(JobContext job)

Generate the list of files and make them into FileSplits.

具体实现参考 FileInputFormat getSplits 方法:

上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:mapreduce.input.num.files。

计算出来的分片有时怎么传递给 map呢 ?

我们使用了 就是InputFormat中的另一个方法createRecordReader() 这个方法:

RecordReader:

RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。

可以看到接口中有:

public abstract boolean nextKeyValue() throws IOException, InterruptedException;

public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

public abstract float getProgress() throws IOException, InterruptedException;

public abstract void close() throws IOException;

FileInputFormat<K,V>

Direct Known Subclasses:

CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat

对于 wordcount 测试用了 NLineInputFormat和TextInputFormat实现类

在 InputFormat 构建一个 RecordReader 出来,然后调用RecordReader initialize 的方法,初始化RecordReader 对象

那么 到底 Map是怎么调用 的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了RecordReader中,

下面继续看看这些RecordReader是如何被MapReduce框架使用的

终于 说道 Map了 ,我么如果要实现Map 那么 一定要继承 Mapper这个类

public abstract class Context

implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

}

protected void setup(Context context) throws IOException, InterruptedException

protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException { }

protected void cleanup(Context context ) throws IOException, InterruptedException { }

public void run(Context context) throws IOException, InterruptedException { }

我们写MapReduce程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);

最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。

我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。

public MapContextImpl(Configuration conf, TaskAttemptID taskid,

RecordReader<KEYIN,VALUEIN> reader,

RecordWriter<KEYOUT,VALUEOUT> writer,

OutputCommitter committer,

StatusReporter reporter,

InputSplit split) {

super(conf, taskid, writer, committer, reporter);

this.reader = reader;

this.split = split;

}

RecordReader 看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 RecordReader 传过来了 ?

我们可以想象 这里 应该被框架调用的可能性比较大了,那么mapreduce 框架是怎么分别来调用map和reduce呢?

还以为分析完map就完事了,才发现这里仅仅是做了mapreduce 框架调用前的一些准备工作,

还是继续分析 下 mapreduce 框架调用吧:

参考:http://www.cppblog.com/javenstudio/articles/43073.html

1.在 job提交 任务之后 首先由jobtrack 分发任务,

在 任务分发完成之后 ,执行 task的时候,这时 调用了 maptask 中的 runNewMapper

在这个方法中调用了 MapContextImpl, 至此 这个map 和框架就可以联系起来了

map 过程参考:

http://www.360doc.com/content/12/0827/09/9318309_232551564.shtml

分享到:
评论

相关推荐

    MapReduce:超大机群上的简单数据处理

    用户自定义的map函数,接受一个输入对,然后产生一个中间key/value对集.MapReduce库把所有具有相同中间key I的中间value聚合在一起,然后把它们传递给reduce函数. 用户自定义的reduce函数,接受一个中间key I和相关的一...

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    mapreduce多表关联join多个job相互依赖传递参数

    MongoDB学习笔记之MapReduce使用示例

    使用 MapReduce 要实现两个函数 Map 函数和 Reduce 函数, Map 函数调用 emit(key, value), 遍历 collection 中所有的记录, 将key 与 value 传递给 Reduce 函数进行处理。Map 函数必须调用 emit(key, value) 返回键值...

    大数据处理流程.pdf

    Hive SQL实际上先被 SQL解析器解析,然后被Hive框架解析成⼀个MapReduce可执⾏计划,并按照该计划⽣产MapReduce任务后交给Hadoop集群处理。 Spark:尽管MapReduce和Hive能完成海量数据的⼤多数批处理⼯作,并且在打...

    完整版大数据云计算课程 Hadoop数据分析平台系列课程 Hadoop 04 MapReduce 共31页.pptx

    熟练地在Hadoop和操作系统以及关系型数据库之前传递数据 能独立制定数据集成方案 熟练地向Hadoop提交作业以及查询作业运行情况 了解Map-Reduce原理,能书写Map-Reduce程序 了解HDFS原理,能熟练地对HDFS中的文件进行...

    mapfileinputformat:MapFiles 的 Hadoop InputFormat,它在将任何内容传递给映射器之前过滤不相关的 FileSplits

    映射文件输入格式MapFiles 的 Hadoop InputFormat,它在将任何内容传递给映射器之前过滤不相关的 FileSplits。目的假设您的文件系统中有一些带有排序键的非常大的文件,并且键已排序。 在编写 MapReduce 作业时,您...

    第七章-《大数据导论》大数据处理平台.pdf

    HDFS HDFS: Hadoop Distributed File System 构建于本地文件系统之上,例如:ext3, xfs等 特点:多备份、一次写入(不允许修改) MapReduce 基本思想: 分而治之: 数据被切分成许多独立分片,被多个Map任务并行处理 ...

    yelp-heat-map:Yelp 学术数据集的 MapReduce 算法,用于创建单词的地理热图

    经度] 值(IntWritable):频率目前已经使用 Yelp 在此处提供的评论和业务的 JSON 数据进行了测试: : 要运行该程序,您还需要获取 json-simple 库并将其作为参数传递给 libjars。 hadoop jar &lt;buil

    大数据面试题-.docx

    Client 端上传文件的时候下列哪项正确 a)数据经过 NameNode 传递给 DataNode b)Client 端将文件切分为 Block,依次上传 c)Client 只上传数据到一台 DataNode,然后由 NameNode 负责 Block 复制工作 11. 下列哪个是 ...

    Hadoop实战中文版

    6.3.1 通过combiner来减少网络流量 6.3.2 减少输入数据量 6.3.3 使用压缩 6.3.4 重用JVM 6.3.5 根据猜测执行来运行 6.3.6 代码重构与算法重写 6.4 小结 第7章 细则手册 7.1 向任务传递作业定制的参数 ...

    大数据的基础知识.pdf

    HDFS YARN MapReduce Map阶段并⾏处理数据 Reduce阶段对Map处理数据的结构进⾏汇总 ⼤数据体系 名词解释 序 序 号 号 名称 名称 描述 描述 1 Sqoop Sqoop是⼀款开源的⼯具,主要⽤于在Hadoop、Hive与传统的数据库...

    同一张地图的结果差异减少了在不同Hadoop平台上实现的传递关闭算法-研究论文

    Apache MapReduce是与Apache Hadoop一起使用的软件框架,该框架已成为事实上的用于在分布式计算环境中处理和存储大量数据的标准平台。 本文介绍的研究重点是针对不同的分布式环境运行时,有效的迭代传递闭合算法的...

    大数据平台常见面试题.pdf

    JobClient 类将应⽤已经配置参数打包成 jar ⽂件存储到 hdfs,并把路径提交到 Jobtracker,然后由 JobTracker 创建每⼀个 Task(即 MapTask 和 ReduceTask)并将它们分发到各个 TaskTracker 服务中去执⾏ 2、...

    PIO-Parallel-Simrank-Engine

    并行 SimRank 算法使用 Delta-Simrank 算法 ( )。 通过将每个 SimRank 迭代作为 Map 和两个 Reduce 任务的...参数说明数据源 - graphEdgelistPath :传递给 GraphX 图形加载器的边列表。 为了有效地存储中间 SimRank 分

    Hadoop实战(陆嘉恒)译

    细则手册7.1 向任务传递作业定制的参数7.2 探查任务特定信息7.3 划分为多个输出文件7.4 以数据库作为输入输出7.5 保持输出的顺序7.6 小结第8 章 管理Hadoop8.1 为实际应用设置特定参数值8.2 系统体检8.3 权限设置8.4...

    Hadoop实战中文版.PDF

    865.1.3 预处理和后处理阶段的链接 865.2 联结不同来源的数据 895.2.1 Reduce侧的联结 905.2.2 基于DistributedCache的复制联结 985.2.3 半联结:map侧过滤后在reduce侧联结 1015.3 创建一个Bloom filter...

    大数据面试题.doc

    Client 端上传文件的时候下列哪项正确 a)数据经过 NameNode 传递给 DataNode b)Client 端将文件切分为 Block,依次上传 c)Client 只上传数据到一台 DataNode,然后由 NameNode 负责 Block 复制工作 11. 下列哪个是 ...

    大数据面试题(1).doc

    Client 端上传文件的时候下列哪项正确 a)数据经过 NameNode 传递给 DataNode b)Client 端将文件切分为 Block,依次上传 c)Client 只上传数据到一台 DataNode,然后由 NameNode 负责 Block 复制工作 11. 下列哪个是 ...

    Hadoop实战

    71.5.2 相同程序在MapReduce中的扩展 91.6 用Hadoop统计单词——运行第一个程序 111.7 Hadoop历史 151.8 小结 161.9 资源 16第2章 初识Hadoop 172.1 Hadoop的构造模块 172.1.1 NameNode 172.1.2 DataNode 182.1.3 ...

    大数据面试题.docx

    Client 端上传文件的时候下列哪项正确 a)数据经过 NameNode 传递给 DataNode b)Client 端将文件切分为 Block,依次上传 c)Client 只上传数据到一台 DataNode,然后由 NameNode 负责 Block 复制工作 11. 下列哪个是 ...

Global site tag (gtag.js) - Google Analytics