`
flamezealot
  • 浏览: 20051 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

kafka partition

 
阅读更多

最近有个项目,要求通过kafka传较大的消息,先不论这么做操蛋不操蛋,反正咱研究研究也没错。

网上搜了一些文章,据说把消息拆成多片,然后如果使用一样的key,这些消息片都会被发到同一个partition上,就可以利用kafka同一个partition里消息是有序的这个特性来重新组装文件。

于是问题就来了,为什么key相同,就会被发到同一个partition上呢,key是啥类型都行吗,会不会有坑呢。咱先翻翻kafkaTemplate的send代码

public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
		ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);//此处没有指定partition,也没指定key
		return doSend(producerRecord);
	}

 先new一个producerRecord对象,它有多个构造方法,可以同时指定topic、partiion、key、value

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
		if (this.producer == null) {
			synchronized (this) {
				if (this.producer == null) {
					this.producer = this.producerFactory.createProducer();
				}
			}
		}//double check 生成singleton 的producer,createProducer方法用了委派模式,我的理解是spring为了屏蔽kafka底层代码的修改
		
		final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
		this.producer.send(producerRecord, new Callback() {
........
				

SettableListenableFuture,一个可以被set(Object)和setException(throwable)的future。咦,producer是singleton的,先mark下。咱先来send。

// first make sure the metadata for the topic is available
            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);//更新metadata,主要是同步topic对应的partition信息
            long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);

                serializedKey = keySerializer.serialize(record.topic(), record.key());
           
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
           
            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());//分配partition
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);//加上kafka log头,具体可以看kafka的文件存储格式
            ensureValidRecordSize(serializedSize);//检查消息大小maxRequestSize、totalMemorySize
            TopicPartition tp = new TopicPartition(record.topic(), partition);
           

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);//把消息append到一个内存队列,等待发送到server
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;

 

新的问题来了,kafka消息莫非是batch发送的?又mark一下。。。,先看partition

 

 

return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
            cluster);

 

partition方法很简单,哈哈。。。,好吧,其实也没那么简单,partitioner是个接口,也就是说我可以自己实现自己的partitioner,我们先看看default的DefaultPartitioner吧,懒。。。。

 

 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);//从meta里拿topic对应的partition列表
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = counter.getAndIncrement();//看到这就觉得大事不好,差点被坑
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return DefaultPartitioner.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;//看到这里心情又好了,哈哈,看来不要老是一惊一乍的
        }
    }

 

 结论是文章里说的是靠谱的,这篇就到此为止了吧。

 

 额,好像有什么事情忘掉了,mark的东东还没研究好,那咱继续,那先看看发送的内存队列吧,看起来比较有意思

 

 Deque<RecordBatch> dq = dequeFor(tp);//每个topicpartition建一个queue,queue保存RecordBatch,里面包含了数据缓冲区、输出流
            synchronized (dq) {
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());//消息追加到缓冲区,缓冲区不够时返回null
                    if (future != null)
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }

            // we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
           
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);//分配新的缓冲区
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());
                    if (future != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        free.deallocate(buffer);//这意思是其他消息创建了新的batch?于是释放新申请的buffer
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                    }
                }
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);//创建新的MemoryRecords,包含了buffer、输出流
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//创建新的RecordBatch 
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));

                dq.addLast(batch);//加入到dq队尾
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);

结合前面看到的代码,那么dq.size() > 1 || batch.records.isFull()或者是新建了一个RecordBatch的时候,sender会被唤醒。再去看sender,sender在前面创建KafkaProducer时创建,截取一个片段

 String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

 既然是个thread,那就得跑跑跑跑跑啊,那来看看run

Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);

        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);

 

一个词,复杂,不爱看,还是看看 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);找个吧,怎么着就ready了呢

 

        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();

            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader)) {
                synchronized (deque) {
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;//尝试次数大于0,且离上次重试的时间还没超过重试等待时间
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;//已经等待的时间
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;//应等待时间
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);//离下次发送还剩余的时间
                        boolean full = deque.size() > 1 || batch.records.isFull();//判断是否有BatchRecord满了
                        boolean expired = waitedTimeMs >= timeToWaitMs;//结合前面的代码,每有batchrecord满了的时候,才唤醒sender,所以有可能已等待时间超过了应等待时间
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            // Note that this results in a conservative estimate since an un-sendable partition may have
                            // a leader that will later be found to have sendable data. However, this is good enough
                            // since we'll just wake up and then sleep again for the remaining time.
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);

  

boolean sendable = full || expired || exhausted || closed || flushInProgress();

就是说有batchRecord满了、时间到了、RecordAccumulator内存耗尽了、关闭了、flush了,就是ready了,就可以发送啦。

 

分享到:
评论

相关推荐

    kafka自定义partition分发策略实例代码.zip

    kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 ...partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现

    springboot整合kafka,指定分区发送,批量消费,指定topic分区消费

    springboot2.x整合kafka,发送者包含自定义分区发送,发送之后的回调函数,消费者有两个监听,一个是批量消费多topic,另外一个是消费指定topic的不同分区

    kafka中partition和消费者对应关系1

    也就是说如果只有个partition你在同组启动多少个consumer都没,partition的数量决定了此topic在同组中被可被均衡的程度,例如消费者少于和

    kafka自定义partition分发策略代码实例

    kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 ...partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现

    kafka stream与interceptor、自定义partition、springboot集成kafka,亲测可用

    包括四个demo,亲测可用,使用方式见博客 ... (1) kafka 自定义stream的使用demo (2) kafka 自定义interceptor的使用demo (3) kafka 自定义partition的使用demo (4) springboot集成kafka的使用demo

    kafka_topic创建、分区、删除管理

    kafka管理工具,主要用于创建、删除、查看管理topic信息

    深入理解Kafka.pdf

    kafka从入门基础,深入理解、核心设计,包括:kafka结构中的broker\topic\Partition\Producer\Consumer等

    apache kafka

    Kafka是一个对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。... 2.Partition是物理上的概念,每个Topic包含一个或多个Partition. 3.负责发布消息到Kafka broker ……

    Kafka核心图解

    学习中总结的关于Kafka的见解,图解,很直观,分享给大家; 如果要积分那是CSDN的事情啊,我所有资源都没设置,不知为啥审核后就要积分下载,→_→

    kafka教程.pdf

    Apache Kafka是一个分布式流...Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且Kafka仍然能够继续工作,Kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader

    kafka学习资料

    kafka是最初由Linkedin公司开发,使用Scala语言编写,Kafka是一个分布式、分区的、多副本...支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输 同时支持离线数据处理和实时数据处理

    kafka manager

    kafka 集群管理配.查看partition状态 等帮助优化kafka的性能

    kafka客户端

    为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13...

    Kafka管理工具KafkaManager.zip

    Kafka Manager 是 Yahoo 推出的 Kafka 管理工具,支持:管理多个集群轻松检查集群状态 (topics, brokers, replica distribution, partition distribution)执行复制选举生成分区指派,基于集群的状态分区的重新指派该...

    kafka-eagle1.4.2

    Kafka Eagle 用于监控 Kafka 集群中 Topic 被消费的情况。包含 Lag 的产生,Offset 的变动,Partition 的分布,Owner ,Topic 被创建的时间和修改的时间等信息

    Apache Kafka 基本介绍.zip

    kafkaKafka的特性主要包括: 消息文件存储(消息堆积能力):Kafka具有很强的消息堆积能力,可以存储大量的消息。 消息topic分区:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个...

    kafka企业级入门

    1.kafka概述 ...partition:partition是物理上的概念,每个topic包含一个或多个partition producer:消息生产者,负责发布消息到kafka broker consumer:每个consumer属于一个特定的consumer group。

    kafka-manager-2.0.0.2.zip

    kafka-manager 简介 为了简化开发者和服务工程师维护Kafka集群的工作,yahoo构建了一...11.在多个topic上批量重分区(可选partition broker位置) kafka-manager 项目地址:https://github.com/yahoo/kafka-manager

Global site tag (gtag.js) - Google Analytics