`

[spark-src-core] 2.3 shuffle in spark

 
阅读更多

1.flow

1.1 shuffle abstract



 

 

  1.2 shuffle flow

 

   1.3 sort flow  in shuffle

 

   1.4 data structure in mem



 

2.core code paths

 

//SortShuffleWriter
override def write(records: Iterator[Product2[K, V]]): Unit = { //-how to collect this result by partition?by index file
    //-1 sort result data
    //-both below cases will spill if over threshold
    val ts = System.currentTimeMillis()
    //-comp to reduce side combine HashShuffleReader#read(),here is real map side.
    if (dep.mapSideCombine) { //-as map side's Combiner;note:even if no aggregrator is provided,DAGScheduler will add it-
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") //by default,but keyOrdering,eg.
      sorter = new ExternalSorter[K, V, C](   //reduceByKey()
        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      logInfo("-aggr:" + dep.aggregator + ",key ord:" + dep.keyOrdering
        +",ser:" + dep.serializer +",part:"+dep.partitioner + ",dep " + dep)

      sorter.insertAll(records) //-if no order is given ,using the key's hashcode to sort per partition
    } else { //-no combine is given,eg. groupBy..
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the *reduce side*
      // if the operation being run is *sortByKey.*
      sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
      sorter.insertAll(records)
    }
    logInfo("*total cost of sorting(ms) " + (System.currentTimeMillis()-ts))
    ///-2 write to data file then index file
    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)//-same with shulffle output file in insertAll()
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)

    //-means that always write result dato disk event if data is much less.
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
    shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

    logInfo("-output file:" + outputFile + ",blockid:" + blockId
      + ",part len " + partitionLengths.length + ",total " + partitionLengths.sum
      +",shuffle server id " + blockManager.shuffleServerId + ",shuffleId " + dep.shuffleId)

    //-3 encapsulate the result(serialization is placed in Executor#launchTask())
    //-used by MapOutputTracker#getServerStatuses()
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)//-not contains real data but only shuffle server
  }

  /** Close this writer, passing along whether the map completed */
  override def stop(success: Boolean): Option[MapStatus] = {
    try {
      if (stopping) {
        return None
      }
      stopping = true
      if (success) {
        return Option(mapStatus)
      } else {
        // The map task failed, so delete our output data.
        shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
        return None
      }
    } finally {
      // Clean up our sorter, which may have its own intermediate files
      if (sorter != null) {
        val startTime = System.nanoTime()
        sorter.stop()
        context.taskMetrics.shuffleWriteMetrics.foreach(
          _.incShuffleWriteTime(System.nanoTime - startTime))
        sorter = null
      }
    }
  }
 
//ExternalSorter
/**-this file's data structure is same as spilled file.
   * Write all the data added into this ExternalSorter into *a file* in the disk store. This is
   * called by the SortShuffleWriter and can go through an efficient path of just concatenating
   * binary files if we decided to avoid merge-sorting.
   *
   * @param blockId block ID to write to. The index file will be blockId.name + ".index".-note
   * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
   */
  def writePartitionedFile(
      blockId: BlockId,
      context: TaskContext,
      outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)
    //-1.case
    if (bypassMergeSort && partitionWriters != null) {
      logInfo("-bypass:" + bypassMergeSort+",pwriters:" + partitionWriters.length)
      // We decided to write separate files for each partition, so just concatenate them. To keep
      // this simple we spill out the current in-memory collection so that everything is in files.-so no order is guaranteed
      spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
      partitionWriters.foreach(_.commitAndClose())
      val out = new FileOutputStream(outputFile, true) //-note here,append is 'true'
      val writeStartTime = System.nanoTime
      util.Utils.tryWithSafeFinally {
        for (i <- 0 until numPartitions) {
          val in = new FileInputStream(partitionWriters(i).fileSegment().file)
          util.Utils.tryWithSafeFinally {
            lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
          } {
            in.close()
          }
        }
      } {
        out.close()
        context.taskMetrics.shuffleWriteMetrics.foreach(
          _.incShuffleWriteTime(System.nanoTime - writeStartTime))
      }
    } else if (spills.isEmpty && partitionWriters == null) {//-come here is if no shuffle spill data to disk;same as spill()
      logInfo("-no spills occured")
      //2 Case where we only have in-memory data
      val collection = if (aggregator.isDefined) map else buffer
      val it = collection.destructiveSortedWritablePartitionedIterator(comparator) //-same as spillToMergableFile()
      while (it.hasNext) { //-note:this is a double loops
        val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
          context.taskMetrics.shuffleWriteMetrics.get) //-using file's append mode,so only one file will be created
        val partitionId = it.nextPartition() //-base part id
        while (it.hasNext && it.nextPartition() == partitionId) { //-since the data has been ordered by partId,so
          it.writeNext(writer)      //-this is a staged writing
        }
        writer.commitAndClose() //-a part data is write all out
        val segment = writer.fileSegment()
        lengths(partitionId) = segment.length //-count up it's size
      }
    } else { //-3.case spilled files and remain data in-mem(here will spill them all out and concate them to final file)
      logInfo("-merge spilled file and in-mem data?,part it:" + this.partitionedIterator)
      // Not bypassing merge-sort; get an iterator by partition and just write everything directly.
      for ((id, elements) <- this.partitionedIterator) { //-use multiway merge sorter
        if (elements.hasNext) {
          val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
            context.taskMetrics.shuffleWriteMetrics.get)
          for (elem <- elements) {
            writer.write(elem._1, elem._2)
          }
          writer.commitAndClose()
          val segment = writer.fileSegment()
          lengths(id) = segment.length
        }
      }
    }

    context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
    context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
    context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m =>
      if (curWriteMetrics != null) {
        m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten)
        m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime)
        m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten)
      }
    }

    lengths
  }
 

 

3.FAQ(TBD TODO)

   here are some questions im not clear,so any clues from you are highly appreciated:

A.when and where does ResultTask know to fetch result data from ShuffleMapTask?for example,here are a job with : 

  3 maps(m1,m2,m3) x 2 reduces(r1,r2)

t1:m1,m2,m3 are all running ;

t2:all maps are continued running except that m1 is finished,then  r1 is notified to setup and fetchs result of m1

t3:m2 is finished.question is here:when does r1 know to fetch the result of m2? in my guess,there shuould be a share place to for reduces to know where are maps and when they are finished,but i have not found that sources with happy.

 

B.in BlockManager,see #Question# below

private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
    require(blockId != null, "BlockId is null")
    //-1 get bock locations replication-note:this block is plaaced in TaskRunner#run() in case of 'Indirect result'
    val locations = Random.shuffle(master.getLocations(blockId)) //-deliver to BlockManagerMasterEndpoint
    //- #Question# why not to identify which blockmanager is most recent to this driver if in cluster even local mode?vip
    for (loc <- locations) { //-multi hosts for the same blockid,so once is enough if data is valid
      logDebug(s"Getting remote block $blockId from $loc")
      //-2 fetch real data
      val data = blockTransferService.fetchBlockSync(
        loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()

      if (data != null) { //-get one from one of the replication node,return immediately
        if (asBlockResult) {
          return Some(new BlockResult(
            dataDeserialize(blockId, data),
            DataReadMethod.Network,
            data.limit()))
        } else {
          return Some(data)
        }
      }
      logDebug(s"The value of block $blockId is null")
    }
    logDebug(s"Block $blockId not found")
    None
  }

 

 

 

 

 

 

  • 大小: 332.1 KB
  • 大小: 193.6 KB
  • 大小: 34.7 KB
  • 大小: 151.1 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics