`

基于hadoop的推荐算法-mahout版

阅读更多

基于hadoop的推荐算法,讲其中mahout实现的基于项目的推荐算法

分为4步:

1.获得人-物 用户矩阵

    输入为所有人对物品的评价或关联

    map端输出key为人,value为物品+倾好度

    reeduce端输出key为人,vallue为多个物品+倾好度

 

2.获得物-物 项目矩阵

   输入为“用户矩阵”,讲每一行人-物数据中的物品做笛卡尔积,生产成物-物的关联

   map端输出为key为物,value为关联度

   reduce端输出key为物,value为多个物的关联度

(可以根据各种规则生成项目相似度矩阵表,此处算法带过)

修改:

求项目相似矩阵是基于项目的协同过滤算法的核心

公式有很多种,核心是物品i和物品j相关用户的交集与并集的商

mahout使用的公式是1.dot(i,j) = sum(Pi(u)*Pi(u))

                  2.norms(i) = sum(Pi(u)^2)

                  3.simi(i,j) = 1/(1+(norms(i)-2*dot(i,j)+noorm(i))^1/2)

 

mahout的实现方法是

第一个job,用物品-人的矩阵,求得norms,即物品的用户平方和,输出是物-norms

第二个job,Map:用人-物的矩阵,求Pi(u)*Pi(u),即相同用户的物品的评价的乘机,输出物-多个对端物品的Pi(u)*Pi(u)

           Reduce:用物-多个对端物品的Pi(u)*Pi(u)和物-norms,求得物品的相似矩阵(因为这个时候可以汇总所有和这个物品相关的物品的dot)

第三个job,补全物品的相似矩阵

 

 

3.获得用户-项目相似矩阵

输入为人-物 用户矩阵 和 物-物 项目矩阵

Map端输出key为物,value为类VectorOrPrefWritable,是包含物与人的倾好度,或是物与物的相似度

reduce端输出key为物,value为类VectorAndPrefWritable,是汇总当个物品到所有人的倾好度和到所有物品的相似度

 

4.获得用户推荐矩阵

输入为VectorAndPrefWritable

Map端输出为key:人,value:物+系数(map端根据单个物品贡献的系数生成推荐系数,也就是人到物品A的倾好度*物品A到其他物品的相似度)

reduce端输出为key:人,,value:推荐项目+系数(reduce端使用自定公式,汇总所有单物品贡献的四叔,求人到其他项目的倾好度,取topn作为当前用户的推荐项目)

 

再在这里贴几个mahout推荐算法分析的帖子:

http://eric-gcm.iteye.com/blog/1817822

http://eric-gcm.iteye.com/blog/1818033

http://eric-gcm.iteye.com/blog/1820060

 

以下是mahout代码:

 

 ItemSimilarityJob类是mahout使用hadoop做推荐引擎的主要实现类,下面开始分析。

run()函数是启动函数:

 

 

Java代码  收藏代码
  1. public final class RecommenderJob extends AbstractJob {  
  2.   
  3.   public static final String BOOLEAN_DATA = "booleanData";  
  4.   
  5.   private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;  
  6.   private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;  
  7.   private static final int DEFAULT_MIN_PREFS_PER_USER = 1;  
  8.   
  9.   @Override  
  10.   public int run(String[] args) throws Exception {  
  11.     //这里原来有大一堆代码,都是用来载入配置项,不用管它  
  12.   
  13.     //第一步:准备矩阵,将原始数据转换为一个矩阵,在PreparePreferenceMatrixJob这个类中完成  
  14.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  15.       ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{  
  16.               "--input", getInputPath().toString(),  
  17.               "--output", prepPath.toString(),  
  18.               "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),  
  19.               "--minPrefsPerUser", String.valueOf(minPrefsPerUser),  
  20.               "--booleanData", String.valueOf(booleanData),  
  21.               "--tempDir", getTempPath().toString()});  
  22.   
  23.       numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());  
  24.     }  
  25.   
  26.     //第二步:计算协同矩阵  
  27.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  28.   
  29.       /* special behavior if phase 1 is skipped */  
  30.       if (numberOfUsers == -1) {  
  31.         numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  32.                 PathType.LIST, null, getConf());  
  33.       }  
  34.   
  35.       /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like 
  36.        * new DistributedRowMatrix(...).rowSimilarity(...) */  
  37.       //calculate the co-occurrence matrix  
  38.       ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{  
  39.               "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),  
  40.               "--output", similarityMatrixPath.toString(),  
  41.               "--numberOfColumns", String.valueOf(numberOfUsers),  
  42.               "--similarityClassname", similarityClassname,  
  43.               "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),  
  44.               "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),  
  45.               "--threshold", String.valueOf(threshold),  
  46.               "--tempDir", getTempPath().toString()});  
  47.     }  
  48.   
  49.     //start the multiplication of the co-occurrence matrix by the user vectors  
  50.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  51.       Job prePartialMultiply1 = prepareJob(  
  52.               similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,  
  53.               SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  54.               Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  55.               SequenceFileOutputFormat.class);  
  56.       boolean succeeded = prePartialMultiply1.waitForCompletion(true);  
  57.       if (!succeeded)   
  58.         return -1;  
  59.       //continue the multiplication  
  60.       Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  61.               prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,  
  62.               VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  63.               SequenceFileOutputFormat.class);  
  64.       if (usersFile != null) {  
  65.         prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);  
  66.       }  
  67.       prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,  
  68.               maxPrefsPerUser);  
  69.       succeeded = prePartialMultiply2.waitForCompletion(true);  
  70.       if (!succeeded)   
  71.         return -1;  
  72.       //finish the job  
  73.       Job partialMultiply = prepareJob(  
  74.               new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,  
  75.               SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  76.               ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  77.               SequenceFileOutputFormat.class);  
  78.       setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);  
  79.       succeeded = partialMultiply.waitForCompletion(true);  
  80.       if (!succeeded)   
  81.         return -1;  
  82.     }  
  83.   
  84.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  85.       //filter out any users we don't care about  
  86.       /* convert the user/item pairs to filter if a filterfile has been specified */  
  87.       if (filterFile != null) {  
  88.         Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,  
  89.                 ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,  
  90.                 ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  91.                 SequenceFileOutputFormat.class);  
  92.         boolean succeeded = itemFiltering.waitForCompletion(true);  
  93.         if (!succeeded)   
  94.           return -1;  
  95.       }  
  96.   
  97.       String aggregateAndRecommendInput = partialMultiplyPath.toString();  
  98.       if (filterFile != null) {  
  99.         aggregateAndRecommendInput += "," + explicitFilterPath;  
  100.       }  
  101.       //extract out the recommendations  
  102.       Job aggregateAndRecommend = prepareJob(  
  103.               new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,  
  104.               PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,  
  105.               AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,  
  106.               TextOutputFormat.class);  
  107.       Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();  
  108.       if (itemsFile != null) {  
  109.         aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);  
  110.       }  
  111.   
  112.       if (filterFile != null) {  
  113.         setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);  
  114.       }  
  115.       setIOSort(aggregateAndRecommend);  
  116.       aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,  
  117.               new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());  
  118.       aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);  
  119.       aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);  
  120.       boolean succeeded = aggregateAndRecommend.waitForCompletion(true);  
  121.       if (!succeeded)   
  122.         return -1;  
  123.     }  
  124.   
  125.     return 0;  
  126.   }  

 

 

 

       第二步,计算协同矩阵,主要在RowSimilarityJob 这个类中完成

 

  

Java代码  收藏代码
  1. ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{  
  2.               "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),  
  3.               "--output", similarityMatrixPath.toString(),  
  4.               "--numberOfColumns", String.valueOf(numberOfUsers),  
  5.               "--similarityClassname", similarityClassname,  
  6.               "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),  
  7.               "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),  
  8.               "--threshold", String.valueOf(threshold),  
  9.               "--tempDir", getTempPath().toString()});  
  10.     }  

   可以看到这个job的输入路径就是上一篇中,PreparePreferenceMatrixJob中最后一个reducer的输出路径。

 

下边详细分析RowSimilarityJob类的实现:

 

Java代码  收藏代码
  1. public class RowSimilarityJob extends AbstractJob {  
  2.   
  3.   
  4.   @Override  
  5.   public int run(String[] args) throws Exception {  
  6.     //一大堆载入参数的代码,忽略  
  7.       
  8.     //第一个MapReduce  
  9.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  10.       Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,  
  11.           VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);  
  12.       normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);  
  13.       Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();  
  14.       normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));  
  15.       normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());  
  16.       normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());  
  17.       normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());  
  18.       normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);  
  19.       boolean succeeded = normsAndTranspose.waitForCompletion(true);  
  20.       if (!succeeded) {  
  21.         return -1;  
  22.       }  
  23.     }  
  24.     //第二个MapReduce  
  25.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  26.       Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,  
  27.           IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);  
  28.       pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);  
  29.       Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();  
  30.       pairwiseConf.set(THRESHOLD, String.valueOf(threshold));  
  31.       pairwiseConf.set(NORMS_PATH, normsPath.toString());  
  32.       pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());  
  33.       pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());  
  34.       pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);  
  35.       pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);  
  36.       pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);  
  37.       boolean succeeded = pairwiseSimilarity.waitForCompletion(true);  
  38.       if (!succeeded) {  
  39.         return -1;  
  40.       }  
  41.     }  
  42.     //第三个MapReduce  
  43.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  44.       Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,  
  45.           IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,  
  46.           VectorWritable.class);  
  47.       asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);  
  48.       asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);  
  49.       boolean succeeded = asMatrix.waitForCompletion(true);  
  50.       if (!succeeded) {  
  51.         return -1;  
  52.       }  
  53.     }  
  54.   
  55.     return 0;  
  56.   }  

 

 

 可以看到RowSimilityJob也是分成三个MapReduce过程:

 

1、Mapper :VectorNormMapper类,输出 ( userid_index, <itemid_index, pref> )类型

Java代码  收藏代码
  1. public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  2.   
  3.     @Override  
  4.     protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)  
  5.         throws IOException, InterruptedException {  
  6.   
  7.       Vector rowVector = similarity.normalize(vectorWritable.get());  
  8.   
  9.       int numNonZeroEntries = 0;  
  10.       double maxValue = Double.MIN_VALUE;  
  11.   
  12.       Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();  
  13.       while (nonZeroElements.hasNext()) {  
  14.         Vector.Element element = nonZeroElements.next();  
  15.         RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);  
  16.         partialColumnVector.setQuick(row.get(), element.get());  
  17.         //输出 ( userid_index, <itemid_index, pref> )类型  
  18.         ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));  
  19.   
  20.         numNonZeroEntries++;  
  21.         if (maxValue < element.get()) {  
  22.           maxValue = element.get();  
  23.         }  
  24.       }  
  25.   
  26.       if (threshold != NO_THRESHOLD) {  
  27.         nonZeroEntries.setQuick(row.get(), numNonZeroEntries);  
  28.         maxValues.setQuick(row.get(), maxValue);  
  29.       }  
  30.       norms.setQuick(row.get(), similarity.norm(rowVector));  
  31.       //计算item的总数  
  32.       ctx.getCounter(Counters.ROWS).increment(1);  
  33.     }  
  34. }  

 

Reduer : MergeVectorsReducer类,输入的是(userid_index, <itemid_index, pref>),同一个userid_index在此进行合并,输出( userid_index, vector<itemid_index, pref> )

Java代码  收藏代码
  1.   public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  2.   
  3.     @Override  
  4.     protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)  
  5.         throws IOException, InterruptedException {  
  6.       Vector partialVector = Vectors.merge(partialVectors);  
  7.   
  8.       if (row.get() == NORM_VECTOR_MARKER) {  
  9.         Vectors.write(partialVector, normsPath, ctx.getConfiguration());  
  10.       } else if (row.get() == MAXVALUE_VECTOR_MARKER) {  
  11.         Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());  
  12.       } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {  
  13.         Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);  
  14.       } else {  
  15.         ctx.write(row, new VectorWritable(partialVector));  
  16.       }  
  17.     }  
  18.   }  
  19. }  

 2、Mapper:CooccurrencesMapper类,对同一个userid_index下的vector<itemid_index ,pref>进行处理,

收集<item1, item2>对, 输出为( itemid_index, vector<itemid_index, value> )

Java代码  收藏代码
  1. public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  2.   
  3.     @Override  
  4.     protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)  
  5.         throws IOException, InterruptedException {  
  6.       Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);  
  7.       Arrays.sort(occurrences, BY_INDEX);  
  8.   
  9.       int cooccurrences = 0;  
  10.       int prunedCooccurrences = 0;  
  11.       for (int n = 0; n < occurrences.length; n++) {  
  12.         Vector.Element occurrenceA = occurrences[n];  
  13.         Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);  
  14.         for (int m = n; m < occurrences.length; m++) {  
  15.           Vector.Element occurrenceB = occurrences[m];  
  16.           if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {  
  17.             dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));  
  18.             cooccurrences++;  
  19.           } else {  
  20.             prunedCooccurrences++;  
  21.           }  
  22.         }  
  23.         ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));  
  24.       }  
  25.       ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);  
  26.       ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);  
  27.     }  
  28.   }  

 

Reducer :SimilarityReducer类,生成协同矩阵

 

Java代码  收藏代码
  1. public static class SimilarityReducer  
  2.       extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  3.   
  4.     @Override  
  5.     protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)  
  6.         throws IOException, InterruptedException {  
  7.       Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();  
  8.       //取一个vecotr作为该item的行向量  
  9.       Vector dots = partialDotsIterator.next().get();  
  10.       while (partialDotsIterator.hasNext()) {  
  11.         Vector toAdd = partialDotsIterator.next().get();  
  12.         Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();  
  13.         while (nonZeroElements.hasNext()) {  
  14.           Vector.Element nonZeroElement = nonZeroElements.next();  
  15.           //nonZeroElement.index()为itemid,将另一个vecotr中itemid的value加进去  
  16.           dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());  
  17.         }  
  18.       }  
  19.       //最后得到的dots是协同矩阵中行号为row的一行,行中元素是item对其他的item的相似度  
  20.       Vector similarities = dots.like();  
  21.       double normA = norms.getQuick(row.get());  
  22.       Iterator<Vector.Element> dotsWith = dots.iterateNonZero();  
  23.       while (dotsWith.hasNext()) {  
  24.         Vector.Element b = dotsWith.next();  
  25.         double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);  
  26.         if (similarityValue >= treshold) {  
  27.           similarities.set(b.index(), similarityValue);  
  28.         }  
  29.       }  
  30.       if (excludeSelfSimilarity) {  
  31.         similarities.setQuick(row.get(), 0);  
  32.       }  
  33.       ctx.write(row, new VectorWritable(similarities));  
  34.     }  
  35.   }  

 

3、Mapper:UnsymmetrifyMapper类,用来生成对称矩阵的。上一步得到的是非对称矩阵,首先将矩阵偏转,得到偏转矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵

 

Java代码  收藏代码
  1. public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable>  {  
  2.   
  3.     private int maxSimilaritiesPerRow;  
  4.   
  5.     @Override  
  6.     protected void setup(Mapper.Context ctx) throws IOException, InterruptedException {  
  7.       maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);  
  8.       Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");  
  9.     }  
  10.   
  11.     @Override  
  12.     protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)  
  13.         throws IOException, InterruptedException {  
  14.       Vector similarities = similaritiesWritable.get();  
  15.       // For performance reasons moved transposedPartial creation out of the while loop and reusing the same vector  
  16.       Vector transposedPartial = similarities.like();  
  17.       TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);  
  18.       Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();  
  19.       //这个地方用来生成偏转矩阵的,非对称矩阵,用原矩阵加上偏转矩阵,可以得到对称矩阵  
  20.       while (nonZeroElements.hasNext()) {  
  21.         Vector.Element nonZeroElement = nonZeroElements.next();  
  22.         topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));  
  23.           
  24.         transposedPartial.setQuick(row.get(), nonZeroElement.get());  
  25.         //偏转矩阵中的每一个元素  
  26.         ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));  
  27.           
  28.         transposedPartial.setQuick(row.get(), 0.0);  
  29.       }  
  30.       Vector topKSimilarities = similarities.like();  
  31.       for (Vector.Element topKSimilarity : topKQueue.retrieve()) {  
  32.         topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());  
  33.       }  
  34.       //这里只收集前maxSimilaritiesPerRow个得分最高的item,所以咱们最后的对称矩阵,实际上每行只有  
  35.       //maxSimilaritiesPerRow个是对称的,其他的位置也不管了  
  36.       ctx.write(row, new VectorWritable(topKSimilarities));  
  37.     }  
  38.   }  

 

 Reducer:MergeToTopKSimilaritiesReducer类,就是将上面Map偏转的元素都收集起来,也就是完成了偏转矩阵和(截取了得分前maxSimilaritiesPerRow个)的原矩阵相加的过程,得到了对称矩阵

Java代码  收藏代码
  1. public static class MergeToTopKSimilaritiesReducer  
  2.     extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  3.   
  4.   private int maxSimilaritiesPerRow;  
  5.   
  6.   @Override  
  7.   protected void setup(Context ctx) throws IOException, InterruptedException {  
  8.     maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);  
  9.     Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");  
  10.   }  
  11.   
  12.   @Override  
  13.   protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)  
  14.       throws IOException, InterruptedException {  
  15.     Vector allSimilarities = Vectors.merge(partials);  
  16.     Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);  
  17.     ctx.write(row, new VectorWritable(topKSimilarities));  
  18.   }  
  19. }  

 

至此,RowSimilarityJob类的全部工作就完成,最终生成的是一个对称矩阵,也就是协同矩阵

 

 

 

Java代码  收藏代码
  1. //协同矩阵与用户向量相乘  
  2.     //start the multiplication of the co-occurrence matrix by the user vectors  
  3.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  4.       //第一个MapReducer  
  5.       Job prePartialMultiply1 = prepareJob(  
  6.               similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,  
  7.               SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  8.               Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  9.               SequenceFileOutputFormat.class);  
  10.       boolean succeeded = prePartialMultiply1.waitForCompletion(true);  
  11.       if (!succeeded)   
  12.         return -1;  
  13.       //第二个MapReduce  
  14.       //continue the multiplication  
  15.       Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  16.               prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,  
  17.               VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  18.               SequenceFileOutputFormat.class);  
  19.       if (usersFile != null) {  
  20.         prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);  
  21.       }  
  22.       prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,  
  23.               maxPrefsPerUser);  
  24.       succeeded = prePartialMultiply2.waitForCompletion(true);  
  25.       if (!succeeded)   
  26.         return -1;  
  27.       //finish the job  
  28.       //第三个MapReduce  
  29.       Job partialMultiply = prepareJob(  
  30.               new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,  
  31.               SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  32.               ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  33.               SequenceFileOutputFormat.class);  
  34.       setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);  
  35.       succeeded = partialMultiply.waitForCompletion(true);  
  36.       if (!succeeded)   
  37.         return -1;  
  38.     }  

 

 下边也是同样分析一下这个三个MapReduce的细节:

 

1、Mapper: SimilarityMatrixRowWrapperMapper 类,将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper 的输出类型一致

Java代码  收藏代码
  1. public final class SimilarityMatrixRowWrapperMapper extends  
  2.     Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {  
  3.     
  4.   //将协同矩阵的一行拿出来,通过包装,封装成VectorOrPrefWritable类,与那边的UserVectorSplitterMapper  
  5.   //的输出类型一致  
  6.   @Override  
  7.   protected void map(IntWritable key,  
  8.                      VectorWritable value,  
  9.                      Context context) throws IOException, InterruptedException {  
  10.     Vector similarityMatrixRow = value.get();  
  11.     /* remove self similarity */  
  12.     similarityMatrixRow.set(key.get(), Double.NaN);  
  13.     context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));  
  14.   }  
  15.   
  16. }  

 

 

2、Mapper:UserVectorSplitterMapper类

Java代码  收藏代码
  1. //输入格式: theUserID:<itemid_index1,pref1>,<itemid_index2,pref2>........<itemid_indexN,prefN>  
  2.   //输出格式:  itemid1:<theUserID,pref1>  
  3.   //          itemid2:<theUserID,pref2>  
  4.   //          itemid3:<theUserID,pref3>  
  5.   //          ......  
  6.   //          itemidN:<theUserID,prefN>  
Java代码  收藏代码
  1. public final class UserVectorSplitterMapper extends  
  2.     Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {  
  3.     
  4.   @Override  
  5.   protected void map(VarLongWritable key,  
  6.                      VectorWritable value,  
  7.                      Context context) throws IOException, InterruptedException {  
  8.     long userID = key.get();  
  9.     if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {  
  10.       return;  
  11.     }  
  12.     Vector userVector = maybePruneUserVector(value.get());  
  13.     Iterator<Vector.Element> it = userVector.iterateNonZero();  
  14.     VarIntWritable itemIndexWritable = new VarIntWritable();  
  15.     VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();  
  16.     while (it.hasNext()) {  
  17.       Vector.Element e = it.next();  
  18.       itemIndexWritable.set(e.index());  
  19.       vectorOrPref.set(userID, (float) e.get());  
  20.       context.write(itemIndexWritable, vectorOrPref);  
  21.     }  
  22.   }  

 

3、Reduce:ToVectorAndPrefReducer类,收集协同矩阵为itemid的一行,并且收集评价过该item的用户和评分,最后的输出是 itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)

 

Java代码  收藏代码
  1. public final class ToVectorAndPrefReducer extends  
  2.     Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {  
  3.   
  4.   //收集所有key为itemid的  
  5.   @Override  
  6.   protected void reduce(VarIntWritable key,  
  7.                         Iterable<VectorOrPrefWritable> values,  
  8.                         Context context) throws IOException, InterruptedException {  
  9.   
  10.     List<Long> userIDs = Lists.newArrayList();  
  11.     List<Float> prefValues = Lists.newArrayList();  
  12.     Vector similarityMatrixColumn = null;  
  13.     for (VectorOrPrefWritable value : values) {  
  14.       if (value.getVector() == null) {  
  15.         // Then this is a user-pref value  
  16.         userIDs.add(value.getUserID());  
  17.         prefValues.add(value.getValue());  
  18.       } else {  
  19.         // Then this is the column vector  
  20.         //协同矩阵的一个行(行号为itemid的一行)  
  21.         if (similarityMatrixColumn != null) {  
  22.           throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());  
  23.         }  
  24.         similarityMatrixColumn = value.getVector();  
  25.       }  
  26.     }  
  27.   
  28.     if (similarityMatrixColumn == null) {  
  29.       return;  
  30.     }  
  31.     //收集协同矩阵为itemid的一行,并且手机评价过该item的用户和评分  
  32.     VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);  
  33.     context.write(key, vectorAndPrefs);  
  34.   }  
  35.   
  36. }  

 

第四步,协同矩阵和用户向量相乘,得到推荐结果

 

Java代码  收藏代码
  1. //extract out the recommendations  
  2.      Job aggregateAndRecommend = prepareJob(  
  3.              new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,  
  4.              PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,  
  5.              AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,  
  6.              TextOutputFormat.class);  
  7.      Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();  

 

 

Mapper:PartialMultiplyMapper类

 

Java代码  收藏代码
  1. //输入类型:( itemid_index, <userid的数组,pref的数组,协同矩阵行号为itemid_index的行> )  
  2. //输出类型: userid,<该用户对itemid_index1的评分,协同矩阵行号为itemid_index1的行> )  
  3. //        userid,<该用户对itemid_index2的评分,协同矩阵行号为itemid_index2的行> )  
  4. //                       .....    
  5. //                       .....  
  6. //          userid,<该用户对itemid_indexN的评分,协同矩阵行号为itemid_indexN的行> )  

 

 

 

Java代码  收藏代码
  1. public final class PartialMultiplyMapper extends  
  2.     Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> {  
  3.   
  4.   @Override  
  5.   protected void map(VarIntWritable key,  
  6.                      VectorAndPrefsWritable vectorAndPrefsWritable,  
  7.                      Context context) throws IOException, InterruptedException {  
  8.   
  9.     Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();  
  10.     List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();  
  11.     List<Float> prefValues = vectorAndPrefsWritable.getValues();  
  12.   
  13.     VarLongWritable userIDWritable = new VarLongWritable();  
  14.     PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();  
  15.   
  16.     for (int i = 0; i < userIDs.size(); i++) {  
  17.       long userID = userIDs.get(i);  
  18.       float prefValue = prefValues.get(i);  
  19.       if (!Float.isNaN(prefValue)) {  
  20.         prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);  
  21.         userIDWritable.set(userID);  
  22.         context.write(userIDWritable, prefAndSimilarityColumn);  
  23.       }  
  24.     }  
  25.   }  
  26.   
  27. }  

 

 Reducer:AggregateAndRecommendReducer类,Reducer中进行PartialMultiply,按乘积得到的推荐度的大小取出最大的几个item。对于非booleanData,是用pref和相似度矩阵的PartialMultiply得到推荐度的值来进行排序。
而booleanData的pref值都是1.0f,所以去计算矩阵相乘的过程没有意义,直接累加相似度的值即可。
用这个数据排序就可得到推荐结果

 

Java代码  收藏代码
  1. public final class AggregateAndRecommendReducer extends  
  2.     Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> {  
  3.  @Override  
  4.   protected void reduce(VarLongWritable userID,  
  5.                         Iterable<PrefAndSimilarityColumnWritable> values,  
  6.                         Context context) throws IOException, InterruptedException {  
  7.     if (booleanData) {  
  8.       reduceBooleanData(userID, values, context);  
  9.     } else {  
  10.       reduceNonBooleanData(userID, values, context);  
  11.     }  
  12.   }  
  13.   
  14.   private void reduceBooleanData(VarLongWritable userID,  
  15.                                  Iterable<PrefAndSimilarityColumnWritable> values,  
  16.                                  Context context) throws IOException, InterruptedException {  
  17.     /* having boolean data, each estimated preference can only be 1, 
  18.      * however we can't use this to rank the recommended items, 
  19.      * so we use the sum of similarities for that. */  
  20.     Vector predictionVector = null;  
  21.     for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {  
  22.       predictionVector = predictionVector == null  
  23.           ? prefAndSimilarityColumn.getSimilarityColumn()  
  24.           : predictionVector.plus(prefAndSimilarityColumn.getSimilarityColumn());  
  25.     }  
  26.     writeRecommendedItems(userID, predictionVector, context);  
  27.   }  
  28.   
  29.   private void reduceNonBooleanData(VarLongWritable userID,  
  30.                         Iterable<PrefAndSimilarityColumnWritable> values,  
  31.                         Context context) throws IOException, InterruptedException {  
  32.     /* each entry here is the sum in the numerator of the prediction formula */  
  33.     Vector numerators = null;  
  34.     /* each entry here is the sum in the denominator of the prediction formula */  
  35.     Vector denominators = null;  
  36.     /* each entry here is the number of similar items used in the prediction formula */  
  37.     Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);  
  38.   
  39.     for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {  
  40.       Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();  
  41.       float prefValue = prefAndSimilarityColumn.getPrefValue();  
  42.       /* count the number of items used for each prediction */  
  43.       Iterator<Vector.Element> usedItemsIterator = simColumn.iterateNonZero();  
  44.       while (usedItemsIterator.hasNext()) {  
  45.         int itemIDIndex = usedItemsIterator.next().index();  
  46.         numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);  
  47.       }  
  48.       //vector.times(float) 是向量乘于一个数,也就是向量的每一个值都乘以这个数  
  49.       //vector.plus(vector) 是两个向量相加,每一个位置上的值相加  
  50.         
  51.       //numerators是一个vecotr,每一个元素是这样的  
  52.       /* 
  53.                 例如index为item1的元素的值为: 
  54.        simility(item1, item_2)*pref(userid, item_2) 
  55.       + simility(item_1, item_3)*pref(userid, item_3) 
  56.       + simility(item1, item_4)*pref(userid, item_4) 
  57.       + ……  
  58.       + simility(item_1, item_2)*pref(userid, item_N) 
  59.       */  
  60.       // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度 ,pref(userid, item)代表用于userid对item打分分值   
  61.        
  62.       numerators = numerators == null  
  63.           ? prefValue == BOOLEAN_PREF_VALUE ? simColumn.clone() : simColumn.times(prefValue)  
  64.           : numerators.plus(prefValue == BOOLEAN_PREF_VALUE ? simColumn : simColumn.times(prefValue));  
  65.         
  66.         
  67.         
  68.       simColumn.assign(ABSOLUTE_VALUES);  
  69.       //denominators是一个vecotr,每一个元素是这样的  
  70.       /* 
  71.                 例如index为item1的元素的值为: 
  72.        simility(item1, item_2)+ simility(item_1, item_3)+ …… + simility(item_1, item_2)*pref(userid, item_N) 
  73.       */  
  74.       // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度  
  75.       denominators = denominators == null ? simColumn : denominators.plus(simColumn);  
  76.     }  
  77.   
  78.     if (numerators == null) {  
  79.       return;  
  80.     }  
  81.   
  82.     Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);  
  83.     Iterator<Vector.Element> iterator = numerators.iterateNonZero();  
  84.     while (iterator.hasNext()) {  
  85.       Vector.Element element = iterator.next();  
  86.       int itemIDIndex = element.index();  
  87.       /* preference estimations must be based on at least 2 datapoints */  
  88.       if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {  
  89.         /* compute normalized prediction */  
  90.         //计算归一化预测值  
  91.         double prediction = element.get() / denominators.getQuick(itemIDIndex);  
  92.         recommendationVector.setQuick(itemIDIndex, prediction);  
  93.       }  
  94.     }  
  95.     writeRecommendedItems(userID, recommendationVector, context);  
  96.   }  
  97. }  

 

 

 

分享到:
评论

相关推荐

    基于Hadoop-Mahout的分布式课程推荐算法.pdf

    #资源达人分享计划#

    Mahout推荐算法API详解

    Mahoutt推荐算法,从数据处理能力上,可以划分为2类:单机内存算法实现基于Hadoop的分步式算法实现单机内存算法实现:就是在单机下运行的算法,是由cf.taste项目实现的,像我的们熟悉的UserCF,ItemCF都支持单机内存...

    基于Hadoop的分布式存储计算框架实战设计源码

    这是一个基于Hadoop的分布式存储计算框架实战设计,使用Java语言开发,包含135个文件。主要文件类型包括100个XML文件、14个Java源文件、3个gitignore文件、3个prefs文件、2个Markdown文档、2个classpath文件、2个...

    基于Hadoop的大数据存储与计算框架设计源码

    本资源提供了一套基于Hadoop的大数据存储与计算框架的设计源码,包含135个文件,其中包括100个XML配置文件,14个Java源代码文件,以及3个Git忽略文件。此外,还包括3个Preferences配置文件,2个Markdown文档,以及2...

    基于mahout的协同过滤算法实现

    基于mahout的协同过滤,个性化推荐算法实现。源码是java,可单机运行

    Mahout in action带书签目录 中文 完整版

    它实现的算法都被归入机器学习或者集体智慧的范畴,但是在这里Mahout主要注重协同过滤/推荐引擎、聚类和分类。 Mahout是可伸缩的。Mahout致力于实现海量数据,单机无法处理情况下的机器学习工具。在目前阶段,这种...

    基于Java Mahout的电影推荐系统【100011537】

    基于Mahout的电影推荐系统,数据预处理程序源码,程序处理rating.dat文件,生成movie_preferences.txt文件。Apache Mahout 是 Apache Software Foundation(ASF) 旗下的一个开源项目,提供一些可扩展的机器学习领域...

    Hadoop实战(第2版)

     《Hadoop硬实战》包含: ·Hadoop和MapReduce的基本概念 ·85个实战和测试技术 ·真实的场景,实用的解决方案 ·如何整合MapReduce和R前言 致谢关于本书 第1 部分 背景和基本原理1 跳跃中的Hadoop1.1 什么...

    Hadoop实战中文版

    11.2 其他Hadoop 相关的部分 11.2.1 HBase 11.2.2 ZooKeeper 11.2.3 Cascading 11.2.4 Cloudera 11.2.5 Katta 11.2.6 CloudBase 11.2.7 Aster Data和Greenplum 11.2.8 Hama和Mahout 11.2.9 search-...

    Hadoop大数据分析与挖掘实战 高清版

    10多位技术专家结合自己10多年的经验,以电信、航空、医疗等多个行业的实战案例为主线,深入浅出地讲解了如何基于Hadoop架构技术进行大数据挖掘建模、数据分析和二次开发。 本书共16章,分三个部分:基础篇、实战篇...

    Hadoop实战中文版.PDF

    22412.2 挖掘中国移动的数据 22512.3 在StumbleUpon推荐最佳网站 22912.3.1 分布式StumbleUpon的开端 23012.3.2 HBase和StumbleUpon 23012.3.3 StumbleUpon上的更多Hadoop应用 23612.4 搭建面向企业查询...

    Hadoop海量网络数据处理平台的关键技术

    3.提出了一种异构环境下的高效数据存储机制针对当前基于Hadoop的海量网络数据处理平台中数据存储问题,本文对分布式数据存储技术进行深入研究,并结合服务器性能评估技术提出了一种适用于异构环境下的高效数据存储算法...

    Eclipse下编译Mahout项目运行示例

    Mahout是基于Hadoop之上的机器学习和数据挖掘的一个分布式框架项目。搞机器智能学习算法,这个是首选。本文给出一个完整的编译运行示例,让有兴趣的同行少走段弯路,多一份参考!

    第7章-大数据分析与挖掘技术---大数据基础.pptx

    推荐的定义与评估 9 推荐算法就是利用用户的一些行为,通过一些数学算法,推测出用户可能喜欢的东西 最为典型的两种推荐模式,基于用户(User-based)的推荐和基于物品(Item-based)的推荐,Mahout的推荐程序中...

    Hadoop从业者为什么需要Spark?

    而如果基于Hadoop就需要分别构建实时流处理团队、数据统计分析团队、数据挖掘团队等,而且这些团队之间无论是代码还是经验都不可相互借鉴,会形成巨大的成本,而使用Spark就不存在这个问题; 再说一点,Hadoop现在...

    基于spark、mahout和spring boot构建的智能推荐系统.zip

    推荐模块:包含离线推荐和实时推荐,离线推荐负责把推荐结果存储到存储系统中实时推荐负责产生实时的消息队列,并且消费实时消息产生推荐结果,最后存储在存储模块中。 数据展示模块:负责展示项目中所用的数据。 ...

    基于Spark框架的聚类算法研究

    新型框架Spark部署在Hadoop平台上,它的机器学习算法几乎可以完全替代传统的Mahout Map Reduce的编程模式,但由于Spark的内存模型特点,执行速度快。该文研究了Spark中的机器学习中的聚类算法KMeans,先分析了算法思想,...

    基于Hadoop海量网络数据安全性研究

    为了解决海量网络数据的安全性问题,本文利用模糊Kalgorithm和朴素贝叶斯分类法开发了联合分类算法,并利用Mahout技术开发了基于MapReduce并行计算框架的特征提取分类算法。 相应模型的建立和训练,可以使联合分类器...

    Hadoop实战(陆嘉恒)译

    Hive及Hadoop群11.1 Hive11.1.1 安装与配置Hive11.1.2 查询的示例11.1.3 深入HiveQL11.1.4 Hive小结11.2 其他Hadoop 相关的部分11.2.1 HBase11.2.2 ZooKeeper11.2.3 Cascading11.2.4 Cloudera11.2.5 Katta11.2.6 ...

    Hadoop实战

    第一部分 Hadoop——一种分布式编程框架第1章 Hadoop简介 21.1 为什么写《Hadoop 实战》 31.2 什么是Hadoop 31.3 了解分布式系统和Hadoop 41.4 比较SQL数据库和Hadoop 51.5 理解MapReduce 61.5.1 动手扩展一个简单...

Global site tag (gtag.js) - Google Analytics