名称解释:
Broker:负责消息的存储和转发,也可以叫消息中介节点
Topic:每种消息的分类叫做主题(Topic)。
Partition:每一个Topic被切分为多个Partitions。
背景
Producer构造Message对象时,传入key参数,当Producer发送Message,会根据key确定目标Partition,当Kafka集群中某个Partition所有存活的节点都失效或挂掉。会造成Producer尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,每次尝试都会休眠一定时间(默认值为100ms)。用户捕捉到异常其结果,停止发送会阻塞,继续发送消息会丢失。
解决思路
Kafka中默认发送消息方式不变,给用户提供一种可选择方式,增加一个消息发送失效转移的开关,当Producer发送到目标Partition的(所有副本本来存活的)节点都失效或挂掉,就转发到其他Partition上。
M:表示Partition的主分区。
S:表示Partition的从分区。下面图2所示,消息轮询发送到Partition的0-3上。
图1
上图本来一条消息是发送到Partition-0所在Broker的,当Partition-0所在Broker全部失效或挂掉后,如下图所示。
图2
消息(轮询均衡)发送到其他Partition上,待失效的Brokers恢复,发送消息恢复到图1状态。
Producer代码修改
代码修改如下:
ProducerConfig.Scala类代码中增加一行:
val sendSwitchBrokerEnabled = props.getBoolean(“send.switch.broker.enable”, false)
DefaultEventHandler.scala类增加一行
val isSendSwitchBrokerEnabled = config.sendSwitchBrokerEnabled
DefaultEventHandler.scala类中增加如下方法:
private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader], isSendSwitchBrokerEnabled: Boolean = false): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") var partition = if (key == null) { // If the key is null, we don't really need a partitioner // So we look up in the send partition cache for the topic to decide the target partition val id = sendPartitionPerTopicCache.get(topic) id match { case Some(partitionId) => // directly return the partitionId without checking availability of the leader, // since we want to postpone the failure until the send operation anyways partitionId case None => val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) partitionId } } else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic + "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition)) if (isSendSwitchBrokerEnabled) { if(partitionsLeaderInTopicsCache.isEmpty()) { val availablePartitions = topicPartitionList.map { partitionAndLeader => if(partitionAndLeader.leaderBrokerIdOpt.isDefined) { partitionsLeaderInTopicsCache.put(TopicAndPartition(topic, partitionAndLeader.partitionId), partitionAndLeader.leaderBrokerIdOpt.get) } } } if(!partitionsLeaderInTopicsCache.containsKey(TopicAndPartition(topic, partition))) { val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size partition = availablePartitions(index).partitionId } } partition }
相关推荐
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
赠送jar包:flink-connector-kafka_2.12-1.14.3.jar 赠送原API文档:flink-connector-kafka_2.12-1.14.3-javadoc.jar 赠送源代码:flink-connector-kafka_2.12-1.14.3-sources.jar 包含翻译后的API文档:flink-...
赠送jar包:flink-connector-kafka_2.12-1.14.3.jar; 赠送原API文档:flink-connector-kafka_2.12-1.14.3-javadoc.jar; 赠送源代码:flink-connector-kafka_2.12-1.14.3-sources.jar; 赠送Maven依赖信息文件:...
Simple application demonstrate kafka java springboot
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
赠送jar包:flink-connector-kafka-base_2.11-1.10.0.jar; 赠送原API文档:flink-connector-kafka-base_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-connector-kafka-base_2.11-1.10.0-sources.jar; 赠送Maven...
kettle kafka 生产者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
5、kafka监控工具Kafka-Eagle介绍及使用 网址:https://blog.csdn.net/chenwewi520feng/article/details/130581571 本文主要介绍了kafka监控工具Kafka-Eagle的使用。 本文依赖:kafka、zookeeper部署完成。 本分分为...
大数据监控工具kafka监控工具kafka-eagle-bin-1.4.2.tar.gz,比较简单好用。
最新版kafka kafka_2.13-2.6.0.tgz
kafka-schema-registry-client-3.2.0.jar包,亲测可用,在aliyun仓库内找不到,可以下载此jar包来进行手动安装
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
kafka-schema-registry-client-3.3.1.jar包,在aliyun 仓库内无法下载,可以下载此jar包然后手动安装
已编译 Kafka-Manager-1.3.3.22 linux下直接解压解压kafka-manager-1.3.3.22.zip到/opt/module目录 [root@hadoop102 module]$ unzip kafka-manager-1.3.3.22.zip 4)进入到/opt/module/kafka-manager-1.3.3.22/...
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...
spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
kafka-eagle-bin-2.1.0.tar.gz 2022年7月份下载,最新版
Apache Kafka 3.0.0 (kafka-3.0.0-src.tgz源代码) 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。