1.什么是MapReduce
MapReduce是Google公司的核心计算模型,我在前面提到过,Google的三大论文。hadoop受到Google的启发开发出自己的MapReduce框架,基于这个框架写出的应用程序能够在上千台计算机上组成大型集群,并以一种可靠容错的方式并行处理上T级别的数据,实现hadoop在集群上的数据和任务并行计算与处理
1.一个MapReduce作业通常会把输入的数据集切分成若干个独立的数据块,由Map Task以完成并行的方式处理他们。对于Map的输出,框架会首先进行排序,然后把结果输入给Reduce Task。
2.通常计算节点和数据节点在一起,这样可以减少网络带宽中数据的传输,达到高效利用
MapReduce有两个角色:
1.JobTracker:负责任务的管理和调度(一个hadoop集群中只有一台JobTracker)
2.TaskTracker:负责执行工作,在DataNode节点上执行(Map函数和Reduce函数运行的节点)
下图是MapReduce的流程简介:
2.MapReduce处理流程
我们使用hadoop里面的一个简单的例子看一下MapReduce的处理流程。打开$HADOOPHOME\hadoop-1.0.4\src\examples\org\apache\hadoop\examples下面有一个WordCount.java。代码如下:
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { /** * * @title TokenizerMapper * @description Mapper后面的四个参数含义: * Object:输入到Map中的Key * Text:输入到Map中的Value * Text:Map的输出Key也是Reduce的输入的Key * IntWritable:Map的输出Value也是Reduce的输入Value * @author hadoop * @version WordCount * @copyright (c) SINOSOFT * */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /** * 重写Map函数 * Map函数的意思:使用空格切割value。生成一个一个的单词。 * 模拟Map的输入:<偏移量,"hadoop hadoop haha hive redis redis LVS LVS NB"> * 模拟Map的输出:<hadoop,1>;<hadoop,1>;<haha,1>;<hive,1>;<redis,1>;<redis,1>;<LVS,1>;<LVS,1>;<NB,1> */ public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { //String类的工具类,可以切割字符串。默认使用空格切割字符串 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } /** * * @title IntSumReducer * @description Reduce函数中Reducer<Text,IntWritable,Text,IntWritable>的泛型含义 * Text:输入到Reduce中的Key,也是Map输出的Key * IntWritable:输入到Reduce中的Value,也是Map输出的Value * Text:Reduce的输出Key * IntWritable:Reduce的输出Value * @author hadoop * @version WordCount * @copyright (c) SINOSOFT * */ public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); /** * 在Reduce函数的输入是Map函数的输出,并且Map输出相同的Key会放在一起,也就生成了List类型的Value * 模拟Reduce的输入前:<hadoop,1>;<hadoop,1>;<haha,1>;<hive,1>;<redis,1>;<redis,1>;<LVS,1>;<LVS,1>;<NB,1> * Reduce中的洗牌:<hadoop,List values>;<haha,List values>;<redis,List values>;<LVS,List values>;<NB,List values>:相同的Key放在一组 * 模拟Reduce的输入:<hadoop,List values>;<haha,List values>;<redis,List values>;<LVS,List values>;<NB,List values> * 模拟Reduce的输出:<hadoop, 2>;<haha, 1>;<redis, 2>;<LVS, 2>;<NB, 1> */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/user/helloMR")); //Map的输入 FileOutputFormat.setOutputPath(job, new Path("/user/helloHDFS/success"));//Reduce的输出 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
在main方法中有:FileInputFormat.addInputPath(job, new Path("/user/helloMR"));
提供了Map阶段的输入,具体是怎么做到的呢?
在hadoop中使用InputSplit将输入的数据发送给每个单独的Map,InputSplit并非保存数据本身,而是一个分片长度和一个记录数据位置的数组,InputSplit对象可以通过Inputformat()来生成。
数据-->Map将分片-->InputFormat()-->getRecordReader()-->生成RecordReader-->通过createKey();createValue()-->Map需要的<key,Value>
现在可以在eclipse里面运行:
3.WordCount中的数据流程
相关推荐
NULL 博文链接:https://zc985552943.iteye.com/blog/2088181
NULL 博文链接:https://zc985552943.iteye.com/blog/2087950
Hadoop_Hadoop集群(第1期)_CentOS安装配置 Hadoop_Hadoop集群(第2期)_机器信息分布表 Hadoop_Hadoop集群(第4期)_SecureCRT使用 Hadoop_Hadoop集群(第5期)_Hadoop安装配置 Hadoop_Hadoop集群(第5期副刊)_...
1.细细品味Hadoop_Hadoop集群(第1期)_CentOS安装配置 2.细细品味Hadoop_Hadoop集群(第2期)_机器信息分布表 3.细细品味Hadoop_Hadoop集群(第3期)_VSFTP安装配置 4.细细品味Hadoop_Hadoop集群(第4期)_...
hadoop2.X配置详解和mapreduce详解
大数据Hadoop核心模块之MapReduce,文档有概念、案例、代码,Mapreduce中,不可多得文档!
使用和学习过老Hadoop框架(0.20.0及之前版本)的同仁应该很熟悉如下的原MapReduce框架图:图1.Hadoop原MapReduce架构从上图中可以清楚的看出原MapReduce程序的流程及设计思路:可以看得出原来的map-reduce架构是...
英特尔Hadoop发行版开发者指南,主要介绍mapreduce、hbase等源代码的详解。
云计算HADOOP、mapreduce、hdfs详解,有兴趣的可以一起研究!
1)Hadoop集群_第1期_CentOS安装配置_V1.0 2)Hadoop集群_第2期_机器信息分布表_V1.1 3)Hadoop集群_第3期_VSFTP安装配置_V1.0 4)Hadoop集群_第4期_SecureCRT使用_V1.0 5)Hadoop集群_第5期_Hadoop...
Hadoop中mapReduce处理过程详解
024 Hadoop相关命令中的【–config configdir】作用 025 Hadoop 目录结构 026 Eclipse导入Hadoop源码项目 027 HDFS 设计目标 028 HDFS 文件系统架构概述 029 HDFS架构之NameNode和DataNode 030 HDFS 架构讲解总结 ...
如果想将mapreduce结果排序,需将排序对象作为键值。 案例:将利润求和后按照顺序排序 数据源 profit.txt 编号 | 姓名 | 收入 | 支出 1 ls 2850 100 2 ls 3566 200 3 ls 4555 323 1 zs 19000 2000 2 zs 28599 3900 3...
《大数据技术丛书:Hadoop应用开发技术详解》共12章。第1~2章详细地介绍了Hadoop的生态系统、关键技术以及安装和配置;第3章是MapReduce的使用入门,让读者了解整个开发过程;第4~5章详细讲解了分布式文件系统HDFS...
hadoop海量数据处理技术详解,包括hdfs、MapReduce、hive、sqoop等相关技术和伪代码,代码是使用python语言写的。
通过对Hadoop分布式计算平台最核心的分布式文件系统HDFS、MapReduce处理过程,以及数据仓库工具Hive和分布式数据库Hbase的介绍,基本涵盖了Hadoop分布式平台的所有技术核心。通过这一阶段的调研总结,从内部机理的...
MapReduce是Hadoop提供的一套用于进行分布式计算的模型,本身是Doug Cutting根据Google的<MapReduce: Simplified Data Processing on Large Clusters>仿照实现的。 MapReduce由两个阶段组成:Map(映射)阶段和Reduce...
在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker;另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracker是用于执行工作的。一个Hadoop集群中只有一台JobTracker。 在分布式计算...
MapReduce计算模式详解