producer服务端:
1.nio接受请求 http://blackproof.iteye.com/blog/2239949
2.handler从请求队列中获取,调用KafkaApis http://blackproof.iteye.com/blog/2239953
3.KafkaApis类,调用handleProducerOrOffsetCommitRequest方法:
def handle(request: RequestChannel.Request) { try{ trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) //设置leader或flower,flower启动对应的replica-fetch msg中的线程类 case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => request.requestObj.handleError(e, requestChannel, request) error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds }
3.1 将数据插入到本地log,默认本地为leader,客户端发送到tplog的leader的broker里
appendToLocalLog是主要的处理类
3.2 按照客户produer设置的ack级别,处理如何返回客户端
0,不做任何返回,直接wake处理之后的请求
1,获取leader的result,并返回
-1,判断leader外的isr队列中的replica的lastoffset是否大于等于当前的offset,并获取错误信息
如果未满足且没有错误信息,则设置watcher
如果超时则放入到delay操作的队列中
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { ...... val sTime = SystemTime.milliseconds //将数据插入到本地log(默认本地为leader) val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)//如果是offset请求:true;producerequest:false debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) //获得结果,是否有错误信息(throw error) val numPartitionsInError = localProduceResults.count(_.error.isDefined) if(produceRequest.requiredAcks == 0) { //当acks基本为0,则无需任务响应,直接返回执行成功 // no operation needed if producer request.required.acks = 0; however, if there is any exception in handling the request, since // no response is expected by the producer the handler will send a close connection response to the socket server // to close the socket so that the producer client will know that some exception has happened and will refresh its metadata if (numPartitionsInError != 0) { info(("Send the close connection response due to error handling produce request " + "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) requestChannel.closeConnection(request.processor, request) } else { if (firstErrorCode == ErrorMapping.NoError) offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) //offset,producer两种请求 if (offsetCommitRequestOpt.isDefined) { val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetMetadataMaxSize) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } else requestChannel.noOperation(request.processor, request) } } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || numPartitionsInError == produceRequest.numPartitions) { //需要leader确认请求,才返回执行成功 if (firstErrorCode == ErrorMapping.NoError) { //offsetsCache 更新offsetmanager的offset内存 offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) } val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize)) .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) //返回处理之后的response,包含produceresult信息ProducerResponseStatus requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } else { //需要所有replica都受到请求,才返回成功 // create a list of (topic, partition) pairs to use as keys for this delayed request val producerRequestKeys = produceRequest.data.keys.toSeq val statuses = localProduceResults.map(r => r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap val delayedRequest = new DelayedProduce( producerRequestKeys, request, produceRequest.ackTimeoutMs.toLong, produceRequest, statuses, offsetCommitRequestOpt) //查看其它replication是否都完成,如果没完成则设置watcher,如果超时则放入队列中(watcher功能) // add the produce request for watch if it's not satisfied, otherwise send the response back val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest) if (satisfiedByMe) producerRequestPurgatory.respond(delayedRequest) } }appendToLocalLog获得本地tp的partition类,调用partition的appendMessagesToLeader方法
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val info = partitionOpt match { case Some(partition) => partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet],producerRequest.requiredAcks) //将数据发送给leader case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" .format(topicAndPartition, brokerId)) }
3.1
appendMessagesToLeader
...... //检查isr之后,进行真正往log里写的方法 val info = log.append(messages, assignOffsets = true) // probably unblock some follower fetch requests since log end offset has been updated // 检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应 replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 //因为数据多了,提升HighWatermark,用于判断当前leader和其他replica的offset做比较, maybeIncrementLeaderHW(leaderReplica) ......
3.1.1.log.append方法:
检查是否需要segment生成新文件,数据入segment,更新lastoffset
// maybe roll the log if this segment is full,获取当前的segment,检查是否需要segment的flush val segment = maybeRoll(validMessages.sizeInBytes) // now append to the log 添加到segment的file中,如果超过index文件的间隔,写到index文件里; // index使用channel map,log使用GatheringByteChannel自带的数组缓存池(java本身的) segment.append(appendInfo.firstOffset, validMessages) // increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1)
segment.append方法:
数据入file channel流里,判断是否如index中
def append(offset: Long, messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes())) // append an entry to the index (if needed) if(bytesSinceLastIndexEntry > indexIntervalBytes) { //如果上次进行建立索引的index到当前的index(bytesSinceLastIndexEntry)大于需要建索引的间隔 =》 满足建索引的要求,则建立索引 index.append(offset, log.sizeInBytes()) this.bytesSinceLastIndexEntry = 0 } // append the messages log.append(messages) this.bytesSinceLastIndexEntry += messages.sizeInBytes } }
3.1.2 unblockDelayedFetchRequests
检查当前tp是否有watcher,如果有满足replica都同步的watcher,则返回watcher的response,返回给channel response,返回响应
def unblockDelayedFetchRequests(key: TopicAndPartition) { val satisfied = fetchRequestPurgatory.update(key) debug("Request key %s unblocked %d fetch requests.".format(key, satisfied.size)) // send any newly unblocked responses satisfied.foreach(fetchRequestPurgatory.respond(_)) }
3.1.3 maybeIncrementLeaderHW
private def maybeIncrementLeaderHW(leaderReplica: Replica) { val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset) //message offset相减,获得最小的offset(最迟更新的) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark if(oldHighWatermark.precedes(newHighWatermark)) {//如果最迟的offset都比leader大(早),则更新highWatermark leaderReplica.highWatermark = newHighWatermark debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) // some delayed requests may be unblocked after HW changed val requestKey = new TopicAndPartition(this.topic, this.partitionId) replicaManager.unblockDelayedFetchRequests(requestKey) replicaManager.unblockDelayedProduceRequests(requestKey) } else { debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s" .format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(","))) } }
相关推荐
Kafka Producer机制优化-提高发送消息可靠性
kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
第12单元 Kafka producer拦截器与Kafka Streams1
Kafka服务端配置文件,broker.id需要再每台服务器上进行修改
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于...
jmeter连接kafka需要的连接器,可用于将静态测试数据通过jemter模拟高并发数据流发送到kafka中,可作为Kafka的生产者。
kafka服务端安装包
kettle kafka 消息者生产插件,用于集成到kettle,生产Kafka消息。亲测试可用。
kafka 2.7.0安装包
3/kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接 4/kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被...
@ hitmands / kafka-producer-stub 简单的Kafka堆栈可促进消费者的本地发展 入门 docker run -tid \ -e ' HKPS_BROKERS=localhost:9092 ' \ -v $( pwd ) /examples:/data \ hitmands/kafka-producer-stub:latest...
封装抽取了一个kafka生产者的连接池,能很好的用池的方式对kafka生产者连接点进行有效的管理
基于新 Kafka Producer 的 Flume kafka sink,高性能且可配置。 它依赖于很少的项目/库,只有 Flume 1.5.2 kafka-clients-0.8.2.1 或更高版本,slf4j。 类似于 Flume 1.6 KafkaSink,但这里有一些不同: Flume 1.6 ...
主要介绍了Kafka Java Producer代码实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
本文介绍了一款集成图形化界面配置和一键自启功能的Kafka与Zookeeper服务管理软件。该软件通过直观易用的图形界面,使用户能够轻松完成Kafka和Zookeeper的配置工作,有效避免了手动编辑配置文件可能带来的错误和不便...
python模拟kafka生产者, 读取配置文件, 根据配置文件的信息, 向kafka中写入数据信息。
spring-kafka-producer.xml
带有Rest URL的Kafka Producer和Consumer API的Spring Boot应用程序 生产者:将数据或消息发送到kafka服务器的应用程序 消息:一小段数据,即kafka的字节数组 使用者:数据的接收者,即从kafka服务器读取数据 Kafka...
kafka c++版本生产者代码,用到了protobuf和avro序列化,需要先安装kafka,并配置集群环境