`

[Hadoop] Hadoop 链式任务 : ChainMapper and ChainReducer的使用

阅读更多

注意:

1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。 

2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。

比如:

Map1 -> Map2 -> Reducer -> Map3 -> Map4

 

(不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)

 

 

任务介绍:

这个任务需要两步完成:

1. 对一篇文章进行WordCount

2. 统计出现次数超过5词的单词

 

WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:

package hadoop_in_action_exersice;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class ChainedJobs {

	public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		public static final int LOW_LIMIT = 5;
		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			String line = value.toString();
			StringTokenizer st = new StringTokenizer(line);
			while(st.hasMoreTokens())
				output.collect(new Text(st.nextToken()), one);
			
		}
		
	}
	
	public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			int sum = 0;
			while(values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
		
	}
	
	
	public static void main(String[] args) throws IOException {
		
		
		JobConf conf = new JobConf(ChainedJobs.class);
		conf.setJobName("wordcount");           //设置一个用户定义的job名称
        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类
        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类

        // Remove output folder before run job(s)
        FileSystem fs=FileSystem.get(conf);
        String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
		Path op=new Path(outputPath);		 
		if (fs.exists(op)) {
			fs.delete(op, true);
			System.out.println("存在此输出路径,已删除!!!");
		}
        
        FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
		FileOutputFormat.setOutputPath(conf, new Path(outputPath));
        JobClient.runJob(conf);         //运行一个job
	}
	
}

 

上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。

为了方便理解,上面的输入的例子如下:

accessed	3
accessible	4
accomplish	1
accounting	7
accurately	1
acquire	1
across	1
actual	1
actually	1
add	3
added	2
addition	1
additional	4

 

old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X 

新的API会方便简洁很多

 

下面是增加了一个Mapper 来过滤

public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {

	@Override
	public void map(Text key, IntWritable value,
			OutputCollector<Text, IntWritable> output, Reporter reporter)
			throws IOException {
		
		if(value.get() >= LOW_LIMIT) {
			output.collect(key, value);
		}
		
	}
}

 这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出

所以,目前为止,任务链如下:

TokenizerMapper -> TokenizeReducer -> RangeFilterMapper 

 

所以我们的main函数改成下面的样子:

public static void main(String[] args) throws IOException {
	
	
	JobConf conf = new JobConf(ChainedJobs.class);
	conf.setJobName("wordcount");           //设置一个用户定义的job名称
//        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
//        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
//        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
//        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
//        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类
//        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
//        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类

	// Step1 : mapper forr word count 
	JobConf wordCountMapper  = new JobConf(false);
	ChainMapper.addMapper(conf, 
			TokenizeMapper.class, 
			LongWritable.class,		// input key type 
			Text.class, 			// input value type
			Text.class, 			// output key type
			IntWritable.class,		// output value type
			false, 					//byValue or byRefference 传值还是传引用
			wordCountMapper);
	
	// Step2: reducer for word count
	JobConf wordCountReducer  = new JobConf(false);
	ChainReducer.setReducer(conf, 
			TokenizeReducer.class, 
			Text.class, 
			IntWritable.class, 
			Text.class, 
			IntWritable.class, 
			false, 
			wordCountReducer);
	
        // Step3: mapper used as filter
	JobConf rangeFilterMapper  = new JobConf(false);
	ChainReducer.addMapper(conf, 
			RangeFilterMapper.class, 
			Text.class, 
			IntWritable.class, 
			Text.class, 
			IntWritable.class, 
			false, 
			rangeFilterMapper);
	
	
    // Remove output folder before run job(s)
    FileSystem fs=FileSystem.get(conf);
    String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
	Path op=new Path(outputPath);		 
	if (fs.exists(op)) {
		fs.delete(op, true);
		System.out.println("存在此输出路径,已删除!!!");
	}
    
    FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
	FileOutputFormat.setOutputPath(conf, new Path(outputPath));
    JobClient.runJob(conf);         //运行一个job
}

  下面是运行结果的一部分:

a	40
and	26
are	12
as	6
be	7
been	8
but	5
by	5
can	12
change	5
data	5
files	7
for	28
from	5
has	7
have	8
if	6
in	27
is	16
it	13
more	8
not	5
of	23
on	5
outputs	5
see	6
so	11
that	11
the	54

 可以看到,英文之中,如果NLP不去除停用词(a, the, for ...) 等,效果确实会被大大的影响。

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics