1.消费位移确认
Kafka消费者消费位移确认有自动提交与手动提交两种策略。在创建KafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。手动提交需要由客户端自己控制偏移量的提交。
(1)自动提交。在创建一个消费者时,默认是自动提交偏移量,当然我们也可以显示设置为自动。例如,我们创建一个消费者,该消费者自动提交偏移量
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("enable.auto.commit", true);// 显示设置偏移量自动提交
props.put("auto.commit.interval.ms", 1000);// 设置偏移量提交时间间隔
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 创建消费者
consumer.subscribe(Arrays.asList("test"));// 订阅主题
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Properties的实例props中存放的key意义:
1)bootstrap.servers表示要连接的Kafka集群中的节点,其中9092表示端口号;
2)enable.auto.commit为true,表示在auto.commit.interval.ms时间后会自动提交topic的offset,其中auto.commit.interval.ms默认值为5000ms;
3)其中foo和bar为要消费的topic名称,由group.id为test作为consumer group统一进行管理;
4)key.deserializer和value.deserializer表示指定将字节序列化为对象。
(2)手动提交offset。 生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。
在有些场景我们可能对消费偏移量有更精确的管理,以保证消息不被重复消费以及消息不被丢失。假设我们对拉取到的消息需要进行写入数据库处理,或者用于其他网络访问请求等等复杂的业务处理,在这种场景下,所有的业务处理完成后才认为消息被成功消费,这种场景下,我们必须手动控制偏移量的提交。
Kafka 提供了异步提交(commitAsync)及同步提交(commitSync)两种手动提交的方式。两者的主要区别在于同步模式下提交失败时一直尝试提交,直到遇到无法重试的情况下才会结束,同时,同步方式下消费者线程在拉取消息时会被阻塞,直到偏移量提交操作成功或者在提交过程中发生错误。而异步方式下消费者线程不会被阻塞,可能在提交偏移量操作的结果还未返
回时就开始进行下一次的拉取操作,在提交失败时也不会尝试提交。
实现手动提交前需要在创建消费者时关闭自动提交,即设置enable.auto.commit=false。然后在业务处理成功后调用commitAsync()或commitSync()方法手动提交偏移量。由于同步提交会阻塞线程直到提交消费偏移量执行结果返回,而异步提交并不会等消费偏移量提交成功后再继续下一次拉取消息的操作,因此异步提交还提供了一个偏移量提交回调的方法commitAsync(OffsetCommitCallback callback)。当提交偏移量完成后会回调OffsetCommitCallback 接口的onComplete()方法,这样客户端根据回调结果执行不同的逻辑处理。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("fetch.max.bytes", 1024);// 为了便于测试,这里设置一次fetch 请求取得的数据最大值为1KB,默认是5MB
props.put("enable.auto.commit", false);// 设置手动提交偏移量
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Arrays.asList("test"));
try {
int minCommitSize = 10;// 最少处理10 条消息后才进行提交
int icount = 0 ;// 消息计算器
while (true) {
// 等待拉取消息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// 简单打印出消息内容,模拟业务处理
System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record. partition(), record.offset(), record.key(),record.value());
icount++;
}
// 在业务逻辑处理成功后提交偏移量
if (icount >= minCommitSize){
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (null == exception) {
// TODO 表示偏移量成功提交
System.out.println("提交成功");
} else {
// TODO 表示提交偏移量发生了异常,根据业务进行相关处理
System.out.println("发生了异常");
}
}
});
icount=0; // 重置计数器
}
}
} catch(Exception e){
// TODO 异常处理
e.printStackTrace();
} finally {
consumer.close();
}
本方案的缺点是必须保证所有数据被处理后,才提交topic的offset。为避免数据的重复消费,可以用第三种方案,根据每个partition的数据消费情况进行提交,称之为“at-least-once”。
3.手动提交partition的offset
Kafka 在0.10.1.1 版本增加了时间戳索引文件,因此我们除了直接根据偏移量索引文件查询消息之外,还可以根据时间戳来访问消息。consumer-API 提供了一个offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,该方法入参为一个Map 对象,Key 为待查询的分区,Value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。需要注意的是,若待查询的分区不存在,则该方法会被一直阻塞。
假设我们希望从某个时间段开始消费,那们就可以用offsetsForTimes()方法定位到离这个时间最近的第一条消息的偏移量,在查到偏移量之后调用seek(TopicPartition partition, long offset)方法将消费偏移量重置到所查询的偏移量位置,然后调用poll()方法长轮询拉取消息。例如,我们希望从主题“stock-quotation”第0 分区距离当前时间相差12 小时之前的位置开始拉取消息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", "test");
props.put("enable.auto.commit", true);// 显示设置偏移量自动提交
props.put("auto.commit.interval.ms", 1000);// 设置偏移量提交时间间隔
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.assign(Arrays.asList(new TopicPartition("test", 0)));
try {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition,Long>();
// 构造待查询的分区
TopicPartition partition = new TopicPartition("stock-quotation", 0);
// 设置查询12 小时之前消息的偏移量
timestampsToSearch.put(partition, (System.currentTimeMillis() - 12 * 3600 * 1000));
// 会返回时间大于等于查找时间的第一个偏移量
Map<TopicPartition, OffsetAndTimestamp> offsetMap = consumer.offsetsForTimes (timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
// 这里依然用for 轮询,当然由于本例是查询的一个分区,因此也可以用if 处理
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsetMap.entrySet()) {
// 若查询时间大于时间戳索引文件中最大记录索引时间,
// 此时value 为空,即待查询时间点之后没有新消息生成
offsetTimestamp = entry.getValue();
if (null != offsetTimestamp) {
// 重置消费起始偏移量
consumer.seek(partition, entry.getValue().offset());
}
}
while (true) {
// 等待拉取消息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records){
// 简单打印出消息内容
System.out.printf("partition = %d, offset = %d,key= %s value = %s%n", record.partition(), record.offset(), record.key(),record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
4、消费速度控制
提供 pause(Collection<TopicPartition> partitions)和resume(Collection<TopicPartition>
partitions)方法,分别用来暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据操作。通过这两个方法可以对消费速度加以控制,结合业务使用。
分享到:
相关推荐
kafka是一种高吞吐量、基于发布/订阅模式的消息队列系统。下面是kafka的原理文档,涵盖了kafka的架构、设计理念、消息模型、 Partition机制、日志策略、消息可靠性机制等方面。 一、kafka架构 kafka的架构主要包括...
IT十八掌第三期配套笔记! 1、kafka消息系统的介绍 2、producer有分区类 3、kafka支持的副本模式 4、kafka消费者偏移量考察 5、kafka自定义消费者 6、kafka自定义生产者 ...8、flume集成kafka的几种方式
仅需几个简单的命令即可设置Kafka UI,从而以一种易于理解的方式可视化您的Kafka数据。 您可以在本地或在云中运行该工具。特征多集群管理—集中监视和管理所有集群使用指标仪表板进行性能监控—使用轻量级仪表板...
============伴侣分子由几种成分组成: ChaperoneClient是一个库,可以像Kafka Producer或Consumer一样放入库中,以在消息通过时对其进行审核。 审核统计信息将发送到专门的Kafka主题,例如“陪伴审核”。 ...
9、spark streaming 读取kafka数据的两种方式 8 10、kafka的数据存在内存还是磁盘 8 11、怎么解决kafka的数据丢失 9 12、fsimage和edit的区别? 10 13、列举几个配置文件优化? 10 14、datanode首次加入 cluster 的...
它不是线程安全的,所以实现多线程时通常由两种实现方法:每个线程维护一个KafkaConsumer维护一个或多个KafkaConsumer,并用一个任务线程池处理任务优缺点分析:方法一:实现简单(即把之前的消费对象new几个就完了...
卡夫卡服务 Application可以满足您的Kafka测试需求。 要求 为了构建和运行应用程序,您需要: 文献资料 在本地运行应用程序 ... 使用docker-compose.yml出您的容器。 (修改docker-compose.yml以反映您... 还有其他几种
Kafka的复制和备份 关于分区的几个总结: 一个分区a有一个首领broker A,且a是首领分区 一个分区可以在多个broker中存在副本,此时这些副本都是复制的首领分区的内容,而且生产者和消费者操作分区时,必须是首领分区...
一种用于发送SMS消息的生产质量微服务,该服务演示了几种技术的使用,包括: 夸库斯 GraalVM Kotlin Postgres(持久性) Panache(JPA) Flyway(数据库迁移) Kafka(消息传递) Debezium,Kafka Connect...
Kafka的系统监视工具。 卡夫卡雷 目录 产品特点 跨平台Kafka监控,实时数据显示桌面应用程序 监控的指标基于现实生活中的Kafka部署崩溃和最佳实践的反馈 ...有两种方法可以生成您的Kafka数据: 手动-生成受控数量的
⼤数据中数据采集的⼏种⽅式 ⼀、采集⼤数据的⽅法 1.1通过系统⽇志采集⼤数据 ⽤于系统⽇志采集的⼯具,⽬前使⽤最⼴泛的有:Hadoop 的Chukwa、ApacheFlumeAFacebook的Scribe和LinkedIn的Kafka等。这 ⾥主要学习...
数据仓库提供()服务,不最终面向用户;而数据集市是()的,面向最终用户 * Flink快照机制的核心是barriers,它不包含以下哪个特点? Structured Streamin不支持以下哪...Structured Stream 可以提供以下哪几种类型的保证
消息队列是分布式系统中的一种重要组件,旨在解决高并发环境下的同步处理问题。在高并发环境下,同步处理用户发送的请求可能会导致请求阻塞,进而引发数据库锁表、请求堆积等问题。消息队列可以解决这些问题,实现...
44.创建线程池有哪几种方式? 45.线程池都有哪些状态? 46.线程池中 submit()和 execute()方法有什么区别? 47.在 java 程序中怎么保证多线程的运行安全? 48.多线程锁的升级原理是什么? 49.什么是死锁? 50.怎么...
21、MyBatis 实现一对多有几种方式,怎么操作的? 22、Mybatis 是否支持延迟加载?如果支持,它的实现原理是什么? 23、Mybatis 的一级、二级缓存 24、什么是 MyBatis 的接口绑定?有哪些实现方式? 25、使用 ...
爬虫(Web Crawler)是一种自动化程序,用于从互联网上收集信息。其主要功能是访问网页、提取数据并存储,以便后续分析或展示。爬虫通常由搜索引擎、数据挖掘工具、监测系统等应用于网络数据抓取的场景。 爬虫的...
文章目录Flume优化一、内存参数优化(减少GC)1)-xmx和-xms...Kafka 几种 Sink知道 Flume 的拦截器吗介绍一下什么是选择器了解 Flume 的负载均衡和故障转移吗 Flume优化 一、内存参数优化(减少GC) 解决办法: 1)-xm
数据访问或通知接口规范根据访问数据的数据量、访问频次可采用以下几种数据访问或通知接口:实时数据通知对于实时过车数据可采用消息中间件(kafka等)或socket
几种常见消息队列的对比: ActiveMQ 功能强劲但是并发性能不够好,不适用于高并发的复杂的项目。 架构模式: Kafka 刚开始是为了收集和传输日志,追求高吞吐量(性能很高);但是缺点是不支持事务,不会对消息的重复...