- 浏览: 187871 次
文章分类
最新评论
MapReduce : Combiner的使用(以平均数为例) 并结合in-mapper design pattern 实例
- 博客分类:
- Hadoop
没有使用Combiner 和 in-mapper desgin pattern import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class digitaver1 { public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] ss = value.toString().split(":"); context.write(new Text(ss[0]), new IntWritable(Integer.parseInt(ss[1]))); } } public static class reducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { int sum = 0; int cnt = 0; while(value.iterator().hasNext()){ sum += value.iterator().next().get(); cnt+=1; } context.write(key, new DoubleWritable((double)sum/(double)cnt)); } } public static void main(String[] args) { try { Job job = new Job(); job.setJarByClass(digitaver1.class); job.setJobName("digitaver1"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(mapper.class); job.setReducerClass(reducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); System.exit( job.waitForCompletion(true) ? 0 : 1 ); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 使用Combiner public static class mapper extends Mapper<LongWritable, Text, Text, pair>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] ss = value.toString().split(":"); pair p = new pair(Integer.parseInt(ss[1]), 1); context.write(new Text(ss[0]), p); } } public static class combiner extends Reducer<Text, pair, Text, pair>{ @Override protected void reduce(Text key, Iterable<pair> value, Context context) throws IOException, InterruptedException { int sum = 0; int cnt = 0; while(value.iterator().hasNext()){ pair p = value.iterator().next(); sum += p.getLeft().get(); cnt += p.getRight().get(); } context.write(key, new pair(sum,cnt)); } } public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{ @Override protected void reduce(Text key, Iterable<pair> value, Context context) throws IOException, InterruptedException { int sum = 0; int cnt = 0; while(value.iterator().hasNext()){ pair p = value.iterator().next(); sum += p.getLeft().get(); cnt += p.getRight().get(); } context.write(key, new DoubleWritable((double)sum/(double)cnt)); } } main函数都一样 使用in-mapper design pattern public static class mapper extends Mapper<LongWritable, Text, Text, pair>{ private Map<String,String> map ; @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub map = new HashMap<String, String>(); } //处理完所有的输入文件再一起传给reducer或者combiner //以前map在执行过程中会一边执行一边讲输出的部分结构先传输给reducer 按照上面的话 效率会不会受影响? //虽然数据少了,但是开始的时间也推迟了??堵塞延迟小了?? //负载平衡??网络中总的数据量少了?? @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] ss = value.toString().split(":"); if(!map.containsKey(ss[0])){ map.put(ss[0], ss[1]+":"+1); }else{ String tmp = map.get(ss[0]); String[] tt = tmp.split(":"); int ta = Integer.parseInt(ss[1])+Integer.parseInt(tt[0]); int tb = Integer.parseInt(tt[1])+1; map.put(ss[0], ta+":"+tb); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for(Map.Entry<String, String> e : map.entrySet()){ String[] tt = e.getValue().split(":"); pair p = new pair(Integer.parseInt(tt[0]), Integer.parseInt(tt[1])); context.write(new Text(e.getKey()), p); } } } public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{ @Override protected void reduce(Text key, Iterable<pair> value, Context context) throws IOException, InterruptedException { int sum = 0; int cnt = 0; while(value.iterator().hasNext()){ pair p = value.iterator().next(); sum += p.getLeft().get(); cnt += p.getRight().get(); } context.write(key, new DoubleWritable((double)sum/(double)cnt)); } }
in-mapper design pattern:单个mapper结果进行聚集
Combiner:所有的mapper结果进行聚集
发表评论
-
多表join的一个优化思路
2012-11-20 11:24 1419big table:streamed small table: ... -
好的网站
2012-09-20 22:17 7481. http://www.cnblogs.com/luche ... -
Hadoop 任务流程
2012-09-07 16:18 784简单的来说分为四个阶段:InputFormat/MapTask ... -
Hadoop关于最大map reducer数目
2012-08-14 20:53 928mapred-site.xml文件: <prop ... -
java.io.IOException:Typemismatch in key from map:expected org.apache.hadoop.io
2012-08-14 20:53 1421解决办法: jo ... -
HDFS 输入文件避免切分
2012-08-14 20:52 1088自定义InputFormat的子类,并把重载方法 ... -
Hadoop 开启debug信息
2012-08-14 20:51 3958运行hadoop程序时,有时候你会使用一些System. ... -
Hadoop 关于0.95/1.75 * (number of nodes)误解
2012-08-14 20:51 943reduce任务槽,即集群能够同时运行的redu ... -
MapReduce ReadingList
2012-08-09 12:22 6681. http://www.aicit.org/jcit/gl ... -
"hadoop fs 和hadoop dfs的区别"
2012-05-30 15:27 1880粗略的讲,fs是个比较抽象的层面,在分布式环境中,fs就是df ... -
Hadoop 自动清除日志
2012-05-29 18:02 896hadoop集群跑了很多的任务后 在hadoop.log ... -
DistributedCache FileNotFoundException
2012-05-26 18:02 944此时注意两种文件路径表示形式,一个在HDFS中。一一个是本地文 ... -
Cygwin 不支持native lib 不支持使用native lib 提供的压缩
2012-05-25 13:33 1109弄了一个上午hadoop的压缩,一直报错NullPointer ... -
Hadoop 在Window下搭建 守护进程启动问题
2012-05-23 15:27 781hadoop version “0.20.2” java ... -
Cygwin ssh Connection closed by ::1
2012-05-17 21:09 1109在Win7下Cygwin中,使用sshlocalhost命令, ... -
Eclipse:Run on Hadoop 没有反应
2012-05-10 20:11 850hadoop-0.20.2下自带的eclise插件没有用,需要 ... -
Hadoop SequcenceFile 处理多个小文件
2012-04-29 11:04 3838利用sequenceFile打包多个小文件,MapFile是s ... -
Hadoop 自定义计数器
2012-04-22 09:04 1454public static class mapper e ... -
MapReduce : 新版API 自定义InputFormat 把整个文件作为一条记录处理
2012-04-10 21:47 2263自定义InputFormat 新版API 把真个文件当成 ... -
Hadoop NameNode backup
2012-03-24 18:12 822NameNode: <property> ...
相关推荐
这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686
MapReduce: Simplified Data Processing on Large Clusters from google.
来自于GOOGLE的mapreduce的开山之作,此文是原英文的中文版本,希望能互相参照,加深理解
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
Each pattern is explained in context, with pitfalls and caveats clearly identified to help you avoid common design mistakes when modeling your big data architecture. This book also provides a complete...
MapReduce求行平均值--标准差--迭代器处理--MapReduce案例
在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词计数的基本思路和具体执行过程。下面将介绍如何编写具体实现代码及如何运行程序。 首先,在本地创建 3 个文件:file00l、file002 和 ...
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
mapreduce创建代码项目mvn原型:generate -DarchetypeGroupId = org.apache.maven.archetypes -DgroupId = org.conan.mymahout -DartifactId = myPro -DpackageName = org.conan.mymahout -Dversion = 1.0-SNAPSHOT ...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.6.5.jar 赠送原API文档:hadoop-mapreduce-client-core-2.6.5-javadoc.jar 赠送源代码:hadoop-mapreduce-client-core-2.6.5-sources.jar 包含翻译后的API文档:...
赠送jar包:hadoop-mapreduce-client-core-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
MapReduce求取行平均值 MapReduce小实例 数据有经过处理已经添加行号的 也有未添加的 行平均值的四种求法
Google并行计算,分布式处理模型MapReduce: Simplified Data Processing on Large Clusters
MapReduce: Simplified Data Processing on Large Clusters翻译
2010-05-21肖韬组会报告---MapReduce编程实例浅析
赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...