`
ganliang13
  • 浏览: 249519 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于hadoop的多个reduce 输出

阅读更多
import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipOutputWordCount extends Configured implements Tool {
	/*
	 * Mapper<Object, Text, Text, IntWritable>
	 * Object ,读取的字节偏移量
	 * Text Map读取的文本行
	 * Text Map的输出Key
	 * IntWritable 的输出Value
	 */
	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			//一行行读取文件内容,一行行处理文件
			StringTokenizer itr = new StringTokenizer(value.toString());//对输入行切词,eg:Hello World,Hello Hadoop
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);//<Hello,1>,<World,1>,<Hello,1>,<Hadoop,1>
			}
		}
	}

	/**
	 *   Reducer<Text, IntWritable, Text, IntWritable> 
	 * Text:Reduce 输入Key
	 * IntWritable:Reduce的输入Value
	 * Text: Reduce 输出Key 默认类型
	 * IntWritable,输入Value,默认类型LongWritable
	 */
	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();
		@SuppressWarnings("rawtypes")
		private MultipleOutputs multipleOutputs;

		protected void setup(Context context) throws IOException, InterruptedException {
			multipleOutputs =new MultipleOutputs<Text,IntWritable>(context);
		}
		
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			multipleOutputs.close();
		}
		@SuppressWarnings("unchecked")
		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			multipleOutputs.write(NullWritable.get(), new Text(key.toString()+":"+result), "1");
			multipleOutputs.write(NullWritable.get(), key, "2");
			multipleOutputs.write(NullWritable.get(), "我是你大爷", "3");
		}
	}
	
	public static class MultipOutputWordFormat extends MultipleTextOutputFormat<Text, IntWritable>{
		
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new MultipOutputWordCount(), args));
	}

	@Override
	public int run(String[] args) throws Exception {
		File jarFile = EJob.createTempJar("bin");
		ClassLoader classLoader = EJob.getClassLoader();
		Thread.currentThread().setContextClassLoader(classLoader);
		//Hadoop 运行环境
		Configuration conf = new Configuration();
		conf.set("mapred.job.tracker", "bfdbjc1:12001");
		
		//任务参数设置
		  //a.创建任务,并设置名称,以便跟踪
		Job job = new Job(conf, "word count");
		  //b.运行主类,Map类,Reduce类
		job.setJarByClass(MultipOutputWordCount.class);
		job.setMapperClass(MultipOutputWordCount.TokenizerMapper.class);
		job.setReducerClass(MultipOutputWordCount.IntSumReducer.class);
		//下面两行不需要写,Map默认输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		  //c.设置Reduce输入输出类型,Map默认出及Reduce默认输入是<Text,IntWritable>
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		
		//HDFS输入,如果是路径默认读取路径下所有文件.
		FileInputFormat.addInputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/a.txt"));
		//reduce 输出路径
		FileOutputFormat.setOutputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/output/2da1"));
		
		//Eclipse 本地提交
		((JobConf) job.getConfiguration()).setJar(jarFile.toString());
		
		//等待任务运行完成
		 job.waitForCompletion(true);
		 return 0;
	}
}

 

分享到:
评论

相关推荐

    Hadoop实战中文版

    7.2 探查任务特定信息 7.3 划分为多个输出文件 7.4 以数据库作为输入输出 7.5 保持输出的顺序 7.6 小结 第8章 管理Hadoop 8.1 为实际应用设置特定参数值 8.2 系统体检 8.3 权限设置 8.4 配额管理 8.5 启用...

    Hadoop实战中文版.PDF

    1568.11 多用户作业的调度 1578.11.1 多个JobTracker 1588.11.2 公平调度器 1588.12 小结 160第三部分 Hadoop也疯狂第9章 在云上运行Hadoop 1629.1 Amazon Web Services简介 1629.2 安装AWS 1639.2.1...

    Hadoop实战(第2版)

    join技术点20 实现semi-join4.1.4 为你的数据挑选最优的合并策略4.2 排序4.2.1 二次排序技术点21 二次排序的实现4.2.2 整体并行排序技术点22 通过多个reducer 对key 进行排序4.3 抽样技术点23 蓄水...

    Hadoop实战

    1357.2 探查任务特定信息 1377.3 划分为多个输出文件 1387.4 以数据库作为输入输出 1437.5 保持输出的顺序 1457.6 小结 146第8章 管理Hadoop 1478.1 为实际应用设置特定参数值 1478.2 系统体检 1498.3 权限设置 1518...

    Hadoop实战(陆嘉恒)译

    细则手册7.1 向任务传递作业定制的参数7.2 探查任务特定信息7.3 划分为多个输出文件7.4 以数据库作为输入输出7.5 保持输出的顺序7.6 小结第8 章 管理Hadoop8.1 为实际应用设置特定参数值8.2 系统体检8.3 权限设置8.4...

    akka-mapreduce:基于Scala和Akka的map-reduce框架

    在我们的框架中,最初使用Akka Stream读取数据,我们要做的是提供一个 ,可用作流的接收器,并行处理数据块,并在流结束时汇总来自多个reducer的结果。 我们的Wiki有了地图缩减数据处理问题的。 我们认为,在某些...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    技术点22 通过多个reducer 对key 进行排序 4.3 抽样 技术点23 蓄水池抽样(reservoir 抽样) 4.4 本章小结 5 优化HDFS 处理大数据的技术 5.1 处理小文件 技术点24 使用Avro 存储大量小文件 5.2 通过...

    大数据-Inceptor技术白皮书.pdf

    但由于复杂的工作流通常需 要多个阶段的Map/Reduce任务,而Map/Reduce的输入输出必须经过低速磁盘,导致运行复杂迭代任务时非常 低效,因此不适合对延时要求高的交互式分析或者需要复杂迭代的数据分析任务。...

    fourinone-3.04.25

    3、一次性启动多工人进程支持,可以通过程序api一次性启动和管理“ParkServer/工头/工人”多个进程,并附带良好的日志输出功能,用于代替写批处理脚本方式,方便部署和运行。 4、增加了相应指南和demo。 本软件...

    大数据时代银行业的机遇与挑战.docx

    MapReduce框架是进行海量数据并行计算的框架,由一个作业追踪(Master)节点和多个任务追踪(Worker)节点构成。作业追踪节点用于任务划分、任务调度;而任务追踪节点用于接收来自于作业追踪节点分配的Map或者Reduce...

Global site tag (gtag.js) - Google Analytics