`

源代码分析-kafka-0.9.0.0 KafkaProducer

阅读更多

       源码分析是一种临界知识,掌握了这种临界知识,能不变应万变,源码分析对于很多人来说很枯燥,生涩难懂,所以以后每篇源码分析博客会首先在文首给出一些问题,这些问题是这个博客即将解决的问题,对于感兴趣的人,可以先看是否对这些问题感兴趣,然后决定是否阅读。

       本文主要解决以下问题:

       1.源码分析得到的启发?

       2.生产者KafkaProducer组成结构和创建过程?

       3.生产者KafkaProducer写数据流程是怎样的?

       4.KafkaProducer数据收集器RecordAccumulator流程和处理逻辑?

       5.KafkaProducer数据发送器Sender流程和处理逻辑?

       6.关于生产者里时间相关参数的理解

       7.关于生产者里大小相关参数的理解

1.源码分析得到的启发

       1)KafkaProducer是一个客户端类,用于将数据发送到kafka服务端

       2)KafkaProducer是线程安全的,可以同时被多个线程并发调用,且一个KafkaProducer实例比多个KafkaProducer实例更高效

       3)KafkaProducer维护着一个默认32MB大小的内存池,用于存储还未发送到kafka服务端的数据,同时维持着一个长连接的IO处理线程,所以务必在不再使用KafkaProducer时调用KafkaProducer.close()将它关闭,否则可能导致内存泄露、OOM这类资源泄露问题

       4)KafkaProducer.send是异步执行的,被调用后是立即将数据发送到了KafkaProducer维护的内存池里面存储,然后立即返回,此种方式如果发送数据够快,就可以将多个数据归集进行批量发送到服务端

       5)KafkaProducer数据从发起到返回客户端程序状态通过ACK控制,acks=0是不等待任何kafka服务端的响应,立即告诉客户端程序成功;acks=1是代表只要数据在parttion的leader上写完而不管follower是否写完就告诉客户端程序成功;acks=all是只有数据在parttion的leader和follower上都写完才告诉客户端程序成功,这里最安全也是最慢的方式是acks=all

       6)KafkaProducer在失败时,如果设置了重试次数大于0,那么会尝试重试机制,而重试机制又牵涉到消息传递机制,消息传递机制有At most once,At least once,Exactly once这3种,其中最后一种Exactly once是大家都希望发生的,而不希望消息被重复投递到kafka服务端

       7)KafkaProducer的内存缓冲区默认总大小为32MB,这些内存会按照16KB大小切分内存页用来缓存针对每个topic每个parttion的数据,32MB通过参数buffer.memory控制,16KB通过batch.size控制,加大这2个值可以容纳更多的数据,但响应对于内存消耗也大。KafkaProducer的内存缓冲区通过java的NIO的ByteBuffer实现,而ByteBuffer堆内存(受java GC控制)和非堆内存(不受 java GC控制),默认使用的是堆内存

       8)默认linger.ms=0毫秒,KafkaProducer.send调用后虽然数据写到了内存缓冲区,但是不管内存缓冲区是否仍然空余很多空间,是会立即出发发送数据到kafka服务端这个操作,这样发送到kafka服务端的请求会很多,不过可以设置linger.ms为一个大于0的值,比如linger.ms=5秒,那么每次发送数据到kafka服务端会尽可能等待5毫秒,尽可能将多个请求合并然后发送一次请求到kafka服务端

       9)buffer.memory=32MB,控制着每个KafkaProducer能支配的内存的最大值,如果KafkaProducer请求够快,那么32MB的内存缓冲区会很快被消耗殆尽,此时默认就会对后续请求进行阻塞,如果想避免阻塞发生,那么可以设置block.on.buffer.full=false来避免阻塞,设置block.on.buffer.full=false后对于后续请求,会直接给每个请求回复一个异常

       10)关于消息的key.serializer和value.serializer,kafka都提供了默认实现,比如org.apache.kafka.common.serialization.ByteArraySerializer和org.apache.kafka.common.serialization.StringSerializer,可以直接使用

       11)通过本次源码分析对于生产者里时间相关参数max.block.ms、request.timeout.ms、retry.backoff.ms、linger.ms、reconnect.backoff.ms和connections.max.idle.ms有了全新的认识,这部分详见本文末尾总结

       12)通过本次源代码分析对于生产者里大小相关参数max.request.size、buffer.memory和batch.size有了全新认识,这部分详见本文默认总结

 

2.生产者创建代码分析

       kafka官网对应版本0.9.0.0关于KafkaProducer的使用demo代码如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer(props);
for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

       上面是kafka官网关于生产者的代码使用demo,那么本篇的代码分析便以此作为入口来进行分析。

       首先通过Properties类收集生产者配置属性信息,这里不对这部分属性配置做过多介绍,相关参数含义以官网为准,详见官网kafka-0.9.0.0生产者配置属性介绍,然后通过如下代码创建了生产者:

Producer<String, String> producer = new KafkaProducer(props);

       对于KafkaProducer创建的代码,对其处理逻辑总结如下:

  • 读取解析传入的配置Properties,读取指定的配置项,对于未配置项就采用默认值;
  • 构建RecordAccumulator,用于收集所有提交的即将被发送到kafka服务端的被写的数据,这些数据放入缓冲区,缓冲区最大32MB,按照16KB页进行拆分,也可以按需分配内存页;
  • 构建KafkaClient,采用IO多路复用模型,负责与kafka服务端的所有SOCKET数据处理交互;
  • 构建Sender,负责将收集到RecordAccumulator的数据发送到服务端
  • 构建一个线程类,立即让Sender在线程中异步执行;

       这里通过一张图来大致了解KafkaProducer的全貌:

       上图中需要特别关注两个对象:

  • RecordAccumulator(负责收集数据到缓冲区,并没有提交数据)
  • Sender(负责将收集到RecordAccumulator的数据发送到服务端)

       这里对于KafkaProducer的创建代码注释如下:

/** 
 * KafkaProducer构造方法,通过Properties属性构造KafkaProducer
 * @param properties Properties配置文件 
 */  
public KafkaProducer(Properties properties) {
    //将properties转换为ProducerConfig,传入ProducerConfig创建KafkaProducer对象
    this(new ProducerConfig(properties), null, null);
}
/** 
 * KafkaProducer构造方法,通过ProducerConfig属性构造KafkaProducer
 * @param properties Properties配置文件 
 * @param keySerializer key序列化
 * @param valueSerializer value序列化 
 */
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    try {
        //kafka自己的日志,可以设置log4j日志级别为trace打印此日志
        log.trace("Starting the Kafka producer");
        //读取外部传入的配置文件转换为Map<String, Object>并设置给KafkaProducer属性producerConfig
        Map<String, Object> userProvidedConfigs = config.originals();
        this.producerConfig = config;
        //当前系统时间,也即生产者KafkaProducer客户端创建时间
        this.time = new SystemTime();
        
        //Kafka运行状况度量信息,通过metrics.num.samples设置度量个数,默认值2,通过metrics.sample.window.ms设置度量信息有效时间,默认30000毫秒
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                        TimeUnit.MILLISECONDS);
        //生产者客户端标识,规则:"producer-"+自增序列
        clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
        if (clientId.length() <= 0)
            clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
        //添加JMX监控
        List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                MetricsReporter.class);
        reporters.add(new JmxReporter(JMX_PREFIX));
        //Kafka运行状况度量信息,通过metrics.num.samples设置度量个数,默认值2,通过metrics.sample.window.ms设置度量信息有效时间,默认30000毫秒
        this.metrics = new Metrics(metricConfig, reporters, time);
        //消息路由分区算法实现类,默认算法:自增序列%分区个数
        this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
        //通过retry.backoff.ms控制元数据刷新时间,默认100毫秒
        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
        //Kafka元数据信息,元数据指kafka每个topic分区和对应服务器IP等信息,通过retry.backoff.ms控制元数据刷新时间,默认100毫秒,通过metadata.max.age.ms配置元数据刷新后有效时间,默认300000毫秒
        this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
        //每次写入请求的字节数最大值,通过max.request.size配置,默认为1048576字节,也即1MB
        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
        //缓冲区总内存大小,通过buffer.memory配置,默认值33554432字节,也即32MB
        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
        //压缩格式,通过compression.type配置压缩格式,可以接收gzip、snappy、lz4等,默认为producer意思为保留原始字符,可配置Uncompressed,效果和producer相同
        this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
        
        //以下为一些检查项,将在0.9版本后随着deprecated属性弃用而弃用
        //主要检查用户设置了BLOCK_ON_BUFFER_FULL=true时,忽略用户设置的METADATA_FETCH_TIMEOUT_CONFIG
        /* check for user defined settings.
         * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
         * This should be removed with release 0.9 when the deprecated configs are removed.
         */
        if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
            log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
            boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
            if (blockOnBufferFull) {
                this.maxBlockTimeMs = Long.MAX_VALUE;
            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
            } else {
                this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            }
        } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
            log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
            this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
        } else {
            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
        }
        //以下为一些检查项,将在0.9版本后随着deprecated属性弃用而弃用
        //主要检查用户设置了TIME_OUT,那么将发送数据到kafka服务端的请求超时时间设置为此TIME_OUT对应时间
        /* check for user defined settings.
         * If the TIME_OUT config is set use that for request timeout.
         * This should be removed with release 0.9
         */
        if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
            log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
                    ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
        } else {
            this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        }

        //核心代码RecordAccumulator,用于收集所有提交的即将被发送到kafka服务端的被写的数据
        //注意RecordAccumulator只是收集数据到缓冲区,并没有提交数据,提交数据由接下来的Sender完成
        Map<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("client-id", clientId);
        this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                this.totalMemorySize,
                this.compressionType,
                config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                retryBackoffMs,
                metrics,
                time,
                metricTags);
        //KafkaClient,Kafka生产者socket IO操作,负责与kafka服务端的所有SOCKET数据处理交互
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
        NetworkClient client = new NetworkClient(
                new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),
                this.metadata,
                clientId,
                config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                this.requestTimeoutMs, time);
        //Sender sender,收集到RecordAccumulator的数据被Sender发送到服务端
        this.sender = new Sender(client,
                this.metadata,
                this.accumulator,
                config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                config.getInt(ProducerConfig.RETRIES_CONFIG),
                this.metrics,
                new SystemTime(),
                clientId,
                this.requestTimeoutMs);
        //Thread ioThread,发送请求到kafka的服务端的Sender执行所在的线程ioThread,创建完Sender和ioThread,立马让Sender在ioThread中异步执行
        String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();
        //Sensor errors,错误信息传感器,会往Metrics丢入度量信息
        this.errors = this.metrics.sensor("errors");

        if (keySerializer == null) {
            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    Serializer.class);
            this.keySerializer.configure(config.originals(), true);
        } else {
            config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
            this.keySerializer = keySerializer;
        }
        if (valueSerializer == null) {
            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    Serializer.class);
            this.valueSerializer.configure(config.originals(), false);
        } else {
            config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
            this.valueSerializer = valueSerializer;
        }
        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
        log.debug("Kafka producer started");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed
        // this is to prevent resource leak. see KAFKA-2121
        close(0, TimeUnit.MILLISECONDS, true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}

 

3.生产者写数据代码分析

       当KafkaProducer创建完毕后,就可以发送数据到kafka服务端了,一般的发送数据的代码如下:

producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

       那么我们就从KafkaProducer.java中的public Future<RecordMetadata> send(ProducerRecord<K, V> record)这段代码入手查看其处理逻辑,一图胜万言,这里以一张图片来了解其处理流程:

       这里注释这部分源代码如下:

/**  
 * 生产者发送数据到kafka服务端
 * @param record 需要写入kafka的数据对象,包含topic,key,value
 * @return Future<RecordMetadata> 异步调用后的结果获取对象
 */  
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return send(record, null);
}
/**  
 * 生产者发送数据到kafka服务端
 * @param record 需要写入kafka的数据对象,包含topic,key,value
 * @param callback 回调函数
 * @return Future<RecordMetadata> 异步调用后的结果获取对象
 */  
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    try {
        // first make sure the metadata for the topic is available
        long startTime = time.milliseconds();
        
        //max.block.ms=60000毫秒,首先确保被写入的topic的元数据信息获取正常,也即检查kafka服务端是否存活
        //这里如果获取不成功,会一直阻塞进行尝试,阻塞时间默认值为60000毫秒,也即60秒,太长,一般建议优化
        waitOnMetadata(record.topic(), this.maxBlockTimeMs);
        
        //序列化key
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer");
        }
        
        //判断截止当前是否超过了max.block.ms=60000毫秒规定的时间,超过了那么此次写数据就无效,抛出异常并且退出
        checkMaybeGetRemainingTime(startTime);
        
        //序列化value
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer");
        }
        //判断截止当前是否超过了max.block.ms=60000毫秒规定的时间,超过了那么此次写数据就无效,抛出异常并且退出
        checkMaybeGetRemainingTime(startTime);
        //调用parttioner算法计算将数据发送到哪个parttion
        int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
        //判断截止当前是否超过了max.block.ms=60000毫秒规定的时间,超过了那么此次写数据就无效,抛出异常并且退出
        checkMaybeGetRemainingTime(startTime);
        
        //判断写入的字节数是否超过max.request.size=1MB,是否超过buffer.memory=32MB
        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
        ensureValidRecordSize(serializedSize);
        TopicPartition tp = new TopicPartition(record.topic(), partition);
        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        //判断截止当前是否超过了max.block.ms=60000毫秒规定的时间,超过了那么此次写数据就无效,抛出异常并且退出
        long remainingTime = checkMaybeGetRemainingTime(startTime);
        
        //很关键的代码,这里将数据丢入了数据收集器RecordAccumulator
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingTime);
       
        //很关键的代码,这里唤醒发送器Sender,将收集器里面的数据发送到kafka服务端
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        this.errors.record();
        throw new InterruptException(e);
    } catch (BufferExhaustedException e) {
        this.errors.record();
        this.metrics.sensor("buffer-exhausted-records").record();
        throw e;
    } catch (KafkaException e) {
        this.errors.record();
        throw e;
    }
}

 

4.数据收集器RecordAccumulator收集数据代码分析

       在上面的分析中,我们看到如下代码:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingTime);

       对于收集器的处理流程总结如下:

  • 缓冲区通过java的NIO的ByteBuffer实现,ByteBuffer有受GC管理的堆内存和不受GC管理的非堆内存,这里默认使用的是受GC管理的堆内存;
  • 缓冲区针对每个topic的每个parttion设置,会开辟内存空间容纳字节数据到ByteBuffer,针对每个生产者默认最大开辟内存空间32MB,内存空间中按照16KB分页使用(这个有点类似于memcache中按照1KB  2KB  4KB  8KB分配内存,看来知识是相通的,理解了一个点,和这个类似的其它操作都差不多嘛);
  • 首先判断内存32MB是否被用满,没用满就分配内存存储此次数据的字节;
  • 最后通过future对象返回,整个过程是线程安全的,可以在多线程中执行,最后通过检测future来触发数据发送器sender

       上述代码将数据发送给了数据收集器RecordAccumulator,那么这里对这部分代码进行分析,仍然以一张流程图来了解其全貌:

       这里注释收集器RecordAccumulator.java里的public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock)代码如下:

/**
 * 将数据放入收集器RecordAccumulator
 * @param tp 写入的消息被分配到的topic的parttion信息
 * @param key 消息的key
 * @param value 消息的value
 * @param callback 回调函数
 * @param maxTimeToBlock 从producer.send开始的超时时间max.block.ms=60000毫秒,也即60秒,超时就抛异常退出
 */
public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
    //计数器appendsInProgress加1
    appendsInProgress.incrementAndGet();
    try {
        //数据收集器RecordAccumulator是否已经被关闭
        if (closed)
            throw new IllegalStateException("Cannot send after the producer is closed.");
        // check if we have an in-progress batch
        //检查是否已经有提交在执行中
        //从ConcurrentMap<TopicPartition, Deque<RecordBatch>>中取出该topic对应的parttion对应的队列Deque,如果没有就新建一个
        Deque<RecordBatch> dq = dequeFor(tp);
        synchronized (dq) {
            //parttion对应的队列Deque首先加锁,防止多线程并发访问,然后从中取出队列最后一个RecordBatch,判断是否为空
            RecordBatch last = dq.peekLast();
            if (last != null) {
                //Deque中RecordBatch是否有缓冲内存空间容纳本次请求的字节数据量,是的话就将字节数组存入RecordBatch的MemoryRecords,返回非空的FutureRecordMetadata
                FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                if (future != null)
                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
            }
        }
        
        //已经确认没有提交动作在执行中
        // we don't have an in-progress record batch try to allocate a new batch
        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        //获取batch.size=16KB,计算本次写入的字节数,
        //取这2个值中的最大值,然后依据此值分配内存ByteBuffer,这里仍然会判断max.block.ms=60000毫秒,如果超时也会退出
        //ByteBuffer有受GC管理的堆内存和不受GC管理的非堆内存,这里默认使用的是受GC管理的堆内存
        ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            // Need to check if producer is closed again after grabbing the dequeue lock.
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            RecordBatch last = dq.peekLast();
            if (last != null) {
                //Deque中RecordBatch是否有缓冲内存空间容纳本次请求的字节数据量,是的话就将字节数组存入RecordBatch的MemoryRecords,返回非空的FutureRecordMetadata
                FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                if (future != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    //释放ByteBuffer
                    free.deallocate(buffer);
                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }
            //创建新的MemoryRecords
            //创建新的RecordBatch
            //IncompleteRecordBatches里添加此RecordBatch
            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));

            dq.addLast(batch);
            incomplete.add(batch);
            //返回RecordAppendResult(future,dq.size()>1||batch.records.isFull(),true)
            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
        }
    } finally {
            //计数器appendsInProgress减1
        appendsInProgress.decrementAndGet();
    }
}

 

5.数据发送器Sender发送数据到kafka代码分析

       在之前的代码介绍种已经顺带引出了我称之为数据发送器的Sender,之前有这么几行代码:

//很关键的代码,这里将数据丢入了数据收集器RecordAccumulator
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingTime);

//很关键的代码,这里唤醒发送器Sender,将收集器里面的数据发送到kafka服务端
if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
    this.sender.wakeup();
}

       上面数据字节被数据收集器RecordAccumulator收集完成后,通过检测RecordAccumulator返回的future对象触发了sender进行数据发送操作,这里总结数据发送器Sender的处理逻辑如下:

 
       注释Sender.java里的public void run(long now)源代码如下:

/**
 * 数据发送器发送数据到kafka服务端处理
 * @param now 当前系统时间
 */
public void run(long now) {
    Cluster cluster = metadata.fetch();
    // get the list of partitions with data ready to send
    //数据发送器Sender持有数据收集器RecordAccumulator的引用,这里先调用RecordAccumulator的ready,查找哪些节点当前可以进行数据发送
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    //如果仍然有topic的某个parttion的leader无法找到,设置Sender持有的元数据信息metadata状态为需要强制更新
    if (result.unknownLeadersExist)
        this.metadata.requestUpdate();

    // remove any nodes we aren't ready to send to
    //再次遍历可以发送数据的节点是否真的ready,去除非ready状态的parttion节点,如果某个节点未建立连接,那么同时创建连接,这里会判断reconnect.backoff.ms=50毫秒
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }

    // create produce requests
    Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                     result.readyNodes,
                                                                     this.maxRequestSize,
                                                                     now);

    //request.timeout.ms=30000毫秒,决定着放入到数据收集器RecordAccumulator中的数据自放入以来的存活时间,这里遍历RecordAccumulator里的数据,如果自放入至今超过30秒,那么就丢弃掉,同时更新度量器sensors
    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
    // update sensors
    for (RecordBatch expiredBatch : expiredBatches)
        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
    sensors.updateProduceRequestMetrics(batches);
    
    //将RecordAccumulator里的List<RecordBatch>>转换为即将发送到kafka服务端的List<ClientRequest>请求,调用KafkaClient将List<ClientRequest>发送到kafka服务端
    List<ClientRequest> requests = createProduceRequests(batches, now);
    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
    // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
    // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
    // with sendable data that aren't ready to send since they would cause busy looping.
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (result.readyNodes.size() > 0) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        log.trace("Created {} produce requests: {}", requests.size(), requests);
        pollTimeout = 0;
    }
    for (ClientRequest request : requests)
        client.send(request, now);

    // if some partitions are already ready to be sent, the select time would be 0;
    // otherwise if some partition already has some data accumulated but not ready yet,
    // the select time will be the time difference between now and its linger expiry time;
    // otherwise the select time will be the time difference between now and the metadata expiry time;
    //调用KafkaClient.poll,这里开始遍历多路复用器Selector,执行实际的读写操作
    this.client.poll(pollTimeout, now);
}

 

 6.关于生产者里时间相关参数的理解

    6.1 max.block.ms

       1)影响范围:数据发送器Sender未触发之前所有的操作的耗时总和不能超过此值,如果超过,直接抛出超时异常然后退出

       2)默认值60秒,太长,建议优化;

 

    6.2 request.timeout.ms

       1)影响范围:用于检测数据自放入收集器开始是否超时,最多允许放30秒

       2)默认值30000毫秒,决定着放入到数据收集器RecordAccumulator中的数据自放入以来的存活时间

 

    6.3 retry.backoff.ms

       1)数据收集器RecordAccumulator获取kafka服务端每个topic每个parttion元数据最大超时时间

       2)默认值100毫秒

 

    6.4 linger.ms

       1)影响范围:尽可能将linger.ms秒内能收集到的发往同一parttion的数据进行批量提交,一般这个不会发生,只会在数据到达缓冲区速度远大于它们被发送时的速度这种情况下会发生

       2)默认值0毫秒

 

    6.5 reconnect.backoff.ms

       1)影响范围:数据发送器Sernder中的KafkaClient如果断开重连的最大超时时间

       2)默认值50毫秒

 

    6.6 connections.max.idle.ms

       1)影响范围:数据发送器Sender中的KafkaClient中的多路复用器Selector创建后,最长允许的空闲时间,超过此值,kafka服务端会自动断开连接

       2)默认值540000毫秒,也即9分钟

 

7.关于生产者里大小相关参数的理解

    7.1 max.request.size

       数据中key,value的字节数外加socket消息头字节数不能超过1MB

       默认值1048576字节,也即1MB

 

    7.2 buffer.memory

       此值决定缓冲区最大开辟的内存空间,每个生产者开辟最大32MB,所以不要频繁的创建多个producer并且不关闭,迟早会发生OOM

       默认值33554432字节,也即32MB

 

    7.3 batch.size

       官网的解释是说和linger.ms一起使用,尽可能将linger.ms秒内能收集到的发往同一parttion的数据进行批量提交,一般这个不会发生,只会在数据到达缓冲区速度远大于它们被发送时的速度这种情况下会发生,不过经查看代码发现,每个生产者可以支配的最大内存为32MB,这32MB是优先按照batch.size=16KB进行内存页分页,也即每次分配16KB内存块供数据缓冲使用

       默认值16384字节,也即16KB

 

  • 大小: 2 MB
  • 大小: 1 MB
  • 大小: 1.2 MB
  • 大小: 1.1 MB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics