1. 介绍
原文来自linkedin的一篇PPTproducer-performance-tuning-for-apache-kafka。
2. 本文的一些前提
- 讨论的kafka版本为0.10.0
- 没有broker端的再压缩
- 消息都有8字节的时间戳介绍信息
3. 优化目标
给定一个要发送的数据集,在满足持久性、有序性的前提下优化以下两点:
- 吞吐量
- 延迟
优化专注于优化平均性能,这样对所有的producer都有效。
4. kafka producer原理回顾
4.1 生产者的关键配置
- batch.size: 基于大小的batching策略
- linger.ms: 基于时间的batching策略
- compression.type:压缩的速度上lz4=snappy<gzip。
- max.in.flight.requests.per.connection (affects ordering,设置为1可以保证有序性,但是发送性能会受影响。不为1的时候,如果发生消息重发则会乱序)
- acks (affects durability)
PS: 更大的批次,意味着更好的压缩率、更高的吞吐量。但是负面影响,就是延迟会高些。
4.2 生产者发送原理
这个之前在kafka生产者原理详解一文中做了一些分析。现在来看看kafka的 committer如何来分析的发送者原理的。其分析相对更加简明扼要。
发送者发送消息的过程简单概括为:
- 序列化
- 根据topic的元信息对数据进行分区
- 分区数据经过压缩器处理后放入batch,产生M和CB。分区数据按照batch在Record Accumulator里面组织(used和callback)。一个batch对象本身会占用一些空间,图上的used和callbacks。
假设现在Record Accumulator中已经包含了如下的数据:
当一个batch准备完毕后,用户线程就可以去执行具体的发送操作了。当满足以下条件之一时,我们认为一个batch是已经“准备完毕的”:
- batch.size达到了
- linger.ms时间达到了
- 发现同一个broker的其他batch已经完毕了
- flush()和close()方法被调用
用户线程获取batch的过程如下:
- 从batch队列中轮询获取batch
- 将batch根据leader broker来分组
- 将分完组的batches发送给broker
- 如果max.in.flight.requests.per.connection>1则在管道中排队
PS: 接下来的说明,都假设max.in.flight.requests.per.connection=1
5. 生产者调优
5.1 调优工具
生产者调优,主要可以利用kafka-producer-perf-test.sh(org.apache.kafka.tools.ProducerPerformance)。通过测试不同的配置来对比发送效率。
使用方法例子:
./kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000 --topic
becket_test_3_replicas_1_partition --throughput 1000000 --producer-props bootstrap.
servers=localhost:9092 max.in.flight.requests.per.connection=1 batch.size=100000
compression.type=lz4
PS: kafka 0.8的版本还支持thread-num等选项,现在0.10.1中还没有,不过已经有issue在解决了。相信马上会有了。详情见:KAFKA-3554
3554修复后会有如下功能:
- --num-threads: 发送消息的线程数
- --value-bound: The range of the random integer in the messages. This option is useful when compression is used.Different integer range simulates different compression ratio.
- producer metrics: 在使用ProducerPerformance的时候,还会打印一系列metrics。
关于第三点,是以前没有的特性。这个对生产者调优十分重要。使用ProducerPerformance的时候,打印的度量信息有:
- Select_Rate_Avg (The rate that the sender thread runs to check if it can send some messages)
- Request_Rate_Avg
- Request_Latency_Avg (Not including the callback execution time)
- Request_Size_Avg (After compression)
- Batch_Size_Avg (After compression)
- Records_Per_Request_Avg
- Record_Queue_Time_Avg
- Compression_Rate_Avg
PS:以上度量信息,需要至少1分钟运行时间才能保证稳定。
使用例子:
./kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000 --topic
becket_test_3_replicas_4_partition --throughput 100000 --num-threads 1 --value-bound 50000
--producer-props bootstrap.servers=localhost:9092 compression.type=gzip max.in.flight.
requests.per.connection=1
5.2 用于调优的几个公式
5.2.1 吞吐量计算公式
吞吐量可以用以下公式估算:throughput_Avg(平均吞吐量) ~= Request_Rate_Avg (平均请求速率)* Request_Size_Avg(平均请求大小) / Compression_Rate_Avg (压缩率)估算的实际值会比实际值大一些,因为会有一些request overhead没有考虑进去。
5.2.2 request_size_avg计算
平均请求大小的计算公式为:
Request_Size_Avg(平均请求大小) = Records_Per_Request_Avg (每个请求的消息数)Record_Size (消息大小) Compression_Rate_Avg(压缩率) +Request_Overhead
request overhead取决于:
- topic和分区数量
- 一般都是从几十字节到几百字节
5.2.3 Request_Rate_Upper_Limit
5.2.4 平均延迟计算公式
5.3 调优工具使用示例
假设我们使用以下的生产者来测试:
5.2.1 测试结果分析
根据得到的结果,我们发现吞吐量为9.96MB/s,远远小于我们实际的网络带宽1Gbps。
request_rate_avg和理论上限差距不大,而压缩率又是固定的。所以我们的目标为增大request_size_avg来增加吞吐量。增加吞吐量的方式主要有:
linger.ms与batch size、压缩率以及吞吐量和延迟之间的关系:
5.2.2 batching与压缩时间和吞吐量的关系
上图看出来,batching增大之后,吞吐量反而变差了,而且压缩率也只有少量增长。这种原因主要是:增大batch会显著增加压缩的耗时。
相关测试:
总结: 一般我们说增大批次,都有利于增加吞吐量(减少了网络IO次数)。但是这里之所以行不通是因为增大批次带来的好处无法抵消压缩时间的增长。从上图的实验结果可以看到,采用16KB或者索性采用较大的256KB都是可以的。避免采用处在中间的batch size
5.2.3 线程数与吞吐量的关系
可见:发送者的线程数,不是越多越好,因为线程数过多会影响延迟,而且有时候会产生负面效果。但是一般线程数小于topic分区数都是没啥问题的。
5.2.4 优化
通过增加分区数、线程数、batch size,使得吞吐量得到改善:
5.2.5 关于寻找吞吐量瓶颈的方式
5.3 acks=-1(all)时的延迟调优
5.3.1 原理回顾
acks=all的时候,瓶颈很有肯能发生在replication time。
高水位线的值变更需要等待下一次fetch过来之后才变更。所有ProduceRequest里面的高水位线全部抵达当前offset了,才会返回ProduceResponse。
第二个fetch过来的时候,partition0的高水位线移动到当前offset
假设broker1只有1个replication线程,则replicaiton time为
5.3.2 replication time优化
显而易见的是增加num.replica.fetchers,从而使得并发的fetch来做复制。这样的Replication time则为:
设置多少的replica fetchers合理?一般按照官方的生产建议设置成4就好了。
5.4 生产者RTT时间长优化
5.4.1 场景描述
有个跨洋的pipeline
5.4.2 优化方案
现有情况的计算,发现确实吞吐量比较低。
解决办法是增加send和 recieve buffer。下图可以看到增大吞吐量之后,最多能达到20MB/s的吞吐量。
REL:
https://www.kaimingwan.com/post/framworks/kafka/kafka-producerxing-neng-diao-you
相关推荐
kafka的3554版本中的kafka-producer-perf-test.sh增添了 --num-thread --value-bound 两个参数,并且可以打印print-metrics 下载本资源后,读introduction.txt文件。 没有积分的同学,可以访问 ...
Kafka Producer机制优化-提高发送消息可靠性
kafka配置调优实践
kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
第12单元 Kafka producer拦截器与Kafka Streams1
kafka调优
kafka及其性能测试 kafka及其性能测试 kafka及其性能测试 kafka及其性能测试 kafka及其性能测试 kafka及其性能测试
课程内容包括了Kafka java Consumer实战,Kafka集成框架,Kafka分布式集群架构,Kafka性能测试实战,Kafka集群监控实战,Kafka用户行为画像,Kafka性能存储优化及如何提高Kafka吞吐量等企业级技术。 视频大小:1.5G
为了提高 Kafka的自适应能力,消除系统中的复杂性,获得更好的运行性能,提出种针对 Kafka的自适应性能调优方法。该方法充分考虑了 Kafka特征参数与性能的影响权值,并使用抽样的原理来提高数据集的生成效率并优化...
jmeter连接kafka需要的连接器,可用于将静态测试数据通过jemter模拟高并发数据流发送到kafka中,可作为Kafka的生产者。
如果需要指定多个目录,以逗号分隔即可,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是
kettle kafka 消息者生产插件,用于集成到kettle,生产Kafka消息。亲测试可用。
Kafka技术实战学习的优选课程,课程内容全程实战,没有拖泥带水,但不包含基础知识的教学,需要同学们先具备一定的Kafka技术基础再进行学习。课程内容包括了Kafka java Consumer实战,Kafka集成框架等。
Flink 全网最全资源(视频、博客、PPT、入门、原理、实战、性能调优、源码解析、问答等持续更新),包含 Flink 入门、概念、原理、实战、性能调优、大型案例、源码解析
基于新 Kafka Producer 的 Flume kafka sink,高性能且可配置。 它依赖于很少的项目/库,只有 Flume 1.5.2 kafka-clients-0.8.2.1 或更高版本,slf4j。 类似于 Flume 1.6 KafkaSink,但这里有一些不同: Flume 1.6 ...
课程内容包括了JVM性能调优专题,Tomcat性能优化实战,MySQL深度优化,并发编程,源码框架专题,分布式缓存技术Redis,分布式协调任务ZooKeeper,分布式协调任务ZooKeeper,Kafka mongodb sharding-sphere,Netty...
封装抽取了一个kafka生产者的连接池,能很好的用池的方式对kafka生产者连接点进行有效的管理
第01课 Kafka简介, 第02课Kafka架构,第03课 Kafka HA Kafka一致性重要机制之ISR,第04课 Zookeeper与Kafka Kafka如何使用Zookeeper ……第12课 Kafka性能测试
@ hitmands / kafka-producer-stub 简单的Kafka堆栈可促进消费者的本地发展 入门 docker run -tid \ -e ' HKPS_BROKERS=localhost:9092 ' \ -v $( pwd ) /examples:/data \ hitmands/kafka-producer-stub:latest...