`

初学Hadoop,统计Top10单词

阅读更多
课程明明叫SeachTechnology,本以为可以趁机好好学一下Lucene和Nutch,结果Project却是使用分布式计算框架Map/Reduce的开源项目Hadoop进行文档关键词的自动提取,算了,既来之则安之,都是Doug Cutting的作品啊。
Project要求是给定250个文章的摘要(trial data),通过三个步骤
1.preprocessing such as Part-of-Speech tagging,lemmatization and stemming
2.candidate generation
3.candidate ranking
提取前十的关键字,然后将算法在test data上运行,与人工提取的关键字进行比较来评价算法的优劣,要求使用Map/Reduce以使得算法可以在大规模数据上运行。
学习Hadoop的最好资料应该是[url = http://download.csdn.net/source/1491868]Hadoop: The Definitive Guide[/url]。

花了一天时间在Ubuntu上配置完了环境,先在eclipse下写了个统计Top10单词的程序进行一下standalone模式下的试验。期间涉及到一个停用词的删除问题,发现Lucene下面有一个StopFilter可用?得好好研究下

TopWords.java
import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;


public class TopWords {		
	private static boolean enalbeRemoveStopWords = false;  
	
	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, WordCountPair> {
		private Text word = new Text();
		private Text location = new Text();		
		private final String[] PUNCTUATIONS = {"\\?","\\.","\\[","\\]",",","\\(","\\)"};
		private final String[] STOP_WORDS ={
			"\\ba\\b", "\\ban\\b", "\\band\\b", "\\bare\\b","\\bas\\b","\\bat\\b","\\bbe\\b","\\bbut\\b",
            "\\bby\\b", "\\bfor\\b", "\\bif\\b", "\\bin\\b", "\\binto\\b", "\\bis\\b", "\\bit\\b",
            "\\bno\\b", "\\bnot\\b", "\\bof\\b", "\\bon\\b", "\\bor\\b", "\\bs\\b", "\\bsuch\\b",
            "\\bthat\\b", "\\bthe\\b", "\\btheir\\b", "\\bthen\\b", "\\bthere\\b","\\bthese\\b",
            "\\bthey\\b", "\\bthis\\b", "\\bto\\b", "\\bwas\\b", "\\bwill\\b", "\\bwith\\b" };
		
		/* the output of map is <filename,<word,frequency>>*/
		public void map(LongWritable key, Text value,
				OutputCollector<Text, WordCountPair> output, Reporter reporter)
				throws IOException {
			
			/*get the filename*/
			FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
			String fileName = fileSplit.getPath().getName().replaceAll("\\.txt", " :");								
			location.set(fileName);

			/*normalize the words*/
			String line = value.toString().toLowerCase();
			for(String s : PUNCTUATIONS)
				line = line.replaceAll(s, "");
			if(enalbeRemoveStopWords){
				for(String s : STOP_WORDS)
				line = line.replaceAll(s, "");
			}
			
			StringTokenizer tokenizer = new StringTokenizer(line);
			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				output.collect(location,new WordCountPair(word,1));
			}
		}		
	}

	/*Rudece传进来的是 <filename,List(<word,frequency>) >*/
	public static class Reduce extends MapReduceBase implements
			Reducer<Text, WordCountPair, Text, Text> {
		
List<WordCountPair> list = new ArrayList<WordCountPair>();		
		
		public void reduce(Text key, Iterator<WordCountPair> values,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {

			list.removeAll(list);
			/*get the statistics of all the <word,frequency>pair in a list*/
			while (values.hasNext()) {
				WordCountPair temp = values.next();
				//System.out.println(temp);
				int i;
				if( (i=list.indexOf(temp)) != -1)
					list.get(i).frequency++;
				else
					list.add(new WordCountPair(temp));						
			}
			
			/*sort the list according to frequency and output the top 10*/
			Collections.sort(list);
			//System.out.println(list.toString());
			StringBuilder topWords = new StringBuilder();
			boolean first = true;
			for(int i=0;i<Math.min(10, list.size());i++){
				if(!first)
					topWords.append(",");									
				first = false;
				topWords.append(list.get(i).word.toString());				
			}
			
			output.collect(key, new Text(topWords.toString()));		
		}		
	}

	public static void main(String[] args) throws Exception {
		
		if(args.length < 2){
			System.err.println("Usage:Java TopWords input output [-stop]");
			System.exit(1);
		}
		
		JobConf conf = new JobConf(TopWords.class);
		conf.setJobName("TopWords");

		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(WordCountPair.class);

		conf.setMapperClass(Map.class);
		conf.setReducerClass(Reduce.class);

		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);

		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
		if(args.length == 3){
			if(args[2].equals("-stop"))
				enalbeRemoveStopWords = true;
			else{		
				System.err.println("Usage:Java TopWords input output [-stop]");
				System.exit(1);		
			}
		}
		
		JobClient.runJob(conf);
	}
}



import java.io.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

/**
 * WordCountPair是自定义类型,需要实现Writable接口(如果作为Key的话,则需要实现WritableComparable接口
 */

public class WordCountPair implements Writable,Comparable<WordCountPair>{
	public Text word; 
	public int frequency;

	public WordCountPair(Text word, int frequency){
		this.word = new Text(word);
		this.frequency = frequency;
	}

	public WordCountPair(){
		this(new Text(), 1);
	}

	public WordCountPair(WordCountPair wcp){
		this(wcp.word,wcp.frequency);
	}	

	public void readFields(DataInput in) throws IOException {
		word.readFields(in);
		frequency = in.readInt();
	}

	public void write(DataOutput out) throws IOException {
		word.write(out);
		out.writeInt(frequency);		

	}	

	@Override

	public boolean equals(Object o) {

		if (o instanceof WordCountPair) {

			WordCountPair wcp = (WordCountPair) o;

			return word.equals(wcp.word);

		}

		return false;
	}

	/* sort according to the frequency of the word,descending order*/
	public int compareTo(WordCountPair other) {
		return other.frequency - this.frequency;		

	}
	
	public String toString() {
		return word + "," + frequency;

	}
}


分享到:
评论
1 楼 NeverGiveUpToChange 2014-07-08  
       不错,学习了

相关推荐

Global site tag (gtag.js) - Google Analytics