在学习笔记(一)中,讲解了kafka的安装、部署、以及bash下进行的一些简单操作,而这次将学习kafka的java客户端代码。
1、jar包。
在maven上,我们有两种apache kafka提供的jar包:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.1</version> </dependency>
这里我们选择的是kafka-clients,因为kafka-clients比kafka_2.11依赖的jar少,而且对于Consumer,没有了低级别api与高级别api的区分,方便了代码的编写。
2、Producer。
接下来我们编写Produer的java代码:
public class Producer extends Thread { private final KafkaProducer producer; public Producer(){ Properties props = new Properties(); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("bootstrap.servers", "localhost:9092"); this.producer = new KafkaProducer(props); } @Override public void run() { int messageNo = 1; while (true) { String messageStr = "Message_" + messageNo; System.out.println("Send:" + messageStr); producer.send(new ProducerRecord("my_test", messageStr)); messageNo++; try { sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
在构造函数中,我们创建一个KafkaProducer的实例,props中为必要的参数(已经最少了,不能更少)。
为了一会方便运行,我们继承了Thread类,并且重写了run。在send中还能添加一个callback回调方法,可以在你的IDE中看到这个参数,如果你的业务有需要的话,可以进行定制。
对于producer参数配置,将在后续的笔记中介绍。
3、Consumer。
然后是Consumer:
public class Consumer extends Thread { private final KafkaConsumer<String, String> consumer; public Consumer (){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-consumer-group"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer(props); } @Override public void run() { this.consumer.subscribe(Arrays.asList("my_test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.println("receive:" + record.value()); } } } }
与Producer类似,我们继承了Thread,并且在构造函数中创建了KafkaConsumer的实例。
consumer可以通过subscribe订阅想要的topic,而poll方法能够拉取消息,参数为超时时间,单位为millisenonds,如果指定时间未拉取到消息,返回ConsumerRecords.empty()。
4、进行测试:
public class KafkaConsumerProducerDemo { public static void main(String[] args) { new Producer().start(); new Consumer().start(); } }
在控制台输出如下:
Send:Message_506
receive:Message_506
Send:Message_507
receive:Message_507
Send:Message_508
receive:Message_508
Send:Message_509
receive:Message_509
Send:Message_510
receive:Message_510
至此,java客户端kafak代码就编写完成了。需要注意的是,对于consumer,要保证消息的处理速度能够跟上producer的生产速度,可以根据业务复杂程度与可控制程度,选择合适的线程方式处理消息(例如线程池,或者是actor)。
相关推荐
1.kafka的基础知识(安装、部署、基础概念,版本) 2.kafka的特性 3. kafka客户端 4.kafka中的zookeeper 5. kafka如何不丢消息 6.kafka多线程消费 7.kafka重组平衡 ...11. 代码见《kafka学习代码》
kafka学习代码
kafka-java-demo 基于java的kafka生产消费者示例。 mvn
kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列...
本篇文章主要介绍了Kafka使用Java客户端进行访问的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
Kafka全套学习笔记
kafka学习笔记(一) ================= 本人整理的学习笔记,该笔记目前只有第一版,适合初学者初步了解kafka
Kafka与Storm整合后java客户端使用实例代码
kafka学习笔记,内容详实
Kafka是一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。在本课程中,你将学习到,Kafka架构原理、安装配置使用、详细的...
Kafka学习笔记,包括Kafka术语、学习过程中单机版Kafka安装与配置、基于Docker的Kafka集群安装与配置、kafka消息机制与原理、学习方法
基于JAVA API方式使用Kafka——编写生产者客户端 记录我的学习之旅,每份文档倾心倾力,带我成我大牛,回头观望满脸笑意,望大家多多给予意见,有问题或错误,请联系 我将及时改正;借鉴文章标明出处,谢谢
Kafka学习笔记 3M大文件,全面涵盖kafka知识点,值得收藏
从kafka官网找例子: http://kafka.apache.org/ –>click Documentation http://kafka.apache.org/documentation/–>click 1.0.X http://kafka.apache.org/10/documentation.html –>click API ...
最全面的Kafka学习笔记,含代码,知识点,命令,设计图等等
Kafka学习笔记,全网最全
kafka客户端工具,可以方便的查看kafka存储的数据,对于查看服务器存储数据非常方便,欢迎大家的使用
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献...
RabbitMQ和Kafka详细笔记以及示例代码
该笔记从零开始学习kafka,笔记内容包括:kafka的简介、下载、安装、配置、运行,以及springboot整合kafka实例