本章,我们将进入到Kafka的核心类中进行代码走读,深入分析他的存储交互和消息分发原理。
首先给大家展示一张服务端交互图,因为比较复杂我就没有再画,转发别人的一张图以供参考:
大家看完这个图以后相信有了一个整体认识,那么下面我们就重点从整体到细节的逐步分解。
一、KafkaServerStartable
在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装
我们使用命令行./kafka-server-start.sh -daemon ../config/server.properties 进行启动的时候,也是调用的这个类。
class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { private val server = new KafkaServer(serverConfig) def startup() { try { server.startup() AppInfo.registerInfo() } catch { case e: Throwable => fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e) // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code System.exit(1) } } def shutdown() { try { server.shutdown() } catch { case e: Throwable => fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e) System.exit(1) } } /** * Allow setting broker state from the startable. * This is needed when a custom kafka server startable want to emit new states that it introduces. */ def setServerState(newState: Byte) { server.brokerState.newState(newState) } def awaitShutdown() = server.awaitShutdown }
二、Kafka Server类
在KafkaServerStartable中启动了KafkaServer,这代表一个kafka broker, 是kafka的核心,默认的情况下一个KafkaServer就是一个broker。
只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析。
this.logIdent = "[Kafka Server " + config.brokerId + "], " private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) private var startupComplete = new AtomicBoolean(false) val brokerState: BrokerState = new BrokerState val correlationId: AtomicInteger = new AtomicInteger(0) var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null var offsetManager: OffsetManager = null var kafkaHealthcheck: KafkaHealthcheck = null var topicConfigManager: TopicConfigManager = null var replicaManager: ReplicaManager = null var apis: KafkaApis = null var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var zkClient: ZkClient = null
这个些类的初始化和启动代码如下,整个KafkaServer其实就是在初始和启动任务。
def startup() { try { info("starting") brokerState.newState(Starting) isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) /* start scheduler */ kafkaScheduler.startup() /* setup zookeeper */ zkClient = initZk() /* start log manager */ logManager = createLogManager(zkClient, brokerState) logManager.startup() socketServer = new SocketServer(config.brokerId, config.hostName, config.port, config.numNetworkThreads, config.queuedMaxRequests, config.socketSendBufferBytes, config.socketReceiveBufferBytes, config.socketRequestMaxBytes, config.maxConnectionsPerIp, config.connectionsMaxIdleMs, config.maxConnectionsPerIpOverrides) socketServer.startup() replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) /* start offset manager */ offsetManager = createOffsetManager() kafkaController = new KafkaController(config, zkClient, brokerState) /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() replicaManager.startup() kafkaController.startup() topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() /* tell everyone we are alive */ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() registerStats() startupComplete.set(true) info("started") }
说明:
1、KafkaScheduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现
2、由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信
3、LogManager是kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据。
4、replicaManager是partition的备份分区管理。
5、broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的。
6、offsetManager是定期清除过期的offset数据,即compact操作,以及consumer相关的一些offset操作。
7、kafkaController是为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性
8、Kafka apis调用handler的事件类型,进行相关的事件的处理。
请求的事件类型共有ProduceKey, FetchKey, OffsetsKey, MetadataKey, LeaderAndIsrKey, StopReplicaKey。包括我们之前说的producer和consumer与服务端的通信,很多就是通过这些key来获取的。UpdateMetadataKey, ControlledShutdownKey, OffsetCommitKey, OffsetFetchKey
9、TopicConfigManager用于处理topic config的change。
- ./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1
比如你可以这样设置,那么这些topic config如何生效的?
topic-level config默认是被存储在,
/brokers/topics/<topic_name>/config 但是topic很多的情况下,为了避免创建太多的watcher,
所以单独创建一个目录
/brokers/config_changes
来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化
/brokers/config_changes/config_change_13321
10、KafkaHealthcheck这个只是心跳检查机制。
接下来的博客将分别对这9个启动项进行逐一介绍。
相关推荐
Kafka技术内幕:图文详解Kafka源码设计与实现 有书签 有源码
Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。
Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载 Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
Kafka技术内幕:图文详解Kafka源码设计与实现
kafka 技术内幕 图文详解Kafka源码设计与实现 kafka 源码
apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社,分为part1和part2,一起下载解压
()完整目录,高清,无水印)Kafka源码解析与实战 理论联系实战,迅速使用API开发高并发的消息系统
kafka源码解析新手版本修正版,重新排版、加入了导航收签,添加原文档部分图片无法显示的问题,原文请参考原博主http://blog.csdn.net/column/details/kafka-details.html
apache kafka技术内幕 和 apacke kafka源码分析2本PDF 电子书 网盘下载
Kafka源码解析与实战.zip
Kafka源码解析以及实战,为深度学习kafka的童鞋提供研究教材呦
Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。
kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala
apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
Apache Kafka源码剖析试读文章, kafka 开发的好文章
Kafka技术内幕:图文详解Kafka源码设计与实现.郑奇煌(2017.11).pdfKafka技术内幕:图文详解Kafka源码设计与实现.郑奇煌(2017.11).pdf
Apache Kafka源码剖析 PDF较大,分6份上传!一起解压即可。