最近有个项目,要求通过kafka传较大的消息,先不论这么做操蛋不操蛋,反正咱研究研究也没错。
网上搜了一些文章,据说把消息拆成多片,然后如果使用一样的key,这些消息片都会被发到同一个partition上,就可以利用kafka同一个partition里消息是有序的这个特性来重新组装文件。
于是问题就来了,为什么key相同,就会被发到同一个partition上呢,key是啥类型都行吗,会不会有坑呢。咱先翻翻kafkaTemplate的send代码
public ListenableFuture<SendResult<K, V>> send(String topic, V data) { ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);//此处没有指定partition,也没指定key return doSend(producerRecord); }
先new一个producerRecord对象,它有多个构造方法,可以同时指定topic、partiion、key、value
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { if (this.producer == null) { synchronized (this) { if (this.producer == null) { this.producer = this.producerFactory.createProducer(); } } }//double check 生成singleton 的producer,createProducer方法用了委派模式,我的理解是spring为了屏蔽kafka底层代码的修改 final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>(); this.producer.send(producerRecord, new Callback() { ........
SettableListenableFuture,一个可以被set(Object)和setException(throwable)的future。咦,producer是singleton的,先mark下。咱先来send。
// first make sure the metadata for the topic is available long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);//更新metadata,主要是同步topic对应的partition信息 long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs); serializedKey = keySerializer.serialize(record.topic(), record.key()); serializedValue = valueSerializer.serialize(record.topic(), record.value()); int partition = partition(record, serializedKey, serializedValue, metadata.fetch());//分配partition int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);//加上kafka log头,具体可以看kafka的文件存储格式 ensureValidRecordSize(serializedSize);//检查消息大小maxRequestSize、totalMemorySize TopicPartition tp = new TopicPartition(record.topic(), partition); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);//把消息append到一个内存队列,等待发送到server if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;
新的问题来了,kafka消息莫非是batch发送的?又mark一下。。。,先看partition
return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
partition方法很简单,哈哈。。。,好吧,其实也没那么简单,partitioner是个接口,也就是说我可以自己实现自己的partitioner,我们先看看default的DefaultPartitioner吧,懒。。。。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);//从meta里拿topic对应的partition列表 int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement();//看到这就觉得大事不好,差点被坑 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;//看到这里心情又好了,哈哈,看来不要老是一惊一乍的 } }
结论是文章里说的是靠谱的,这篇就到此为止了吧。
额,好像有什么事情忘掉了,mark的东东还没研究好,那咱继续,那先看看发送的内存队列吧,看起来比较有意思
Deque<RecordBatch> dq = dequeFor(tp);//每个topicpartition建一个queue,queue保存RecordBatch,里面包含了数据缓冲区、输出流 synchronized (dq) { RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());//消息追加到缓冲区,缓冲区不够时返回null if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // we don't have an in-progress record batch try to allocate a new batch int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); ByteBuffer buffer = free.allocate(size, maxTimeToBlock);//分配新的缓冲区 synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); if (future != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer);//这意思是其他消息创建了新的batch?于是释放新申请的buffer return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);//创建新的MemoryRecords,包含了buffer、输出流 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//创建新的RecordBatch FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds())); dq.addLast(batch);//加入到dq队尾 incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
结合前面看到的代码,那么dq.size() > 1 || batch.records.isFull()或者是新建了一个RecordBatch的时候,sender会被唤醒。再去看sender,sender在前面创建KafkaProducer时创建,截取一个片段
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start();
既然是个thread,那就得跑跑跑跑跑啊,那来看看run
Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // create produce requests Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this.client.poll(pollTimeout, now);
一个词,复杂,不爱看,还是看看 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);找个吧,怎么着就ready了呢
boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; } else if (!readyNodes.contains(leader)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;//尝试次数大于0,且离上次重试的时间还没超过重试等待时间 long waitedTimeMs = nowMs - batch.lastAttemptMs;//已经等待的时间 long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;//应等待时间 long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);//离下次发送还剩余的时间 boolean full = deque.size() > 1 || batch.records.isFull();//判断是否有BatchRecord满了 boolean expired = waitedTimeMs >= timeToWaitMs;//结合前面的代码,每有batchrecord满了的时候,才唤醒sender,所以有可能已等待时间超过了应等待时间 boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
boolean sendable = full || expired || exhausted || closed || flushInProgress();
就是说有batchRecord满了、时间到了、RecordAccumulator内存耗尽了、关闭了、flush了,就是ready了,就可以发送啦。
相关推荐
kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 ...partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现
springboot2.x整合kafka,发送者包含自定义分区发送,发送之后的回调函数,消费者有两个监听,一个是批量消费多topic,另外一个是消费指定topic的不同分区
也就是说如果只有个partition你在同组启动多少个consumer都没,partition的数量决定了此topic在同组中被可被均衡的程度,例如消费者少于和
kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 ...partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现
包括四个demo,亲测可用,使用方式见博客 ... (1) kafka 自定义stream的使用demo (2) kafka 自定义interceptor的使用demo (3) kafka 自定义partition的使用demo (4) springboot集成kafka的使用demo
kafka管理工具,主要用于创建、删除、查看管理topic信息
kafka从入门基础,深入理解、核心设计,包括:kafka结构中的broker\topic\Partition\Producer\Consumer等
Kafka是一个对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。... 2.Partition是物理上的概念,每个Topic包含一个或多个Partition. 3.负责发布消息到Kafka broker ……
学习中总结的关于Kafka的见解,图解,很直观,分享给大家; 如果要积分那是CSDN的事情啊,我所有资源都没设置,不知为啥审核后就要积分下载,→_→
Apache Kafka是一个分布式流...Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且Kafka仍然能够继续工作,Kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader
kafka是最初由Linkedin公司开发,使用Scala语言编写,Kafka是一个分布式、分区的、多副本...支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输 同时支持离线数据处理和实时数据处理
kafka 集群管理配.查看partition状态 等帮助优化kafka的性能
为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13...
Kafka Manager 是 Yahoo 推出的 Kafka 管理工具,支持:管理多个集群轻松检查集群状态 (topics, brokers, replica distribution, partition distribution)执行复制选举生成分区指派,基于集群的状态分区的重新指派该...
Kafka Eagle 用于监控 Kafka 集群中 Topic 被消费的情况。包含 Lag 的产生,Offset 的变动,Partition 的分布,Owner ,Topic 被创建的时间和修改的时间等信息
kafkaKafka的特性主要包括: 消息文件存储(消息堆积能力):Kafka具有很强的消息堆积能力,可以存储大量的消息。 消息topic分区:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个...
1.kafka概述 ...partition:partition是物理上的概念,每个topic包含一个或多个partition producer:消息生产者,负责发布消息到kafka broker consumer:每个consumer属于一个特定的consumer group。
kafka-manager 简介 为了简化开发者和服务工程师维护Kafka集群的工作,yahoo构建了一...11.在多个topic上批量重分区(可选partition broker位置) kafka-manager 项目地址:https://github.com/yahoo/kafka-manager