import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.partition.*;
public class WordCount {
public static class MyMapper extends Mapper<Object,Text, Text,IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value,Context context) throws IOException,InterruptedException{
StringTokenizer st=new StringTokenizer(value.toString());
while(st.hasMoreTokens()){
String str=st.nextToken();
word.set(str);
context.write(word, one);
System.out.println(str+"="+one.get());
}
}
}
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values ,Context context) throws IOException,InterruptedException{
int sum=0;
for(IntWritable val:values)
{
sum+=val.get();
}
System.out.println(key+"="+sum);
result.set(sum);
context.write(key,result);
}
}
public static void main(String [] args) throws Exception
{
Configuration conf= new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job=new Job(conf,"wordcount" );
/**InputFormat类的作用是将输入的数据分割成splits,并将splits进一步拆成<K,V>
*可以通过setInputFormatClass()方法进行设置
*默认为TextInputFormat.class,默认情况可以不写
**/
job.setInputFormatClass(TextInputFormat.class);
/**
*Mapper类的作用是实现map函数,将splits作为输入生成一个结果
*可以通过setMapperClass()方法进行设置
*默认为Mapper.class,默认情况可以不写,此时输入即输出
*/
job.setMapperClass(MyMapper.class);
/**
* 设置Mapper输出的key的类型
*/
job.setMapOutputKeyClass(Text.class);
/**
* 设置Mapper输出的value的类型
*/
job.setMapOutputValueClass(IntWritable.class);
/**
* Combiner类的作用是实现combine函数,将mapper的输出作为输入,合并具有形同key值得键值对
* 可以通过setCombinerClass()方法进行设置
* 默认为null,默认情况不写,此时输入即输出
*/
job.setCombinerClass(MyReducer.class);
/**
* Partitioner类的作用是实现getPartition函数,用于在洗牌过程中将由Mapper输入的结果分成R份,每份交给一个Reducer
* 可以通过setPartitionerClass()方法进行设置
* 默认为HashPartitioner.class,默认情况可以不写,此时输入即输出
*/
job.setPartitionerClass(HashPartitioner.class);
/**
* Reducer类的作用是实现reduce函数,将有combiner的输出作为输入,得到最终结果
* 可以通过setReducerClass()方法进行设置
* 默认为Reducer.class,默认情况可以不写,此时输入即输出
*/
job.setReducerClass(MyReducer.class);
/**
* OutputFormat类,负责输出最终结果
* 可以通过setOutputFormatClass()方法进行设置
* 默认TextOutputFormat.class,默认情况可以不写,此时输入即输出
*/
//job.setOutputFormatClass(TextOutputFormat.class);
/**
* 设置reduce()输出的key的类型
*/
job.setOutputKeyClass(Text.class);
/**
* 设置reduce()输出的value的类型
*/
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.out.println("运行啦");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我们先来看main函数,以便了解hadoop的MapReduce的过程,假设输入为两个文件
InputFormat类将输入文件划分成多个splits,同时将这些splits转化为<K,V>的形式,如下,可以发现,当使用默认的TextInputFormat进行处理的时候,K为偏移量,V为每行的文本。
接着map()方法
对以上结果进行处理,根据MyMapper中的map方法,得到以下结果:
接下来Mapper框架对以上结果进行处理,根据key值进行排序,并合并成集合,得到以下结果:
接下来combine类对以上结果进行处理(实际上combine是一个本地的reducer,所以用MyReducer给他复制,见文章最后)得到结果如下:
接下来Reducer框架对其进行处理,结果如下:
接下来reduce()方法进行处理,结果如下:
以上是wordcount程序执行的全过程,通过wordcount的代码,我们了解了MapReduce框架的执行流程,如下
InputFormat>>map()方法>>Mapper框架>>Combiner类>>Partitioner类>>Reducer框架>>reduce()方法
以上的每个步骤,不设置具体的类时都会有个默认的类,除了InputFormat类以外,其他类的默认类都是输入即输出,但是InputFormat的默认类是输出为<K,V>
因此,对于wordcount程序来说,如果都采用默认类的话,输出应该为
下面来说说Combiner类,本质上是一个本地的Reducer类。其设计初衷是在本地将需要reduce操作的数据合并,以减少不必要的通信代价,以提高hadoop的运行性能。
但值得注意的是,并不是所有的mapreduce程序都可以将reduce过程移植到本地进行combine,这需要在逻辑上考虑这种移植是否可行!要想 进行本地reduce(combine),一个必要的条件是,reduce的输入输出格式必须一样!比如,对于wordcount程序,combine是 可行的,combine的输出(实际上是reduce的输出)与reduce的输入吻合。因此我们才可以有 job.setCombinerClass(MyReducer.class);
实际上,在Combiner和Reducer之间还有一个Partitioner类,该类用于在shuffle过程中将中间值分成R份,每份由一个Reducer负责。通过使用job.setPartitionerClass()来进行设置,默认使用HashPartitioner类。
相关推荐
Hadoop 用mapreduce实现Wordcount实例,绝对能用
wordcount, mapreduce经典,文字计数
hadoop 框架下 mapreduce源码例子 wordcount ,eclipse下,hadoop 2.2 可以运行
mapreduce的Wordcount案例将main生成jar可直接在hdfs上运行
这是大数据架构的编程作业,用python实现mapreduce里的word count
第5章 MapReduce分布式计算框架 2 5.1. MapReduce简介 2 5.2. wordcount经典案例介绍 2 5.3. MapReduce进程介绍 3 5.4. MapReduce编程规范 3 5.5. wordcount经典案例的实现 5 5.5.1. 分析数据准备 5 5.5.2. 新建...
wordcount-mapreduce Hadoop MapReduce WordCount 示例应用程序
myeclipse +maven 搭建的hadoop mapreduce 例子项目,运行了单机wordcount
4 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后查看 MapReduce Web 界面。 5. 分别在自编 MapReduce 程序 WordCount 运行过程中和运行结束后练习 MapReduce Shell 常用命令。 。。
使用python实现MapReduce的wordcount实例
<artifactId>wordcount <version>0.0.1-SNAPSHOT <packaging>jar <name>wordcount <url>http://maven.apache.org</url> <project.build.sourceEncoding>UTF-8 <groupId>org.apache.hadoop ...
Hadoop初学MapReduce最经典的范例
WordCount_MapReduce 在 Hadoop 上运行的 MapReduce 程序
用java的MapReduce写了个demo,用于计算文档单词出现个数
Ubuntu环境下新手学习Hadoop,从配置Hadoop开始,实现mapreduce,过程详细,也是自己学习过程的一个记录。
MapReduce的wordcount的jar包
介绍如何在Intellij Idea中通过创建maven工程配置MapReduce的编程环境,WordCount代码。
7.1 概述 7.2 MapReduce体系结构 7.3 MapReduce工作流程 7.4 实例分析:WordCount 7.5 MapReduce的具体应用 7.6 MapReduce编程实践
字数 使用 Hadoop MapReduce 框架的字数统计 使用 mapreduce 的简单字数统计示例...
Hadoop搭建 MapReduce之Wordcount代码实现 代码讲解,通俗易懂。