`

MapReduce : Combiner的使用(以平均数为例) 并结合in-mapper design pattern 实例

阅读更多

 

没有使用Combiner 和 in-mapper desgin pattern


import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;



public class digitaver1 {
	
	public static class mapper extends Mapper<LongWritable, Text, Text, IntWritable>{

		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] ss = value.toString().split(":");
			context.write(new Text(ss[0]), new IntWritable(Integer.parseInt(ss[1])));
		}
		
	}

	public static class reducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{

		@Override
		protected void reduce(Text key, Iterable<IntWritable> value,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			int cnt = 0;
			while(value.iterator().hasNext()){
				sum += value.iterator().next().get();
				cnt+=1;
			}
			context.write(key, new DoubleWritable((double)sum/(double)cnt));
		}
	}
	
public static void main(String[] args) {
		
		try {
			Job job = new Job();
			job.setJarByClass(digitaver1.class);
			job.setJobName("digitaver1");
			
			FileInputFormat.addInputPath(job, new Path(args[0]));
			FileOutputFormat.setOutputPath(job, new Path(args[1]));
			
			job.setMapperClass(mapper.class);
			job.setReducerClass(reducer.class);
			
			job.setInputFormatClass(TextInputFormat.class);
			job.setOutputFormatClass(TextOutputFormat.class);
			
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(IntWritable.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(DoubleWritable.class);
			
			System.exit( job.waitForCompletion(true) ? 0 : 1 );
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}


使用Combiner
	public static class mapper extends Mapper<LongWritable, Text, Text, pair>{

		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] ss = value.toString().split(":");
			pair p = new pair(Integer.parseInt(ss[1]), 1);
			context.write(new Text(ss[0]), p);
		}
		
	}
	
	public static class combiner extends Reducer<Text, pair, Text, pair>{

		@Override
		protected void reduce(Text key, Iterable<pair> value,
				Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			int cnt = 0;
			while(value.iterator().hasNext()){
				pair p = value.iterator().next();
				sum += p.getLeft().get();
				cnt += p.getRight().get();
			}
			context.write(key, new pair(sum,cnt));
		}
		
	}
	
	public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{

		@Override
		protected void reduce(Text key, Iterable<pair> value,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			int cnt = 0;
			while(value.iterator().hasNext()){
				pair p = value.iterator().next();
				sum += p.getLeft().get();
				cnt += p.getRight().get();
			}
			context.write(key, new DoubleWritable((double)sum/(double)cnt));
		}
	}

main函数都一样


使用in-mapper design pattern
	public static class mapper extends Mapper<LongWritable, Text, Text, pair>{

		private Map<String,String> map ;

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			map = new HashMap<String, String>();
		}

		//处理完所有的输入文件再一起传给reducer或者combiner
		//以前map在执行过程中会一边执行一边讲输出的部分结构先传输给reducer  按照上面的话  效率会不会受影响?
		//虽然数据少了,但是开始的时间也推迟了??堵塞延迟小了??
		//负载平衡??网络中总的数据量少了??
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] ss = value.toString().split(":");
			if(!map.containsKey(ss[0])){
				map.put(ss[0], ss[1]+":"+1);
			}else{
				String tmp = map.get(ss[0]);
				String[] tt = tmp.split(":");
				int ta = Integer.parseInt(ss[1])+Integer.parseInt(tt[0]);
				int tb = Integer.parseInt(tt[1])+1;
				map.put(ss[0], ta+":"+tb);
			}
		}
		
		@Override
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			for(Map.Entry<String, String> e : map.entrySet()){
				String[] tt = e.getValue().split(":");
				pair p = new pair(Integer.parseInt(tt[0]), Integer.parseInt(tt[1]));
				context.write(new Text(e.getKey()), p);
			}
		}
	}
	
	public static class reducer extends Reducer<Text, pair, Text, DoubleWritable>{

		@Override
		protected void reduce(Text key, Iterable<pair> value,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			int cnt = 0;
			while(value.iterator().hasNext()){
				pair p = value.iterator().next();
				sum += p.getLeft().get();
				cnt += p.getRight().get();
			}
			context.write(key, new DoubleWritable((double)sum/(double)cnt));
		}
	}
in-mapper design pattern:单个mapper结果进行聚集
Combiner:所有的mapper结果进行聚集
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics