`

[spark-src-core] 7.1 application in spark-PageRank

 
阅读更多

  below code path are all from sparks' example beside some comments are added by me.

 

val lines = ctx.textFile(args(0), 1)
    //-1 generate links of <src,targets> pair
    var links = lines.map{ s =>
      val parts = s.split("\\s+")
      (parts(0), parts(1)) //-pair of <src,target>
    }.distinct() //-needless if dedupliate
      .groupByKey().cache() //-raw:利用groupby生成一个准备join的表,模拟表数据实际情况; 由于links多次迭代所以要cache提升性能 #B
      //-leib.如果此行打开,上行也要同时打开否则redueByKey()异常,因为for()中flatMap()会产生(Char,Double)
//      .partitionBy(new org.apache.spark.HashPartitioner(2)).cache()

    //-2 generate ranks with default value,ie <spawnup-url,default-rank>
    //-use val if #A is comment
    var ranks = links.mapValues(v => 1.0) // ie <raw-links-key,1.0>

    //-3
    for (i <- 1 to iters) {
      //-3.1 reverse the spawnup urls to target urls:inner join;由于links是url全集可能性能影响大
      //- 交换links,ranks是否可以提升性能? no ,this is not leftJoin but inner join
      // ?links数量太大时,对于后续深迭代计算影响大,可以先利用contribs计算新的links(mapValues())再进行下一次join
      //-note:both links and ranks rdd are same partitioner,so no shuffle is necessary for join op
      val contribs = links.join(ranks).values.flatMap{ case (urls, rank) => //-why uses 'case' clause?元组tuple就要用
        val size = urls.size  //-target(to) urls size
        urls.map(url => (url, rank / size)) //-avg rank per target url
      }
      //-3.2 merge the contributed ranks per target url; 注意:此ranks不断收窄(慢慢远离出发urls),导致要计算的数据越来越少,see #A
      //-为什么不用恢复到原ranks节点数?若果恢复,统计数据将再恢复为第二次的数据
      ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) //加权求和; retains same partittoner with join
      //-#A:若果数量确实庞大,可以使用此方法大约每隔几轮缓存下结果,这样在10轮以上的就快很多了? cmp #B
//      val oldlinks = links
//      links = links.join(ranks).map{ case (k,(urls, rank)) => (k,urls)} //-added by leib
//      oldlinks.unpersist(false)

      println("step------------------------------"+i+"---------------------------------")
      ranks.foreach(s => println("-result:" + s._1 + " - " + s._2))
    }

 

ref:

PageRank算法在spark上的简单实现

Spark PageRank

 

0
3
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics