登录zookeeper
zkCli.sh -server centos1:2181
创建topics mytopic
kafka-topics.sh --create --zookeeper centos1:2181,centos2:2181,centos3:2181 --replication-factor 3 --partitions 2 --topic mytopic
zookeeper节点结构
/controller data={"version":1,"brokerid":2,"timestamp":"1495002238024"} //id=2的broker是leader
/controller_epoch data=1
/brokers
/brokers/ids //实时维护active的brokers
/brokers/ids/0
/brokers/ids/1
/brokers/ids/2
/brokers/topics
/brokers/topics/mytopic/partitions/0/state data={"controller_epoch":7,"leader":1,"version":1,"leader_epoch":0,"isr":[1,0,2]} //其中leader指的是该partition的leader,每个partition都有一个leader。"isr":[1,0,2]表示该partition有三个replication,分别位于1,0,2三个broker上。leader维护了其它副本的同步信息。
/brokers/topics/mytopic/partitions/1/state data={"controller_epoch":7,"leader":2,"version":1,"leader_epoch":0,"isr":[2,1,0]}
/brokers/seqid
/admin/delete_topics
/isr_change_notification
/consumers/
/consumers/console-consumer-24372 data=
/consumers/console-consumer-24372/ids data=
/consumers/console-consumer-24372/ids/console-consumer-24372_centos1-1495075711403-999aec1a data={"version":1,"subscription":{"mytopic":1},"pattern":"white_list","timestamp":"1495075711460"}
/consumers/console-consumer-24372/owners data=null
/consumers/console-consumer-24372/owners/mytopic data=null
/consumers/console-consumer-24372/owners/mytopic/0 data=console-consumer-24372_centos1-1495075711403-999aec1a-0
/consumers/console-consumer-24372/owners/mytopic/1 data=console-consumer-24372_centos1-1495075711403-999aec1a-0
/consumers/console-consumer-24372/offsets data=null
/consumers/console-consumer-24372/offsets/mytopic data=null
/consumers/console-consumer-24372/offsets/mytopic/0 data=153
/consumers/console-consumer-24372/offsets/mytopic/1 data=582 //console-consumer-24372 是某个控制台consumer的group name,582是该consumer目前消费的mytopic中分区1中的消息的偏移量。可以直接在zookeeper中修改这个值,从而让该consumer从这个值(偏移量)开始读消息。
/config
/config/changes
/config/clients
/config/topics
kafka目录结构
./.lock
./meta.properties
./cleaner-offset-checkpoint
./replication-offset-checkpoint
./recovery-point-offset-checkpoint
./mytopic-0 //命名方式:topic+分区ID
./mytopic-0/00000000000000000000.index
./mytopic-0/00000000000000000000.timeindex
./mytopic-0/00000000000000000000.log //存放消息的地方
./mytopic-1
./mytopic-1/00000000000000000000.index
./mytopic-1/00000000000000000000.timeindex
./mytopic-1/00000000000000000000.log //存放消息的地方
Kafka 副本机制:
1. 每个分区存放n个副本,可承受n-1个节点失效。
2. 这n个副本中有一个是leader,它同时维护者所有副本的同步状态。
3. 如果leader失效,会通知producer,然后producer将消息重新发送给新的leader。
4. 选择新leader的方法是:所有follower在zookeeper中注册自己,最先注册的是leader,其它是follower。
5. Kafka支持的副本机制有:
同步机制: producer从zookeeper中找到leader,向leader发送消息,消息写入leader本地log。follower从leader中pull消息,每个follower将消息写入本地log,向leader发送确认回执。leader收到follower的确认回执后再想producer发送确认回执。 在consumer端,所有的消息是从leader中pull的。
异步机制:与同步机制不同的是一旦leader向log写入message完成就会向producer发送确认回执。所以这种机制不保证向失效的follower写入成功。
consumer group 与 partition
1. 监听同一个topic的多个consumer,可以属于一个group。同属一个group的多个consumer不会重复接收消息。如果要重复接收所有消息需要配不同的group。
2. 假设partition的数量是m, 同属一个group的consumer数量是n:
a. m=n, 平均为每个consumer分配一个partition
b. m>n, 每个consumer都能分配一个partition,有些consumer会分配到多个partition
c. m<n, 只有m个consumer都能分配一个partition,n-m个consumer接收不到消息。此时如果开启新的consumer,某个旧的consumer将读不到消息。
consumer的数量可以随时调整,不会漏掉消息。
命令:bin/kafka-console-consumer.sh --bootstrap-server centos1:9092 --topic mytopic --consumer-property group.id=group1
3. 可以指定consumer只接收某个partition的消息
命令:bin/kafka-console-consumer.sh --bootstrap-server centos1:9092 --topic mytopic --consumer-property group.id=group1 --partition 0
4.下面的zookeeper节点信息是被group共享的(新版Kafka可能没有把offset存到zookeeper):
/consumers/mygroup/offsets/mytopic/0 data=153
/consumers/mygroup/offsets/mytopic/1 data=582
153、582记录的是mygroup在mytopic中的分区0和分区1分别读取到的偏移量
参考:http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/
相关推荐
这个Docker Compose 文件定义了一个包含Zookeeper和三个Kafka节点的服务集群。通过指定镜像、端口映射、环境变量和依赖关系等配置,实现了Zookeeper和Kafka的快速部署和集成。同时,在定义了一个名为"mynetwork"的...
Kafka / ZK REST API将提供可用于生产环境的端点,以执行一些针对Kafka和Zookeeper的管理/度量任务。 提供以下功能: 集群/节点/控制器信息描述 经纪人名单 代理配置获取/更新,动态配置获取/更新/删除 日志目录...
命令行脚本有很多弊端,如不能在应用程序、运维框架或是监控平台中集成,需要连接 ZooKeeper,可能会绕过 Kafka 的安全设置等问题。AdminClient 是 Java 客户端提供的工具,可以解决这些问题。 2. 功能概述 ...
Kafka是由LinkedIn公司用Scala语言开发的,一个分布式、分区的、多副本的、多订阅者的,基于Zookeeper协调的分布式日志系统(也可做MQ系统)。主要初衷目标是构建一个用来处理海量日志,用户行为和网站运营统计等的...
日志分析器-分析大数据组件的客户日志,例如HDFS,Hive,HBase,Yarn,MapReduce,Storm,Spark,Spark 2,Knox,Ambari Metrics,Nifi,Accumulo,Kafka,Flume,Oozie,Falcon,Atlas和Zookeeper。 内部架构 分析...
然而,一些项目(例如在原型设计期间)开始使用 Kafka 可能会产生开销——此外,您还需要启动 Zookeeper(即使是单节点设置)。 此外,Kafka 客户端应该处理自定义二进制协议并处理与 Zookeeper 的通信。 Kiwi 为...
zookeeper(kafka附带了它。Kafka使用ZooKeeper来管理集群) mongodb(用于存储审核日志数据) 节点和npm(开发和构建环境) 加工 此应用程序已开发为可与关系数据库一起使用。 它的工作可以通过以下步骤进行解释...
从0开始搭建3个节点额度zookeeper集群 深入分析Zookeeper在disconf配置中心的应用 基于Zookeeper Watcher 核心机制深入源码分析 Zookeeper集群升级、迁移 基于Zookeeper实现分布式服务器动态上下线感知 深入...
2、基础环境监控zookeeper、storm、kafka、redis等基础集群运行状态监控 3、日志监控APP运行关键流程节点状态日志收集和监控 2、引擎框架
kafka的监控消费组,app在日志中进行各种event埋点(如:第三方异常报警、请求耗时异常报警等) business-group: kafka的业务消费组 trace-group: 通过日志进行rpc调用trace跟踪(dapper论文) es: 日志存储db,并...
(1)kafka在187上的节点死掉了,可能是虚拟机的问题,杀掉进程,重新启动一下 nohup bin/kafka-server-start.sh config/server.properties & (2)需要在nginx.conf中,http部分,加入resolver 8.8.8.8; (3)...
13_zookeeper的安装部署和使用.mp4 14_hadoop配置介绍.mp4 15_hadoop高可用配置及群起脚本.mp4 16_hdfs的多目录配置和集群平衡.mp4 17_hadoop的参数调优.mp4 18_kafka的安装部署.mp4 19_kafka命令行简单介绍....
Hadoop、Spark、Kafka、Hbase..... 等,更新中... 综合实践项目 项目名 说明 使用 Spark SQL imooc 访问日志,数据清洗,统计,可视化 入门学习示例 项目名 所属组件 介绍 MapReduce MapReduce 实验 - 计算气温 最大...
如果已经正确安装部署完 zookeeper 和 kafka、以及配置完 /etc/hostname 文件,下面正式开始安装部署 RTSync 同步工具及数据同步展示。 六、配置 /etc/hosts 文件 配置 /etc/hosts 文件的方法及步骤如下: 步骤 1...