package com.dt.spark.topn; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class TopN { /** * * 从所有订单日志中获取Top 5订单及付款金额 * @author yuming * @ail: ymzhang@foxmail.com * @weibo: http://www.weibo.com/yumzhang */ public static class ForTopNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { int[] tops; int length; @Override protected void setup(Context context) throws IOException, InterruptedException { length = context.getConfiguration().getInt("topN", 5); tops = new int[length + 1]; } /** * 在Map阶段各个Map分别计算自己的Top N,减少网络传输的压力 * 减少数据量,提高Reduce处理的效率,防止少海量数据的OOM问题 */ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] data = value.toString().split(","); if (data != null && 4 == data.length) { int cost = Integer.valueOf(data[2]); tops[0] = cost; Arrays.sort(tops); //正向排序 } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (int i = 0; i < tops.length; i++) { context.write(new IntWritable(tops[i]), new IntWritable(tops[i])); } } } public static class ForTopNReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { int[] tops; int length; @Override protected void setup(Context context) throws IOException, InterruptedException { length = context.getConfiguration().getInt("topN", 5); //default get Top 5 tops = new int[length + 1]; } public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { tops[0] = key.get(); Arrays.sort(tops); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (int i = length; i > 0; i--) { //对已经排序好的数组输出 context.write(new IntWritable(length - i + 1), new IntWritable(tops[i])); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInt("topN", 5); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: TopN Rank <in> [<in>...] <out>"); System.exit(2); } // set job Job job = new Job(conf, "Sorted TopN Application"); job.setJarByClass(TopN.class); // set Map、Combine and Reduce class job.setMapperClass(ForTopNMapper.class); // job.setCombinerClass(ForSortReducer .class); job.setReducerClass(ForTopNReducer.class); // set input output data format job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); // set path for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } /** * dataTopN1.txt Id,custmId,pay,productId 1,9819,100,121 2,8918,2000,111 3,2813,1234,22 4,9100,10,1101 5,3210,490,111 6,1298,28,1211 7,1010,281,90 8,1818,9000,20 ---------------- dataTopN.txt 10,2222,10,1000 11,9321,1000,293 12,3881,999,328 13,8328,1000,66 */
一图顶千言:
充分利用Map的集群效应,在Map阶段将处理各自处理自己的Top N,然后将数据输出给Reducer,提高集群利用率,防止OOM发生,可能还有更优算法,如果哪位有也可以回复我。
相关推荐
论文研究-DTCNN的人脸识别算法的Map-Reduce并行化实现研究.pdf, 传统人脸识别算法都采用基于特征提取的解决方案,所以有效的特征需要很强的先验知识和丰富的工程经验....
针对蛙跳策略在Map-Reduce作业调度中的应用, 算法具体设计了编码方案和进化算子; 同时, 为提高算法收敛性能, 对蛙跳策略进行改进:结合种群多样性指标增加逆转变异操作。仿真实验结果表明, 提出的改进蛙跳策略在Map-...
map算法MAP算法在Turbo码译码中的实现及性能在数域中,串行级联的MAP算法是用于获得高性能的Turbo码译码器。一般情况下,解码器通过可编程门序列和EPROMs实现从4到512的任何状态码,其码率达到1/3至1/7(通过删余...
Map reduce的执行原理。MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。
FPGA实现 Map Reduce ,加速实现
在VS2010中采用多线程技术实现MDS-MAP定位算法,每个线程模拟一个传感器节点,通信范围之内的节点之间模拟采用RSSI测距的方法,并且利用高斯误差来模拟RSSI的测量误差。实验中任意生成原始的节点拓扑,设定节点通信...
人脸识别,车辆识别,一人一档,一车一档 hadoop map reduce hbase
Hadoop Map-Reduce数据分析
基于Map Reduce的大数据并行关联数据挖掘算法.pdf
NULL 博文链接:https://kf47453.iteye.com/blog/2273906
基于云计算Map-Reduce模型的快速碰撞检测算法.pdf
为了充分利用该项目,您将需要创建自己的map reduce算法实现。 实现应扩展在/src/implementation.js中找到的Implementation类。 在/src/word-count-implementation.js中可以找到扩展实现的示例。 通过重写...
_Map-Reduce_并行聚类算法的研究
纠错码中turbo码的译码原理 MAP算法。自己写,照下来的图片格式。参考王新梅的纠错码一书
google三大核心技术之一,map reduce的论文
Google Map Reduce 中文版 论文 Google Map Reduce 中文版 论文 Google Map Reduce 中文版 论文
本文在研究BIRCH算法、规则关联算法、Hadoop的map/reduce机制的基础上,提 出了一种基于map/reduce的应用于网络安全事件分析的并行关联方法。一方面,通过对BIRCH 算法的改进,在BIRCH的分层次思想中引入预定义的...
GPU上实现Map-Reduce的框架 可以进一步实现各种机器学习
hadoop中map/reduce自学资料合集
MDS-MAP算法。其中包含了MDS子算法 我只实现了在二维上的MDS-MAP算法 只需要少许改动你们即可实现多维的MDS-MAP算法 这个是论文地址:...