`
kane_xie
  • 浏览: 143356 次
社区版块
存档分类
最新评论

KafkaConsumer0.9(三)

阅读更多

 

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<TopicPartition> list = new ArrayList<TopicPartition>();
TopicPartition tp = new TopicPartition("test_topic", 0);
list.add(tp);
consumer.assign(list);
consumer.seek(tp, 96);
// consumer.seekToBeginning(tp);
// consumer.seekToEnd(tp);
int commitInterval = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
	ConsumerRecords<String, String> records = consumer.poll(100);
	for (ConsumerRecord<String, String> record : records) {
		buffer.add(record);
		if (buffer.size() >= commitInterval) {
			batchProcessRecords(buffer);
			consumer.commitSync();
			buffer.clear();
		}
	}
}

 

1. 在上一篇中我们看了一个简单的自动提交offset的example,但很多情况下我们为了避免消息丢失,需要确保消息被处理完了之后才提交offset,这就需要手动地提交。在上面这个例子中,我们从kafka中抓取数据并缓存在List中,只有当消息达到一定的数量的时候我们才批量处理,假设我们使用自动提交,如果在我们还没来得及处理之前consumer就异常终止,那么有可能这些消息的offset已经被自动提交掉了,等我们的consumer重新连接上来了之后,上次没有处理完成的消息会被我们完全略过,造成数据丢失,这就是"at-most-once delivery"。解决的办法是,只有在批量处理完消息之后,才用consumer.commitSync()手动地提交offset,但这样的副作用的,假如我们正在批量处理消息,这时consumer异常终止,offset没有被提交但有部分消息已经被处理过了,当consumer重连上来时,这批没有被commit的消息会被重新处理一次,造成会有部分消息被重复处理,这就是"at-least-once delivery"。

 

2. kafka提供load balance机制来确保consumer正常工作,简单的说partitions会被分配给正在监听这个topic的多个consumers(同一个group),当其中一个consumer process异常终止,它之前所占有的partitions会被分配给其他consumer process,从而保证所有的数据都能被正常消费掉。但有时我们并不需要load balance机制,例如:

  • 为了节省网络带宽,我们只希望consumer从某一个partition抓取数据,并存储在本地。这在大数据计算或存储中是很常见的行为。在这种情况下我们并不希望另一台机器的consumer来消费这台机器的partition。
  • 如果程序本身带有HA机制,例如使用类似于YARN,Mesos等集群管理框架,那么当一个consumer终止了之后,它会被重启,或者是另一个consumer会被启动来替代它,在这种情况下我们不需要kafka重新分配partition。

要做到这点很简单,替换掉上个例子的consumer.subscribe(Arrays.asList("test_topic")),我们使用consumer.assign(list),在本例中,consumer只会消费partition0的数据。

 

3. 在之前的版本中,如果我们需要消费旧数据(已经commit offset),我们需要用SimpleConsumer。但是在0.9中,这变得更简单了。在本例中,consumer.seek(tp, 96)表示我们从partition 0的offset 96开始抓取数据,consumer.seekToBeginning(tp)表示从头开始抓取数据,consumer.seekToEnd(tp)表示从最后开始抓取数据,换句话说,只消费consumer启动之后新进来的数据。

 

 

 

 

1
1
分享到:
评论

相关推荐

    springboot kafka整合

    spring boot 与 kafka consumer 整合,可在 jvm 开发平台运行。

    disruptor-kafka-consumer:基于React流的卡夫卡消费者

    卡夫卡消费者的破坏者演示如何在Kafka 0.9 Consumer上使用LMAX Disruptor 好处-&gt;一旦先前的使用者完全处理完消息,便可以使用序列屏障来提交消息。想象力是极限。如果环形缓冲区可以容纳在L3缓存中,则处理速度会更...

    sarama-cluster:Sarama的集群扩展,Apache Kafka 0.9的Go客户端库[已弃用]

    集群扩展,Apache Kafka 0.9(及更高版本)的Go客户端库。 停产通知 请注意,由于已合并并发布(&gt; = v1.19.0),因此该库已正式弃用。 本机实现支持该库无法提供的各种用例。 文献资料 可通过godoc在中获得文档和...

    kafka-node:适用于Apache Kafka 0.8及更高版本的Node.js客户端

    Kafka-node是Apache Kafka 0.9及更高版本的Node.js客户端。 目录 抵消 行政 故障排除/常见问题解答 首次发送时出现KeyedPartitioner错误的HighLevelProducer 如何调试问题? 对于新使用者,如何从分区中的最新...

    kq:基于Kafka的Python作业队列

    KQ:适用于Python的Kafka作业队列 KQ(Kafka队列)是一个轻量级的Python库,可让您使用异步入队和执行作业...from kafka import KafkaConsumer from kq import Worker # Set up logging. formatter = logging . Forma

    c#+毕业设计+源码-librdkafka:ApacheKafkaC/C++库

    高水平平衡的KafkaConsumer(需要经纪人&gt; = 0.9) 简单(传统)消费者 管理员客户端 压缩:snappy,gzip,lz4,zstd 支持 (GSSAPI / Kerberos / SSPI,PLAIN,SCRAM,OAUTHBEARER)支持 完整清单 代理版本支持:

    Kafka常用命令收录

     在0.9.0.0之后的Kafka,出现了几个新变动,一个是在Server端增加了GroupCoordinator这个角色,另一个较大的变动是将topic的offset 信息由之前存储在zookeeper上改为存储到一个特殊的topic(__consumer_offsets)中...

    kafka-influxdb:InfluxDB的高性能Kafka使用者。 支持收集的消息格式

    支持InfluxDB 0.9.x及更高版本。 要获得InfluxDB 0.8.x支持,请签出。 :warning: 该项目应能按预期工作,并且非常欢迎进行错误修复,但是有关新功能的活动很少。 对于较新的项目,我建议改为使用 ,它既更快又更...

    bigpipe:以Kafka为存储介质,提供异步HTTP RPC的中间件

    横向扩展:完全支持rebalanced-consumer-group,多进程部署即可实现分区自动负载均衡 配置热加载:在线服务无需重启即可立即加载配置,流量0损失 优雅退出:处理完剩余任务后退出,流量0损失 超强性能:无锁,协程...

    高朗卡夫卡消费群-Golang开发

    中文文档要求Apache Kafka 0.8.x,0.9.x,0.10.x,0.11.x,1.0.x依赖go-consumergroup Go-consumergroup是一个用golang编写的kafka使用者库,具有重新平衡和chroot支持。 中文文档要求Apache Kafka 0.8.x,0.9.x,...

    具有组重新平衡功能PHP Kafka使用者库-PHP开发

    中文文档要求Apache Kafka 0.7.x,0.8.x,0.9.x,0.10.x依赖项php-zookeeper ph php-consumergroup php-consumergroup是一个具有组和重新平衡支持的kafka使用者库。 中文文档要求Apache Kafka 0.7.x,0.8.x,0.9.x,...

Global site tag (gtag.js) - Google Analytics