`
wbj0110
  • 浏览: 1555185 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Mahout之SparseVectorsFromSequenceFiles源码分析

阅读更多

一系列添加选项的操作:包括minSupport,analyzerName,chunkSize,weight,minDF等等。

1
2
3
4
5
6
7
8
9
10
11
12
    Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(
abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
      "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create();
       //term weight,TF或TFIDF
    Option weightOpt = obuilder.withLongName("weight").withRequired(false).withArgument(
     abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription(
      "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create();
    //最小文档频率minDF
    Option minDFOpt = obuilder.withLongName("minDF").withRequired(false).withArgument(
     abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription(
      "The minimum document frequency.  Default is 1").withShortName("md").create();
……

一系列获取用户输入的选项的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
      Path inputDir = new Path((String) cmdLine.getValue(inputDirOpt));
      Path outputDir = new Path((String) cmdLine.getValue(outputDirOpt));
   
      int chunkSize = 100;
      if (cmdLine.hasOption(chunkSizeOpt)) {
        chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
      }
      int minSupport = 2;
      if (cmdLine.hasOption(minSupportOpt)) {
        String minSupportString = (String) cmdLine.getValue(minSupportOpt);
        minSupport = Integer.parseInt(minSupportString);
      }
   ……

在SparseVectorsFromSequenceFiles的输入目录为经过SequenceFilesFromDirectory加工过的SequenceFile。SequenceFile是hadoop专有的文件格式,保存的是key/value对。SparseVectorsFromSequenceFiles中首先是将输入目录的SequenceFile通过DocumentProcessor的处理,保存在输出目录的tokenized-documents目录中。
而DocumentProcessor也就是只有一个map,没有reduce的一个job。将原来的key按原样输出,value提取后tokenize一下,转化成List,也就是将value中的文本去掉标点符号,以空格分开后的单词。

SparseVectorsFromSequenceFiles有如下两行:

1
2
3
4
5
      Configuration conf = getConf();
      Path tokenizedPath =
new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER);
      //TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom to have one framework for all of this.
      DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath, conf);

再看看处理文本的job,DocumentProcessor.tokenizeDocuments(),只有一个mapper SequenceFileTokenizerMapper。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  public static void tokenizeDocuments(Path input,
                                       Class< ? extends Analyzer > analyzerClass,
                                       Path output,
                                       Configuration baseConf)
    throws IOExceptionInterruptedExceptionClassNotFoundException {
    Configuration conf = new Configuration(baseConf);
    // this conf parameter needs to be set enable serialisation of conf values
    conf.set("io.serializations""org.apache.hadoop.io.serializer.JavaSerialization,"
                                  + "org.apache.hadoop.io.serializer.WritableSerialization");
    conf.set(ANALYZER_CLASS, analyzerClass.getName());
    Job job = new Job(conf);
    job.setJobName("DocumentProcessor::DocumentTokenizer: input-folder: " + input);
    job.setJarByClass(DocumentProcessor.class);
     
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(StringTuple.class);
    FileInputFormat.setInputPaths(job, input);
    FileOutputFormat.setOutputPath(job, output);
   
    job.setMapperClass(SequenceFileTokenizerMapper.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    HadoopUtil.delete(conf, output);
    
    boolean succeeded = job.waitForCompletion(true);
    if (!succeeded)
      throw new IllegalStateException("Job failed!");

tokenizer之后,便进行TFIDF计算。

进行TFIDF计算
如果用户输入的maxDFSigma大于0,则输出目录为tf-vectors-toprune,否则为tf-vectors。
由DictionaryVectorizer类的createTermFrequencyVectors()静态方法来完成。
进行TFIDF计算的第一步是WordCount,
if n-gram为1,则直接由startWordCounting()方法来完成

         outputKey=Text
         outputValue=LongWritable
         Mapper= TermCountMapper(org.apache.mahout.vectorizer.term)
	Combiner=TermCountCombiner
	Reducer=TermCountReducer
	输出类型=SequenceFileOutputFormat
	输出目录=wordcount

说白了就是hadoop入门的第一个程序:wordCount
else 由CollocDriver.generateAllGrams()来完成(两个job):

         generateCollocations
	computeNGramsPruneByLLR

第二步,给每个单词编号(assign ids to feature List)。

由createDictionaryChunks处理,输入目录为wordcount,输出文件为dictionary.file-*,每个chunk一个块号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
      int i = 0;
      for (Pair< Writable,Writable > record
           : new SequenceFileDirIterable< Writable,Writable >(filesPattern, PathType.GLOBnullnulltrue, conf)) {
        if (currentChunkSize > chunkSizeLimit) {
          Closeables.closeQuietly(dictWriter);
          chunkIndex++;
          chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);
          chunkPaths.add(chunkPath);
          dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
          currentChunkSize = 0;
        }
        Writable key = record.getFirst();
        int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;
        currentChunkSize += fieldSize;
        dictWriter.append(key, new IntWritable(i++));//编号!
      }
      maxTermDimension[0] = i;

从0开始编号,最后的词的数量i保存在maxTermDimension[0]中。

第三步,构造PartialVector
最开始的tokenizer之后,文章以key/value的sequenceFile保存,其中key为相对路径,value为整篇文章的单词组。
上一步得到的dictionary是每个单词对应一个id,也写入sequenceFile里面。
mapper将tokenizer后的文章原样输出,reducer一部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  protected void reduce(Text key, Iterable< StringTuple > values, Context context)
          throws IOExceptionInterruptedException {
    Iterator< StringTuple > it = values.iterator();
    if (!it.hasNext()) {
      return;
    }
    StringTuple value = it.next();
    Vector vector = new RandomAccessSparseVector(dimension, value.length()); // guess at initial size
      for (String term : value.getEntries()) {
        if (!term.isEmpty() && dictionary.containsKey(term)) { // unigram
          int termId = dictionary.get(term);
          vector.setQuick(termId, vector.getQuick(termId) + 1);
        }
      }
    if (vector.getNumNondefaultElements() > 0) {
      VectorWritable vectorWritable = new VectorWritable(vector);
      context.write(key, vectorWritable);
    }
  }

此时以tokenizer之后的文章和dictionary作为输入,每篇文章得到一个vector(类型为RandomAccessSparseVector,其实是一个hashMap),vector保存的是每篇文章的id号和频率。
然后以key/vector写入。
由于上一步产生的dictionary可能很大,分过块,每次reduce只从一个dictionary的chunk中提取id,分多次处理,最后再合并。合并采用PartialVectorMerger.mergePartialVectors()方法设置一个job来完成。
默认是不计算IDF的,在参数中指明后会在上一步计算partialVector(TF)后计算IDF,输入为TF目录。

1
2
3
4
    if (shouldPrune || processIdf) {
      docFrequenciesFeatures = TFIDFConverter.calculateDF(new Path(outputDir, tfDirName),
          outputDir, conf, chunkSize);
    }

计算IDF过程比较清晰:
看此过程的Mapper:

1
2
3
4
5
6
7
8
9
10
  protected void map(WritableComparable< ? > key, VectorWritable value, Context context)
    throws IOExceptionInterruptedException {
    Vector vector = value.get();
    Iterator< Vector.Element > it = vector.iterateNonZero();
    while (it.hasNext()) {
      Vector.Element e = it.next();
      context.write(new IntWritable(e.index()), ONE);
    }
    context.write(TOTAL_COUNT, ONE);
  }

输入为key/vector,提取出vector内容,对每一个词,得到他在词典中的id,然后加1.现在key变为这个词的id。
Reducer:

1
2
3
4
5
6
7
8
  protected void reduce(IntWritable key, Iterable< LongWritable > values, Context context)
    throws IOExceptionInterruptedException {
    long sum = 0;
    for (LongWritable value : values) {
      sum += value.get();
    }
    context.write(key, new LongWritable(sum));
  }

相同key的value相加,又是一个wordcount程序。这样每个词key在多少个文档中出现过DF(不是在文档中出现的次数)就得到了。
输出目录为df-count,同计算tf一样,分为几个chunk写入HDFS。
根据要求有一个计算标准差的过程:

1
2
3
   double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf);
   long vectorCount = docFrequenciesFeatures.getFirst()[1];
   maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount);

以及一个pruneVector的过程:

1
2
3
4
5
6
7
8
9
HighDFWordsPruner.pruneVectors(tfDir,
                                          prunedTFDir,
                                          prunedPartialTFDir,
                                          maxDF,
                                          conf,
                                          docFrequenciesFeatures,
                                          -1.0f,
                                          false,
                                          reduceTasks);

最后计算TFIDF:

1
2
3
4
5
6
7
8
9
10
11
  public static void processTfIdf(Path input,
                                  Path output,
                                  Configuration baseConf,
                                  Pair< Long[], List< Path > > datasetFeatures,
                                  int minDf,
                                  long maxDF,
                                  float normPower,
                                  boolean logNormalize,
                                  boolean sequentialAccessOutput,
                                  boolean namedVector,
                                  int numReducers)

Mapper照原样输出,Reducer一部分如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  protected void reduce(WritableComparable< ? > key, Iterable< VectorWritable > values, Context context)
    throws IOExceptionInterruptedException {
    Iterator< VectorWritable > it = values.iterator();
    if (!it.hasNext()) {
      return;
    }
    Vector value = it.next().get();
    Iterator< Vector.Element > it1 = value.iterateNonZero();
    Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements());
    while (it1.hasNext()) {
      Vector.Element e = it1.next();
      if (!dictionary.containsKey(e.index())) {
        continue;
      }
      long df = dictionary.get(e.index());
      if (maxDf > -1 && (100.0 * df) / vectorCount > maxDf) {
        continue;
      }
      if (df < minDf) {
        df = minDf;
      }
      vector.setQuick(e.index(), tfidf.calculate((int) e.get()(int) df, (int) featureCount, (int) vectorCount));
    }

    if (sequentialAccess) {
      vector = new SequentialAccessSparseVector(vector);
    }
    if (namedVector) {
      vector = new NamedVector(vector, key.toString());
    }
    VectorWritable vectorWritable = new VectorWritable(vector);
    context.write(key, vectorWritable);
  }

注意到

vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));

首先得到单词的id,然后计算tf*idf,再写回到这个vector。
最后的context.write(key, vectorWritable)得到了key为此文本的相对路径,value为ft*idf的词的vector。
至此,计算完成。
整个过程产生的目录如下:

整个过程的所有job信息如下:

http://hnote.org/big-data/mahout/sparsevectorsfromsequencefiles-2

 

 

http://soledede.com/

 

大家可以加我个人微信号:scccdgf

 

 

或者关注soledede的微信公众号:soledede
微信公众号:

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics