`

kafka client端 producer

阅读更多

kafka producer客户端

 

KafkaProducer的send方法:

1.等待kafka要发送的topic的partition都在线

2.序列化key,value;
key:org.apache.kafka.common.serialization.IntegerSerializer
value:org.apache.kafka.common.serialization.StringSerializer

3.根据发送数据计算索要发送的topic的partition
使用record记录中的partition,若为空,用paritition类计算
partition:org.apache.kafka.clients.producer.internals.DefaultPartitioner

4.确保所要发送的信息的序列化大小不超过阈值
阈值:MAX_REQUEST_SIZE_CONFIG = "max.request.size"
      BUFFER_MEMORY_CONFIG = "buffer.memory"
      
5.实例化topic的partition,实例化发送对象result,添加accumulator中的topic队列中
封装为writable records,包含compresor压缩,再封装为batch
压缩参数:COMPRESSION_TYPE_CONFIG = "compression.type";

6.查看result的batch是否或是新建的,则唤醒sener发送消息

7.返回result的future

 

 

 

  @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        try {
            // @1 first make sure the metadata for the topic is available
            waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
            byte[] serializedKey;
            try {
            	// @2 序列化key
                serializedKey = keySerializer.serialize(record.topic(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
            	//序列化value
                serializedValue = valueSerializer.serialize(record.topic(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }
            //@3 计算partition
            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
            ensureValidRecordSize(serializedSize);//@4 确保发送请求不超过阈值
            TopicPartition tp = new TopicPartition(record.topic(), partition);
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            //@5 发送封装好的对象
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback);
            if (result.batchIsFull || result.newBatchCreated) {//@6
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;//@7
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            return new FutureFailure(e);
        } catch (InterruptedException e) {
            this.errors.record();
            throw new InterruptException(e);
        } catch (KafkaException e) {
            this.errors.record();
            throw e;
        }
    }

 

其中第5步,添加到accumulator代码如下:

将record添加到topic的partition队列中,如果存在则添加;

如果不存在则创建队列,二次检查队列是否有值,如果有,则将record添加

                                                                                     如果没有,则封装writable records,包含compresor压缩

                                                                                                        和batch类;

record.append的时候调用compressor进行压缩

存在与否都将当record添加到队列中,并且进行压缩(如果配置压缩)

 

   public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        try {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            // check if we have an in-progress batch
            Deque<RecordBatch> dq = dequeFor(tp);
            synchronized (dq) {//添加到已经存在的topic队列中
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
                    if (future != null)
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }

            // we don't have an in-progress record batch try to allocate a new batch
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                //重复判断其他producer有没有放到dp中
                RecordBatch last = dq.peekLast();
                if (last != null) {
                    FutureRecordMetadata future = last.tryAppend(key, value, callback);
                    if (future != null) {
                        // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                        free.deallocate(buffer);
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                    }
                }
                //没有当前topic的数据
                //封装writable records,包含compresor压缩
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());//封装为recordBatch
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }

   

   MemoryRecords

    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback) {
        if (!this.records.hasRoomFor(key, value)) {//mem溢出
            return null;
        } else {
            this.records.append(0L, key, value);//添加到record中,又进行压缩操作
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

 

在第6步,sender类发送消息run方法:

1.检查record是否准备好:

   

The record set is full 
The record set has sat in the accumulator for at least lingerMs milliseconds 
The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready). 
The accumulator has been closed 

 2.获取accumulator中所有数据

 3. 生成request中

 4.填入selector的client中

 5.client selector nio发送数据

 

public void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send 获得数据的leader
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        //only for debug test
//        if(batches.size()>=1){
//        	System.out.println(batches.size());
//        }
        
        sensors.updateProduceRequestMetrics(batches);
        List<ClientRequest> requests = createProduceRequests(batches, now);//生成request
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        for (ClientRequest request : requests)
            client.send(request);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);//nio 发送数据
    }

 

 

 

1
3
分享到:
评论

相关推荐

    kafka_client_producer_consumer

    kafka_client_producer_consumer

    jemter-kafka连接器

    jmeter连接kafka需要的连接器,可用于将静态测试数据通过jemter模拟高并发数据流发送到kafka中,可作为Kafka的生产者。

    kafka-client-0.8.2.1-jdk-1.5

    Kafka Java Client Build for jdk 1.5. Usage: mvn install This will install following artifact into local maven cache. org.apache.kafka kafka-clients-jdk15 0.8.2.1 cd example; mvn package This will ...

    php-simple-kafka-client:用于PHP的Kafka扩展

    PHP Kafka扩展(php-simple-kafka-client) 文献资料 请在阅读文档 图书馆 一个小的包装库,使使用更容易 支持 加入或 学分 此扩展依赖 此扩展基于 非常感谢所有贡献者 :red_heart:

    kafka客户端

    3/kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接 4/kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被...

    通过pykafka接收Kafka消息队列的方法

    from pykafka import KafkaClient host = '192.168.200.38' client = KafkaClient(hosts="%s:9092" % host) print client.topics # 生产者 # topicdocu = client.topics['task_pull'] # producer = topicdocu.get_...

    kafka-python批量发送数据的实例

    from kafka import KafkaClient from kafka.producer import SimpleProducer def send_data_2_kafka(datas): ''' 向kafka解析队列发送数据 ''' client = KafkaClient(hosts=KAFKABROKER.split(,), timeout=30)...

    nestjs-kafka-demo:Nest.js + Kafka演示

    $ cd kafka-microservice-client-producer $ yarn install # install dependencies $ yarn start:dev Nest.js消费者服务 $ cd kafka-microservice-server-consumer $ yarn install # install dependencies $ yarn ...

    kafka-go:Go中的Kafka图书馆

    卡夫卡动机我们在Segment上非常依赖Go和Kafka。 不幸的是,在撰写本文时,Kafka的Go客户端库的状态并不理想。 可用的选项有: ,它是迄今为止最受欢迎的工具,但很难使用。 它的文档记录很少,API公开了Kafka协议的...

    kafka-ui:用于Apache Kafka管理的开源Web GUI

    Kafka UI – Kafka的免费Web UI Kafka UI是一个免费的开源Web UI,用于监视和管理Apache Kafka集群。 Kafka UI是一个简单的工具,可让您观察到数据流,帮助更快地查找和排除问题并提供最佳性能。 它轻巧的仪表板使您...

    Python测试Kafka集群(pykafka)实例

    from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) print client.topics # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_...

    kafka-example:一个简单的Kafka消费者和生产者示例

    简单的Kafka消费者-生产者示例运行项目的步骤下载Kafka 0.10.0.0二进制文件cd kafka_2.11-0.10.0.0 运行zookeeper和kafka服务器bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start....

    python3连接kafka模块pykafka生产者简单封装代码

    from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in range...

    confluent-kafka-python:Confluent的Kafka Python客户端

    confluent-kafka-python提供了与所有兼容的高级Producer,Consumer和AdminClient 经纪人&gt; = v0.8, 和。 客户是: 可靠-它是 (通过二进制车轮自动提供)的包装,已在各种生产场景中广泛部署。 它使用Java客户端,...

    kafka-docker-testing:学习卡夫卡

    在本地主机上测试kafka卡夫卡消费者: cd进入文件夹...\ target \ kafka-producer-jar-with-dependencies.jar &lt;topic&gt; &lt;client&gt; &lt;partition&gt; java -jar。\ target \ kafka-producer-jar-with-dependencies.jar test-t

    chaperone:Kafka审核系统

    ============伴侣分子由几种成分组成: ChaperoneClient是一个库,可以像Kafka Producer或Consumer一样放入库中,以在消息通过时对其进行审核。 审核统计信息将发送到专门的Kafka主题,例如“陪伴审核”。 ...

    kafka-penguin

    卡夫卡企鹅 插件通过快速失败来处理消息处理失败。 快速失败是一种策略,使... //INSTANTIATING PRODUCER //const producer = client.producer(failfastSetting) ---&gt; set num of retries //Ex: const producer =

    strimzi-kafka-bridge:Apache Kafka桥

    可用的协议是: 它提供了与Apache Kafka进行交互的另一种方式,因为后者本身仅支持自定义(专有)协议。 多亏了网桥,所有能够说不同标准协议的客户端都可以连接到Apache Kafka集群,以便向主题发送消息或从主题...

    gauntlet:用于兼容性测试的生产者和消费者的 Apache Kafka 测试框架

    #用法示例: ./gradlew jar./run.sh --name dataset --size 10000000 --min.length 1024 --max.length 5000 --producer.config producer.properties --zk.connect localhost:2181 --kafka.connect localhost:9092 -...

    misakai-kafka:Misakai.Kafka 是 C# 的高性能 Apache Kafka 客户端

    御坂卡夫卡 用于 Apache Kafka 的高性能 C# 客户端。 该客户端旨在不支持所有可能的功能,而是为长期运行的生产者/消费者提供简约轻量级的 Kafka ...var client = new Producer ( router ); Task . Run (() =&gt; { v

Global site tag (gtag.js) - Google Analytics