`

多个mapreduce工作相互依赖处理方法完整实例(JobControl)

阅读更多

        处理复杂的要求的时候,有时一个mapreduce程序时完成不了的,往往需要多个mapreduce程序,这个时候就要牵扯到各个任务之间的依赖关系所谓依赖就是一个M/R Job 的处理结果是另外的M/R 的输入,以此类推,完成几个mapreduce程序,得到最后的结果,下面将直接贴出一个例子的全部代码,因为为了找一个完整的例子实在是太难了,今天找了半天才把这个问题解决。

         代码描述,一共包括两个mapreduce作业。也就是两个map和两个reduce函数,第一个job处理后的输出是第二个job的输入,然后交由第二个job来做出最后的结果,代码里面的关键的地方已经有了注释

 

先是代码的主体部分:



 

 

上代码:

/*
 * anthor TMS
 */
package 依赖MR处理方法;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 public class MODEL {
	 
	 //第一个Job的map函数
	 public static class Map_First extends Mapper<Object, Text  ,Text , IntWritable>{                                                                                               	    private final static IntWritable one = new IntWritable(1);
		    private Text keys = new Text();
		    public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {
		    	String s = value.toString();
		    	String[]  allStr = Config.CatString(s);
		        keys.set(allStr[1]); 
		    	context.write(keys, one);
		    }
	  }
	
	 //第一个Job的reduce函数
	  public static class Reduce_First extends Reducer<Text, IntWritable, Text, IntWritable> {
		  private IntWritable result = new IntWritable();
		  public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {
			   int sum = 0;
			   for(IntWritable value:values) {
				   sum  +=  value.get();
			   }
			    result.set(sum);
			   
			    context.write(key, result);
		  }
	  }
	  
	  //第二个job的map函数
	  public static class Map_Second extends Mapper<Object, Text  ,Text , IntWritable>{        
		    private final static IntWritable one = new IntWritable(1);
		    private Text keys = new Text();
		    public void map(Object key,Text value, Context context ) throws IOException, InterruptedException {

		    	String s = value.toString();
		    	String[]  allStr = Config.CatString(s);
		        keys.set(allStr[1]); 
		    	context.write(keys, one);
		    }
	  }
	  
	  //第二个Job的reduce函数
	  public static class Reduce_Second extends Reducer<Text, IntWritable, Text, IntWritable> {
		  private IntWritable result = new IntWritable();
		  public void reduce(Text key,Iterable<IntWritable>values, Context context) throws IOException, InterruptedException {
			   int sum = 0;
			   for(IntWritable value:values) {
				   sum  +=  value.get();
			   }
			    result.set(sum);
			    context.write(key, result);
		  }
	  }
	  
	  //启动函数
	  public static void main(String[] args) throws IOException {
		
		JobConf conf = new JobConf(MODEL.class);
		
		//第一个job的配置
		Job job1 = new Job(conf,"join1");
		job1.setJarByClass(MODEL.class); 

	    job1.setMapperClass(Map_First.class); 
	    job1.setReducerClass(Reduce_First.class); 

		job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key 
		job1.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value 
	
		job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key 
		job1.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value 
		
		//加入控制容器 
		ControlledJob ctrljob1=new  ControlledJob(conf); 
		ctrljob1.setJob(job1); 
		//job1的输入输出文件路径
		FileInputFormat.addInputPath(job1, new Path(args[0])); 
	    FileOutputFormat.setOutputPath(job1, new Path(args[1])); 

	    //第二个job的配置
    	Job job2=new Job(conf,"Join2"); 
	    job2.setJarByClass(MODEL.class); 
	    
	    job2.setMapperClass(Map_Second.class); 
	   job2.setReducerClass(Reduce_Second.class); 
	   
		job2.setMapOutputKeyClass(Text.class);//map阶段的输出的key 
		job2.setMapOutputValueClass(IntWritable.class);//map阶段的输出的value 

		job2.setOutputKeyClass(Text.class);//reduce阶段的输出的key 
		job2.setOutputValueClass(IntWritable.class);//reduce阶段的输出的value 

		//作业2加入控制容器 
		ControlledJob ctrljob2=new ControlledJob(conf); 
		ctrljob2.setJob(job2); 
	
	   //设置多个作业直接的依赖关系 
       //如下所写: 
	   //意思为job2的启动,依赖于job1作业的完成 
	
		ctrljob2.addDependingJob(ctrljob1); 
		
		//输入路径是上一个作业的输出路径,因此这里填args[1],要和上面对应好
		FileInputFormat.addInputPath(job2, new Path(args[1]));
		
		//输出路径从新传入一个参数,这里需要注意,因为我们最后的输出文件一定要是没有出现过得
		//因此我们在这里new Path(args[2])因为args[2]在上面没有用过,只要和上面不同就可以了
		FileOutputFormat.setOutputPath(job2,new Path(args[2]) );

		//主的控制容器,控制上面的总的两个子作业 
		JobControl jobCtrl=new JobControl("myctrl"); 
	
		//添加到总的JobControl里,进行控制
		jobCtrl.addJob(ctrljob1); 
		jobCtrl.addJob(ctrljob2); 


		//在线程启动,记住一定要有这个
		Thread  t=new Thread(jobCtrl); 
		t.start(); 

		while(true){ 

		if(jobCtrl.allFinished()){//如果作业成功完成,就打印成功作业的信息 
		System.out.println(jobCtrl.getSuccessfulJobList()); 
		jobCtrl.stop(); 
		break; 
		}
		}
		}
}

 

   工程上右键run进行配置:先配置第一个栏目main里面的Project(项目名)和Main Class(主类名) 

    

 

接下来是arguments如下所示:



 

最后点击Apply然后Run,运行成功之后,刷新DFS出现几个文件,如下分别为输入的原始数据文件,第一个mapreduce任务后输出的文件output和第二个mapreduce任务之后输出的文件output1



 

 

这里只有两个mapreduce任务,多个也是一样,主要的思想就是先写好每一个mapreduce任务的主体部分,也就是map和reduce函数,然后就是分别配置每一个mapreduce任务(这里要注意设置好输入和输出路径,很容易忘记!!!)此时将job任务加入到控制容器,每一个都要加,再就是使用addDependingJob()添加依赖关系,再用一个总的控制器控制每一个任务。最后用一个线程启动!!!

 

 

 

        

 

  • 大小: 66.1 KB
  • 大小: 73.1 KB
  • 大小: 104.4 KB
  • 大小: 145.5 KB
3
0
分享到:
评论
2 楼 MNTMs 2014-07-28  
我现在用的还是1.1.2
1 楼 SpringJava 2014-07-28  
楼主的hadoop是哪个版本的?

相关推荐

Global site tag (gtag.js) - Google Analytics