`

MapReduce牛逼(3)(继承WritableComparable)实现自定义key键,实现二重排序

 
阅读更多

package sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class SortText {
	

	private static final String INPUT_PATH = "hdfs://hadoop.master:9000/data1";
	private static final String OUTPUT_PATH = "hdfs://hadoop.master:9000/outSort";

	public static void main(String[] args) throws Exception { 

		FileSystem fileSystem = FileSystem.get(new Configuration());
		boolean exists = fileSystem.exists(new Path(OUTPUT_PATH));
		if(exists){
			fileSystem.delete(new Path(OUTPUT_PATH),true);
		}
		
		Job job=new Job(new Configuration(),SortText.class.getName());
		job.setJarByClass(SortText.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		FileInputFormat.setInputPaths(job,new Path(INPUT_PATH));
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(MyKey.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		
//		job.setGroupingComparatorClass(cls);
		
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		
		job.waitForCompletion(true);
		
	}

	static class MyMapper extends
			Mapper<LongWritable, Text, MyKey, LongWritable> {
		protected void map(
				LongWritable key,
				Text value,
				org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, MyKey, LongWritable>.Context context)
				throws java.io.IOException, InterruptedException {

			String[] split = value.toString().split("\t");
			context.write(
					new MyKey(Long.parseLong(split[0]), Long
							.parseLong(split[1])),
					new LongWritable(Long.parseLong(split[1])));

		};
	}

	static class MyReducer extends
			Reducer<MyKey, LongWritable, LongWritable, LongWritable> {
		protected void reduce(
				MyKey arg0,
				java.lang.Iterable<LongWritable> arg1,
				org.apache.hadoop.mapreduce.Reducer<MyKey, LongWritable, LongWritable, LongWritable>.Context arg2)
				throws java.io.IOException, InterruptedException {

//			for (LongWritable w : arg1) {
				arg2.write(new LongWritable(arg0.k), new LongWritable(arg0.v));
//			}

		};
	}

	static class MyKey implements WritableComparable<MyKey> {

		long k;
		long v;

		public MyKey() {
		}

		public MyKey(long k, long v) {
			this.k = k;
			this.v = v;
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(k);
			out.writeLong(v);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.k = in.readLong();
			this.v = in.readLong();
		}

		@Override
		public int compareTo(MyKey o) {
			if (this.k == o.k) {
				return (int) (this.v - o.v);// v
			} else {
				return (int) (o.k - this.k);// k
			}
		}

		@Override
		public int hashCode() {
			final int prime = 31;
			int result = 1;
			result = prime * result + (int) (k ^ (k >>> 32));
			result = prime * result + (int) (v ^ (v >>> 32));
			return result;
		}

		@Override
		public boolean equals(Object obj) {
			if (this == obj)
				return true;
			if (obj == null)
				return false;
			if (getClass() != obj.getClass())
				return false;
			MyKey other = (MyKey) obj;
			if (k != other.k)
				return false;
			if (v != other.v)
				return false;
			return true;
		}

	}

}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics