背景
Kafka中由Consumer维护消费状态,当Consumer消费消息时,支持2种模式commit消费状态,分别为立即commit和周期commit。前者会导致性能低下,做到消息投递恰好一次,但很少使用,后者性能高,通常用于实际应用,但极端条件下无法保证消息不丢失。
目标
在有效期内,保证每条消息至少可被消费一次
问题分析
请看如上图1,Consumer Thread读取一条消息,更新缓存消费状态,传入消息执行业务逻辑,同时有另外一个调度线程异步周期执行,从缓存中读取消费状态信息,持久化消费状态。假设Consumer Thread更新了缓存消费状态,Scheduler Thread在“执行业务逻辑”完成前就持久化消费状态,正在此时,Consumer失效或宕机了,这条消息就丢失了。
解决思路
等待“执行业务逻辑”成功完成后更新缓存消费状态,就可以保证消息不会丢失。
具体做法:新增一个消费机制策略开关,此开关启动执行图2策略,关闭启动执行图1策略
实现代码
package kafka.consumer import kafka.utils.{IteratorTemplate, Logging, Utils} import java.util.concurrent.{TimeUnit, BlockingQueue} import kafka.serializer.Decoder import java.util.concurrent.atomic.AtomicReference import kafka.message.{MessageAndOffset, MessageAndMetadata} import kafka.common.{KafkaException, MessageSizeTooLargeException} class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], val clientId: String, val consumerAtLeastOnceMessageEnabled: Boolean) extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) override def next(): MessageAndMetadata[K, V] = { val item = super.next() if(consumedOffset < 0) throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) if (consumerAtLeastOnceMessageEnabled) currentTopicInfo.resetConsumeOffset(consumedOffset) val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() item } protected def makeNext(): MessageAndMetadata[K, V] = { var currentDataChunk: FetchedDataChunk = null // if we don't have an iterator, get one var localCurrent = current.get() if(localCurrent == null || !localCurrent.hasNext) { if (consumerTimeoutMs < 0) currentDataChunk = channel.take else { currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) if (currentDataChunk == null) { // reset state to make the iterator re-iterable resetState() throw new ConsumerTimeoutException } } if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { debug("Received the shutdown command") return allDone } else { currentTopicInfo = currentDataChunk.topicInfo val cdcFetchOffset = currentDataChunk.fetchOffset val ctiConsumeOffset = currentTopicInfo.getConsumeOffset if (ctiConsumeOffset < cdcFetchOffset) { error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) currentTopicInfo.resetConsumeOffset(cdcFetchOffset) } localCurrent = currentDataChunk.messages.iterator current.set(localCurrent) } // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } var item = localCurrent.next() // reject the messages that have already been consumed while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { item = localCurrent.next() } consumedOffset = item.nextOffset item.message.ensureValid() // validate checksum of message to ensure it is valid new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) } def clearCurrentChunk() { try { debug("Clearing the current data chunk for this consumer iterator") current.set(null) } } def resetConsumeOffset() { if (!consumerAtLeastOnceMessageEnabled) currentTopicInfo.resetConsumeOffset(consumedOffset) } } class ConsumerTimeoutException() extends RuntimeException()
相关推荐
Kafka Producer机制优化-提高发送消息可靠性
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
赠送jar包:flink-connector-kafka_2.12-1.14.3.jar 赠送原API文档:flink-connector-kafka_2.12-1.14.3-javadoc.jar 赠送源代码:flink-connector-kafka_2.12-1.14.3-sources.jar 包含翻译后的API文档:flink-...
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
赠送jar包:flink-connector-kafka_2.12-1.14.3.jar; 赠送原API文档:flink-connector-kafka_2.12-1.14.3-javadoc.jar; 赠送源代码:flink-connector-kafka_2.12-1.14.3-sources.jar; 赠送Maven依赖信息文件:...
5、kafka监控工具Kafka-Eagle介绍及使用 网址:https://blog.csdn.net/chenwewi520feng/article/details/130581571 本文主要介绍了kafka监控工具Kafka-Eagle的使用。 本文依赖:kafka、zookeeper部署完成。 本分分为...
赠送jar包:flink-connector-kafka-base_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-base_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-base_2.11-1.10.0-sources.jar; 赠送Maven...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
大数据监控工具kafka监控工具kafka-eagle-bin-1.4.2.tar.gz,比较简单好用。
最新版kafka kafka_2.13-2.6.0.tgz
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
kafka-schema-registry-client-3.2.0.jar包,亲测可用,在aliyun仓库内找不到,可以下载此jar包来进行手动安装
kafka-schema-registry-client-3.3.1.jar包,在aliyun 仓库内无法下载,可以下载此jar包然后手动安装
消费kafka某时间段消息用于分析问题,生产环境海量数据,用kafka-console-consumer.sh只能消费全量,文件巨大,无法grep。 代码来源于博主:BillowX_ ,感谢分享 原贴地址:...
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...
spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
kafka-eagle-bin-2.1.0.tar.gz 2022年7月份下载,最新版
赠送jar包:kafka-clients-2.2.0.jar; 赠送原API文档:kafka-clients-2.2.0-javadoc.jar; 赠送源代码:kafka-clients-2.2.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.2.0.pom; 包含翻译后的API文档...
Apache Kafka 3.0.0 (kafka-3.0.0-src.tgz源代码) 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。