`
sharp-fcc
  • 浏览: 105578 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

拨开kafka 的羊毛衫

阅读更多

好的, 上篇把 kafka.kafka 干的事情解析了一遍, 什么都看不出来, 是的, 什么都看不出来他干了什么。那么这章来电干货。

 

在kafka中, 主要资源的协调,开始运行时在

class KafkaServer(val config: KafkaConfig) extends Logging

这个类中进行的。

 

在初始化这个类的时候,他做了一件事情。

  val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)

咱们来看看  KafkaScheduler的实现

 

  private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
    def newThread(runnable: Runnable): Thread = {
      val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement)
      t.setDaemon(isDaemon)
      t
    }
  })
  executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
  executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)

 

看到结果了吧, 其实就是  ScheduledThreadPoolExecutor, kafka 初始化了一个单线程的 ScheduledThreadPoolExecutor 而且名字叫做 “kafka-logcleaner-”

 

初始化完成了, 咱们看看  startup 方法里有些什么猫腻。

    isShuttingDown = new AtomicBoolean(false)
    shutdownLatch = new CountDownLatch(1)
    var needRecovery = true
    val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE)
    if (cleanShutDownFile.exists) {
      needRecovery = false
      cleanShutDownFile.delete
    }

 首先是 shutdown 的判断吧之类的操作, 很巧妙,使用了一个文件来表示运行状态。

 

    logManager = new LogManager(config,
                                scheduler,
                                SystemTime,
                                1000L * 60 * 60 * config.logRollHours,
                                1000L * 60 * config.logCleanupIntervalMinutes,
                                1000L * 60 * 60 * config.logRetentionHours,
                                needRecovery)

 

他new 了一个叫做, logManager 的东西,是的,了解kafka 的人都知道,kafka 是全磁盘操作,message全放磁盘上,此类用于磁盘io的操作。相当关键,咱们看一下。

 

    for(dir <- subDirs) {
      if(!dir.isDirectory()) {
        warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
      } else {
        info("Loading log '" + dir.getName() + "'")
        val topic = Utils.getTopicPartition(dir.getName)._1
        val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
        val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
        val log = new Log(dir, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery)
        val topicPartion = Utils.getTopicPartition(dir.getName)
        logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
        val parts = logs.get(topicPartion._1)
        parts.put(topicPartion._2, log)
      }
    }

 首先, 他获取了topic们, 获取一些系统属性,把topic 放到 名叫 logs 的一个 pool 中, new Log 的作用是 加载 目录topic 中的log 信息到内存中。

 

在log 对象之中,用LogSegment 抽象了 log 的分段,因为 topic 是有 partition 的。

  /* The actual segments of the log */
  private[log] val segments: SegmentList[LogSegment] = loadSegments()

 这个 segments 搜有的加在一起就是一个完整的 topic。

然后是按照 logsegment 的start 排个序,做个验证,完事。

 

接着, 把 各个topic 信息放到内存中之后,开始用

  if(scheduler != null) {
    info("starting log cleaner every " + logCleanupIntervalMs + " ms")    
    scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
  }

 来定时 按照 config.logCleanupIntervalMinutes 配置的分钟做一些事情。做什么事情呢, 清空一下旧的log,按照两种标准清空, 一个是超过一定时间的log,还有一个是超过大小的log。

 

下面到了跟zk交互的阶段

  if(config.enableZookeeper) {
    kafkaZookeeper = new KafkaZooKeeper(config, this)
    kafkaZookeeper.startup
    zkActor = new Actor {
      def act() {
        loop {
          receive {
            case topic: String =>
              try {
                kafkaZookeeper.registerTopicInZk(topic)
              }
              catch {
                case e => error(e) // log it and let it go
              }
            case StopActor =>
              info("zkActor stopped")
              exit
          }
        }
      }
    }
    zkActor.start
  }

 

跟zk交互的过程包括创建以下path:

 *   /topics/[topic]/[node_id-partition_num]

 *   /brokers/[0...N] --> host:port

订阅事件

 

总结一下,磁盘部分在 broker 初始化的时候,加载topic 信息到内存, 定期清理以下log, 跟zk做一些注册,订阅事件。

 

下回咱们看下,初始化的时候, 是网络连接用的什么神奇的东西。

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics