`
hugh.wangp
  • 浏览: 288793 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

MapReduce的排序和二次排序

阅读更多

 

自己学习排序和二次排序的知识整理如下。
1.Hadoop的序列化格式介绍:Writable
2.Hadoop的key排序逻辑
3.全排序
4.如何自定义自己的Writable类型
5.如何实现二次排序


1.Hadoop的序列化格式介绍:Writable
要了解和编写MR实现排序必须要知道的第一个知识点就是Writable相关的接口和类,这些是HADOOP自己的序列化格式。更多的可能是要关注他的Subinterfaces:WritableComparable<T>。他是继承Writable和Comparable<T>接口,继而WritableComparable<T>的实现除了具有序列化特性,更重要的是具有了比较的特性,而比较的特性在MapReduce里是很重要的,因为MR中有个基于键的排序过程,所以可以作为键的类型必须具有Comparable<T>的特性。
除了WritableComparable接口外,还有一个接口RawComparaotor。
WritableComparable和RawComparator两个接口的区别是:
WritableComparable是需要把数据流反序列化为对象后,然后做对象之间的比较,而RawComparator是直接比较数据流的数据,不需要数据流反序列化成对象,省去了新建对象的开销。

2.Hadoop的key排序逻辑
Hadoop本身Key的数据类型的排序逻辑其实就是依赖于Hadoop本身的继承与WritableComparable<T>的基本数据类型和其他类型(相关类型可参考《Hadoop权威指南》第二版的90页)的compareTo方法的定义。
Key排序的规则:
1.如果调用jobconf的setOutputKeyComparatorClass()设置mapred.output.key.comparator.class
2.否则,使用key已经登记的comparator
3.否则,实现接口WritableComparable的compareTo()函数来操作
例如IntWritable的比较算法如下:
public int compareTo(Object o) {
    int thisValue = this.value;
    int thatValue = ((IntWritable)o).value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  }
 
可以修改compareTo来实现自己所需的比较算法。

虽然我们知道是compareTo这个方法实现Key的排序,但其实我们在使用Hadoop的基本数据类型时不需要关注这个排序如何实现,因为Hadoop的框架会自动调用compareTo这个方法实现key的排序。但是这个排序只是局限在map或者reduce内部。针对于map与map,reduce与reduce之间的排序compareTo就管不着了,虽然这种情况不常出现,但是确实存在这种问题的,而且确实有适用场景,比如说全排序。

3.全排序
这里就需要关注Partition这个阶段,Partition阶段是针对每个Reduce,需要创建一个分区,然后把Map的输出结果映射到特定的分区中。这个分区中可能会有N个Key对应的数据,但是一个Key的所有数据只能在一个分区中。在实现全排序的过程中,如果只有一个reduce,也就是只有一个Partition,那么所有Map的输出都会经过一个Partition到一个reduce里,在一个reduce里可以根据compareTo(也可以采用其他比较算法)来排序,实现全排序。但是这种情况就让MapReduce失去了分布式计算的光环。
所以全排序的大概思路为:确保Partition之间是有序的就OK了,即保证Partition1的最大值小于Partition2的最小值就OK了,即便这样做也还是有个问题:Partition的分布不均,可能导致某些Partition处理的数据量远大于其他Partition处理的数据量。而实现全排序的核心步骤为:取样和Partition。
先“取样”,保证Partition得更均匀: 
1) 对Math.min(10, splits.length)个split(输入分片)进行随机取样,对每个split取10000个样,总共10万个样
2) 10万个样排序,根据reducer的数量(n),取出间隔平均的n-1个样
3) 将这个n-1个样写入partitionFile(_partition.lst,是一个SequenceFile),key是取的样,值是nullValue
4) 将partitionFile写入DistributedCache 
整个全排序的详细介绍可参照:http://www.iteye.com/topic/709986

4.如何自定义自己的Writable类型
自定义自己的Writable类型的场景应该很简单:Hadoop自带的数据类型要么在功能上不能满足需求,要么在性能上满足需求,毕竟Hadoop还在发展,不是所有情况都考虑的,但是他提供了自主的框架实现我们想要的功能。
定义自己的Writable类型需要实现:
a.重载构造函数
b.实现set和get方法
c.实现接口的方法:write()、readFields()、compareTo()
d.(可选)相当于JAVA构造的对象,重写java.lang.Object的hashCode()、equals()、toString()。Partition阶段默认的hashpartitioner会根据hashCode()来选择分区,如果不要对自定义类型做key进行分区,hashCode()可不实现
具体例子可参考hadoop的基本类型IntWritable的实现
public class IntWritable implements WritableComparable {
  private int value;

  public IntWritable() {}

  public IntWritable(int value) { set(value); }

  /** Set the value of this IntWritable. */
  public void set(int value) { this.value = value; }

  /** Return the value of this IntWritable. */
  public int get() { return value; }

  public void readFields(DataInput in) throws IOException {
    value = in.readInt();
  }

  public void write(DataOutput out) throws IOException {
    out.writeInt(value);
  }

  /** Returns true iff <code>o</code> is a IntWritable with the same value. */
  public boolean equals(Object o) {
    if (!(o instanceof IntWritable))
      return false;
    IntWritable other = (IntWritable)o;
    return this.value == other.value;
  }

  public int hashCode() {
    return value;
  }

  /** Compares two IntWritables. */
  public int compareTo(Object o) {
    int thisValue = this.value;
    int thatValue = ((IntWritable)o).value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  }

  public String toString() {
    return Integer.toString(value);
  }
}
 

5.如何实现二次排序
二次排序的工作原理涉及到如下几方面:
a.创建key的数据类型,key要包括两次排序的元素
b.setPartitionerClass(Class<? extends Partitioner> theClass)
hadoop0.20.0以后的函数为setPartitionerClass
c.setOutputKeyComparatorClass(Class<? extends RawComparator> theClass)
hadoop0.20.0以后的函数为setSortComparatorClass
d.setOutputValueGroupingComparator(Class<? extends RawComparator> theClass)
hadoop0.20.0以后的函数为setGroupingComparatorClass

根据hadoop自己提供的example:org.apache.hadoop.examplesSecondarySort来说明二次排序具体是如何实现的.

SecondarySort实现IntPair、FirstPartitioner、FirstGroupingComparator、MapClass、Reduce这几个内部类,然后在main函数中调用。先说明下main函数中有哪些地方和普通的MR代码不同。
不同点是多了这两个set:
job.setPartitionerClass(FirstPartitioner.class);
设置自定义的Partition操作,在此是调用我们自定义的内部类FirstPartitioner
job.setGroupingComparatorClass(FirstGroupingComparator.class);
设置哪些value进入哪些key的迭代器中,在此是调用自定义的内部类FirstGroupingComparator

具体的操作逻辑为:

a.定义一个作为key的类型IntPair,在IntPair中有两个变量first、second,SecondarySort就是在对first排序后再对second再排序处理
b.定义分区函数类FirstPartitioner,Key的第一次排序。在FirstPartitioner实现如何处理key的first,把key对应的数据划分到不同的分区中。这样key中first相同的value会被放在同一个reduce中,在reduce中再做第二次排序 
c(代码没有实现,其实内部是有处理).key比较函数类,key的第二次排序,是继承WritableComparator的一个比较器。setSortComparatorClass可以实现。
为什么没有使用setSortComparatorClass()是因为hadoop对key排序的规则(参看2.Hadoop的key排序逻辑)决定的。由于我们在IntPair中已经定义了compareTo()函数。
d.定义分组函数类FirstGroupingComparator,保证只要key的的第一部分相同,value就进入key的value迭代器中。
0
0
分享到:
评论
1 楼 mojunbin 2012-12-02  
呵呵,整体思路都讲到了哦。

相关推荐

    mapreduce二次排序

    mapreduce二次排序,年份升序,按照年份聚合,气温降序

    MapReduce二次排序

    MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序

    MapReduce模型--二次排序

    完整的二次排序具有多个层次的排序功能,可以有效提高系统的处理性能。 排序功能分别包括:排序分区、Key值排序、Key值分组 需要注意的是,这多个层次的排序功能均只能针对Key进行,而不能针对Value进行排序。在...

    hadoop 二次排序 原理

    Hadoop 大数据方向 mapreduce计算中的二次排序,讲解透彻

    拓思爱诺大数据-第二次作业MapReduce编程

    四川大学IT企业实训,拓思爱诺大数据第二次作业,MapReduce编程,包括Hadoop wordcount程序,及flowcount流量统计程序,包括重写排序及分区函数

    MapReduce的小应用

    利用MapReduce框架实现了关于音乐播放网站的两个简单问题。主要解决了多个Map多个Reduce的连接问题,二次排序问题,关于Key降序排序的问题。

    mapreduce secondarysort

    mapreduce的二次排序,稍微有点难度,帮助你更好的理解它

    java大数据作业_5Mapreduce、数据挖掘

    课后作业 1.请找出日志中的访问者ip,访问时间,来源...6.简述二次排序算法 有输入数据如下所示: 1 2 2 3 2 1 4 6 3 1 3 8 3 2 需要使用二次排序算法,得到如下处理结果: 1 2 2 1 2 3 3 1 3 2 3 8 4 6 请简述处理过程

    mapreduce高级特性2

    mr各种应用场景的例子,1.1 内存排序1.2 mr数据类型1.3 自定义mr数据类型1.4 使用自定义数据实现内存排序1.5 二次排序1.6 使用自定义mr数据类型实现二次排序1.7 内存排序找出每一组中的最大值1.8 排序找出每一组中的...

    大数据技术 Hadoop开发者第二期 MapReduce HDFS Hive Mahout HBase 共64页.pdf

    10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 - 12、Mahout Kmeans简介 .................................... - 57 -

    Hadoop权威指南 第二版(中文版)

     二次排序  联接  map端联接  reduce端联接  边数据分布  利用JobConf来配置作业  分布式缓存  MapReduce库类 第9章 构建Hadoop集群  集群规范  网络拓扑  集群的构建和安装  安装Java  创建Hadoop用户...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    技术点21 二次排序的实现 4.2.2 整体并行排序 技术点22 通过多个reducer 对key 进行排序 4.3 抽样 技术点23 蓄水池抽样(reservoir 抽样) 4.4 本章小结 5 优化HDFS 处理大数据的技术 5.1 处理小文件 ...

    《数据算法Hadoop Spark大数据处理技巧》PDF 带目录!!

    目录 第1章二次排序:简介 19 第2章二次排序:详细示例 42 第3章 Top 10 列表 54 第4章左外连接 96 第5章反转排序 127 第6章移动平均 137 第7章购物篮分析 155 第8章共同好友 182 第9章使用MapReduce实现推荐引擎 ...

    Hadoop实战(第2版)

    大数据模式4 处理大数据的MapReduce 模式4.1 Join4.1.1 Repartition Join技术点19 优化repartition join 4.1.2 Replicated Join 4.1.3 Semi-join技术点20 实现semi-join4.1.4 为你的数据挑选最优的...

    Hadoop权威指南(中文版)2015上传.rar

    二次排序 联接 map端联接 reduce端联接 边数据分布 利用JobConf来配置作业 分布式缓存 MapReduce库类 第9章 构建Hadoop集群 集群规范 网络拓扑 集群的构建和安装 安装Java 创建Hadoop用户 安装Hadoop 测试安装 SSH...

    大数据导论-6.1.4-熟悉大数据处理技术——大数据的处理模式.pptx

    一次MapReduce处理引擎的运行被称为MapReduce作业,它由映射(Map)和归约(Reduce)两部分任务组成,这两部分任务又被分为多个阶段。 一个作业 = 映射 + 归约 其中映射任务被分为映射(map)、合并(combine)和...

    hadoop开发者文档

    10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 12、Mahout Kmeans简介 .................................... - 57 -

    大数据框架(HADOOP、HIVE、HBASE)优化和简历项目编写(视频+讲义+笔记)

    03_MapReduce 二次排序回顾及Reduce Join实现详解 04_MapReduce 中Map Join实现思路及伪代码详解 05_Hive重点知识回顾总结及小表与大表关联时MapJoin优化 06_Hive中大表与大表关联时SMB Join优化 07_Hive中高级...

    代码之美(中文完整版).pdf

    5.6 版本4:第二次优化:避免重复验证 5.7 版本5:第三次优化:复杂度 O(1) 5.8 版本 6:第四次优化:缓存(Caching) 5.9 从故事中学到的 第6章 集成测试框架:脆弱之美 6.1. 三个类搞定一个验收测试框架 6.2. 框架...

    2017最新大数据架构师精英课程

    94_job二次排序5 t3 Z2 R- ]( a: s* c0 Z 95_从db输入数据进行mr计算: L. M4 I6 y, R2 l/ u/ L 96_输出数据到db中 97_NLineInputFormat& u( k1 T& z( O# P, S* y1 Y 98_KeyValueTextInputFormat* p$ O1 z- h, n" e( ...

Global site tag (gtag.js) - Google Analytics