`
zhangym195
  • 浏览: 121578 次
  • 性别: Icon_minigender_1
  • 来自: 黑龙江
社区版块
存档分类
最新评论

从海量订单中利用Map Reduce获取Top N的较优算法实现应用程序

阅读更多
package com.dt.spark.topn;

import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TopN {

	/**
	 * 
	 * 从所有订单日志中获取Top 5订单及付款金额
	 * @author yuming
	 * @ail: ymzhang@foxmail.com
	 * @weibo: http://www.weibo.com/yumzhang
	 */

	public static class ForTopNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
		int[] tops;
		int length;

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			length = context.getConfiguration().getInt("topN", 5); 
			tops = new int[length + 1];
		}
		/**
		 * 在Map阶段各个Map分别计算自己的Top N,减少网络传输的压力
		 * 减少数据量,提高Reduce处理的效率,防止少海量数据的OOM问题
		 */
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String[] data = value.toString().split(",");
			if (data != null && 4 == data.length) {
				int cost = Integer.valueOf(data[2]);
				tops[0] = cost;
				Arrays.sort(tops); //正向排序
			}
		}

		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			for (int i = 0; i < tops.length; i++) {
				context.write(new IntWritable(tops[i]), new IntWritable(tops[i]));
			}
		}
	}

	public static class ForTopNReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
		int[] tops;
		int length;

		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			length = context.getConfiguration().getInt("topN", 5); //default get Top 5
			tops = new int[length + 1];
		}

		public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			tops[0] = key.get();
			Arrays.sort(tops);
		}

		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			for (int i = length; i > 0; i--) {
				//对已经排序好的数组输出
				context.write(new IntWritable(length - i + 1), new IntWritable(tops[i]));
			}
		}
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		conf.setInt("topN", 5); 

		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length < 2) {
			System.err.println("Usage: TopN Rank <in> [<in>...] <out>");
			System.exit(2);
		}
		// set job
		Job job = new Job(conf, "Sorted TopN  Application");
		job.setJarByClass(TopN.class);

		// set Map、Combine and Reduce class
		job.setMapperClass(ForTopNMapper.class);
		// job.setCombinerClass(ForSortReducer .class);
		job.setReducerClass(ForTopNReducer.class);

		// set input output data format
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);

		// set path
		for (int i = 0; i < otherArgs.length - 1; ++i) {
			FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
		}
		
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}
}
/**
 * 
dataTopN1.txt
Id,custmId,pay,productId
1,9819,100,121
2,8918,2000,111
3,2813,1234,22
4,9100,10,1101
5,3210,490,111
6,1298,28,1211
7,1010,281,90
8,1818,9000,20
----------------
dataTopN.txt
10,2222,10,1000
11,9321,1000,293
12,3881,999,328
13,8328,1000,66
 */

 一图顶千言:

   充分利用Map的集群效应,在Map阶段将处理各自处理自己的Top N,然后将数据输出给Reducer,提高集群利用率,防止OOM发生,可能还有更优算法,如果哪位有也可以回复我。

 

  • 大小: 24.2 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics