`

kafka producer服务端

阅读更多

 

producer服务端:

1.nio接受请求 http://blackproof.iteye.com/blog/2239949

 

2.handler从请求队列中获取,调用KafkaApis http://blackproof.iteye.com/blog/2239953

 

3.KafkaApis类,调用handleProducerOrOffsetCommitRequest方法:

  def handle(request: RequestChannel.Request) {
    try{
      trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
      request.requestId match {
        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
        case RequestKeys.FetchKey => handleFetchRequest(request)
        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //设置leader或flower,flower启动对应的replica-fetch msg中的线程类
        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
        case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    } catch {
      case e: Throwable =>
        request.requestObj.handleError(e, requestChannel, request)
        error("error when handling request %s".format(request.requestObj), e)
    } finally
      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  }

 

 3.1 将数据插入到本地log,默认本地为leader,客户端发送到tplog的leader的broker里

         appendToLocalLog是主要的处理类

 3.2 按照客户produer设置的ack级别,处理如何返回客户端

           0,不做任何返回,直接wake处理之后的请求

           1,获取leader的result,并返回

           -1,判断leader外的isr队列中的replica的lastoffset是否大于等于当前的offset,并获取错误信息

                   如果未满足且没有错误信息,则设置watcher

                   如果超时则放入到delay操作的队列中

 

  def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
    ......
    val sTime = SystemTime.milliseconds
    //将数据插入到本地log(默认本地为leader)
    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)//如果是offset请求:true;producerequest:false
    debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))

    val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)

    //获得结果,是否有错误信息(throw error)
    val numPartitionsInError = localProduceResults.count(_.error.isDefined)
    if(produceRequest.requiredAcks == 0) {
      //当acks基本为0,则无需任务响应,直接返回执行成功
      // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since
      // no response is expected by the producer the handler will send a close connection response to the socket server
      // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata
      if (numPartitionsInError != 0) {
        info(("Send the close connection response due to error handling produce request " +
          "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
          .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
        requestChannel.closeConnection(request.processor, request)
      } else {

        if (firstErrorCode == ErrorMapping.NoError)
          offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))

        //offset,producer两种请求
        if (offsetCommitRequestOpt.isDefined) {
          val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize)
          requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
        } else
          requestChannel.noOperation(request.processor, request)
      }
    } else if (produceRequest.requiredAcks == 1 ||
        produceRequest.numPartitions <= 0 ||
        numPartitionsInError == produceRequest.numPartitions) {
      //需要leader确认请求,才返回执行成功

      if (firstErrorCode == ErrorMapping.NoError) {
        //offsetsCache 更新offsetmanager的offset内存
        offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
      }

      val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
      val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
                                           .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
      //返回处理之后的response,包含produceresult信息ProducerResponseStatus
      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
    } else {
      //需要所有replica都受到请求,才返回成功
      // create a list of (topic, partition) pairs to use as keys for this delayed request
      val producerRequestKeys = produceRequest.data.keys.toSeq
      val statuses = localProduceResults.map(r =>
        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
      val delayedRequest =  new DelayedProduce(
        producerRequestKeys,
        request,
        produceRequest.ackTimeoutMs.toLong,
        produceRequest,
        statuses,
        offsetCommitRequestOpt)

      //查看其它replication是否都完成,如果没完成则设置watcher,如果超时则放入队列中(watcher功能)
      // add the produce request for watch if it's not satisfied, otherwise send the response back
      val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
      if (satisfiedByMe)
        producerRequestPurgatory.respond(delayedRequest)
    }
  }
 appendToLocalLog获得本地tp的partition类,调用partition的appendMessagesToLeader方法

 

 val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
        val info = partitionOpt match {
          case Some(partition) =>
            partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks) //将数据发送给leader
          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
            .format(topicAndPartition, brokerId))
        }

 3.1

appendMessagesToLeader

          ...... 
         //检查isr之后,进行真正往log里写的方法
          val info = log.append(messages, assignOffsets = true)
          // probably unblock some follower fetch requests since log end offset has been updated
          // 检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应
          replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
          // we may need to increment high watermark since ISR could be down to 1
          //因为数据多了,提升HighWatermark,用于判断当前leader和其他replica的offset做比较,
          maybeIncrementLeaderHW(leaderReplica)
         ...... 

3.1.1.log.append方法:

检查是否需要segment生成新文件,数据入segment,更新lastoffset

        // maybe roll the log if this segment is full,获取当前的segment,检查是否需要segment的flush
        val segment = maybeRoll(validMessages.sizeInBytes)

        // now append to the log 添加到segment的file中,如果超过index文件的间隔,写到index文件里;
        // index使用channel map,log使用GatheringByteChannel自带的数组缓存池(java本身的)
        segment.append(appendInfo.firstOffset, validMessages)

        // increment the log end offset
        updateLogEndOffset(appendInfo.lastOffset + 1)

 segment.append方法:

数据入file channel流里,判断是否如index中

  def append(offset: Long, messages: ByteBufferMessageSet) {
    if (messages.sizeInBytes > 0) {
      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
      // append an entry to the index (if needed)
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        //如果上次进行建立索引的index到当前的index(bytesSinceLastIndexEntry)大于需要建索引的间隔 =》 满足建索引的要求,则建立索引
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      // append the messages
      log.append(messages)
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  }

 3.1.2 unblockDelayedFetchRequests

检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应

  def unblockDelayedFetchRequests(key: TopicAndPartition) {
    val satisfied = fetchRequestPurgatory.update(key)
    debug("Request key %s unblocked %d fetch requests.".format(key, satisfied.size))

    // send any newly unblocked responses
    satisfied.foreach(fetchRequestPurgatory.respond(_))
  }

 3.1.3 maybeIncrementLeaderHW

  private def maybeIncrementLeaderHW(leaderReplica: Replica) {
    val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
    //message offset相减,获得最小的offset(最迟更新的)
    val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
    val oldHighWatermark = leaderReplica.highWatermark
    if(oldHighWatermark.precedes(newHighWatermark)) {//如果最迟的offset都比leader大(早),则更新highWatermark
      leaderReplica.highWatermark = newHighWatermark
      debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
      // some delayed requests may be unblocked after HW changed
      val requestKey = new TopicAndPartition(this.topic, this.partitionId)
      replicaManager.unblockDelayedFetchRequests(requestKey)
      replicaManager.unblockDelayedProduceRequests(requestKey)
    } else {
      debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
        .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
    }
  }

 

 

 

 

 

 

 

 

 

 

 

 

 

 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics