1.问题
batch.size和linger.ms是对kafka producer性能影响比较大的两个参数。batch.size是producer批量发送的基本单位,默认是16384Bytes,即16kB;lingger.ms是sender线程在检查batch是否ready时候,判断有没有过期的参数,默认大小是0ms。
那么producer是按照batch.size大小批量发送消息呢,还是按照linger.ms的时间间隔批量发送消息呢?这里先说结论:其实满足batch.size和ling.ms之一,producer便开始发送消息。
2.源码分析
首先sender线程主要代码如下,我们主要关心sender线程阻塞的情况:
void run(long now) { Cluster cluster = metadata.fetch(); // result.nextReadyCheckDelayMs表示下次检查是否ready的时间,也是//selecotr会阻塞的时间 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); if (result.unknownLeadersExist) this.metadata.requestUpdate(); Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now); // 暂且只关心result.nextReadyCheckDelayMs long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // poll最终会调用selector,pollTimeout也就是selector阻塞的时间 this.client.poll(pollTimeout, now); }
selector
private int select(long ms) throws IOException { if (ms < 0L) throw new IllegalArgumentException("timeout should be >= 0"); if (ms == 0L) return this.nioSelector.selectNow(); else return this.nioSelector.select(ms); }
我们可以从实例化一个新的KafkaProducer开始分析(还没有调用send方法),在sender线程调用accumulator#ready(..)时候,会返回result,其中包含selector可能要阻塞的时间。由于还没有调用send方法,所以Deque<RecordBatch>为空,所以result中包含的nextReadyCheckDelayMs也是最大值,这个时候selector会一直阻塞。
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<Node>(); // 初始化为最大值 long nextReadyCheckDelayMs = Long.MAX_VALUE; boolean unknownLeadersExist = false; boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader) && !muted.contains(part)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; // 和linger.ms有关 long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); }
然后我们调用send方法往内存中放入了一条数据,由于是新建的一个RecordBatch,所以会唤醒sender线程
KafkaProducer#doSend(...)
if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }
这个时候会唤醒阻塞在selector#select(..)的sender线程,sender线程又运行到accumulator#ready(..),由于Deque<RecordBatch>有值,所以返回的result包含的nextReadyCheckDelayMs不再是最大值,而是和linger.ms有关的值。也就是时候selector会z最多阻塞lingger.ms后就返回,然后再次轮询。
也就是说当Deque<RecordBatch>不为空的时候,sender线程会最多阻塞linger.ms时间;Deque<RecordBatch>为空的时候,sender线程会阻塞Long.MAX_VALUE时间;一旦调用了KafkaProduer#send(..)将消息放到内存中,新建了个RecordBatch,则会将sender线wakeup。
另外从上面的代码,即KafkaProducer#doSend(...)中也可以看到,如果有一个RecordBatch满了,也会调用Sender#wakeup(..),所以综上所述:只要满足linger.ms和batch.size满了就会激活sender线程来发送消息。
rel:https://www.cnblogs.com/set-cookie/p/8902340.html
相关推荐
kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
1、kafka_2.111.0.0-1_arm64.deb UOS+鲲鹏平台 2、软件目录/opt/kafka_2.111.0.0 3、自带服务启动
spring-kafka-producer.xml
ranger-2.1.0-kafka-plugin.tar.gz
jmeter后端侦听器kafka 一个JMeter插件,使您可以将测试结果发送到Kafka服务器
kafka的Dockerfile镜像的构建 1. jdk-8u102-linux-x64.tar.gz 2. kafka_2.11-2.4.0.tgz 配合脚本进行自动化的构建
kafka_demo.zip包含了kafka_producer和kafka_consumer两个vc10工程代码,这两个工程是参考了librdkafka-0.11.0中的rdkafka_example.c代码,将其封装成C++类,便于使用
kafka开发jar包 0.7.0版本 kafka-0.7.0.jar
kafka-0.10.1.0-src.tgz
集成开发 Kafka 权威指南.. 完整版 集成开发 Kafka 权威指南.. 完整版
离线安装包,亲测可用
kafka-0.10.2.1 + zookeeper-3.4.9.zip ,kafka-0.10.2.1 + zookeeper-3.4.9.zip
kafka生产者代码-producer的一些配置的介绍(包括bootstrap.servers,acks,batch.size,linger.ms,buffer.memory,key.serialization,value.serialization。。。。。。)
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server ...
kafka-0.8.1.1-src.tgz kafka-0.8.1.1-src.tgz kafka-0.8.1.1-src.tgz
jmeter-backend-listener-kafka
python模拟kafka生产者, 读取配置文件, 根据配置文件的信息, 向kafka中写入数据信息。
官方离线安装包,亲测可用。使用rpm -ivh [rpm完整包名] 进行安装