引言
Kafka中的Message是以topic为基本单位组织的,不同的 topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定 的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。
partition是以文件的形式存储在文件系统中,比如,创建了一个名 为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样 5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则 为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。
接下来,本文将分析partition目录中的文件的存储格式和相关的代码所在的位置。
Partition的数据文件
Partition中的每条Message由offset来表示它在这个 partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了 partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条 Message包含了以下三个属性:
- offset
- MessageSize
- data
其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。它的格式和Kafka通讯协议中介绍的MessageSet格式是一致。
Partition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起。它的实现类为FileMessageSet,类图如下:
它的主要方法如下:
- append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。
- searchFor: 从指定的startingPosition开始搜索找到第一个Message其offset是大于或者等于指定的offset,并返回其在文件中的位置 Position。它的实现方式是从startingPosition开始读取12个字节,分别是当前MessageSet的offset和size。如 果当前offset小于指定的offset,那么将position向后移动LogOverHead+MessageSize(其中 LogOverHead为offset+messagesize,为12个字节)。
- read:准确名字应该是slice,它截取其中一部分返回一个新的FileMessageSet。它不保证截取的位置数据的完整性。
- sizeInBytes: 表示这个FileMessageSet占有了多少字节的空间。
- truncateTo: 把这个文件截断,这个方法不保证截断位置的Message的完整性。
- readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。
我们来思考一下,如果一个partition只有一个数据文件会怎么样?
- 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
- 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。
那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引。
数据文件的分段
Kafka解决查询效率的手段之一是将数据文件分段,比如有100条 Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文 件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段 中。
为数据文件建索引
数据文件分段使得可以在一个较小的数据文件中查找对应offset的 Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立 了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。
- 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对 offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么 offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
- position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。
index文件中并没有为数据文件中的每条Message建立索引,而是 采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的 Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
在Kafka中,索引文件的实现类为OffsetIndex,它的类图如下:
主要的方法有:
- append方法,添加一对offset和position到index文件中,这里的offset将会被转成相对的offset。
- lookup, 用二分查找的方式去查找小于或等于给定offset的最大的那个offset
小结
我们以几张图来总结一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。
Message是按照topic来组织,每个topic可以分成多个的partition,比如:有5个partition的名为为page_visits的topic的目录结构为:
partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件:
可以看到,这个partition有4个LogSegment。
比如:要查找绝对offset为7的Message:
- 首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
- 打 开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然 offset为6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。
- 打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。
这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。
一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。
相关推荐
实时数据处理:Kafka + Spark Streaming 数据应用层:MLlib 产生一个模型 als算法 数据展示和对接:Zeppelin 选用考量: HDFS不管是在存储的性能,稳定性 吞吐量 都是在主流文件系统中很占有优势的 如果感觉HDFS...
kafkaKafka的特性主要包括: 消息文件存储(消息堆积能力):Kafka具有很强的消息...拉模型(消费者水平扩展):Kafka采用拉模型进行消息的消费,消费者可以根据自己的处理能力来拉取消息,实现了消费者的水平扩展。
kafkaApache Kafka是一个分布式流式计算平台,它主要用于处理实时的数据流。...拉模型(消费者水平扩展):Kafka采用拉模型进行消息的消费,消费者可以根据自己的处理能力来拉取消息,实现了消费者的水平扩展。
Kafka基于发布-订阅的消息队列模型,消息以主题(topics)的形式发布到Kafka集群中的一个或多个分区(partitions)上,并可以被一个或多个消费者应用程序订阅和处理。Kafka提供了持久性存储、数据复制和容错机制,...
Kafka还强调减少数据的序列化和拷贝开销,它会将一些消息组织成Message Set做批量存储和发送,并且客户端在pull数据的时候,尽量以zero-copy的方式传输,利用sendfile(对应java里的 FileChannel.transferTo/...
实时数据处理:Kafka + Spark Streaming 数据应用层:MLlib 产生一个模型 als算法 数据展示和对接:Zeppelin 选用考量: HDFS不管是在存储的性能,稳定性 吞吐量 都是在主流文件系统中很占有优势的 如果感觉HDFS...
实时数据处理:Kafka + Spark Streaming 数据应用层:MLlib 产生一个模型 als算法 数据展示和对接:Zeppelin 选用考量: HDFS不管是在存储的性能,稳定性 吞吐量 都是在主流文件系统中很占有优势的 如果感觉HDFS...
3.kafka中的消息模型 4.大概流程 1.kafka中涉及的名词 消息记录(record): 由一个key,一个value和一个时间戳构成,消息最终存储在主题下的分区中, 记录在生产者中称为生产者记录(ProducerRecord), 在消费者中称为...
apache-atlas-2.3.0-kafka-hook.tar.gz Apache Atlas 框架是一组可扩展的核心基础治理服务,使企业能够有效且高效地满足 Hadoop 内的合规性要求,并允许与整个企业数据生态系统集成。这将通过使用规范和取证模型、...
我已经使用TensorFlow.js来构建ml模型体系结构,并使用Kafka来进行实时数据流传输和流水线操作。 使用的技术: 卡夫卡 将日志从源流水到主题。 消费者订阅主题以并行进行实时ml预测和模型训练。 TensorFlow.js...
实时数据处理:Kafka + Spark Streaming 数据应用层:MLlib 产生一个模型 als算法 数据展示和对接:Zeppelin 选用考量: HDFS不管是在存储的性能,稳定性 吞吐量 都是在主流文件系统中很占有优势的 如果感觉HDFS...
概述Kafka Connect MarkLogic 是一个仅接收器的连接器,用于... MarkLogic 是一个多模型无模式 NoSQL 数据库,用于存储、管理和搜索 JSON、XML 和 RDF 三元组。 更多详情请参考 MarkLogic 官。什么是 Apache 卡夫卡? A
项目架构: 主要是基于Flume+Kafka+Sparkstreaming +HBase+ES来实现实时的用户信息存储轨迹查询任务。 1、资源内容: 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、...
在这里被广泛用于许多目的,例如数据处理、构建物化视图、持久化域模型和公开服务的输出。 此外,没有传统的数据库,取而代之的是本地状态存储(PostgreSQL 只扮演遗留系统的角色)。 这个代码库试图应用 with 和 的...
因此,在解释Kapaka的工作原理之前,让我们仔细看一下消息传递系统:如果发送了称为消息的数据单元的生产者将数据存储在每个消息存储中的数据(在Kafka中称为主题), 接受方消费者应该从所需主题中获取数据。...
大数据技术指的是用于处理和分析...机器学习:大数据技术也广泛应用于机器学习领域,支持大规模数据的模型训练和预测分析。 流式处理:针对实时数据处理需求,流式处理技术(如Apache Kafka、Apache Flink)可以实时。
网络通信和IO模型 多线程、协程和异步IO的应用 分布式爬虫系统的扩展和负载均衡 实际案例分析和项目实践 实际爬虫系统的设计和实现 爬虫系统的性能优化和调试技巧 爬虫项目开发流程和实践经验分享
网络通信和IO模型 多线程、协程和异步IO的应用 分布式爬虫系统的扩展和负载均衡 实际案例分析和项目实践 实际爬虫系统的设计和实现 爬虫系统的性能优化和调试技巧 爬虫项目开发流程和实践经验分享
网络通信和IO模型 多线程、协程和异步IO的应用 分布式爬虫系统的扩展和负载均衡 实际案例分析和项目实践 实际爬虫系统的设计和实现 爬虫系统的性能优化和调试技巧 爬虫项目开发流程和实践经验分享
- 功能:消费kafka数据,并将处理后的数据存储到Redis中 - 操作步骤 1. 启动Redis > 服务端:[hadoop@mini1 redis]# bin/redis-server ~/apps/redis/etc/redis.conf \ > 客户端:[hadoop@mini1 redis]# bin/redis-...