Kafka将数据持久化到了硬盘上,允许你配置一定的策略对数据清理,清理的策略有两个,删除和压缩。
数据清理的方式
删除
log.cleanup.policy=delete启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:
清理超过指定时间清理:
log.retention.hours=16
超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824
为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。
压缩
将数据压缩,只保留每个key最后一个版本的数据。
首先在broker的配置中设置log.cleaner.enable=true启用cleaner,这个默认是关闭的。
在topic的配置中设置log.cleanup.policy=compact启用压缩策略。
压缩策略的细节
如上图,在整个数据流中,每个Key都有可能出现多次,压缩时将根据Key将消息聚合,只保留最后一次出现时的数据。这样,无论什么时候消费消息,都能拿到每个Key的最新版本的数据。
压缩后的offset可能是不连续的,比如上图中没有5和7,因为这些offset的消息被merge了,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,比如,当试图获取offset为5的消息时,实际上会拿到offset为6的消息,并从这个位置开始消费。
这种策略只适合特俗场景,比如消息的key是用户ID,消息体是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。
压缩策略支持删除,当某个Key的最新版本的消息没有内容时,这个Key将被删除,这也符合以上逻辑。
相关推荐
kafka读取写入数据
向kafka插入数据测试
Storm集成Kafka 一、整合说明 二、写入数据到Kafka 三、从Kafka中读取数据 从Kafka中读取数据 Storm从Kafka集群读取数据处理
使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据,数据可以批量进行操作
用java实现向kafka写数据以及从kafka消费数据,kafka版本支持0.10以上
.net core 使用kafka推送消息和接收消息,包含生产端和消费端
安装kafka支持库pip install kafka-python from kafka import KafkaProducer import json ''' 生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下...
图解 Kafka 之实战指南.pdf
自己研究大数据多年,写的一个日志数据采集方案笔记,可快速熟悉Flume,Kafka,Hdfs的操作使用,以及相互的操作接口。
消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch <artifactId>elastic...
unity利用kafka接收数据,只需填写ip端口,topic 即可接收消息;适用范围,unity编辑器,发布PC应用 说明:如果发布PC不可用,请手动将Plugins\X64文件夹里的dll 文件拷贝到发布文件kafka-Test_Data\Managed 路径下...
通过java模拟生产环境的日志,flume监控指定目录,采集日志推送到kafka。具体内容可参考 “基于CDH5的flume-kafka对接”这篇
1、Kafka如何防止数据丢失 1)消费端弄丢数据 消费者在消费完消息之后需要执行消费位移的提交,该消费位移表示下一条需要拉取的消息的位置。Kafka默认位移提交方式是自动提交,但它不是在你每消费一次数据之后就...
flink 实现将kafka的数据读取出来,并经过xml配置和重写算子方式实现数据处理并写到kafka
kafka生产数据的java项目、利用项目生成数据放入kafka中
二、写入数据到Kafka 三、从Kafka中读取数据 整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持; Storm Kafka ...
9、NIFI综合应用场景-通过NIFI配置kafka的数据同步 网址:https://blog.csdn.net/chenwewi520feng/article/details/130622776 本文旨在介绍nifi与kafka的交互过程,即生产数据到kafka中,然后通过nifi消费kafka中的...
c# kafka 发送端与接收 完整demo 生成 消费
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
pip install --user kafka-python==1.4.3 如果报错压缩相关的错尝试安装下面的依赖 yum install snappy-devel yum install lz4-devel pip install python-snappy pip install lz4 2.生产者 #!/usr/bin/env python...