0.Kafka服务器的配置
一个Broker,
一个Topic
Topic中只有一个Partition()
1. Producer:
package kafka.examples.producers; import kafka.producer.KeyedMessage; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import java.util.Properties; public class SimpleProducer { private static Producer<Integer, String> producer; private static final Properties props = new Properties(); ///ProducerConfig没有关于Zookeeper的配置信息 static { props.put("broker.list", "192.168.26.140:9092"); /*metadata.broker.list is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.*/ props.put("metadata.broker.list", "192.168.26.140:9092"); /*The serializer class for messages. The default encoder(kafka.serializer.DefaultEncoder) takes a byte[] and returns the same byte[].*/ props.put("serializer.class", "kafka.serializer.StringEncoder"); /**/ props.put("request.required.acks", "1"); producer = new Producer<Integer, String>(new ProducerConfig(props)); } public static void main(String[] args) { String topic = "learn.topic"; String messageStr = "This is a simple message from JavaAPI Producer2"; ///Key如何生成的? KeyedMessage<Integer, String> data = new KeyedMessage<Integer,String>(topic, messageStr); producer.send(data); producer.close(); } }
关于request.required.acks:
This value controls when a produce request is considered completed. Specifically, how many other brokers must have committed the data to their log and acknowledged this to the leader? Typical values are
- 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
- 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
- -1, The producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the greatest level of durability. However, it does not completely eliminate the risk of message loss because the number of in sync replicas may, in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting. Please read the Replication section of the design documentation for a more in-depth discussion.
关于KeyedMessage:
/** * A topic, key, and value. * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */ case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message) def this(topic: String, key: K, message: V) = this(topic, key, key, message) //分区键,如果没有,是什么行为 def partitionKey = { if(partKey != null) partKey else if(hasKey) key else null } def hasKey = key != null }
2. Consumer
package kafka.examples.consumers; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class SimpleHLConsumer { private final ConsumerConnector consumer; private final String topic; public SimpleHLConsumer(String zookeeper, String groupId, String topic) { ///Consumer的属性配置 Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); //consumer group id props.put("group.id", groupId); /* ZooKeeper session timeout. If the server fails to heartbeat to ZooKeeper within this period of time it is considered dead. If you set this too low the server may be falsely considered dead; if you set it too high it may take too long to recognize a truly dead server. */ props.put("zookeeper.session.timeout.ms", "500"); //默认6秒 ///How far a ZK follower can be behind a ZK leader.默认两秒 props.put("zookeeper.sync.time.ms", "250"); ///offset自动提交的时间间隔 props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void doConsume() { Map<String, Integer> topicCount = new HashMap<String, Integer>(); // Define single thread for topic topicCount.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); //KafkaStream是一个BlockingQueue List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); ///有几个线程,就会有几个Kafka Stream for (final KafkaStream stream : streams) { /** * An iterator that blocks until a value can be read from the supplied queue. * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown * */ ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); ///阻塞在hasNext等待消息到来 while (consumerIte.hasNext()) { System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message())); } } if (consumer != null) { consumer.shutdown(); } } public static void main(String[] args) { String topic = "learn.topic"; ////learn.topic.consumers.group是消费者群组,不需要预先定义,但是会记录到Zookeeper中 SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.26.140:2181", "learn.topic.consumers.group", topic); simpleHLConsumer.doConsume(); } }
3. 注意的问题:
因为Kafka服务器和Producer、Consumer不在同一个机器上,因此在配置Kafka中的Zookeeper连接信息以及server.properties中的host.name时,需要指定具体的IP,不能使用localhost
相关推荐
kafka_client_producer_consumer
kafka集群搭建,开启sasl认证,并通过Python调用Producer和Consumer
Kafka-Simple-Producer-Consumer:使用Java 8的kafka的生产者和消费者的简单变化
主要介绍了Springboot集成Kafka实现producer和consumer的示例代码,详细的介绍了什么是Kafka和安装Kafka以及在springboot项目中集成kafka收发message,感兴趣的小伙伴们可以参考一下
一本经典的kafka入门书籍。 Kafka is a distributed,partitioned,replicated commit logservice。...无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
Simple application demonstrate kafka java springboot
Windows下kafka安装配置,producer和consumer启动测试,SASL/PLAIN身份认证测试
kettle kafka 消费者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。
Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制 ,我拿第三种进行了 配置 。你可以直接下载 运行并测试
带有Rest URL的Kafka Producer和Consumer API的Spring Boot应用程序 生产者:将数据或消息发送到kafka服务器的应用程序 消息:一小段数据,即kafka的字节数组 使用者:数据的接收者,即从kafka服务器读取数据 Kafka...
1、安装包内容包括: 64位的JDK: jdk1.8.0_181_64bit.exe 3.2.1版本的kafka,Scala版本为2.13: kafka_2.13-3.2.1.tgz ...5、用命令创建producer和consumer,发布消息并消费 6、启动kafka和消费消息时的异常处理
kafka集群类型: single broker(单节点单boker集群,亦即kafka只启一个broker消息中间件服务,producer、consumer、broker均通过zookeeper集群交换消息,具体可参考:http://exp-blog.com/2018/08/03/pid-2187/
无论是Kafka集群,还是Producer和Consumer都依赖于Zookeeper集群保存一些meta信息,来保证系统可用性。 Kafka的特性包括高吞吐量、可持久化、可水平扩展、支持流数据处理等。它能够实时处理大量数据来满足需求,...
kafka的生产消费示例程序,其中数据生产时需要avro序列化,消费时需要反序列化
使用java客户端, kafka-producer, kafka-consumer进行kafka的连接 注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息
kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala
kafka从入门基础,深入理解、核心设计,包括:kafka结构中的broker\topic\Partition\Producer\Consumer等
通过xmind思维导图的方式对kafka框架总结, 包含producer和consumer程序
本文档提供了对Kafka这一分布式消息系统的全面解析,从基本概念到实际应用,涵盖了其在日志收集、消息系统、用户活动跟踪等方面的使用场景。首先介绍了Kafka的核心概念,如Broker、Topic、Producer、Consumer等,...