`
1028826685
  • 浏览: 920277 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

Consumer rebalance失败问题定位和解决思路

 
阅读更多

背景

最近在公司使用Kafka的Consumer高级API出现一些问题,问题描述如下: 
大象push推送队列,在(大约每天8点到10点间)发送消息高峰期,消费节点负载处于高位、jvm内存占用大于%80。这时候JVM会频繁持续FullGC而卡住异步线程(stop the world),心跳等异步线程就没法正常收发数据包。这种情况下导致zk会话过期触发Consumer rebalance,zk会话过期很频繁会发rebalance,rebalance过程中因为写入临时节点冲突,Consumer rebalance失败而无法分配分片数据(即Consumer掉线)。

rebalance失败官方解释: 
consumer rebalancing fails (you will see ConsumerRebalanceFailedException): This is due to conflicts when two consumers are trying to own the same topic partition. The log will show you what caused the conflict (search for “conflict in “). 
If your consumer subscribes to many topics and your ZK server is busy, this could be caused by consumers not having enough time to see a consistent view of all consumers in the same group. If this is the case, try Increasing rebalance.max.retries and rebalance.backoff.ms. 
Another reason could be that one of the consumers is hard killed. Other consumers during rebalancing won’t realize that consumer is gone after zookeeper.session.timeout.ms time. In the case, make sure that rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.

如下配置会导致rebalance失败 
rebalance.max.retries * rebalance.backoff.ms < zookeeper.session.timeout.ms

为什么大象push推送队列jvm内存占用高?

消费端会将消息异步推送到下游系统,异步发出的请求放到内存map当中。如果收到下游的ack,会将请求从内存map删除,或者请求超过5s未收到ack,自动删除。当前流控会在内存map超过2000请求时触发 
大象jvm内存高问题原因梳理: 
大象mafka推送客户端内存占用高导致mafka client掉线。内存占用高初步定为问题为推送端推送量大于下游处理能力,导致推送端请求堆积在netty内存buffer。这个问题可以从高峰期请求推送量(17k/s)和ack量(10k/s)的巨大差距得到印证

目标

保证活跃消费节点能接管或分配到失败消费节点的数据分片,失败消费节点释放握有的数据分片资源。 
完善报警机制,监控和收集rebalance异常情况。

分析

如下图创建一个topicXXX 6分区,消费组中各个消费节点分配情况如下 
这里写图片描述
Consumers在zk上存储结构 
这里写图片描述

Conumer控制逻辑

目前Consumer rebalance的控制策略是由每一个Consumer通过Zookeeper完成的。 
1.每个Consumer客户端启动时,会注册自身到zk自己的Consumer group下. 
2.Watcher Consumer group下Consumers的变化 
3.Watcher Topic变化 
4.Wacher zk session timeout变化

Consumer逻辑说明

a.同一个Consumer Group中的Consumers,Kafka将相应Topic中的每个消息只发送给其中一个Consumer。 
b.Consumer Group中的每个Consumer读取Topic的一个或多个Partitions,并且是唯一的Consumer; 
c.一个Consumer group的多个consumer的所有线程依次有序地消费一个topic的所有partitions,如果Consumer group中所有consumer总线程大于partitions数量

Consumer rebalacne算法

当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力. 
1) 假如topic1,具有如下partitions: P0,P1,P2,P3 
2) 加入group中,有如下consumer: C0,C1 
3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3 
4) 根据(consumer.id + ‘-‘+ thread序号)排序: C0,C1 
5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整) 
6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)] 
在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。

解决思路

为什么活跃消费节点无法分配到下线消费节点的分片数据?

下面分析下原因:目前有三种方式会触发rebalance,其一Topic与partition映射表发生变化;其二同一订阅组中消费节点数发生变化,其三zk会话过期 
前两种直接就做rebalance操作了,最后一种消费节点重新注册临时znode到zk上,然后再做rebalance操作,rebalance失败会导致消费节点下线,其他活跃消费节点也无法分配到分片数据,其实质原因是消费节点rebalance失败下线时并没有从zk的Consumer Group下删除自身临时节点,而每个消费节点的分片数据是根据Consumer Group下数量按照计算规则分配的,所以活跃消费节点无法分配到分片数据。 
消费节点分配分片数据简单计算规则:每个消费节点分片数 =总体分片数/消费节点数

会话过期重新注册消费节点问题:消费端无限重试直到注册成功为止,每次重试会休眠一定时间。

rebalance问题:有限次重试(设置阀值)内无法分配分片,则分配分片失败。

经过多次测试和验证,发现如果rebalance失败,是因为多个消费节点在zk注册临时节点产生冲突,另外zk同步数据延时性时间拉长远大于(rebalance.max.retries * rebalance.backoff.ms),rebalance就会失败,如果再次触发rebalance且成功,其分片数据会被分配并消费,所以rebalance失败,其实是一个中间件状态。

解决思路-方案1

1.每当消费节点rebalance失败下线时,删除自身注册的临时节点,再次触发rebalance直到分片数据成功分配为止,如分配不成功报警。 
2.当前两种Watcher触发rebalance时,检查并重新注册消费节点到Consumer Group下,重新获得分片数据。

注明

像这种通过zk来协调做rebalance并不是一个最佳方案,会存在脑裂情况。根据Kafka官方文档,Kafka作者正在开发0.9.x版本中使用中心协调器。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有分区或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的Consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功

分享到:
评论

相关推荐

    phpkafkaconsumer是一个kafkaconsumer库支持group和rebalance

    php-kafka-consumer 主要是对 php_rdkafka 的 consumer 的 API 进行一层封装,增加了原程序中所没有的与 zookeeper 交互的功能。

    php-kafka-consumer.zip

    在此基础上实现了 rebalance 功能以及 group 功能。 经过简单的压力测试,单个进程的消费能力能达到每秒钟7.8W条,压测详细内容见压力测试。依赖php_zookeeperphp_rdkafka (建议使用1.0.0版本)librdkafka(建议...

    rebalance-villagers:Minecraft Bukkit插件。 详情请参阅以下内容

    Minecraft Bukkit插件。 有关详细信息,请参见以下内容: :

    NM_rebalance

    NM_rebalance

    rebalance:基于 Ember CLI 的投资组合再平衡计算器可在 https 获取

    再平衡这是一个基于 Ember CLI 的投资组合再平衡计算器。 您可以在找到实时版本

    kafkaDEMO源码下载地址

    1.BaseApi 2.Streams 3.Producer 4.Consumer 5.Connector 6.Rebalance 7.Offset 整理的kafka资料和代码

    kafka深度分析

    介绍Kafka背景,使用消息系统的优势,常用消息系统对比,Kafka架构介绍,Kafka实现语义分析,Replication及Leader Election机制剖析,Consumer Group Rebalance实现原理介绍,以及Benchmark测试。

    pokemon-rebalance

    Pokémon Showdown 模拟了迄今为止所有游戏中的单打和双打战斗(第 1 代到第 6 代)。 此存储库包含设置您自己的 Pokémon Showdown 服务器所需的文件。 请注意,要设置服务器,您还需要一台服务器计算机。 您可以...

    DockerToolbox-1.12.6.exe

    rebalance()和rescale()之间的根本区别在于形成的任务连接的方式。 虽然rebalance()将在所有发送端任务与所有接收端任务之间创建通信通道【communication channels 】,但rescale()将仅创建从每个任务到下游操作符...

    RebalanceDesign6

    Hadoop HDFS 集群 Balance原理

    kafka-example:卡夫卡的例子

    读我kafka学习代码。基础代码:消费者:com.example.kafka.consumer生产者:com.example.kafka.producer其中消费者均为单线程消费,分为手动提交、自动...但是消费者线程处理任务容易超时,导致rebalance。方法二:实

    深入剖析Kafka设计原理:如何构建高效的消息系统

    接着,文档深入探讨了Kafka中重要的机制,包括Partition副本选举、消费者消费消息的Offset记录机制以及消费者Rebalance机制。特别地,对于Kafka的生产者和消费者客户端行为进行了详细分析,包括消息的发布机制、消息...

    manta-hk:管理Manta家政服务

    如果有任何问题,此命令将报告该错误并提出解决问题的命令。 manta-hk audit manta-hk cruft manta-hk gc manta-hk metering manta-hk rebalance 这些子命令获取有关最近运行的内部管理作业的信息。 有关详细信息,...

    rebalance-lnd:一种脚本,可用于平衡LND节点的闪电通道

    再平衡指数 使用此脚本,您可以轻松地重新平衡lnd节点的各个通道。 安装 此脚本需要运行一个活动的lnd 0.9.0+( )实例。 如果您自己编译,则需要包括routerrpc构建标记。 示例: make tags="autopilotrpc signrpc ...

    企业级消息队列Kafka视频教程

    3,结合工作实践及分析应用,培养解决实际问题的能力。 4,企业级方案设计,完全匹配工作场景。 适用人群 1、对大数据感兴趣的在校生及应届毕业生。 2、对目前职业有进一步提升要求,希望从事大数据行业高薪工作的...

    vpic工作量

    此代码库模拟由一些n Ranks或编写过程组成的Renegotiation 。 首先,一些实用程序类。 utils.VPICReader这是读取VPIC跟踪的包装。 然后可以将这些迹线... oob_left和oob_right是越界缓冲区,这些缓冲区存储要求等级

    Oracle 10.2 ASM 最佳实践 最终版本

    Oracle 10.2 ASM 最佳实践 ...Rebalance and Redistribution Files and Aliases Enhanced management for ASM ASM Command Line Interface (ASMCMD) DBMS_FILE_TRANSFER Utility Enhancements ......

    ib_user_sa.rar_The Tree

    after inserting @node into the tree, update the tree to account for both the new entry and any damage done by rebalance Source Code for Linux v2.13.6.

    疯狂内核之——进程管理子系统

    3.5.2 rebalance_tick()函数 178 3.5.3 load_balance()函数 180 3.5.4 move_tasks()函数 183 3.6 进程退出 187 3.6.1 进程终止 187 3.6.2 进程删除 189 4 进程的并发性体现 191 4.1 内核抢占 193 4.1.1 内核抢占概念...

Global site tag (gtag.js) - Google Analytics