`
flychao88
  • 浏览: 743554 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

跟我学Kafka源码之Broker Server

 
阅读更多

本章,我们将进入到Kafka的核心类中进行代码走读,深入分析他的存储交互和消息分发原理。

首先给大家展示一张服务端交互图,因为比较复杂我就没有再画,转发别人的一张图以供参考:

 大家看完这个图以后相信有了一个整体认识,那么下面我们就重点从整体到细节的逐步分解。

 

一、KafkaServerStartable

在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装

我们使用命令行./kafka-server-start.sh -daemon ../config/server.properties 进行启动的时候,也是调用的这个类。

 

class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
  private val server = new KafkaServer(serverConfig)

  def startup() {
    try {
      server.startup()
      AppInfo.registerInfo()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
        System.exit(1)
    }
  }

  def shutdown() {
    try {
      server.shutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
        System.exit(1)
    }
  }

  /**
   * Allow setting broker state from the startable.
   * This is needed when a custom kafka server startable want to emit new states that it introduces.
   */
  def setServerState(newState: Byte) {
    server.brokerState.newState(newState)
  }

  def awaitShutdown() = 
    server.awaitShutdown

}

 

二、Kafka Server类

在KafkaServerStartable中启动了KafkaServer,这代表一个kafka broker, 是kafka的核心,默认的情况下一个KafkaServer就是一个broker。

只需要看看里面startup了哪些modules, 就知道broker做了哪些工作, 后面一个个具体分析。

 

  this.logIdent = "[Kafka Server " + config.brokerId + "], "
  private var isShuttingDown = new AtomicBoolean(false)
  private var shutdownLatch = new CountDownLatch(1)
  private var startupComplete = new AtomicBoolean(false)
  val brokerState: BrokerState = new BrokerState
  val correlationId: AtomicInteger = new AtomicInteger(0)
  var socketServer: SocketServer = null
  var requestHandlerPool: KafkaRequestHandlerPool = null
  var logManager: LogManager = null
  var offsetManager: OffsetManager = null
  var kafkaHealthcheck: KafkaHealthcheck = null
  var topicConfigManager: TopicConfigManager = null
  var replicaManager: ReplicaManager = null
  var apis: KafkaApis = null
  var kafkaController: KafkaController = null
  val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
  var zkClient: ZkClient = null

 

   这个些类的初始化和启动代码如下,整个KafkaServer其实就是在初始和启动任务。

def startup() {
    try {
      info("starting")
      brokerState.newState(Starting)
      isShuttingDown = new AtomicBoolean(false)
      shutdownLatch = new CountDownLatch(1)

      /* start scheduler */
      kafkaScheduler.startup()
    
      /* setup zookeeper */
      zkClient = initZk()

      /* start log manager */
      logManager = createLogManager(zkClient, brokerState)
      logManager.startup()

      socketServer = new SocketServer(config.brokerId,
                                      config.hostName,
                                      config.port,
                                      config.numNetworkThreads,
                                      config.queuedMaxRequests,
                                      config.socketSendBufferBytes,
                                      config.socketReceiveBufferBytes,
                                      config.socketRequestMaxBytes,
                                      config.maxConnectionsPerIp,
                                      config.connectionsMaxIdleMs,
                                      config.maxConnectionsPerIpOverrides)
      socketServer.startup()

      replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)

      /* start offset manager */
      offsetManager = createOffsetManager()

      kafkaController = new KafkaController(config, zkClient, brokerState)
    
      /* start processing requests */
      apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
      brokerState.newState(RunningAsBroker)
   
      Mx4jLoader.maybeLoad()

      replicaManager.startup()

      kafkaController.startup()
    
      topicConfigManager = new TopicConfigManager(zkClient, logManager)
      topicConfigManager.startup()
    
      /* tell everyone we are alive */
      kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
      kafkaHealthcheck.startup()

    
      registerStats()
      startupComplete.set(true)
      info("started")
    }

说明:

 

1、KafkaScheduler用于在后台执行一些任务,用ScheduledThreadPoolExecutor实现

2、由于Kafka是基于zookeeper进行配置管理的, 所以需要创建zkclient和zookeeper集群通信

3、LogManager是kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据。

4、replicaManager是partition的备份分区管理。

5、broker server是socket server,所有和broker的交互都是通过往socket端口发送request来实现的。

6、offsetManager是定期清除过期的offset数据,即compact操作,以及consumer相关的一些offset操作。

7、kafkaController是为了处理replica,会用一个broker作为master,即controller,用于协调replica的一致性

8、Kafka apis调用handler的事件类型,进行相关的事件的处理。

请求的事件类型共有ProduceKey, FetchKey, OffsetsKey, MetadataKey, LeaderAndIsrKey, StopReplicaKey。包括我们之前说的producer和consumer与服务端的通信,很多就是通过这些key来获取的。
UpdateMetadataKey, ControlledShutdownKey, OffsetCommitKey, OffsetFetchKey

9、TopicConfigManager用于处理topic config的change。

  1. ./bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1  

比如你可以这样设置,那么这些topic config如何生效的?

topic-level config默认是被存储在,

/brokers/topics/<topic_name>/config
但是topic很多的情况下,为了避免创建太多的watcher,

所以单独创建一个目录

/brokers/config_changes

来触发配置的变化
所以上面的命令除了,把配置写入topic/config,还有增加一个通知,告诉watcher哪个topic的config发生了变化

/brokers/config_changes/config_change_13321

10、KafkaHealthcheck这个只是心跳检查机制。

 

接下来的博客将分别对这9个启动项进行逐一介绍。

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics