- 浏览: 527868 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (740)
- css (4)
- jquery (8)
- javascript (23)
- html (0)
- uml (0)
- 设计模式 (1)
- 开发工具 (14)
- json (4)
- struts 1.x (3)
- spring (3)
- hibernate (6)
- struts 2.x (17)
- JFreechart (0)
- j2se (48)
- jsp (9)
- flex (22)
- 找工作 (1)
- 技术杂谈 (18)
- 网络编程 (5)
- io流 (1)
- ORACLE (15)
- 报表 (3)
- extjs (11)
- jpbm (2)
- swing (5)
- jspereports (3)
- sql (1)
- linux (15)
- ps (1)
- storm (4)
- hbase (8)
- li (0)
- python (1)
- hive (3)
- 机器学习 (1)
- hdfs (1)
- elasticsearch (1)
- hadoop 2.2 (5)
- hadoop (1)
最新评论
-
Tristan_S:
这个有点意思
ASM -
starryskydog:
程序修改detail band部分的样式 如内容字体大小 ...
使用jasperReport实现动态表头 -
samwong:
Good, so usefule
使用YUI Compressor压缩CSS/JS -
gc715409742:
能够告诉我怎么在web项目中使用YUI Compressor? ...
使用YUI Compressor压缩CSS/JS -
JsonTeye:
您好! 我看你的代码,我现在也在做动态报表,实现功能由用户自己 ...
使用jasperreport动态生成pdf,excel,html
hadoop二次排序(合集)
1.原理
在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。
在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。
2.步骤
(1)自定义key
所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法
@Override
public void write(DataOutput out) {}
@Override
public void readFields(DataInput in) {}
@Override
public int hashCode() {}
@Override
public boolean equals(Object right)
@Override
public int compareTo(StringPair o) {}
(2)由于key是自定义的,所以还需要自定义一下类:
(2.1)分区函数类。这是key的第一次比较。
static class FirstPartitioner extends HashPartitioner<StringPair, LongWritable>
在job中使用setPartitionerClasss设置Partitioner。
(2.2)key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。
public static class SortComparator extends WritableComparator
必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
另一种方法是 实现接口RawComparator。
在job中使用setSortComparatorClass设置key比较函数类。
(2.3)分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。
分组函数类也必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
分组函数类的另一种方法是实现接口RawComparator。
在job中使用setGroupingComparatorClass设置分组函数类。
另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。
例子1:package example;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Text.Comparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import util.HbaseServiceUtil;
public class TwoSortMR {
static class TwoSortMapper extends
Mapper<ImmutableBytesWritable, Result, Text, LongWritable> {
HTable htable;
HTable htable1;
byte[] family = Bytes.toBytes("baseInfo");
HbaseServiceUtil u;
protected void setup(Context context) throws IOException,
InterruptedException {
u = new HbaseServiceUtil();
this.htable = u.getHtable("wb_hbase_relation_attentions", 1);
this.htable1 = u.getHtable("wb_hbase_user", 1);
}
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
String uid = new String(key.get());
try {
int cachsize = 5000;
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(uid + "_"));
scan.setStopRow(Bytes.toBytes(uid + "_sz"));
scan.setCaching(cachsize);
ResultScanner scanner = htable.getScanner(scan);
Result[] results = scanner.next(cachsize);
// 判断用户是否有对应的微博信息
if (results.length <= 0) {
return;
}
for (Result result : results) {
if (result.isEmpty()) {
continue;
}
byte[] obj = result.getValue(family, Bytes.toBytes("uid2"));
if (obj == null) {
continue;
}
String uid2 = new String(obj);
Get get = new Get(uid2.getBytes());
Result result1 = htable1.get(get);
obj = result1.getValue(family, Bytes.toBytes("fansCount"));
if (obj == null) {
continue;
}
String fansCount = new String(obj);
if (!fansCount.matches("[0123456789].*")) {
continue;
}
long aa = Long.parseLong(fansCount);
try {
context.write(new Text(uid+"|"+aa), new LongWritable(aa));
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class TwoSortReducer extends
Reducer<Text, LongWritable, NullWritable, Put> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
// long max = values.iterator().next().get();
StringBuffer max = new StringBuffer();
for (LongWritable value : values) {
max.append(value.get()+"|");
}
System.out.println(key.toString()+" "+max.toString());
Put put = new Put(Bytes.toBytes(key.toString().split("\\|")[0]));
put.add("baseInfo".getBytes(), "maxFansCount".getBytes(),
(max.toString() + "").getBytes());
context.write(NullWritable.get(), put);
}
}
// map阶段的最后会对整个map的List进行分区,每个分区映射到一个reducer
static class FirstPartitioner extends HashPartitioner<Text, LongWritable> {
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
return (key.toString().split("\\|")[0].hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
}
// 每个分区内又调用job.setSortComparatorClass或者key的比较函数进行排序
public static class SortComparator extends WritableComparator {
protected SortComparator() {
super(Text.class, true);
}
// 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
// 这里是先比较用户id,再比较粉丝数,按粉丝数降序排序
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
Text t1 =(Text)w1;
Text t2 =(Text)w2;
long a1 =Long.parseLong(t1.toString().split("\\|")[0].replaceAll("si", ""));
long a2 = Long.parseLong(t2.toString().split("\\|")[0].replaceAll("si", ""));
long a3 = Long.parseLong(t1.toString().split("\\|")[1]);
long a4 = Long.parseLong(t2.toString().split("\\|")[1]);
int cmp = TwoSortMR.compare(a1, a2);
if (cmp != 0) {
return cmp;
}
return -TwoSortMR.compare(a3, a4); //reverse
}
}
// 只要这个比较器比较的两个key相同,他们就属于同一个组.
// 它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key
public static class GroupingComparator extends WritableComparator {
protected GroupingComparator() {
super(Text.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
// return w1.toString().split("\\|")[0].compareTo(w2.toString().split("\\|")[0]);
Text t1 =(Text)w1;
Text t2 =(Text)w2;
long l = Long.parseLong(t1.toString().split("\\|")[0].replaceAll("si", ""));
long r = Long.parseLong(t2.toString().split("\\|")[0].replaceAll("si", ""));
return TwoSortMR.compare(l, r);
}
}
public static int compare(long left, long right) {
// TODO Auto-generated method stub
return left > right ? 1 : (left == right ? 0 : -1);
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = HbaseServiceUtil.getConfiguration();
HbaseServiceUtil.setConf(conf, "hdfs4");
String inputTableName = "analyzer_wuqilong1";
String OutputTableName = "analyzer_wuqilong3";
Scan scan = new Scan();
// scan.setStartRow("si".getBytes());
// scan.setStopRow("0000000000000si\uFFFF".getBytes()); //��Ҫ�Ӵ�Χ��
scan.setCaching(100);
scan.setCacheBlocks(false); // don't set to true for MR jobs
conf.set(TableInputFormat.SCAN,
HbaseServiceUtil.convertScanToString(scan));
conf.set(TableInputFormat.INPUT_TABLE, inputTableName);
conf.set(TableOutputFormat.OUTPUT_TABLE, OutputTableName);
Job job = new Job(conf);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.setMapOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
job.setMapperClass(TwoSortMapper.class);
job.setReducerClass(TwoSortReducer.class);
// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(SortComparator.class);
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);
job.setNumReduceTasks(5);
job.setJarByClass(TwoSortMR.class);
job.setJobName("Test2Sort");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
例子2: package temp;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class StringPair implements WritableComparable<StringPair> {
private String first;
private long second;
public StringPair() {
}
public void set(String first, long second) {
this.first = first;
this.second = second;
}
public String toString() {
return first + "|" + second;
}
public String getFirst() {
return first;
}
public long getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
int length = first.length();
byte[] buf = first.getBytes();
// 先写字符串长度
out.writeInt(length);
// 再写字符串数据
out.write(buf, 0, length);
// 接着long
out.writeLong(second);
}
@Override
public void readFields(DataInput in) throws IOException {
// 先写字符串的长度信息
int length = in.readInt();
byte[] buf = new byte[length];
in.readFully(buf, 0, length);
first = new String(buf);
second = in.readLong();
}
@Override
public int hashCode() {
return first.hashCode();
}
@Override
public boolean equals(Object right) {
return first.equals(((StringPair) right).first);
}
@Override
public int compareTo(StringPair o) {
int c1 = StringPair.compare(getFirst(), o.getFirst());
if (c1 != 0) {
return c1;
} else {
return StringPair.compare(getSecond(), o.getSecond());
}
}
public static int compare(StringPair o1, StringPair o2) {
int c1 = StringPair.compare(o1.getFirst(), o2.getFirst());
if (c1 != 0) {
return c1;
} else {
return -StringPair.compare(o1.getSecond(), o2.getSecond());
}
}
public static int compare(long left, long right) {
return left > right ? 1 : (left == right ? 0 : -1);
}
public static int compare(String left, String right) {
return left.compareTo(right);
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import util.HbaseServiceUtil;
public class TwoSortMR {
static class TwoSortMapper extends
Mapper<ImmutableBytesWritable, Result, StringPair, LongWritable> {
HTable htable;
HTable htable1;
byte[] family = Bytes.toBytes("baseInfo");
HbaseServiceUtil u;
protected void setup(Context context) throws IOException,
InterruptedException {
u = new HbaseServiceUtil();
this.htable = u.getHtable("wb_hbase_relation_attentions", 1);
this.htable1 = u.getHtable("wb_hbase_user", 1);
}
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
String uid = new String(key.get());
try {
int cachsize = 5000;
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(uid + "_"));
scan.setStopRow(Bytes.toBytes(uid + "_sz"));
scan.setCaching(cachsize);
ResultScanner scanner = htable.getScanner(scan);
Result[] results = scanner.next(cachsize);
// 判断用户是否有对应的微博信息
if (results.length <= 0) {
return;
}
for (Result result : results) {
if (result.isEmpty()) {
continue;
}
byte[] obj = result.getValue(family, Bytes.toBytes("uid2"));
if (obj == null) {
continue;
}
String uid2 = new String(obj);
Get get = new Get(uid2.getBytes());
Result result1 = htable1.get(get);
obj = result1.getValue(family, Bytes.toBytes("fansCount"));
if (obj == null) {
continue;
}
String fansCount = new String(obj);
if (!fansCount.matches("[0123456789].*")) {
continue;
}
long aa = Long.parseLong(fansCount);
try {
StringPair strs = new StringPair();
strs.set(uid,aa);
context.write(strs, new LongWritable(aa));
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class TwoSortReducer extends
Reducer<StringPair, LongWritable, NullWritable, Put> {
@Override
protected void reduce(StringPair key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
// long max = values.iterator().next().get();
StringBuffer max = new StringBuffer();
for (LongWritable value : values) {
max.append(value.get()+"|");
}
System.out.println(key.toString()+" "+max.toString());
Put put = new Put(Bytes.toBytes(key.getFirst()));
put.add("baseInfo".getBytes(), "maxFansCount".getBytes(),
(max.toString() + "").getBytes());
context.write(NullWritable.get(), put);
}
}
// map阶段的最后会对整个map的List进行分区,每个分区映射到一个reducer
static class FirstPartitioner extends HashPartitioner<StringPair, LongWritable> {
@Override
public int getPartition(StringPair key, LongWritable value, int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE)
% numPartitions;
}
}
// 每个分区内又调用job.setSortComparatorClass或者key的比较函数进行排序
public static class SortComparator extends WritableComparator {
protected SortComparator() {
super(StringPair.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
StringPair t1 = (StringPair) a;
StringPair t2 = (StringPair) b;
return StringPair.compare(t1, t2);
}
}
// 只要这个比较器比较的两个key相同,他们就属于同一个组.
// 它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key
public static class GroupingComparator extends WritableComparator {
protected GroupingComparator() {
super(StringPair.class, true);
}
// 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
// 这里是先比较用户id,再比较粉丝数,按粉丝数降序排序
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
StringPair strs1 = (StringPair) a;
StringPair strs2 = (StringPair) b;
return StringPair.compare(strs1.getFirst(), strs2.getFirst());
}
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = HbaseServiceUtil.getConfiguration();
HbaseServiceUtil.setConf(conf, "hdfs4");
String inputTableName = "analyzer_wuqilong1";
String OutputTableName = "analyzer_wuqilong3";
Scan scan = new Scan();
// scan.setStartRow("si".getBytes());
// scan.setStopRow("0000000000000si\uFFFF".getBytes()); //��Ҫ�Ӵ�Χ��
scan.setCaching(100);
scan.setCacheBlocks(false); // don't set to true for MR jobs
conf.set(TableInputFormat.SCAN,
HbaseServiceUtil.convertScanToString(scan));
conf.set(TableInputFormat.INPUT_TABLE, inputTableName);
conf.set(TableOutputFormat.OUTPUT_TABLE, OutputTableName);
Job job = new Job(conf);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.setMapOutputValueClass(LongWritable.class);
job.setMapOutputKeyClass(StringPair.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
job.setMapperClass(TwoSortMapper.class);
job.setReducerClass(TwoSortReducer.class);
// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(SortComparator.class);
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);
job.setNumReduceTasks(5);
job.setJarByClass(TwoSortMR.class);
job.setJobName("TestCombine");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
相关推荐
Hadoop 大数据方向 mapreduce计算中的二次排序,讲解透彻
hadoop分区二次排序示例,对基站数据,按电话号码升序、到达时间降序进行排序
hadoop分区二次排序代码示例,包含基站数据集,对基站数据,按电话号码升序、到达时间降序进行排序,只需打包成jar,即可在hadoop集群中运行
NULL 博文链接:https://xjward.iteye.com/blog/1816821
主要介绍了hadoop二次排序的原理和实现,本文通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
mapreduce二次排序,年份升序,按照年份聚合,气温降序
大数据学习资料全排序二次排序
完整的二次排序具有多个层次的排序功能,可以有效提高系统的处理性能。 排序功能分别包括:排序分区、Key值排序、Key值分组 需要注意的是,这多个层次的排序功能均只能针对Key进行,而不能针对Value进行排序。在...
二次排序 联接 map端联接 reduce端联接 边数据分布 利用JobConf来配置作业 分布式缓存 MapReduce库类 第9章 构建Hadoop集群 集群规范 网络拓扑 集群的构建和安装 安装Java 创建Hadoop用户...
二次排序 联接 map端联接 reduce端联接 边数据分布 利用JobConf来配置作业 分布式缓存 MapReduce库类 第9章 构建Hadoop集群 集群规范 网络拓扑 集群的构建和安装 安装Java 创建Hadoop用户 安装Hadoop 测试安装 SSH...
join技术点20 实现semi-join4.1.4 为你的数据挑选最优的合并策略4.2 排序4.2.1 二次排序技术点21 二次排序的实现4.2.2 整体并行排序技术点22 通过多个reducer 对key 进行排序4.3 抽样技术点23 蓄水...
目录 第1章二次排序:简介 19 第2章二次排序:详细示例 42 第3章 Top 10 列表 54 第4章左外连接 96 第5章反转排序 127 第6章移动平均 137 第7章购物篮分析 155 第8章共同好友 182 第9章使用MapReduce实现推荐引擎 ...
10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 - 12、Mahout Kmeans简介 .................................... - 57 -
10、Mapreduce中value集合的二次排序 ....................... - 38 - 11、Hive SQL手册翻译 ................................... - 47 12、Mahout Kmeans简介 .................................... - 57 -
技术点21 二次排序的实现 4.2.2 整体并行排序 技术点22 通过多个reducer 对key 进行排序 4.3 抽样 技术点23 蓄水池抽样(reservoir 抽样) 4.4 本章小结 5 优化HDFS 处理大数据的技术 5.1 处理小文件 ...
四川大学IT企业实训,拓思爱诺大数据第二次作业,MapReduce编程,包括Hadoop wordcount程序,及flowcount流量统计程序,包括重写排序及分区函数
本文将介绍一种通过使用地理网格进行数据关联,并利用Shuffle过程的二次排序实现高效的统计各条道路上位置点分布情况的方法。交通领域正产生着海量的车辆位置点数据。将这些车辆位置信息和道路进行关联的统计操作则...
好评 住 过 几次 东莞 酒店 海悦 地理位置 早餐 最棒 听说 朋友 说 请来 厨师 来头 呵呵 冲 这个 去 好评 酒店设施 比较 不错 就是 携程 价格 酒店 前台 一样 没有 竞争力 好评 房间 不算 大 中规中矩 北方 服务 ...
03_MapReduce 二次排序回顾及Reduce Join实现详解 04_MapReduce 中Map Join实现思路及伪代码详解 05_Hive重点知识回顾总结及小表与大表关联时MapJoin优化 06_Hive中大表与大表关联时SMB Join优化 07_Hive中高级...