注意:
1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。
2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。
比如:
Map1 -> Map2 -> Reducer -> Map3 -> Map4
(不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)
任务介绍:
这个任务需要两步完成:
1. 对一篇文章进行WordCount
2. 统计出现次数超过5词的单词
WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:
package hadoop_in_action_exersice; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class ChainedJobs { public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public static final int LOW_LIMIT = 5; @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer st = new StringTokenizer(line); while(st.hasMoreTokens()) output.collect(new Text(st.nextToken()), one); } } public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while(values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws IOException { JobConf conf = new JobConf(ChainedJobs.class); conf.setJobName("wordcount"); //设置一个用户定义的job名称 conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类 conf.setOutputValueClass(IntWritable.class); //为job输出设置value类 conf.setMapperClass(TokenizeMapper.class); //为job设置Mapper类 conf.setCombinerClass(TokenizeReducer.class); //为job设置Combiner类 conf.setReducerClass(TokenizeReducer.class); //为job设置Reduce类 conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类 conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类 // Remove output folder before run job(s) FileSystem fs=FileSystem.get(conf); String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT"; Path op=new Path(outputPath); if (fs.exists(op)) { fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount")); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); JobClient.runJob(conf); //运行一个job } }
上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。
为了方便理解,上面的输入的例子如下:
accessed 3 accessible 4 accomplish 1 accounting 7 accurately 1 acquire 1 across 1 actual 1 actually 1 add 3 added 2 addition 1 additional 4
old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X
新的API会方便简洁很多
下面是增加了一个Mapper 来过滤
public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> { @Override public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { if(value.get() >= LOW_LIMIT) { output.collect(key, value); } } }
这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出
所以,目前为止,任务链如下:
TokenizerMapper -> TokenizeReducer -> RangeFilterMapper
所以我们的main函数改成下面的样子:
public static void main(String[] args) throws IOException { JobConf conf = new JobConf(ChainedJobs.class); conf.setJobName("wordcount"); //设置一个用户定义的job名称 // conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类 // conf.setOutputValueClass(IntWritable.class); //为job输出设置value类 // conf.setMapperClass(TokenizeMapper.class); //为job设置Mapper类 // conf.setCombinerClass(TokenizeReducer.class); //为job设置Combiner类 // conf.setReducerClass(TokenizeReducer.class); //为job设置Reduce类 // conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类 // conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类 // Step1 : mapper forr word count JobConf wordCountMapper = new JobConf(false); ChainMapper.addMapper(conf, TokenizeMapper.class, LongWritable.class, // input key type Text.class, // input value type Text.class, // output key type IntWritable.class, // output value type false, //byValue or byRefference 传值还是传引用 wordCountMapper); // Step2: reducer for word count JobConf wordCountReducer = new JobConf(false); ChainReducer.setReducer(conf, TokenizeReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, false, wordCountReducer); // Step3: mapper used as filter JobConf rangeFilterMapper = new JobConf(false); ChainReducer.addMapper(conf, RangeFilterMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, false, rangeFilterMapper); // Remove output folder before run job(s) FileSystem fs=FileSystem.get(conf); String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT"; Path op=new Path(outputPath); if (fs.exists(op)) { fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount")); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); JobClient.runJob(conf); //运行一个job }
下面是运行结果的一部分:
a 40 and 26 are 12 as 6 be 7 been 8 but 5 by 5 can 12 change 5 data 5 files 7 for 28 from 5 has 7 have 8 if 6 in 27 is 16 it 13 more 8 not 5 of 23 on 5 outputs 5 see 6 so 11 that 11 the 54
可以看到,英文之中,如果NLP不去除停用词(a, the, for ...) 等,效果确实会被大大的影响。
相关推荐
大数据安全-kerberos技术-hadoop安装包,hadoop版本:hadoop-3.3.4.tar.gz
hadoop2.7汇总:新增功能最新编译64位安装、源码包、API、eclipse插件下载
赠送jar包:hadoop-auth-2.6.5.jar 赠送原API文档:hadoop-auth-2.6.5-javadoc.jar 赠送源代码:hadoop-auth-2.6.5-sources.jar 包含翻译后的API文档:hadoop-auth-2.6.5-javadoc-API文档-中文(简体)-英语-对照版...
1. Hadoop简介2.... Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce程序附录D:使用ChainMapper和ChainReducer的MapReduce程序
Hadoop学习总结之五:Hadoop的运行痕迹
1、遇到问题 今天使用自己的电脑,使用外网操作阿里云服务器上的HDFS。 报错 org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block 2、百度结果 ... 使用 hadoop fsck /weblog/log.
Hadoop学习总结之四:Map-Reduce的过程解析
apache日志hadoop大数据分析项目:清洗数据核心功能解说及代码实现
Hadoop学习总结之一:HDFS简介
1. Hadoop简介2.... Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce程序附录D:使用ChainMapper和ChainReducer的MapReduce程序
Hadoop学习总结之二:HDFS读写过程解析
Data-intensive Systems: Principles and Fundamentals using Hadoop and Spark (Advanced Information and Knowledge Processing) By 作者: Tomasz – Wiktorski – Tomasz Wiktorski ISBN-10 书号: 3030046028 ...
Apache Flume, Distributed Log Collection for Hadoop,2015 第二版,Packt Publishing
Hadoop The Definitive Guide 3rd Edition Storage and Analysis at Internet Scale By Tom White May 2012 Pages: 688
hadoop权威指南代码 (Hadoop: The Definitive Guide code) http://www.hadoopbook.com
在网上搜集的以及本人自己总结的hadoop集群常见问题及解决办法,融合了网上常常搜到的一些文档以及个人自己的经验。
hadoop-hue-hive-cookbook TODO:在此处输入食谱说明。 支持的平台 TODO:列出您支持的平台。 属性 钥匙 类型 描述 默认 ['hadoop-hue-hive']['培根'] 布尔值 是否包括培根 真的 用法 hadoop-hue-hive::default ...
Its simple programming model, "code once and deploy at any scale" paradigm, and an ever-growing ecosystem make Hadoop an inclusive platform for programmers with different levels of expertise and ...
当从本地上传文件到HDFS中...org.apache.hadoop.fs.ChecksumException: Checksum error: file:/hyk/data/hyk.txt [root@node01 data]# hadoop fs -put hyk.txt /hyk/test 20/02/18 12:54:39 INFO fs.FSInputChecker: Fo
It is a good book for both Hadoop beginners and those in need of advancing their Hadoop skills. The author has explored every component of Hadoop. Prior to that, the author helps you understand how ...