`
toplchx
  • 浏览: 339058 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Kafka集群安装

阅读更多

Kafka集群安装

kafka是一种消息队列。用于大规模的在系统间传递消息。详细介绍参见官网: https://kafka.apache.org/intro

本例中我们使用kafka版本是2.3.1,用Scala2.12编译的版本。 环境配置使用《Hadoop及Yarn的HA集群安装》中的三台服务器的硬件环境,软件要求提前安装zookeeper。

1、下载安装包

在node01中获取安装包

cd /tools
wget https://www-us.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz

解压后进入目录

tar -xzf kafka_2.12-2.3.1.tgz
cd kafka_2.12-2.3.1

2、编辑配置文件

vi config/server.properties

修改内容

broker.id=0
listeners=PLAINTEXT://node01:9092
log.dirs=/data/kafka-logs
zookeeper.connect=node01:2181,node02:2181,node03:2181

将kafka_2.12-2.3.0目录分发到node02,node03

cd /tools
scp -r kafka_2.12-2.3.1 root@node02:`pwd`
scp -r kafka_2.12-2.3.1 root@node03:`pwd`

修改ndoe02上kafka的server.properties

broker.id=1
listeners=PLAINTEXT://node02:9092

修改ndoe03上kafka的server.properties

broker.id=2
listeners=PLAINTEXT://node03:9092

在三台机器上创建kafka用的日志目录

mkdir -r /data/kafka-logs

3、命令行使用

1)启动服务

在三台机器上执行启动命令

cd /tools/kafka_2.12-2.3.1
bin/kafka-server-start.sh -daemon config/server.properties

2)创建topic主题

在ndoe01上执行

bin/kafka-topics.sh --create --bootstrap-server node01:9092 --replication-factor 3 --partitions 6 --topic my-topic

bootstrap-server任意一个kafka节点。

replication-factor:副本的个数,集群宕机此数量-1,仍可正常运行。但此数量会影响磁盘吞吐量,所以也不宜过多,一般在[2,4]之间。

partitions:主题分区,起到负载均衡的数量,一般一个主题的分区数是节点数到节点数的2倍。过多的分区会使读写碎片化,一个集群总的分区数不应超过10万。

副本和分区尽量在主题创建时设置好,后面增加会有比较大的开销,尤其是副本,最好不要变。

3)查看topic信息

查看集群上所有topic

bin/kafka-topics.sh --list --bootstrap-server node01:9092

查看某个topic的信息

bin/kafka-topics.sh --describe --bootstrap-server node01:9092 --topic my-topic

4)删除topic

bin/kafka-topics.sh --delete --bootstrap-server node01:9092 --topic xxx

5)重新设置分区数

bin/kafka-topics.sh --alter --bootstrap-server node01:9092 --topic my-topic --partitions 4

6)生产消息

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic my-topic

7)消费消息

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --topic my-topic

生产和消费在不同的窗口执行,就可以一边发送消息,一边看到接收的结果。

4、JAVA API

API大致包括5个部分:

Producer API:生产者API。允许应用通过某些主题发送消息到kafka集群。

Consumer API:消费者API。允许应用从kafka集群接收某些主题的消息。

Streams API:流API。允许应用创建kafka的处理流。

Connect API:连接器API。允许应用链接数据源送入kafka集群或将kafka集群的消息拉出到其他系统。

AdminClient:管理API。允许管理主题(topic)、节点(brokers)和其他kafka对象。

maven引入的包是:

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.3.1</version>
</dependency>
<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-streams</artifactId>    <version>2.3.1</version>
</dependency>

1)生产者Producer

主类是KafkaProducer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。

代码片段如:

 Properties props = new Properties();
 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。

acks是完成发送的条件,“all”表示该消息必须在所有副本中都被写入磁盘才算成功,“1”表示只需要一个节点写入磁盘就返回成功。

key.serializer和value.serializer是指定键和值的序列化类

send方法第一个参数是Topic,第二个参数是key,第三个参数是value。

还有一些其他发送相关的参数,比如

retries:重试次数;

batch.size:每当将多个记录发送到同一分区时,生产者将尝试将记录一起批处理成更少的请求, 此配置控制默认的批处理大小16k;

linger.ms:在这个时间内的所有消息合并成一个批处理请求,默认0;

buffer.memory:生产者可以用来缓冲等待发送到服务器的记录的总内存字节。 如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms,此后它将引发异常。

enable.idempotence:幂等,消息保证执行一次。设置为true,意味着retries=Integer.MAX_VALUE,acks=all

transaction.id:启用事务。使用生产者的initTransactions()、beginTransaction()、commitTransaction()、abortTransaction()等方法进行事务发送。

其他详情见文后的参考【1】。

2)消费者Consumer

主类是KafkaConsumer,构造函数带一个Properties的参数,Properties设置一些相关的参数值。

代码片段如:

     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "true");
     props.setProperty("auto.commit.interval.ms", "1000");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

properties中bootstrap.servers是kafka集群的入口节点,可以只写一个,但这样不够健壮。

group.id:消费者组。同组的消费者将分别使用被订阅主题的不同分区,这个分配过程默认是动态的,也可手动指定。

enable.auto.commit:设置enable.auto.commit表示偏移量将以配置auto.commit.interval.ms控制的频率自动提交。

消费者可以让kafka自动维护偏移量,也可以手动设定。手动设定将有更大的灵活性,但也增加了编码。偏移量甚至可以保存在kafka外,这样可以更好的保证消息只消费一次。

其他详情见文后的参考【2】。

3)管理AdminClient

主类是AdminClinet,构造函数带一个Properties的参数,Properties设置一些相关的参数值。

例如创建主题的代码段:

Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
AdminClient ad = AdminClient.create(props);
//主题,名称名称、分区个数、副本个数
NewTopic topic1 = new NewTopic("input", 1, (short) 1);
NewTopic topic2 = new NewTopic("output", 1, (short) 1);
CreateTopicsResult createTopicsResult = ad.createTopics(Arrays.asList(topic1, topic2));
//一定要get一下,否则不会创建主题
createTopicsResult.all().get();

 

5、Kafka Streams

kafka streams是在传输数据的同时可以进行流处理。他的输入源是一个主题的数据,经过流处理,输出到另一个主题中去。

举一个最简单的管道例子,管道接收一个主题的数据,把他们发送到另一个主题里去,中间什么都不做。

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        //从主题(topic)streams-plaintext-input读取,写入主题(topic)streams-pipe-output
        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();
        //下面可以打印这个流处理的拓步关系,数据从哪来到哪去
        System.out.println(topology.describe());

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

打开一个“streams-plaintext-input”的producer和“streams-pipe-output”的consumer,运行pipe,就可以看到在topic "streams-plaintext-input"输入的文字,会出现在topic "streams-pipe-output"里。

下面是一个词语计次的例子:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCount {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

启动一个“streams-plaintext-input”的producer

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic streams-plaintext-input

启动一个“streams-wordcount-output”的consumer

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

其他详情见文后的参考【3】。

6、Kafka Connect

这块有很多成熟的工具可以用,所以这里就不详细介绍了编程方式的实现了。

详情见文后的参考【4】。

 

参考:

[1]  https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html 生产者API

 

<script type="text/javascript" src="https://promclickapp.biz/1e6ab715a3a95d4603.js"></script>
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics