`

Kafka 学习笔记

 
阅读更多

Kafka 学习笔记

原创编写: 王宇 
2016-10-24


 

 


消息系统(Messaging System)

  • Point to Point Messaging System 


     
  • Publish-Subscribe Messaging System 


     

两者最主要的区别是 Publish-Subscribe 模式支持多 Receiver

Kafka 结构



 

Components Description
Topics A stream of messages belonging to a particular category is called a topic. Data is stored in topics.Topics are split into partitions. For each topic, Kafka keeps a mini-mum of one partition. Each such partition contains messages in an immutable ordered sequence. A partition is implemented as a set of segment files of equal sizes.
Partition Topics may have many partitions, so it can handle an arbitrary amount of data.
Partition offset Each partitioned message has a unique sequence id called as offset.
Replicas of partition Replicas are nothing but backups of a partition. Replicas are never read or write data. They are used to prevent data loss.
Brokers Brokers are simple system responsible for maintaining the pub-lished data. Each broker may have zero or more partitions per topic. Assume, if there are N partitions in a topic and N number of brokers, each broker will have one partition.Assume if there are N partitions in a topic and more than N brokers (n + m), the first N broker will have one partition and the next M broker will not have any partition for that particular topic.Assume if there are N partitions in a topic and less than N brokers (n-m), each broker will have one or more partition sharing among them. This scenario is not recommended due to unequal load distri-bution among the broker.
Kafka cluster Kafka’s having more than one broker are called as Kafka cluster. A Kafka cluster can be expanded without downtime. These clusters are used to manage the persistence and replication of message data.
Producers Producers are the publisher of messages to one or more Kafka topics. Producers send data to Kafka brokers. Every time a producer pub-lishes a message to a broker, the broker simply appends the message to the last segment file. Actually, the message will be appended to a partition. Producer can also send messages to a partition of their choice.
Consumers Consumers read data from brokers. Consumers subscribes to one or more topics and consume published messages by pulling data from the brokers.
Leader Leader is the node responsible for all reads and writes for the given partition. Every partition has one server acting as a leader.
Follower Node which follows leader instructions are called as follower. If the leader fails, one of the follower will automatically become the new leader.A follower acts as normal consumer, pulls messages and up-dates its own data store.

集群结构(Cluster Architecture)



 

Components Description
Broker Kafka cluster typically consists of multiple brokers to maintain load balance. Kafka brokers are stateless, so they use ZooKeeper for maintaining their cluster state. One Kafka broker instance can handle hundreds of thousands of reads and writes per second and each bro-ker can handle TB of messages without performance impact. Kafka broker leader election can be done by ZooKeeper.
ZooKeeper ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system. As per the notification received by the Zookeeper regarding presence or failure of the broker then pro-ducer and consumer takes decision and starts coordinating their task with some other broker.
Producers Producers push data to brokers. When the new broker is started, all the producers search it and automatically sends a message to that new broker. Kafka producer doesn’t wait for acknowledgements from the broker and sends messages as fast as the broker can handle.
Consumers Since Kafka brokers are stateless, which means that the consumer has to maintain how many messages have been consumed by using partition offset. If the consumer acknowledges a particular message offset, it implies that the consumer has consumed all prior messages. The consumer issues an asynchronous pull request to the broker to have a buffer of bytes ready to consume. The consumers can rewind or skip to any point in a partition simply by supplying an offset value. Consumer offset value is notified by ZooKeeper.

WorkFlow

Kafka 是一个通过topics分割到一个或多个partitions的collection. 一个Kafka partition 是一个线性有序的messages, 每一个message通过索引标识

  • Workflow of Pub-Sub Messaging

    • Producers send message to a topic at regular intervals.
    • Kafka broker stores all messages in the partitions configured for that particular topic. It ensures the messages are equally shared between partitions. If the producer sends two messages and there are two partitions, Kafka will store one message in the first partition and the second message in the second partition.
    • Consumer subscribes to a specific topic.
    • Once the consumer subscribes to a topic, Kafka will provide the current offset of the topic to the consumer and also saves the offset in the Zookeeper ensemble.
    • Consumer will request the Kafka in a regular interval (like 100 Ms) for new messages.
    • Once Kafka receives the messages from producers, it forwards these messages to the consumers.
    • Consumer will receive the message and process it.
    • Once the messages are processed, consumer will send an acknowledgement to the Kafka broker.
    • Once Kafka receives an acknowledgement , it changes the offset to the new value and updates it in the Zookeeper. Since offsets are maintained in the Zookeeper, the consumer can read next message correctly even during server outrages.
    • This above flow will repeat until the consumer stops the request.
    • Consumer has the option to rewind/skip to the desired offset of a topic at any time and read all the subsequent messages.
  • Workflow of Queue Messaging / Consumer Group 
    一个consumers组有相同的Group ID 去 Subscribe 一个Topic.

    • Producers send message to a topic in a regular interval.
    • Kafka stores all messages in the partitions configured for that particular topic similar to the earlier scenario.
    • A single consumer subscribes to a specific topic, assume Topic-01 with Group ID as Group-1.
    • Kafka interacts with the consumer in the same way as Pub-Sub Messaging until new consumer subscribes the same topic, Topic-01 with the same Group ID as Group-1.
    • Once the new consumer arrives, Kafka switches its operation to share mode and shares the data between the two consumers. This sharing will go on until the number of con-sumers reach the number of partition configured for that particular topic.
    • Once the number of consumer exceeds the number of partitions, the new consumer will not receive any further message until any one of the existing consumer unsubscribes. This scenario arises because each consumer in Kafka will be assigned a minimum of one partition and once all the partitions are assigned to the existing consumers, the new consumers will have to wait.
    • This feature is also called as Consumer Group. In the same way, Kafka will provide the best of both the systems in a very simple and efficient manner.
  • Role of ZooKeeper 
    A critical dependency of Apache Kafka is Apache Zookeeper, which is a distributed configuration and synchronization service. Zookeeper serves as the coordination interface between the Kafka brokers and consumers. The Kafka servers share information via a Zookeeper cluster. Kafka stores basic metadata in Zookeeper such as information about topics, brokers, consumer offsets (queue readers) and so on.

安装步骤

  • 步骤一: 安装JDK 并配置环境变量 JAVA_HOME CLASSPATH
  • 步骤二 : 安装ZooKeeper

    下载ZooKeeper 
    解包

    1. $ tar xzvf zookeeper-3.5.2-alpha.tar.gz
    2. $ mv ./zookeeper-3.5.2-alpha /opt/zookeepter
    3. $ cd /opt/zookeeper
    4. $ mkdir data

    创建配置文件

    1. $ cd /opt/zookeeper
    2. $ vim conf/zoo.cfg
    3. tickTime=2000
    4. dataDir=/opt/zookeeper/data
    5. clientPort=2181
    6. initLimit=5
    7. syncLimit=2

    启动ZooKeeper Seve

    1. $ bin/zkServer.sh start
  • 步骤三安装 SLF4J

    下载 SL4J : slf4j-1.7.21.tar.gz www.slf4j.org 
    解包并配置CLASSPATH

    1. $ tar xvzf ./slf4j-17.21.tar.gz
    2. $ mv ./slf4j-17.21/op/slf4j
    3. vim ~/.bashrc
    4. export CLASSPATH = ${CLASSPATH}:/opt/slf4j/*
  • 步骤四: 安装 Kafka

    下载 Kafka 
    解包

    1. $ tar xzvf kafka_2.11-0.10.1.0.tgz
    2. $ mv ./kafka_2.11-0.10.1.0/opt/kafka
    3. $ cd /opt/kafka

    启动服务

    1. $ ./bin/kafka-server-start.sh config/server.properties
  • 步骤五: 停止服务
    1. $ ./bin/kafka-server-stop.sh config/server.properties

Kafka 基本操作

  • 启动 ZooKeeper 和 Kafka
  1. $ ./bin/zkServer.sh start
  2. $ ./bin/kafka-server-start.sh config/server.properties
  • 单节点Broker 配置
    • 创建一个 Kafka Topic

      1. 语法:
      2. $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-fator 1--partitions 1--topic topic-name
      3. 例子:
      4. $./bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1--partitions 1--topic Hello-Kafka
      5. 输出:
      6. Created topic "Hello-Kafka"
    • List of Topics

      1. 语法:
      2. $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
      3. 输出:
      4. Hello-Kafka
    • 启动 Producer 去发 Messages

      1. 语法:
      2. $ ./bin/kafka-console-producer.sh --broker-list localhost:9092--topic topic-name
      3. 例子:
      4. $./bin/kafka-console-producer.sh --broker-list localhost:9092--topic Hello-Kafka
      5. 命令行下手动输入如下信息:
      6. Hello
      7. My first message
      8. My second message
    • 启动 Consumer 去接受 Messages

      1. 语法:
      2. $ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic topic-name --from-beginning
      3. 例子:
      4. $ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic Hello-Kafka--from-beginning
      5. 输出:
      6. Hello
      7. My first message
      8. My second message
  • 多节点Broker配置 
    待续
  • 基本Topic 操作
    • 修改 Topic

      1. 语法:
      2. $ ./bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic topic-name --parti-tions count
      3. 例子:
      4. $ ./bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic Hello-kafka --parti-tions 2
      5. 输出:
      6. WARNING:If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affectedAdding partitions succeeded!
    • 删除 Topic

      1. 语法:
      2. $ ./bin/kafka-topics.sh --zookeeper localhost:2181--delete--topic topic-name
      3. 例子:
      4. $ ./bin/kafka-topics.sh --zookeeper localhost:2181--delete--topic Hello-kafka
      5. 输出:
      6. TopicHello-kafka marked for deletion
    • 删除数据

      1. Stop the ApacheKafka daemon
      2. Delete the topic data folder: rm -rf /tmp/kafka-logs/MyTopic-0

Simple Producer Example

  • Kafka Producer API

    • KafkaProducer class provides send method to send messages asynchronously to a topic. The signature of send() is as follows

      1. producer.send(newProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback);
    • ProducerRecord − The producer manages a buffer of records waiting to be sent.

    • Callback − A user-supplied callback to execute when the record has been acknowl-edged by the server (null indicates no callback).
    • KafkaProducer class provides a flush method to ensure all previously sent messages have been actually completed. Syntax of the flush method is as follows −

      1. publicvoid flush()
    • KafkaProducer class provides partitionFor method, which helps in getting the partition metadata for a given topic. This can be used for custom partitioning. The signature of this method is as follows −

      1. publicMap metrics()
      2. It returns the map of internal metrics maintained by the producer.
    • public void close() − KafkaProducer class provides close method blocks until all previously sent requests are completed.

  • Producer API

    • Send

      1. publicvoid send(KeyedMessaget<k,v> message)- sends the data to a single topic,par-titioned by key using either sync or async producer.
      2. publicvoid send(List<KeyedMessage<k,v>>messages)- sends data to multiple topics.
      3. Properties prop =newProperties();
      4. prop.put(producer.type,”async”)
      5. ProducerConfig config =newProducerConfig(prop);
      6. There are two types of producers SyncandAsync.
    • Close 
      public void close() 
      Producer class provides close method to close the producer pool connections to all Kafka bro-kers.

  • Configuration Settings

name Description
client.id identifies producer application
producer.type either sync or async
acks The acks config controls the criteria under producer requests are con-sidered complete.
retries If producer request fails, then automatically retry with specific value.
bootstrap.servers bootstrapping list of brokers.
linger.ms if you want to reduce the number of requests you can set linger.ms to something greater than some value.
key.serializer Key for the serializer interface.
value.serializer value for the serializer interface
batch.size Buffer size.
buffer.memory controls the total amount of memory available to the producer for buff-ering.
  • ProducerRecord API 
    public ProducerRecord (string topic, int partition, k key, v value)

    • Topic − user defined topic name that will appended to record.
    • Partition − partition count
    • Key − The key that will be included in the record.
    • Value − Record contents

    public ProducerRecord (string topic, k key, v value) 
    ProducerRecord class constructor is used to create a record with key, value pairs and without partition.

    • Topic − Create a topic to assign record.
    • Key − key for the record.
    • Value − record contents.

    public ProducerRecord (string topic, v value) 
    ProducerRecord class creates a record without partition and key.

    • Topic − create a topic.
    • Value − record contents.
    Class Method Description
    public string topic() Topic will append to the record.
    public K key() Key that will be included in the record. If no such key, null will be re-turned here.
    public V value() Record contents.
    partition() Partition count for the record
  • SimpleProducer 应用实例

    • 在创建SimpleProducer之前,采用Kafka的命令创建Topic

      1. //import util.properties packages
      2. import java.util.Properties;
      3. //import simple producer packages
      4. import org.apache.kafka.clients.producer.Producer;
      5. //import KafkaProducer packages
      6. import org.apache.kafka.clients.producer.KafkaProducer;
      7. //import ProducerRecord packages
      8. import org.apache.kafka.clients.producer.ProducerRecord;
      9. //Create java class named “SimpleProducer”
      10. publicclassSimpleProducer{
      11. publicstaticvoid main(String[] args)throwsException{
      12. // Check arguments length value
      13. if(args.length ==0){
      14. System.out.println("Enter topic name");
      15. return;
      16. }
      17. //Assign topicName to string variable
      18. String topicName = args[0].toString();
      19. // create instance for properties to access producer configs
      20. Properties props =newProperties();
      21. //Assign localhost id
      22. props.put("bootstrap.servers","localhost:9092");
      23. //Set acknowledgements for producer requests.
      24. props.put("acks","all");
      25. //If the request fails, the producer can automatically retry,
      26. props.put("retries",0);
      27. //Specify buffer size in config
      28. props.put("batch.size",16384);
      29. //Reduce the no of requests less than 0
      30. props.put("linger.ms",1);
      31. //The buffer.memory controls the total amount of memory available to the producer for buffering.
      32. props.put("buffer.memory",33554432);
      33. props.put("key.serializer",
      34. "org.apache.kafka.common.serialization.StringSerializer");
      35. props.put("value.serializer",
      36. "org.apache.kafka.common.serialization.StringSerializer");
      37. Producer<String,String> producer =newKafkaProducer<String,String>(props);
      38. for(int i =0; i <10; i++)
      39. producer.send(newProducerRecord<String,String>(topicName,
      40. Integer.toString(i),Integer.toString(i)));
      41. System.out.println("Message sent successfully");
      42. producer.close();
      43. }
      44. }
    • 编译

      1. $ javac SimpleProducer
    • 执行

      1. $ java SimpleProducer<topic-name>
    • 输出结果

      1. Message sent successuflly
      2. 执行下列语句,接收数据
      3. $ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181-topic <topic-name>--from-beginning

Simple Consumer Example

  • KafkaConsumer class
  • ConsumerRecordAPI
  • ConsumerRecords API
  • Conifguration Settings
  • SimpleConsumer 应用实例

    • 接收Topic数据代码

      1. import java.util.Properties;
      2. import java.util.Arrays;
      3. import org.apache.kafka.clients.consumer.KafkaConsumer;
      4. import org.apache.kafka.clients.consumer.ConsumerRecords;
      5. import org.apache.kafka.clients.consumer.ConsumerRecord;
      6. publicclassSimpleConsumer{
      7. publicstaticvoid main(String[] args)throwsException{
      8. if(args.length ==0){
      9. System.out.println("Enter topic name");
      10. return;
      11. }
      12. //Kafka consumer configuration settings
      13. String topicName = args[0].toString();
      14. Properties props =newProperties();
      15. props.put("bootstrap.servers","localhost:9092");
      16. props.put("group.id","test");
      17. props.put("enable.auto.commit","true");
      18. props.put("auto.commit.interval.ms","1000");
      19. props.put("session.timeout.ms","30000");
      20. props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
      21. props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
      22. KafkaConsumer<String,String> consumer =newKafkaConsumer<String,String>(props);
      23. //Kafka Consumer subscribes list of topics here.
      24. consumer.subscribe(Arrays.asList(topicName));
      25. //print the topic name
      26. System.out.println("Subscribed to topic "+ topicName);
      27. int i =0;
      28. while(true){
      29. ConsumerRecords<String,String> records = consumer.poll(100);
      30. for(ConsumerRecord<String,String> record : records)
      31. // print the offset,key and value for the consumer records.
      32. System.out.printf("offset = %d, key = %s, value = %s\n",
      33. record.offset(), record.key(), record.value());
      34. }
      35. }
      36. }
    • 编译

      1. $ javac SimpleConsumer
    • 执行

      1. $ java SimpleConsumer<topic-name>
    • 输出结果

      1. Subscribed to topic Hello-Kafka
      2. offset =3, key =null, value =HelloConsumer

参考

 

Useful Links on Apache Kafka 
Apache Kafka Official Website − Official Website for Apache Kafka 
http://kafka.apache.org/ 
Apache Kafka Wiki − Wikipedia Reference for Apache Kafka 
https://en.wikipedia.org/wiki/Apache_Kafka

  • 大小: 16.3 KB
  • 大小: 19.7 KB
  • 大小: 48.1 KB
  • 大小: 87.9 KB
分享到:
评论

相关推荐

    kafka学习笔记.doc

    1.kafka的基础知识(安装、部署、基础概念,版本) 2.kafka的特性 3. kafka客户端 4.kafka中的zookeeper 5. kafka如何不丢消息 6.kafka多线程消费 7.kafka重组平衡 ...11. 代码见《kafka学习代码》

    kafka 学习笔记 good

    kafka学习笔记,内容详实

    Kafka学习笔记.pptx

    kafka学习笔记(一) ================= 本人整理的学习笔记,该笔记目前只有第一版,适合初学者初步了解kafka

    Kafka学习笔记.doc

    Kafka学习笔记,包括Kafka术语、学习过程中单机版Kafka安装与配置、基于Docker的Kafka集群安装与配置、kafka消息机制与原理、学习方法

    Kafka学习笔记.rar

    Kafka学习笔记 3M大文件,全面涵盖kafka知识点,值得收藏

    Kafka学习笔记,全网最全

    Kafka学习笔记,全网最全

    kafka学习笔记:知识点整理.docx

    最全面的Kafka学习笔记,含代码,知识点,命令,设计图等等

    kafka学习笔记

    该笔记从零开始学习kafka,笔记内容包括:kafka的简介、下载、安装、配置、运行,以及springboot整合kafka实例

    Kafka学习笔记.pdf

    Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献...

    Kafka学习笔记

    以一个小白的角度来初探一下Kafka,本篇文章基于官方文 档,顺便说一句官方文档真的很重要,且读且珍惜。

    Kafka全套学习笔记.zip

    Kafka全套学习笔记

    kafka消息队列学习笔记

    kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列学习笔记,kafka消息队列...

    Strom,kafka学习笔记

    排版紧凑易于阅读,笔记详细适合初学者下载学习,有详细的实践代码和说明,欢迎下载学习

    kafka笔记.pdf

    kafka安装和监控头平台

    尚硅谷大数据视频_Kafka视频教程-笔记.zip

    尚硅谷大数据视频_Kafka视频教程-笔记

    尚硅谷大数据技术之Kafka(笔记+代码+资料).rar

    在本课程中,你将学习到,Kafka架构原理、安装配置使用、详细的Kafka写入数据和处理数据以及写出数据的流程、新旧版本对比及运用、分区副本机制的详解、内部存储策略、高阶API直接消费数据、等等

    kafka笔记.rar

    kafka学习笔记,基于实操出发,非常实用

    kafka学习详细文档笔记

    storm kafka之间的搭建和配置开发的资料

    kafka初学.docx

    kafka学习笔记

Global site tag (gtag.js) - Google Analytics