`
fly_ever
  • 浏览: 149930 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论
阅读更多

 

kafka下载:http://kafka.apache.org/downloads

解压下载下来的文件,bin目录下是常用命令,config目录下是配置文件。

kafka已经内置了一个zookeeper环境,可以直接使用。

建议kafka在linux环境下使用。

 

Kafka的简单测试

以下是在windows环境下的一个kafka测试:

配置则直接使用默认的配置即可。

1,启动zookeeper:

进入bin/windows目录,运行命令:

>zookeeper-server-start.bat  ../../config/zookeeper.properties

2,启动kafka:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-server-start.bat ../../config/server.properties

3,创建一个Topic:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-topics.bat  -create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 -topic test

创建Topic完成后,可通过命令查看Topic:

>kafka-topics.bat -list --zookeeper localhost:2181

4,创建一个消息消费者:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-console-consumer.bat  --bootstrap-server localhost:9092 --topic test --from-beginning

执行命令后,暂时不会有任何信息返回,消费者在等待信息。

5,创建一个消息生产者:

重新打开一个窗口,进入bin/windows目录,运行命令:

>kafka-console-producer.bat  --broker-list localhost:9092 --topic test

此时输入信息,然后按回车键。则完成了一条消息的生产,此时可看到在消息消费者窗口,会打印出刚刚生产的消息。

此时一个Kafka的使用实例完成。

 

Java代码完成Kafka测试

也可以直接通过Java代码来完成上述过程:

如上面步骤3:创建一个Topic,Java代码如下:

public static void main(String[] args) {
		// TODO Auto-generated method stub

		Properties prop = new Properties();
		prop.put("bootstrap.servers", "localhost:9092");
		
		AdminClient ac = AdminClient.create(prop);
		ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
		NewTopic topic = new NewTopic("topic-test2",1,(short)1);
		topics.add(topic);
		
		CreateTopicsResult result = ac.createTopics(topics);
		try {
			System.out.println("show create topics result:");
			result.all().get();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

 然后通过命令

>kafka-topics.bat -list --zookeeper localhost:2181

可查看刚刚通过程序新增的Topic。

 

创建一个消息生产者,

public void createMsg(){
		prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		Producer<String,String> p = new KafkaProducer<String,String>(prop);
		for(int i=0;i<50;i++){
			p.send(new ProducerRecord<String,String>("topic-test1","topictest message" + i));
		}
		p.close();
	}

 

生产的消息能够通过命令查看:

>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic-test1  --from-beginning

 

创建一个消息消费者,把收到的消息信息展示出来:

public void doconsume(){
		prop.put("group.id", "test");
		prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		final KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop);
		consumer.subscribe(Arrays.asList("topic-test1"),new ConsumerRebalanceListener(){

			public void onPartitionsRevoked(
					Collection<TopicPartition> partitions) {
				// TODO Auto-generated method stub
				
			}

			public void onPartitionsAssigned(
					Collection<TopicPartition> partitions) {
				// TODO Auto-generated method stub
				consumer.seekToBeginning(partitions);
				}
		
		});
		
		while(true){
			ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
			for(ConsumerRecord<String,String> record: records)
				System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());
		}
	}

 

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics