环境:ubuntu10 2台(32位)+JDK1.8(32位)+kafka2.11+Intellij15
目标:Java启动一个Producer,启动一个Consumer,Linux启动一个Consumer.
观察3者是否能相互通信。
注意到,Java的Producer和Consumer全是用maven构建的,父项目是kafka_demo,他们两个是module.
1、Java Producer Demo:
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; /** * Created by Germmy on 2016/7/10. */ public class KafkaProducer { private final Producer<String,String> producer; public final static String TOPIC="TEST-TOPIC"; private KafkaProducer(){ Properties props=new Properties(); props.put("metadata.broker.list","192.168.200.129:9092"); props.put("serializer.class","kafka.serializer.StringEncoder"); props.put("key.serializer.class","kafka.serializer.StringEncoder"); props.put("request.required.acks","-1"); producer=new Producer<String, String>(new ProducerConfig(props)) ; } void produce(){ int messageNo=1000; final int COUNT=10000; while(messageNo<COUNT){ String key=String.valueOf(messageNo); String data="hello kafka message"+key; producer.send(new KeyedMessage<String,String>(TOPIC,key,data)); System.out.println(data); messageNo++; } } public static void main(String[] args){ new KafkaProducer().produce(); } }
2、Java Consumer Demo:
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * Created by Germmy on 2016/7/10. */ public class KafkaConsumer { private final ConsumerConnector consumer; public final static String TOPIC="TEST-TOPIC"; private KafkaConsumer(){ Properties props=new Properties(); props.put("zookeeper.connect","192.168.200.129:2181"); props.put("group.id","jd-group");//消费组是什么概念? props.put("zookeeper.session.timeout.ms","60000"); props.put("zookeeper.sync.time.ms","200"); props.put("auto.commit.interval.ms","1000"); props.put("auto.offset.reset","smallest"); props.put("serializer.class","kafka.serializer.StringEncoder"); ConsumerConfig config=new ConsumerConfig(props); consumer=kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume(){ Map<String,Integer> topicCountMap=new HashMap<String, Integer>(); topicCountMap.put(KafkaConsumer.TOPIC,new Integer(1)); StringDecoder keyDecoder=new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder=new StringDecoder(new VerifiableProperties()); Map<String,List<KafkaStream<String,String>>> consumerMap= consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String,String> stream=consumerMap.get(KafkaConsumer.TOPIC).get(0); ConsumerIterator<String,String> it=stream.iterator(); while(it.hasNext()){ System.out.println(it.next().message()); } } public static void main(String[] args){ new KafkaConsumer().consume(); } }
注意到,Consumer连接zookeeper的超时时间需要设置长一点,之前的版本是4秒,会报连接超时异常。我这里设置的是60S,参考链接
3、父POM:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.germmy</groupId> <artifactId>kafkademo</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>kafka_producer</module> <module>kafka_consumer</module> </modules> <dependencies> <!--<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> <exclusions> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </project>
注意到,这里引入的client客户端要去除对jms的依赖,否则会报如下错:
Could not transfer artifact com.sun.jdmk:jmxtools:jar:1.2.1 from/to java.net (https://maven-repository.dev.java.net/nonav/repository): No connector available to access repository java.net (https://maven-repository.dev.java.net/nonav/repository) of type legacy using the available factories
至此,demo已经成功运行。
4、遗留的问题如下:
* Producer用mvn打包失败,具体原因待查。
* props.put("group.id","jd-group");//消费组的概念待明确。
* intellij启动多个main后,console是直接TAB在一起的,根本不要显式切换。
* alt+1,直接打开projectViews.
* 点击左下角的正方形,可以打开或者关闭所有的侧边栏。
* 关于用0.8的client报jms的错误问题,有人说是因为它去maven2的仓库中了,所以要将repositories设置指向maven3的仓库。参考链接。
* 用intellij有时maven不会自动下载依赖,此时可以用cmd直接敲mvn compile命令试试。
* 在git命令中已经和远程仓库关联,但是在intellij中还不知道如何关联。
-------------------------------------------------------------------------------------------------
相关推荐
kafka-java-demo 基于java的kafka生产消费者示例。 mvn
kafka-java-demo 基于java的kafka生产消费者的例子。。。
可以运行的Spirng整合Kafka的Demo , 需要本机配置Kafka
最最最简单的SpringBoot中使用Kafka的Demo,就一个传递参数发送消息接受消息
KafkaConsumerDemo.java
使用java客户端, kafka-producer, kafka-consumer进行kafka的连接 注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息
apache-kafka-1.0.0 java Demo(附jar),只是简单的实现,没有使用连接池。
多次整理精简,得出基于maven 的springmvc 框架搭载 :多线程(线程池式)和 kafka(集群下生产者消费者);demo 下载导入改下kafka集群IP地址即可使用。
scala通过logstash发送日志到kafka的Demo 请注意设置好kafka的2个host.name参数,否则同样无法连接 请注意修改logback.xml文件的服务器地址与端口
本kafkaDemo,是为了简单部署kafka利用JAVA代码进行生产者生产消息和消费者消费消息进行更好的理解kafka
kafka java 生产消费程序 demo 示例 kafka 是吞吐量巨大的一个消息系统,它是用 scala 写的,和普通的消息的生产消费还有所不同,写了个 demo 程序供大家参考。kafka 的安装请参考官方文档。 首先我们需要新建一个 ...
记录一下,防止忘记记录一下,
这是使用java操作kafka consumer api的一个demo,欢迎下载交流,博客地址:https://blog.csdn.net/qq_26803795
java kafka 生产者/消费者demo。。。。。。。。。。。。
java-kafka-demo 请提前建造好zookeeper和kafka 分别启动目录里的java项目和go项目 然后通过即可使用kafka交换消息
#Kafka Java API案例Producer可选配置,如果不配置,则使用默认的partitioner根据key值value映射到指定的Parition props.put("partitioner.class", "kafka.demo.PartitionerDemo");ConsumerString key=(String)obj; ...
卡夫卡示例SSL代码,阿里云连接demo示例
Apache Kafka入门demo,用来理解Apache Kafka的基本原理,附带必要代码注释。基于maven的用Java api编写的Producer和Consumer;
kafka版本:kafka_2.11-0.9.0.1 kafka jar包版本:0.9.0.1 kafka集群:192.168.1.101,192.168.1.102,192.168.1.103 partition分发策略主要是自定义Partitioner的实现类,通过根据key和分区数量来实现
使用springboot整合kafka,并进行基于kafka的发布订阅消息队列模式的消息发布与消费测试。