kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf.xxx.com-1399456594831-99f15e63 can't rebalance after 3 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown Source)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.getKafkaStreams(DefaultConsumerProcessor.java:149)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.recvMessage(DefaultConsumerProcessor.java:63)
at com.xxx.service.mobile.push.kafka.MafkaPushRecordConsumer.main(MafkaPushRecordConsumer.java:22)
at com.xxx.service.mobile.push.Bootstrap.main(Bootstrap.Java:34)
出现以上问题原因分析:
同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时负载消费多个partition数据.
解决办法:
1.配置zk问题(kafka的consumer配置)
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
rebalance.backoff.ms=2000
rebalance.max.retries=10
在使用高级API过程中,一般出现这个问题是zookeeper.sync.time.ms时间间隔配置过短,不排除有其他原因引起,但笔者遇到一般是这个原因。
给大家解释一下原因:一个消费者组中(consumer数量<partitions数量)每当有consumer发送变化,会触发负载均衡。第一件事就是释放当consumer资源,无则免之,调用ConsumerFetcherThread关闭并释放当前kafka broker所有连接,释放当前消费的partitons,实际就是删除临时节点(/xxx/consumer/owners/topic-xxx/partitions[0-n]),所有同一个consumer group内所有consumer通过计算获取本consumer要消费的partitions,然后本consumer注册相应临时节点卡位,代表我拥有该partition的消费所有权,其他consumer不能使用。
如果大家理解上面解释,下面就更容易了,当consumer调用Rebalance时,它是按照时间间隔和最大次数采取失败重试原则,每当获取partitions失败后会重试获取。举个例子,假如某个公司有个会议,B部门在某个时间段预订该会议室,但是时间到了去会议室看时,发现A部门还在使用。这时B部门只有等待了,每隔一段时间去询问一下。如果时间过于频繁,则会议室一直会处于占用状态,如果时间间隔设置长点,可能去个2次,A部门就让出来了。
同理,当新consumer加入重新触发rebalance时,已有(old)的consumer会重新计算并释放占用partitions,但是会消耗一定处理时间,此时新(new)consumer去抢占该partitions很有可能就会失败。我们假设设置足够old consumer释放资源的时间,就不会出现这个问题。
官方解释:
consumer rebalancing fails (you will see ConsumerRebalanceFailedException): This is due to conflicts when two consumers are trying to own the same topic partition. The log will show you what caused the conflict (search for "conflict in ").
- If your consumer subscribes to many topics and your ZK server is busy, this could be caused by consumers not having enough time to see a consistent view of all consumers in the same group. If this is the case, try Increasing rebalance.max.retries and rebalance.backoff.ms.
- Another reason could be that one of the consumers is hard killed. Other consumers during rebalancing won't realize that consumer is gone after zookeeper.session.timeout.ms time. In the case, make sure that rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.
rebalance.backoff.ms时间设置过短就会导致old consumer还没有来得及释放资源,new consumer重试失败多次到达阀值就退出了。
确保rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
kafka zk节点存储,请参考:kafka在zookeeper中存储结构
相关推荐
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server ...
Apache Kafka is a popular distributed streaming platform that acts as a messaging queue or an enterprise messaging system. It lets you publish and subscribe to a stream of records and process them in ...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kp = new ...
flume+kafka+flink+mysql实现nginx数据统计与分析
Kafka游戏机消费者 控制台应用程序,用于从给定的时间戳读取事件。 要求 Java> = 8 建造 ./mvn install ...spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
插件支持多种接口: 经纪人(kafka.metrics.KafkaMetricsReporter) 客户(org.apache.kafka.common.metrics.MetricsReporter)安装可以通过定义下一个依赖项来集成客户端插件。 玛文< dependency> < groupId>...
Further on, you will be introduced to working with RDDs, DataFrames and Datasets to operate on schema aware data, and real-time streaming with various sources such as Twitter Stream and Apache Kafka....
*将以下数据按照相同品牌的放入一个分区中,然后写一个消费者只消费改分区中的数据,进行验证。 tom,puma,400 jim,nike,1000 susan,nike,1200 ...import org.apache.kafka.common.Cluster; import java.u
导入API 构建并运行 ./gradlew clean build ./gradlew run 使用docker-compose运行kafka,postgres ...KAFKA_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required userna
Apache Spark 2.x for Java Developers by Sourav Gulati English | 26 July 2017 | ISBN: 1787126498 | ASIN: B01LY3N7ZO | 350 Pages | AZW3 | 4.48 MB Key Features Perform big data processing with Spark—...
用于存储连接器的Kafka Connect通用模块 之间针对分布式文件系统和云存储... 您可以使用标准生命周期阶段在Maven中构建kafka-connect-storage-common 。贡献源代码: : 问题追踪器: : 执照该项目已获得Apache 2许可。
32.3. Apache Kafka Support 32.3.1. Sending a Message 32.3.2. Receiving a Message 32.3.3. Additional Kafka Properties 33. Calling REST Services with RestTemplate 33.1. RestTemplate Customization 34. ...
You will then cover various recipes to perform interactive queries using Spark SQL and real-time streaming with various sources such as Twitter Stream and Apache Kafka. You will then focus on machine...
Chapter 8 looks at how you can introduce asynchronous messaging into your microservices using Spring Cloud Stream and Apache Kafka. Chapter 9 shows how to implement common logging patterns such as log...
Integrate HBase, Solr, Oracle, SQL Server, MySQL, Flume, Kafka, HDFS, and Amazon S3 with Apache Kudu, Impala, and Spark Use StreamSets, Talend, Pentaho, and CDAP for real-time and batch data ...
利用flume将mysql的数据同步到kafak,flume是基于CDH...解决已下问题:java.lang.NoSuchMethodError: org.apache.flume.Context.getSubProperties(Ljava/lang/String;)Lcom/google/common/collect/ImmutableMap;
java弹球游戏源码真棒明星 我的 ...Apache Kafka 生产者和消费者 - pgreplay 读取 PostgreSQL 日志文件(不是WAL 文件),提取 SQL 语句并以相同的顺序和相对时间针对 PostgreSQL 数据库集群执行它们
DDMQ 是滴滴出行架构部基于 Apache RocketMQ 构建的消息队列产品。作为分布式消息中间件,DDMQ 为滴滴出行各个业务线提供了低延迟、高并发、高可用、高可靠的消息服务。DDMQ 提供了包括实时消息、延迟消息和事务消息...