处理复杂的要求的时候,有时一个mapreduce程序时完成不了的,往往需要多个mapreduce程序,这个时候就要牵扯到各个任务之间的依赖关系,所谓依赖就是一个M/R Job 的处理结果是另外的M/R 的输入,以此类推,完成几个mapreduce程序,得到最后的结果,下面将直接贴出一个例子的全部代码,因为为了找一个完整的例子实在是太难了,今天找了半天才把这个问题解决。
代码描述,一共包括两个mapreduce作业。也就是两个map和两个reduce函数,第一个job处理后的输出是第二个job的输入,然后交由第二个job来做出最后的结果,代码里面的关键的地方已经有了注释
先是代码的主体部分:
上代码:
/* * anthor TMS */ package 依赖MR处理方法; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MODEL { //第一个Job的map函数 public static class Map_First extends Mapper<Object, Text ,Text , IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text keys = new Text(); public void map(Object key,Text value, Context context ) throws IOException, InterruptedException { String s = value.toString(); String[] allStr = Config.CatString(s); keys.set(allStr[1]); context.write(keys, one); } } //第一个Job的reduce函数 public static class Reduce_First extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable value:values) { sum += value.get(); } result.set(sum); context.write(key, result); } } //第二个job的map函数 public static class Map_Second extends Mapper<Object, Text ,Text , IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text keys = new Text(); public void map(Object key,Text value, Context context ) throws IOException, InterruptedException { String s = value.toString(); String[] allStr = Config.CatString(s); keys.set(allStr[1]); context.write(keys, one); } } //第二个Job的reduce函数 public static class Reduce_Second extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable value:values) { sum += value.get(); } result.set(sum); context.write(key, result); } } //启动函数 public static void main(String[] args) throws IOException { JobConf conf = new JobConf(MODEL.class); //第一个job的配置 Job job1 = new Job(conf,"join1"); job1.setJarByClass(MODEL.class); job1.setMapperClass(Map_First.class); job1.setReducerClass(Reduce_First.class); job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key job1.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value //加入控制容器 ControlledJob ctrljob1=new ControlledJob(conf); ctrljob1.setJob(job1); //job1的输入输出文件路径 FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); //第二个job的配置 Job job2=new Job(conf,"Join2"); job2.setJarByClass(MODEL.class); job2.setMapperClass(Map_Second.class); job2.setReducerClass(Reduce_Second.class); job2.setMapOutputKeyClass(Text.class);//map阶段的输出的key job2.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value job2.setOutputKeyClass(Text.class);//reduce阶段的输出的key job2.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value //作业2加入控制容器 ControlledJob ctrljob2=new ControlledJob(conf); ctrljob2.setJob(job2); //设置多个作业直接的依赖关系 //如下所写: //意思为job2的启动,依赖于job1作业的完成 ctrljob2.addDependingJob(ctrljob1); //输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好 FileInputFormat.addInputPath(job2, new Path(args[1])); //输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得 //因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了 FileOutputFormat.setOutputPath(job2,new Path(args[2]) ); //主的控制容器,控制上面的总的两个子作业 JobControl jobCtrl=new JobControl("myctrl"); //添加到总的JobControl里,进行控制 jobCtrl.addJob(ctrljob1); jobCtrl.addJob(ctrljob2); //在线程启动,记住一定要有这个 Thread t=new Thread(jobCtrl); t.start(); while(true){ if(jobCtrl.allFinished()){//如果作业成功完成,就打印成功作业的信息 System.out.println(jobCtrl.getSuccessfulJobList()); jobCtrl.stop(); break; } } } }
工程上右键run进行配置:先配置第一个栏目main里面的Project(项目名)和Main Class(主类名)
接下来是arguments如下所示:
最后点击Apply然后Run,运行成功之后,刷新DFS出现几个文件,如下分别为输入的原始数据文件,第一个mapreduce任务后输出的文件output和第二个mapreduce任务之后输出的文件output1
这里只有两个mapreduce任务,多个也是一样,主要的思想就是先写好每一个mapreduce任务的主体部分,也就是map和reduce函数,然后就是分别配置每一个mapreduce任务(这里要注意设置好输入和输出路径,很容易忘记!!!)此时将job任务加入到控制容器,每一个都要加,再就是使用addDependingJob()添加依赖关系,再用一个总的控制器控制每一个任务。最后用一个线程启动!!!
相关推荐
mapreduce多表关联join多个job相互依赖传递参数
HBase MapReduce完整实例.rar
包含完整实例源码,编译配置文件,测试数据,可执行jar文件,执行脚本及操作步骤。学习完此例子后,你能掌握MapReduce基础编程,及如何编译Java文件,打包jar文件,编写shell执行脚本等。后续学习还可以参看本人的...
Eclipse工程 HBase MapReduce完整实例 可远程执行 包含HBase增删改查 执行Test可看到效果
假设文件的量比较大,每个文档又包含大量的单词,则无法使用传统的线性程序进行处理,而这类问题正是 MapReduce 可以发挥优势的地方。 在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词...
使用hadoop的eclipse插件开发的mapreduce程序,实现对数据的简单统计处理,包括可视化结果
本节通过单词计数实例来阐述采用 MapReduce 解决实际问题的基本思路和具体实现过程。 设计思路 首先,检查单词计数是否可以使用 MapReduce 进行处理。因为在单词计数程序任务中,不同单词的出现次数之间不存在...
在hadoop平台上,利用mapreduce编程模型,做的简单应用实例,文件系统不同的请更改文件系统路径。用了多次mapreduce。
MapReduce Java API实例-统计单次出现频率示例代码-MapReduceDemo.rar MapReduce Java API实例-统计单次出现频率示例代码-MapReduceDemo.rar MapReduce Java API实例-统计单次出现频率示例代码-MapReduceDemo.rar
MapReduce编程实例浅析,讲述如何进行M/R程序开发。
MapReduce工作MapRedMapReduce工作原理uce工作原理原理
1.社交网络综合评分案例 2.微博精准营销案例 3.物品推荐案例 4.QQ好友推荐案例
用mapreduce进行文本处理,发表在SIGIR2009
mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...
Hadoop介绍 HDFS MapReduce 工作原理
MapReduce操作实例-数据去重.pdf 学习资料 复习资料 教学资源
MapReduce简单程序示例
MapReduce集群多用户作业调度方法的研究与实现
大规模数据处理时,MapReduce在三个层面上的基本构思 如何对付大数据处理:分而治之 对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略 上升到抽象模型:Mapper与Reducer MPI等...
云计算中大数据的MapReduce处理方法简析.pdf