一系列添加选项的操作:包括minSupport,analyzerName,chunkSize,weight,minDF等等。
1
2 3 4 5 6 7 8 9 10 11 12 |
Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(
abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription( "The chunkSize in MegaBytes. 100-10000 MB").withShortName("chunk").create(); //term weight,TF或TFIDF Option weightOpt = obuilder.withLongName("weight").withRequired(false).withArgument( abuilder.withName("weight").withMinimum(1).withMaximum(1).create()).withDescription( "The kind of weight to use. Currently TF or TFIDF").withShortName("wt").create(); //最小文档频率minDF Option minDFOpt = obuilder.withLongName("minDF").withRequired(false).withArgument( abuilder.withName("minDF").withMinimum(1).withMaximum(1).create()).withDescription( "The minimum document frequency. Default is 1").withShortName("md").create(); …… |
一系列获取用户输入的选项的操作:
1
2 3 4 5 6 7 8 9 10 11 12 13 |
Path inputDir = new Path((String) cmdLine.getValue(inputDirOpt));
Path outputDir = new Path((String) cmdLine.getValue(outputDirOpt)); int chunkSize = 100; if (cmdLine.hasOption(chunkSizeOpt)) { chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt)); } int minSupport = 2; if (cmdLine.hasOption(minSupportOpt)) { String minSupportString = (String) cmdLine.getValue(minSupportOpt); minSupport = Integer.parseInt(minSupportString); } …… |
在SparseVectorsFromSequenceFiles的输入目录为经过SequenceFilesFromDirectory加工过的SequenceFile。SequenceFile是hadoop专有的文件格式,保存的是key/value对。SparseVectorsFromSequenceFiles中首先是将输入目录的SequenceFile通过DocumentProcessor的处理,保存在输出目录的tokenized-documents目录中。
而DocumentProcessor也就是只有一个map,没有reduce的一个job。将原来的key按原样输出,value提取后tokenize一下,转化成List,也就是将value中的文本去掉标点符号,以空格分开后的单词。
SparseVectorsFromSequenceFiles有如下两行:
1
2 3 4 5 |
Configuration conf = getConf();
Path tokenizedPath = new Path(outputDir, DocumentProcessor.TOKENIZED_DOCUMENT_OUTPUT_FOLDER); //TODO: move this into DictionaryVectorizer , and then fold SparseVectorsFrom with EncodedVectorsFrom to have one framework for all of this. DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath, conf); |
再看看处理文本的job,DocumentProcessor.tokenizeDocuments(),只有一个mapper SequenceFileTokenizerMapper。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public static void tokenizeDocuments(Path input,
Class< ? extends Analyzer > analyzerClass, Path output, Configuration baseConf) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(baseConf); // this conf parameter needs to be set enable serialisation of conf values conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization"); conf.set(ANALYZER_CLASS, analyzerClass.getName()); Job job = new Job(conf); job.setJobName("DocumentProcessor::DocumentTokenizer: input-folder: " + input); job.setJarByClass(DocumentProcessor.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(StringTuple.class); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); job.setMapperClass(SequenceFileTokenizerMapper.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setNumReduceTasks(0); job.setOutputFormatClass(SequenceFileOutputFormat.class); HadoopUtil.delete(conf, output); boolean succeeded = job.waitForCompletion(true); if (!succeeded) throw new IllegalStateException("Job failed!"); |
tokenizer之后,便进行TFIDF计算。
进行TFIDF计算
如果用户输入的maxDFSigma大于0,则输出目录为tf-vectors-toprune,否则为tf-vectors。
由DictionaryVectorizer类的createTermFrequencyVectors()静态方法来完成。
进行TFIDF计算的第一步是WordCount,
if n-gram为1,则直接由startWordCounting()方法来完成
outputKey=Text outputValue=LongWritable Mapper= TermCountMapper(org.apache.mahout.vectorizer.term) Combiner=TermCountCombiner Reducer=TermCountReducer 输出类型=SequenceFileOutputFormat 输出目录=wordcount
说白了就是hadoop入门的第一个程序:wordCount
else 由CollocDriver.generateAllGrams()来完成(两个job):
generateCollocations computeNGramsPruneByLLR
第二步,给每个单词编号(assign ids to feature List)。
由createDictionaryChunks处理,输入目录为wordcount,输出文件为dictionary.file-*,每个chunk一个块号。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
int i = 0;
for (Pair< Writable,Writable > record : new SequenceFileDirIterable< Writable,Writable >(filesPattern, PathType.GLOB, null, null, true, conf)) { if (currentChunkSize > chunkSizeLimit) { Closeables.closeQuietly(dictWriter); chunkIndex++; chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex); chunkPaths.add(chunkPath); dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class); currentChunkSize = 0; } Writable key = record.getFirst(); int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8; currentChunkSize += fieldSize; dictWriter.append(key, new IntWritable(i++));//编号! } maxTermDimension[0] = i; |
从0开始编号,最后的词的数量i保存在maxTermDimension[0]中。
第三步,构造PartialVector
最开始的tokenizer之后,文章以key/value的sequenceFile保存,其中key为相对路径,value为整篇文章的单词组。
上一步得到的dictionary是每个单词对应一个id,也写入sequenceFile里面。
mapper将tokenizer后的文章原样输出,reducer一部分如下:
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
protected void reduce(Text key, Iterable< StringTuple > values, Context context)
throws IOException, InterruptedException { Iterator< StringTuple > it = values.iterator(); if (!it.hasNext()) { return; } StringTuple value = it.next(); Vector vector = new RandomAccessSparseVector(dimension, value.length()); // guess at initial size for (String term : value.getEntries()) { if (!term.isEmpty() && dictionary.containsKey(term)) { // unigram int termId = dictionary.get(term); vector.setQuick(termId, vector.getQuick(termId) + 1); } } if (vector.getNumNondefaultElements() > 0) { VectorWritable vectorWritable = new VectorWritable(vector); context.write(key, vectorWritable); } } |
此时以tokenizer之后的文章和dictionary作为输入,每篇文章得到一个vector(类型为RandomAccessSparseVector,其实是一个hashMap),vector保存的是每篇文章的id号和频率。
然后以key/vector写入。
由于上一步产生的dictionary可能很大,分过块,每次reduce只从一个dictionary的chunk中提取id,分多次处理,最后再合并。合并采用PartialVectorMerger.mergePartialVectors()方法设置一个job来完成。
默认是不计算IDF的,在参数中指明后会在上一步计算partialVector(TF)后计算IDF,输入为TF目录。
1
2 3 4 |
if (shouldPrune || processIdf) {
docFrequenciesFeatures = TFIDFConverter.calculateDF(new Path(outputDir, tfDirName), outputDir, conf, chunkSize); } |
计算IDF过程比较清晰:
看此过程的Mapper:
1
2 3 4 5 6 7 8 9 10 |
protected void map(WritableComparable< ? > key, VectorWritable value, Context context)
throws IOException, InterruptedException { Vector vector = value.get(); Iterator< Vector.Element > it = vector.iterateNonZero(); while (it.hasNext()) { Vector.Element e = it.next(); context.write(new IntWritable(e.index()), ONE); } context.write(TOTAL_COUNT, ONE); } |
输入为key/vector,提取出vector内容,对每一个词,得到他在词典中的id,然后加1.现在key变为这个词的id。
Reducer:
1
2 3 4 5 6 7 8 |
protected void reduce(IntWritable key, Iterable< LongWritable > values, Context context)
throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } |
相同key的value相加,又是一个wordcount程序。这样每个词key在多少个文档中出现过DF(不是在文档中出现的次数)就得到了。
输出目录为df-count,同计算tf一样,分为几个chunk写入HDFS。
根据要求有一个计算标准差的过程:
1
2 3 |
double stdDev = BasicStats.stdDevForGivenMean(dfDir, stdCalcDir, 0.0, conf);
long vectorCount = docFrequenciesFeatures.getFirst()[1]; maxDF = (int) (100.0 * maxDFSigma * stdDev / vectorCount); |
以及一个pruneVector的过程:
1
2 3 4 5 6 7 8 9 |
HighDFWordsPruner.pruneVectors(tfDir,
prunedTFDir, prunedPartialTFDir, maxDF, conf, docFrequenciesFeatures, -1.0f, false, reduceTasks); |
最后计算TFIDF:
1
2 3 4 5 6 7 8 9 10 11 |
public static void processTfIdf(Path input,
Path output, Configuration baseConf, Pair< Long[], List< Path > > datasetFeatures, int minDf, long maxDF, float normPower, boolean logNormalize, boolean sequentialAccessOutput, boolean namedVector, int numReducers) |
Mapper照原样输出,Reducer一部分如下:
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
protected void reduce(WritableComparable< ? > key, Iterable< VectorWritable > values, Context context)
throws IOException, InterruptedException { Iterator< VectorWritable > it = values.iterator(); if (!it.hasNext()) { return; } Vector value = it.next().get(); Iterator< Vector.Element > it1 = value.iterateNonZero(); Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements()); while (it1.hasNext()) { Vector.Element e = it1.next(); if (!dictionary.containsKey(e.index())) { continue; } long df = dictionary.get(e.index()); if (maxDf > -1 && (100.0 * df) / vectorCount > maxDf) { continue; } if (df < minDf) { df = minDf; } vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount)); } if (sequentialAccess) { vector = new SequentialAccessSparseVector(vector); } if (namedVector) { vector = new NamedVector(vector, key.toString()); } VectorWritable vectorWritable = new VectorWritable(vector); context.write(key, vectorWritable); } |
注意到
vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));
首先得到单词的id,然后计算tf*idf,再写回到这个vector。
最后的context.write(key, vectorWritable)得到了key为此文本的相对路径,value为ft*idf的词的vector。
至此,计算完成。
整个过程产生的目录如下:
整个过程的所有job信息如下:
http://hnote.org/big-data/mahout/sparsevectorsfromsequencefiles-2
相关推荐
mahoutAlgorithms源码分析 mahout代码解析
Mahout是一个Java的机器学习库。Mahout的完整源代码,基于maven,可以轻易导入工程中
Mahout教程内含源码以及说明书可以自己运行复现.zip
mahout,朴素贝叶斯分类,中文分词,mahout,朴素贝叶斯分类,中文分词,
mahout0.9的源码,支持hadoop2,需要自行使用mvn编译。mvn编译使用命令: mvn clean install -Dhadoop2 -Dhadoop.2.version=2.2.0 -DskipTests
Mahout:整体框架,实现了协同过滤 Deeplearning4j,构建VSM Jieba:分词,关键词提取 HanLP:分词,关键词提取 Spring Boot:提供API、ORM 关键实现 基于用户的协同过滤 直接调用Mahout相关接口即可 选择不同...
mahout-distribution-0.5-src.zip mahout 源码包
Mahout in Action 源码,结合Mahout in Action 学习数据挖掘,比较容易理解
该资源是mahout in action 中的源码,适用于自学,可在github下载:https://github.com/tdunning/MiA
mahout 数据挖掘 数据分析 开源 hadoop
mahout实战 源码 mahout实战 配套 mahout-distribution-0.5.tar.gz 版本
【甘道夫】通过Mahout构建贝叶斯文本分类器案例详解 -- 配套源码
Thank you for requesting the download for Apache Mahout Cookbook. Please click the following link to download the code:
mahout0.11版本,源码,可修改源码并自己编译,使用java语言编写,maven编译
svd算法的工具类,直接调用出结果,调用及设置方式参考http://blog.csdn.net/fansy1990 <mahout源码分析之DistributedLanczosSolver(七)>
mahout 0.7 src, mahout 源码包, hadoop 机器学习子项目 mahout 源码包
Mahout 是 Apache Software Foundation(ASF) 旗下的一个开源项目,提供一些可扩展的机器学习领域经典算法的实现,旨在帮助开发人员更加方便快捷地创建智能应用程序。Mahout包含许多实现,包括聚类、分类、推荐过滤...
maven_mahout_template-mahout-0.8
mahout_help,mahout的java api帮助文档,可以帮你更轻松掌握mahout