package sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class SortText {
private static final String INPUT_PATH = "hdfs://hadoop.master:9000/data1";
private static final String OUTPUT_PATH = "hdfs://hadoop.master:9000/outSort";
public static void main(String[] args) throws Exception {
FileSystem fileSystem = FileSystem.get(new Configuration());
boolean exists = fileSystem.exists(new Path(OUTPUT_PATH));
if(exists){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
Job job=new Job(new Configuration(),SortText.class.getName());
job.setJarByClass(SortText.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job,new Path(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(LongWritable.class);
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
// job.setGroupingComparatorClass(cls);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
}
static class MyMapper extends
Mapper<LongWritable, Text, MyKey, LongWritable> {
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, MyKey, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
String[] split = value.toString().split("\t");
context.write(
new MyKey(Long.parseLong(split[0]), Long
.parseLong(split[1])),
new LongWritable(Long.parseLong(split[1])));
};
}
static class MyReducer extends
Reducer<MyKey, LongWritable, LongWritable, LongWritable> {
protected void reduce(
MyKey arg0,
java.lang.Iterable<LongWritable> arg1,
org.apache.hadoop.mapreduce.Reducer<MyKey, LongWritable, LongWritable, LongWritable>.Context arg2)
throws java.io.IOException, InterruptedException {
// for (LongWritable w : arg1) {
arg2.write(new LongWritable(arg0.k), new LongWritable(arg0.v));
// }
};
}
static class MyKey implements WritableComparable<MyKey> {
long k;
long v;
public MyKey() {
}
public MyKey(long k, long v) {
this.k = k;
this.v = v;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(k);
out.writeLong(v);
}
@Override
public void readFields(DataInput in) throws IOException {
this.k = in.readLong();
this.v = in.readLong();
}
@Override
public int compareTo(MyKey o) {
if (this.k == o.k) {
return (int) (this.v - o.v);// v
} else {
return (int) (o.k - this.k);// k
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (k ^ (k >>> 32));
result = prime * result + (int) (v ^ (v >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
MyKey other = (MyKey) obj;
if (k != other.k)
return false;
if (v != other.v)
return false;
return true;
}
}
}
分享到:
相关推荐
实现mr的wordcount功能和自定义分区的功能、自定义排序功能;com.ellis.mr1为类似wc功能,com.ellis.mr2为自定义分区功能,com.ellis.mr3为自定义排序功能
NULL 博文链接:https://username2.iteye.com/blog/2274802
16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN 网址:https://blog.csdn.net/chenwewi520feng/article/details/130454036 本文介绍MapReduce常见的基本用法。 前提是hadoop环境可正常运行。 ...
自定义MapReduce的InputFormat,实现提取指定开始与结束限定符的内容。
mapreduce实现全栈排序,简单算法已经在文档中说明,想要了解的可以查看!
这段Java代码是一个Hadoop MapReduce程序,用于处理输入数据并计算每个不同词汇的最高分数。它包含了配置和运行MapReduce作业的逻辑,以及Mapper和Reducer类的定义。主要功能是读取输入数据,将数据拆分成词汇和相关...
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
MapReduce之自定义 OutPutFormat,通过一个案例,实现自定义的一个OutPutFormat,来更加的深刻的理解MR的过程
对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。
MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序
利用采样器实现mapreduce任务输出全排序大数据-MapReduce
使用Hadoop MapReduce实现两个矩阵相乘算法
Hadoop 用mapreduce实现Wordcount实例,绝对能用
在 Mapreduce 中,如果需要自定义类的排序规则,需要让类实现 Writable 的子接口 WritableComparable,重写里面的 write, readFields 6 和 compareTo 方法,所以可以自定义一个类作为 key,类中包含 2 个 需要进行...
本书以Hadoop 1.0为基础,深入剖析了Hadoop MapReduce中各个组件的实现细节,包括RPC框架、JobTracker实现、TaskTracker实现、Task实现和作业调度器实现等。书中不仅详细介绍了MapReduce各个组件的内部实现原理,...
包含knn mapreduce环境下的实现全部代码和自定义输入输出文件格式。以及实验数据集。
《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》内容简介:“Hadoop技术内幕”共两册,分别从源代码的角度对“Common+HDFS”和“MapReduce的架构设计和实现原理”进行了极为详细的分析。《Hadoop技术内幕:...
“Hadoop技术内幕”共两册,分别从源代码的角度对“Common+HDFS”和“MapReduce的架构设计和实现原理”进行了极为详细的分析。《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》由Hadoop领域资深的实践者...
《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》由Hadoop领域资深的实践者亲自执笔,首先介绍了MapReduce的设计理念和编程模型,然后从源代码的角度深入分析了RPC框架、客户端、JobTracker、TaskTracker和...
mapreduce二次排序,年份升序,按照年份聚合,气温降序