好的, 上篇把 kafka.kafka 干的事情解析了一遍, 什么都看不出来, 是的, 什么都看不出来他干了什么。那么这章来电干货。
在kafka中, 主要资源的协调,开始运行时在
class KafkaServer(val config: KafkaConfig) extends Logging
这个类中进行的。
在初始化这个类的时候,他做了一件事情。
val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
咱们来看看 KafkaScheduler的实现
private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() { def newThread(runnable: Runnable): Thread = { val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement) t.setDaemon(isDaemon) t } }) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
看到结果了吧, 其实就是 ScheduledThreadPoolExecutor, kafka 初始化了一个单线程的 ScheduledThreadPoolExecutor 而且名字叫做 “kafka-logcleaner-”
初始化完成了, 咱们看看 startup 方法里有些什么猫腻。
isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) var needRecovery = true val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) if (cleanShutDownFile.exists) { needRecovery = false cleanShutDownFile.delete }
首先是 shutdown 的判断吧之类的操作, 很巧妙,使用了一个文件来表示运行状态。
logManager = new LogManager(config, scheduler, SystemTime, 1000L * 60 * 60 * config.logRollHours, 1000L * 60 * config.logCleanupIntervalMinutes, 1000L * 60 * 60 * config.logRetentionHours, needRecovery)
他new 了一个叫做, logManager 的东西,是的,了解kafka 的人都知道,kafka 是全磁盘操作,message全放磁盘上,此类用于磁盘io的操作。相当关键,咱们看一下。
for(dir <- subDirs) { if(!dir.isDirectory()) { warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") } else { info("Loading log '" + dir.getName() + "'") val topic = Utils.getTopicPartition(dir.getName)._1 val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) val log = new Log(dir, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery) val topicPartion = Utils.getTopicPartition(dir.getName) logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]()) val parts = logs.get(topicPartion._1) parts.put(topicPartion._2, log) } }
首先, 他获取了topic们, 获取一些系统属性,把topic 放到 名叫 logs 的一个 pool 中, new Log 的作用是 加载 目录topic 中的log 信息到内存中。
在log 对象之中,用LogSegment 抽象了 log 的分段,因为 topic 是有 partition 的。
/* The actual segments of the log */ private[log] val segments: SegmentList[LogSegment] = loadSegments()
这个 segments 搜有的加在一起就是一个完整的 topic。
然后是按照 logsegment 的start 排个序,做个验证,完事。
接着, 把 各个topic 信息放到内存中之后,开始用
if(scheduler != null) { info("starting log cleaner every " + logCleanupIntervalMs + " ms") scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) }
来定时 按照 config.logCleanupIntervalMinutes 配置的分钟做一些事情。做什么事情呢, 清空一下旧的log,按照两种标准清空, 一个是超过一定时间的log,还有一个是超过大小的log。
下面到了跟zk交互的阶段
if(config.enableZookeeper) { kafkaZookeeper = new KafkaZooKeeper(config, this) kafkaZookeeper.startup zkActor = new Actor { def act() { loop { receive { case topic: String => try { kafkaZookeeper.registerTopicInZk(topic) } catch { case e => error(e) // log it and let it go } case StopActor => info("zkActor stopped") exit } } } } zkActor.start }
跟zk交互的过程包括创建以下path:
* /topics/[topic]/[node_id-partition_num]
* /brokers/[0...N] --> host:port
订阅事件
总结一下,磁盘部分在 broker 初始化的时候,加载topic 信息到内存, 定期清理以下log, 跟zk做一些注册,订阅事件。
下回咱们看下,初始化的时候, 是网络连接用的什么神奇的东西。
相关推荐
kafka kafka kafka kafka kafka
kafka
kafka连接工具
kafka kafka kafka
一共包含两个程序,分别是Kafka生产者工具、Kafka消费者工具。 1、使用bootstrap、userName、password连接kafka。 2、可使用text、json格式发送topic消息。 3、异步producer、customer,收发消息畅通无阻。 Kafka...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka 插件
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka的docker镜像包含了kafka,zookeeper 和kafkamanager,可以通过docker 来load 安装
kafka_2.11-2.0.0.tgz, kafka_2.11-2.0.1.tgz, kafka_2.11-2.1.0.tgz, kafka_2.11-2.1.1.tgz, kafka_2.11-2.2.0.tgz, kafka_2.11-2.2.1.tgz, kafka_2.11-2.2.2.tgz, kafka_2.11-2.3.0.tgz, kafka_2.11-2.3.1.tgz, ...
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
1、图形化界面可以直观地查看 Kafka 的 Topic 里的内容 2、自由设置 Kafka 数据展示格式 3、使用 Kafka Tool 创建/删除 Topic 4、使用 Kafka Tool 模拟发送 Messages
【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题【BAT必备】kafka面试题...
5、kafka监控工具Kafka-Eagle介绍及使用 网址:https://blog.csdn.net/chenwewi520feng/article/details/130581571 本文主要介绍了kafka监控工具Kafka-Eagle的使用。 本文依赖:kafka、zookeeper部署完成。 本分分为...
kafka-map是一个连接kafka的页面工具
kafka安装包及安装步骤(原始安装及docker安装)
kafkapython教程_Kafka快速⼊门(⼗⼆)——Python客户端 Kafka快速⼊门(⼗⼆)——Python客户端 ⼀、confluent-kafka 1、confluent-kafka简介 confluent-kafka是Python模块,是对librdkafka的轻量级封装,⽀持Kafka ...
版本kafka_2.13-2.5.0. 官网下载太慢了,备份一下. 直接可以用 启动方法 方法一:加守护进程启动 bin/kafka-server-start.sh -daemon config/server.properties 方法二:通过后台来启动 nohup kafka-server-start.sh ...
本人在北美刚刚毕业,目前面试的几家大厂包括小公司在面试中都频繁的问道kafka这个技术,作为大数据开发或者java全栈的开发者来说,2020年很有必要系统的学习一下kafka. 1.[全面][Kafka2.11][jdk1.8][ZooKeeper3.4.6...
使用Maven整合Kafka 包括生产者,消费者 Kafka各种配置 //1.设置参数 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "81.68.232.188:9092,81.68.232.188:9093,81...