`
bit1129
  • 浏览: 1052720 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark九十九】Spark Streaming的batch interval时间内的数据流转源码分析

 
阅读更多

 

以如下代码为例(SocketInputDStream):

Spark Streaming从Socket读取数据的代码是在SocketReceiver的receive方法中,撇开异常情况不谈(Receiver有重连机制,restart方法,默认情况下在Receiver挂了之后,间隔两秒钟重新建立Socket连接),读取到的数据通过调用store(textRead)方法进行存储。数据的流转需要关注如下几个问题:

1. 数据存储到什么位置了

2. 数据存储的结构如何?

3. 数据什么时候被读取

4. 读取到的数据(batch interval)如何转换为RDD

 

1. SocketReceiver#receive

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      logInfo("Stopped receiving")
      restart("Retrying connecting to " + host + ":" + port)
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        restart("Error receiving data", t)
    } finally {
      if (socket != null) {
        socket.close()
        logInfo("Closed socket to " + host + ":" + port)
      }
    }
  }

 

2. SocketReceiver#receive=>SocketReceiver#store

 

  /**
   * Store a single item of received data to Spark's memory.
   * These single items will be aggregated together into data blocks before
   * being pushed into Spark's memory.
   */
  def store(dataItem: T) {
    executor.pushSingle(dataItem)
  }

 

数据存储作为Executor功能之一,store方法调用了executor中的pushSingle操作,此时的Single可以理解为一次数据读取,而dataItem就是一次读取的数据对象

 

 

3. SocketReceiver#store=>executor.pushSingle(ReceiverSupervisorImpl.pushSingle)

 

  /** Push a single record of received data into block generator. */
  def pushSingle(data: Any) {
    blockGenerator.addData(data)
  }

 

数据放入到了blockGenerator数据结构中了,blockGenerator,类型为BlockGenerator,顾名思义是一个block生成器,所谓的block生成器,是指Spark Streaming每隔一段时间(默认200毫秒,   private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200))将接收到的数据合并成一个block,然后将这个block写入到BlockManager,继续沿着个思路分析

 

 

4. executor.pushSingle=>BlockGenerator.addData

  /**
   * Push a single data item into the buffer. All received data items
   * will be periodically pushed into BlockManager.
   */
  def addData (data: Any): Unit = synchronized {
    waitToPush() ///通过阻塞控制Push的速度
    currentBuffer += data,将数据追加到currentBuffer中
  }

 

 当数据写入到currentBuffer中之后,似乎线索已经断了。事实上是BlockGenerator内部开启的两个线程(BlockIntervalTimer和BlockPushingThread)在背后继续处理currentBuffer

 

BlockIntervalTimer默认每200毫秒执行一次updateCurrentBufferer,该函数的功能是将类型为ArrayBuffer的currentBuffer合并成一个小的Block

  private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
  private val blockIntervalTimer =
    new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
 

 

 BlockPushingThread是通过循环调用keepPushingBlocks将BlockIntervalTimer创建的各个Block写入到BlockManager中,

private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
 

 

 上面说到的两个线程的同步是通过ArrayBlockQueue实现的

  private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
  private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
 

 

 

5. BlockGenerator#updateCurrentBuffer

updateCurrentBuffer由BlockIntervalTimer线程执行

 

  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
    try {
      val newBlockBuffer = currentBuffer 
      currentBuffer = new ArrayBuffer[Any] //这两句对currentBuffere这样的操作,是否有线程安全问题?没有,因为currentBuffer已经标注为@volatile类型的变量
      if (newBlockBuffer.size > 0) {
        val blockId = StreamBlockId(receiverId, time - blockInterval) //构造StreamBlockId
        val newBlock = new Block(blockId, newBlockBuffer) //创建出一个Block
        listener.onGenerateBlock(blockId) //通知谁?空实现,listener是作为BlockGenerator的构造函数传入的,这是一个所有通知时间的空实现
        blocksForPushing.put(newBlock)  //添加到阻塞队列中,等待BlockPushingThread读取
        logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
      }
    } catch {
      case ie: InterruptedException =>
        logInfo("Block updating timer thread was interrupted")
      case e: Exception =>
        reportError("Error in block updating thread", e)
    }
  }
 

 

6. BlockGenerator#keepPushingBlocks

keepPushingBlocks由BlockPushingThread执行

 

 

  /** Keep pushing blocks to the BlockManager. */
  private def keepPushingBlocks() {
    logInfo("Started block pushing thread")
    try {
      while(!stopped) {
        //poll是阻塞队列的非阻塞方法,但是如果队列中没有元素,则等待100ms,poll是取一个元素操作
        Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }
      // Push out the blocks that are still left
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        logDebug("Getting block ")
        val block = blocksForPushing.take()
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
  }
 

 

7. BlockGenerator#pushBlock

这个方法是针对一个Block进行push,而不是一次从队列中把所有的Block取出来,一次进行push。

 

 

  private def pushBlock(block: Block) {
    listener.onPushBlock(block.id, block.buffer)
    logInfo("Pushed block " + block.id)
  }
 

 

8. BlockGeneratorListener#onPushBlock

pushBlock是通过Observer模式,通知listener,这个liestener是BlockGenerator的构造函数传入的(其实是作为内部类,在构造时创建的实例)

 

 

  /** Divides received data records into data blocks for pushing in BlockManager. */
  private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
    def onAddData(data: Any, metadata: Any): Unit = { }

    def onGenerateBlock(blockId: StreamBlockId): Unit = { }

    def onError(message: String, throwable: Throwable) {
      reportError(message, throwable)
    }

    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
      pushArrayBuffer(arrayBuffer, None, Some(blockId))
    }
  }, streamId, env.conf)
 

 

9. BlockGenerator#pushArrayBuffer

 

  /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
  def pushArrayBuffer(
      arrayBuffer: ArrayBuffer[_],
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
  }
 

 

10. BlockGenerator#pushAndReportBlock

 

 

  /** Store block and report it to driver */
  def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    val blockId = blockIdOption.getOrElse(nextBlockId)
    val numRecords = receivedBlock match {
      case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
      case _ => -1
    }

    val time = System.currentTimeMillis
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")

    val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
    val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
    Await.result(future, askTimeout)
    logDebug(s"Reported block $blockId")
  }
 

 

pushAndReportBlock做了两件事,一是Store Block,而是想Tracker汇报有Block加入

 

10.1 receivedBlockHandler.storeBlock(BlockManagerBasedBlockHandler#storeBlock)

 

 

  def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
    val putResult: Seq[(BlockId, BlockStatus)] = block match {
      case ArrayBufferBlock(arrayBuffer) =>
        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
      case IteratorBlock(iterator) =>
        blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
      case ByteBufferBlock(byteBuffer) =>
        blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
      case o =>
        throw new SparkException(
          s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
    }
    if (!putResult.map { _._1 }.contains(blockId)) {
      throw new SparkException(
        s"Could not store $blockId to block manager with storage level $storageLevel")
    }
    BlockManagerBasedStoreResult(blockId)
  }
 

 

其中,blockManager是BlockManager类型的变量,定义于org.apache.spark.storage包中,实现向BlockManager写入数据,具体调用putIterator,putBytes,这是Spark存储子系统的内容,此处不赘述,重要的是,在此处写入进了BlockManager

 

10.2 ReceiverTracker#AddBlock

通过下面两个语句,将写入到BlockManager的信息汇报给TrackActor,这是一个进程间的同步调用(ask语法)

    val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
    val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
    Await.result(future, askTimeout)

 

trackerActor对应的实体是ReceiverTracker,AddBlock消息将触发ReceiverTracker.addBlock,进而调用ReceivedBlockTracker.addBlock

 

 

  /** Add new blocks for the given stream */
  private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
  }
 

 

11. ReceivedBlockTracker.addBlock

 

 

  /** Add received block. This event will get written to the write ahead log (if enabled). */
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
    try {
      writeToLog(BlockAdditionEvent(receivedBlockInfo))//写WAL
      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo //getReceivedBlockQueue从Map<streamId,streamReceivedBlockQueue>中获取相应的streamReceivedBlockQueue
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
      true
    } catch {
      case e: Exception =>
        logError(s"Error adding block $receivedBlockInfo", e)
        false
    }
  }
 

 

 

 

 

分享到:
评论

相关推荐

    华为OD机试D卷 - 用连续自然数之和来表达整数 - 免费看解析和代码.html

    私信博主免费获取真题解析以及代码

    Screenshot_2024-05-10-20-21-01-857_com.chaoxing.mobile.jpg

    Screenshot_2024-05-10-20-21-01-857_com.chaoxing.mobile.jpg

    数字图像处理|Matlab-频域增强实验-彩色图像的频域滤波.zip

    数字图像处理|Matlab-频域增强实验-彩色图像的频域滤波.zip

    2024-2030中国定向转向膜市场现状研究分析与发展前景预测报告.docx

    2024-2030中国定向转向膜市场现状研究分析与发展前景预测报告

    开源工时填报管理系统安装包

    开源工时填报管理系统安装包

    激光雷达深度报告:产业化加速,国产供应链迎来投资机遇.pdf

    电子元件 电子行业 行业分析 数据分析 数据报告 行业报告

    node-v0.12.10-darwin-x86.tar.gz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    18-17.网站域名DNS被劫持,网站服务器密码被改.mp4

    18-17.网站域名DNS被劫持,网站服务器密码被改.mp4

    QYResearch:2023年前五大2,3,3',4'-联苯四甲酸二酐(α-BPDA)企业占据全球91%的市场份额.docx

    QYResearch:2023年前五大2,3,3',4'-联苯四甲酸二酐(α-BPDA)企业占据全球91%的市场份额.docx

    2024-2030中国仿生智能餐饮机器人市场现状研究分析与发展前景预测报告.docx

    2024-2030中国仿生智能餐饮机器人市场现状研究分析与发展前景预测报告

    82-82.渗透测试-CVE-2017-8464“震网三代 反弹shell演示课件.mp4

    82-82.渗透测试-CVE-2017-8464“震网三代 反弹shell演示课件.mp4

    node-v6.11.5-darwin-x64.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    33-33.渗透测试渗透测试之SQL注入基于报错注入(下)

    渗透测试渗透测试之SQL注入基于报错注入(下)

    基于Android的云游四海手机端应用.zip

    Android是一种基于Linux内核(不包含GNU组件)的自由及开放源代码的移动操作系统,主要应用于移动设备,如智能手机和平板电脑。该系统最初由安迪·鲁宾开发,后被Google公司收购并注资,随后与多家硬件制造商、软件开发商及电信营运商共同研发改良。 Android操作系统的特点包括: 开放源代码:Android系统采用开放源代码模式,允许开发者自由访问、修改和定制操作系统,这促进了技术的创新和发展,使得Android系统具有高度的灵活性和可定制性。 多任务处理:Android允许用户同时运行多个应用程序,并且可以轻松地在不同应用程序之间切换,提高了效率和便利性。 丰富的应用生态系统:Android系统拥有庞大的应用程序生态系统,用户可以从Google Play商店或其他第三方应用市场下载和安装各种各样的应用程序,满足各种需求。 可定制性:Android操作系统可以根据用户的个人喜好进行定制,用户可以更改主题、小部件和图标等,以使其界面更符合个人风格和偏好。 多种设备支持:Android操作系统可以运行在多种不同类型的设备上,包括手机、平板电脑、智能电视、汽车导航系统等。 此外,Android系统还有一些常见的问题,如应用崩溃、电池耗电过快、Wi-Fi连接问题、存储空间不足、更新问题等。针对这些问题,用户可以尝试一些基本的解决方法,如清除应用缓存和数据、降低屏幕亮度、关闭没有使用的连接和传感器、限制后台运行的应用、删除不需要的文件和应用等。 随着Android系统的不断发展,其功能和性能也在不断提升。例如,最新的Android版本引入了更多的安全性和隐私保护功能,以及更流畅的用户界面和更强大的性能。此外,Android系统也在不断探索新的应用场景,如智能家居、虚拟现实、人工智能等领域。 总之,Android系统是一种功能强大、灵活可定制、拥有丰富应用生态系统的移动操作系统,在全球范围内拥有广泛的用户基础。

    node-v4.8.0-sunos-x86.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    46-46.渗透测试-Kali Linux安全渗透.mp4

    46-46.渗透测试-Kali Linux安全渗透.mp4

    电子周跟踪:华为P70系列开售,台积电指引AI需求依旧强劲.pdf

    电子元件 电子行业 行业分析 数据分析 数据报告 行业报告

    node-v6.13.0-linux-armv7l.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    【名企实践】华为如何打造高绩效团队glq.pptx

    【名企实践】华为如何打造高绩效团队glq.pptx

    node-v4.9.1-linux-x64.tar.xz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

Global site tag (gtag.js) - Google Analytics