kafka下载:http://kafka.apache.org/downloads
解压下载下来的文件,bin目录下是常用命令,config目录下是配置文件。
kafka已经内置了一个zookeeper环境,可以直接使用。
建议kafka在linux环境下使用。
Kafka的简单测试
以下是在windows环境下的一个kafka测试:
配置则直接使用默认的配置即可。
1,启动zookeeper:
进入bin/windows目录,运行命令:
>zookeeper-server-start.bat ../../config/zookeeper.properties
2,启动kafka:
重新打开一个窗口,进入bin/windows目录,运行命令:
>kafka-server-start.bat ../../config/server.properties
3,创建一个Topic:
重新打开一个窗口,进入bin/windows目录,运行命令:
>kafka-topics.bat -create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 -topic test
创建Topic完成后,可通过命令查看Topic:
>kafka-topics.bat -list --zookeeper localhost:2181
4,创建一个消息消费者:
重新打开一个窗口,进入bin/windows目录,运行命令:
>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
执行命令后,暂时不会有任何信息返回,消费者在等待信息。
5,创建一个消息生产者:
重新打开一个窗口,进入bin/windows目录,运行命令:
>kafka-console-producer.bat --broker-list localhost:9092 --topic test
此时输入信息,然后按回车键。则完成了一条消息的生产,此时可看到在消息消费者窗口,会打印出刚刚生产的消息。
此时一个Kafka的使用实例完成。
Java代码完成Kafka测试
也可以直接通过Java代码来完成上述过程:
如上面步骤3:创建一个Topic,Java代码如下:
public static void main(String[] args) { // TODO Auto-generated method stub Properties prop = new Properties(); prop.put("bootstrap.servers", "localhost:9092"); AdminClient ac = AdminClient.create(prop); ArrayList<NewTopic> topics = new ArrayList<NewTopic>(); NewTopic topic = new NewTopic("topic-test2",1,(short)1); topics.add(topic); CreateTopicsResult result = ac.createTopics(topics); try { System.out.println("show create topics result:"); result.all().get(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
然后通过命令
>kafka-topics.bat -list --zookeeper localhost:2181
可查看刚刚通过程序新增的Topic。
创建一个消息生产者,
public void createMsg(){ prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> p = new KafkaProducer<String,String>(prop); for(int i=0;i<50;i++){ p.send(new ProducerRecord<String,String>("topic-test1","topictest message" + i)); } p.close(); }
生产的消息能够通过命令查看:
>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic-test1 --from-beginning
创建一个消息消费者,把收到的消息信息展示出来:
public void doconsume(){ prop.put("group.id", "test"); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); final KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop); consumer.subscribe(Arrays.asList("topic-test1"),new ConsumerRebalanceListener(){ public void onPartitionsRevoked( Collection<TopicPartition> partitions) { // TODO Auto-generated method stub } public void onPartitionsAssigned( Collection<TopicPartition> partitions) { // TODO Auto-generated method stub consumer.seekToBeginning(partitions); } }); while(true){ ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100)); for(ConsumerRecord<String,String> record: records) System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value()); } }
相关推荐
Apache Kafka入门介绍
kafka入门:简介、使用场景、设计原理、主要配置及集群搭建 kafka入门:简介、使用场景、设计原理、主要配置及集群搭建 kafka入门:简介、使用场景、设计原理、主要配置及集群搭建
kafka入门教程,一个简单的入门教程!
介绍Kafka的优缺点、使用场景、设计原理、集群安装部署。
kafka入门到精通
kafka 入门
Kafka入门,非常详细
本详细介绍了Kafka基础理论和实践的相关代码,实践步骤,为零基础的想入门大数据行业的人提供了学习相关资料
kafkakafka入门简介.docxkafka入门简介.docx
Kafka 入门基础篇.pptx
Kafka入门教程与详解
大数据组件-kafka入门学习,通过该部分学习,可以了解掌握kafka的基本常识
研究了一段时间的kafka。做了一个kafka安装到java接口的使用。适合kafka入门
springboot下集成kafka的小例子,把代码写在world里。
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
简单介绍了kafka的入门操作,快速搭建一个实例,以及讲解了kafka消费者组的概念
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka入门demo
简单介绍了kafka的入门操作,快速搭建一个实例,以及讲解了kafka消费者组的概念
1.kafka的初认识 2.Kafka 基础实战 :消费者和生产者实例 3.Kafka 核心源码剖析 4.Kafka 用户日志上报实时统计