之前在网上看到了一篇使用MapReduce实现二次排序的博客,自己尝试实现了,并测试成功,代码有所改动。链接如下:
http://blog.csdn.net/zyj8170/article/details/7530728
所谓的二次排序:对Key和Val都进行排序(比如升序),并输出。对Key的自动排序,MapReduce可以替我们解决,但是同时对Val进行排序,则需要其他的做法。
做法一:对每个Key的所有Val,添加到ArrayList,使用Collections.sort方法进行排序,虽然能够实现,但是存在隐患,如果key的val值很多,那么对每个Key的ArrayList消耗的内存就很大,效率不高;
做法二:定制MapReduce的IO类型,定制GroupingComparator类,实现二次排序,通过定制,将默认的MR执行的方式改成自定义的,效率比较高,易于扩展;
现在开始实现做法二。
一、准备待排序的数据(数据跟原博客一样),见第三部分的结果对比:
二、代码如下
package com; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.StringTokenizer; public class SecondSort extends Configured implements Tool { //自定义的类型,(参见本博客的MapReduce定制类型) static class IntPair implements WritableComparable<IntPair> { private int a; private int b; public IntPair() { a = 0; b = 0; } public int getA() { return a; } public void setA(int a) { this.a = a; } public int getB() { return b; } public void setB(int b) { this.b = b; } public void set(int a, int b) { this.a = a; this.b = b; } @Override public int compareTo(IntPair o) { if (this.a == o.a) { if (this.b == o.b) return 0; else return this.b > o.b ? 1 : -1; } else return this.a > o.a ? 1 : -1; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(a); dataOutput.writeInt(b); } @Override public void readFields(DataInput dataInput) throws IOException { a = dataInput.readInt(); b = dataInput.readInt(); } } static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer strTok = new StringTokenizer(value.toString()); int a = Integer.parseInt(strTok.nextToken()); int b = Integer.parseInt(strTok.nextToken()); IntPair mykey = new IntPair(); mykey.set(a, b); context.write(mykey, new IntWritable(b)); } } static class MyKeyGroupComparator extends WritableComparator { MyKeyGroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; if (ip1.a == ip2.a) return 0; else return ip1.a > ip2.a ? 1 : -1; } } static class Reduce extends Reducer<IntPair, IntWritable, IntWritable, IntWritable> { @Override protected void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { IntWritable myKey = new IntWritable(key.getA()); //显式的分隔分组,便于查看 context.write(new IntWritable(999999999), null); for(IntWritable i :values){ context.write(myKey, i); } } } @Override public int run(String[] strings) throws Exception { //path是HDFS的路径字符串 String path = "/my/inputTest/Test_SecondSort.txt"; if (strings.length != 1) { System.out.println("input:" + path); System.out.print("arg:<out>"); return 1; } Configuration conf = getConf(); Job job = new Job(conf, "SecondSort"); job.setJarByClass(SecondSort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setGroupingComparatorClass(MyKeyGroupComparator.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(path)); FileOutputFormat.setOutputPath(job, new Path(strings[0])); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int rst = ToolRunner.run(conf, new SecondSort(), args); System.exit(rst); } }
三、见结果对比表,结果为结果数据1(999999999是Group分隔符):
四、如果将Group的标准从根据Inpair的a的值判断改成b的值判断,Group部分代码修改如下(其余不变):
static class MyKeyGroupComparator extends WritableComparator { MyKeyGroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; // if (ip1.a == ip2.a) // return 0; // else // return ip1.a > ip2.a ? 1 : -1; if (ip1.b == ip2.b) return 0; else return ip1.b > ip2.b ? 1 : -1; } }
结果见结果对比表的结果数据2.
五、结果对比表:
原始数据 | 结果数据1 | 结果数据2 | 结果说明 |
20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 70 57 70 58 1 2 3 4 5 6 7 82 203 21 50 512 50 522 50 53 530 54 40 511 20 53 20 522 60 56 60 57 740 58 63 61 730 54 71 55 71 56 73 57 74 58 12 211 31 42 50 62 7 8 |
999999999 1 2 999999999 3 4 999999999 7 8 7 82 999999999 12 211 999999999 20 21 20 53 20 522 999999999 31 42 999999999 40 511 999999999 50 51 50 52 50 53 50 54 …… |
999999999 1 2 999999999 3 4 999999999 7 8 999999999 7 82 999999999 12 211 999999999 20 21 999999999 20 53 999999999 20 522 999999999 31 42 999999999 40 511 999999999 50 51 999999999 50 52 999999999 50 53 999999999 50 54 …… |
结果1中,完全符合我们的要求,实现了二次排序,并将key为20的全部分到同一个分组中。
结果2中,将key为20的数据,由于b的值各不相同,因此又将其分成了三个Group;同时,对于黄色高亮的数据,b的值同样是53,却没有分到同一个分组。原因见GroupingComparator的作用。 |
六、GroupingComparator的作用
Job的API解释:
翻译之后,Group的作用是:在一个reduce调用的时候,通过这个Comparator来控制Key的分组。
但是还是不太清晰,有待进一步了解。
相关推荐
MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序
mapreduce二次排序,年份升序,按照年份聚合,气温降序
利用采样器实现mapreduce任务输出全排序大数据-MapReduce
mapreduce实现全栈排序,简单算法已经在文档中说明,想要了解的可以查看!
完整的二次排序具有多个层次的排序功能,可以有效提高系统的处理性能。 排序功能分别包括:排序分区、Key值排序、Key值分组 需要注意的是,这多个层次的排序功能均只能针对Key进行,而不能针对Value进行排序。在...
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
hadoop之MapReduce实现二度好友算法,包含输入数据demo,完整运算代码,在windows10下成功运行,输出结果为cat hello:2,hadoop:2,mr:1,world:1类似。
本次实验,在 Hadoop 平台上,使用 MapReduce 实现了数据的全局排序。将详细阐述了实现所需环境及过程。用阿里云服务器安装, OS: Ubuntu20.04 LTS . Hadoop 支持用三种模式启动:单机模式、伪分布式模式、分布式...
用MapReduce实现TF-IDF,Hadoop版本是2.7.7,参考某教程亲自手写的,可以运行,有问题可以留言
Hadoop 用mapreduce实现Wordcount实例,绝对能用
使用Hadoop MapReduce实现两个矩阵相乘算法
用MapReduce实现KMeans算法,数据的读写都是在HDFS上进行的,在伪分布下运行没有问题。文档中有具体说明。
主要为大家详细介绍了基于MapReduce实现决策树算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
Hadoop 大数据方向 mapreduce计算中的二次排序,讲解透彻
云计算MapReduce实现KNN算法,使用环境:在vmware虚拟机上安装unbuntu14系统,系统中安装hadoop。文件中包含有MapReduce以及KNN的java代码、包含训练数据的excel表格以及详细的教程文档,文档中手把手教到如何使用...
基于hadoop2.0,mapreduce实现朴素贝叶斯算法,源码,NaieBayes
Hadoop mapreduce 实现InvertedIndexer倒排索引,能用。
MapReduce实现单词分类,可直接运行。MapReduce实现单词分类,可直接运行。