`
liyonghui160com
  • 浏览: 761125 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

kafka_2.9.2-0.8.1.1分布式集群搭建代码开发实例

阅读更多

 

准备3台虚拟机, 系统是RHEL64服务版.
1) 每台机器配置如下:

$ cat /etc/hosts

    # zookeeper hostnames: 
    192.168.8.182       zk1 
    192.168.8.183       zk2 
    192.168.8.184       zk3 


2) 每台机器上安装jdk, zookeeper, kafka, 配置如下:

$ vi /etc/profile
     
    # jdk, zookeeper, kafka 
    export KAFKA_HOME=/usr/local/lib/kafka/kafka_2.9.2-0.8.11 
    export ZK_HOME=/usr/local/lib/zookeeper/zookeeper-3.4.6 
    export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar 
    export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$KAFKA_HOME/bin:$ZK_HOME/bin:$PATH 

3) 每台机器上运行:

$ source /etc/profile

$ mkdir -p /var/lib/zookeeper

$ cd $ZK_HOME/conf

$ cp zoo_sample.cfg zoo.cfg

$ vi zoo.cfg
     
    dataDir=/var/lib/zookeeper 
     
    # the port at which the clients will connect 
    clientPort=2181 
     
    # zookeeper cluster 
    server.1=zk1:2888:3888 
    server.2=zk2:2888:3888 
    server.3=zk3:2888:3888 

4) 每台机器上生成myid:

zk1:

$ echo "1" > /var/lib/zookeeper/myid

zk2:

$ echo "2" > /var/lib/zookeeper/myid

zk3:

$ echo "3" > /var/lib/zookeeper/myid
5) 每台机器上运行setup关闭防火墙

Firewall:

[   ] enabled
6) 每台机器上启动zookeeper:

$ zkServer.sh start

查看状态:

$ zkServer.sh status


1)下载KAFKA

    $ wget http://apache.fayea.com/apache-mirror/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

安装和配置参考上一篇文章:

http://blog.csdn.net/ubuntu64fan/article/details/26678877

2)配置$KAFKA_HOME/config/server.properties

我们安装3个broker,分别在3个vm上:zk1,zk2,zk3:

zk1:

$ vi /etc/sysconfig/network

    NETWORKING=yes 
    HOSTNAME=zk1 


$ vi $KAFKA_HOME/config/server.properties

    broker.id=0 
    port=9092 
    host.name=zk1 
    advertised.host.name=zk1 
    ... 
    num.partitions=2 
    ... 
    zookeeper.contact=zk1:2181,zk2:2181,zk3:2181 

zk2:

$ vi /etc/sysconfig/network

    NETWORKING=yes 
    HOSTNAME=zk2 


$ vi $KAFKA_HOME/config/server.properties

    broker.id=1 
    port=9092 
    host.name=zk2 
    advertised.host.name=zk2 
    ... 
    num.partitions=2 
    ... 
    zookeeper.contact=zk1:2181,zk2:2181,zk3:2181 


zk3:

$ vi /etc/sysconfig/network

    NETWORKING=yes 
    HOSTNAME=zk3 


$ vi $KAFKA_HOME/config/server.properties

    broker.id=2 
    port=9092 
    host.name=zk3 
    advertised.host.name=zk3 
    ... 
    num.partitions=2 
    ... 
    zookeeper.contact=zk1:2181,zk2:2181,zk3:2181 


3)启动zookeeper服务, 在zk1,zk2,zk3上分别运行:

$ zkServer.sh start
4)启动kafka服务, 在zk1,zk2,zk3上分别运行:

$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
5) 新建一个TOPIC(replication-factor=num of brokers)

$ kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181
6)假设我们在zk2上,开一个终端,发送消息至kafka(zk2模拟producer)

$ kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test

在发送消息的终端输入:Hello Kafka

7)假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)

$ kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
在消费消息的终端显示:Hello Kafka

项目准备开发

项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版

本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.

  

 <dependencies> 
        <dependency> 
            <groupId>log4j</groupId> 
            <artifactId>log4j</artifactId> 
            <version>1.2.14</version> 
        </dependency> 
        <dependency> 
            <groupId>org.apache.kafka</groupId> 
            <artifactId>kafka_2.8.2</artifactId> 
            <version>0.8.0</version> 
            <exclusions> 
                <exclusion> 
                    <groupId>log4j</groupId> 
                    <artifactId>log4j</artifactId> 
                </exclusion> 
            </exclusions> 
        </dependency> 
        <dependency> 
            <groupId>org.scala-lang</groupId> 
            <artifactId>scala-library</artifactId> 
            <version>2.8.2</version> 
        </dependency> 
        <dependency> 
            <groupId>com.yammer.metrics</groupId> 
            <artifactId>metrics-core</artifactId> 
            <version>2.2.0</version> 
        </dependency> 
        <dependency> 
            <groupId>com.101tec</groupId> 
            <artifactId>zkclient</artifactId> 
            <version>0.3</version> 
        </dependency> 
    </dependencies>  

 


Producer端代码

    1) producer.properties文件:此文件放在/resources目录下


  

 #partitioner.class= 
    ##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata 
    ##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来 
    ##此值,我们可以在spring中注入过来 
    ##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093 
    ##,127.0.0.1:9093 
    ##同步,建议为async 
    producer.type=sync 
    compression.codec=0 
    serializer.class=kafka.serializer.StringEncoder 
    ##在producer.type=async时有效 
    #batch.num.messages=100  

 

    2) KafkaProducerClient.java代码样例


  

 import java.util.ArrayList; 
    import java.util.Collection; 
    import java.util.List; 
    import java.util.Properties; 
     
    import kafka.javaapi.producer.Producer; 
    import kafka.producer.KeyedMessage; 
    import kafka.producer.ProducerConfig; 
     
    public class KafkaProducerClient { 
     
        private Producer<String, String> inner; 
         
        private String brokerList;//for metadata discovery,spring setter 
        private String location = "kafka-producer.properties";//spring setter 
         
        private String defaultTopic;//spring setter 
     
        public void setBrokerList(String brokerList) { 
            this.brokerList = brokerList; 
        } 
     
        public void setLocation(String location) { 
            this.location = location; 
        } 
     
        public void setDefaultTopic(String defaultTopic) { 
            this.defaultTopic = defaultTopic; 
        } 
     
        public KafkaProducerClient(){} 
         
        public void init() throws Exception { 
            Properties properties = new Properties(); 
            properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location)); 
             
             
            if(brokerList != null) { 
                properties.put("metadata.broker.list", brokerList); 
            } 
     
            ProducerConfig config = new ProducerConfig(properties); 
            inner = new Producer<String, String>(config); 
        } 
     
        public void send(String message){ 
            send(defaultTopic,message); 
        } 
         
        public void send(Collection<String> messages){ 
            send(defaultTopic,messages); 
        } 
         
        public void send(String topicName, String message) { 
            if (topicName == null || message == null) { 
                return; 
            } 
            KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); 
            inner.send(km); 
        } 
     
        public void send(String topicName, Collection<String> messages) { 
            if (topicName == null || messages == null) { 
                return; 
            } 
            if (messages.isEmpty()) { 
                return; 
            } 
            List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); 
            int i= 0; 
            for (String entry : messages) { 
                KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); 
                kms.add(km); 
                i++; 
                if(i % 20 == 0){ 
                    inner.send(kms); 
                    kms.clear(); 
                } 
            } 
             
            if(!kms.isEmpty()){ 
                inner.send(kms); 
            } 
        } 
     
        public void close() { 
            inner.close(); 
        } 
     
        /**
         * @param args
         */ 
        public static void main(String[] args) { 
            KafkaProducerClient producer = null; 
            try { 
                producer = new KafkaProducerClient(); 
                //producer.setBrokerList(""); 
                int i = 0; 
                while (true) { 
                    producer.send("test-topic", "this is a sample" + i); 
                    i++; 
                    Thread.sleep(2000); 
                } 
            } catch (Exception e) { 
                e.printStackTrace(); 
            } finally { 
                if (producer != null) { 
                    producer.close(); 
                } 
            } 
     
        } 
     
    } 

 

 
Consumer端

     1) consumer.properties:文件位于/resources目录下

 ## 此值可以配置,也可以通过spring注入 
    ##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 
    ##,127.0.0.1:2182,127.0.0.1:2183 
    # timeout in ms for connecting to zookeeper 
    zookeeper.connectiontimeout.ms=1000000 
    #consumer group id 
    group.id=test-group 
    #consumer timeout 
    #consumer.timeout.ms=5000 
    auto.commit.enable=true 
    auto.commit.interval.ms=60000  

 

    2) KafkaConsumerClient.java代码样例


  

 package com.test.kafka; 
    import java.nio.ByteBuffer; 
    import java.nio.CharBuffer; 
    import java.nio.charset.Charset; 
    import java.util.HashMap; 
    import java.util.List; 
    import java.util.Map; 
    import java.util.Properties; 
    import java.util.concurrent.ExecutorService; 
    import java.util.concurrent.Executors; 
     
    import kafka.consumer.Consumer; 
    import kafka.consumer.ConsumerConfig; 
    import kafka.consumer.ConsumerIterator; 
    import kafka.consumer.KafkaStream; 
    import kafka.javaapi.consumer.ConsumerConnector; 
    import kafka.message.Message; 
    import kafka.message.MessageAndMetadata; 
     
    public class KafkaConsumerClient { 
     
        private String groupid; //can be setting by spring 
        private String zkConnect;//can be setting by spring 
        private String location = "kafka-consumer.properties";//配置文件位置 
        private String topic; 
        private int partitionsNum = 1; 
        private MessageExecutor executor; //message listener 
        private ExecutorService threadPool; 
         
        private ConsumerConnector connector; 
         
        private Charset charset = Charset.forName("utf8"); 
     
        public void setGroupid(String groupid) { 
            this.groupid = groupid; 
        } 
     
        public void setZkConnect(String zkConnect) { 
            this.zkConnect = zkConnect; 
        } 
     
        public void setLocation(String location) { 
            this.location = location; 
        } 
     
        public void setTopic(String topic) { 
            this.topic = topic; 
        } 
     
        public void setPartitionsNum(int partitionsNum) { 
            this.partitionsNum = partitionsNum; 
        } 
     
        public void setExecutor(MessageExecutor executor) { 
            this.executor = executor; 
        } 
     
        public KafkaConsumerClient() {} 
     
        //init consumer,and start connection and listener 
        public void init() throws Exception { 
            if(executor == null){ 
                throw new RuntimeException("KafkaConsumer,exectuor cant be null!"); 
            } 
            Properties properties = new Properties(); 
            properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location)); 
             
            if(groupid != null){ 
                properties.put("groupid", groupid); 
            } 
            if(zkConnect != null){ 
                properties.put("zookeeper.connect", zkConnect); 
            } 
            ConsumerConfig config = new ConsumerConfig(properties); 
     
            connector = Consumer.createJavaConsumerConnector(config); 
            Map<String, Integer> topics = new HashMap<String, Integer>(); 
            topics.put(topic, partitionsNum); 
            Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics); 
            List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic); 
            threadPool = Executors.newFixedThreadPool(partitionsNum * 2); 
             
            //start 
            for (KafkaStream<byte[], byte[]> partition : partitions) { 
                threadPool.execute(new MessageRunner(partition)); 
            } 
        } 
     
        public void close() { 
            try { 
                threadPool.shutdownNow(); 
            } catch (Exception e) { 
                // 
            } finally { 
                connector.shutdown(); 
            } 
     
        } 
     
        class MessageRunner implements Runnable { 
            private KafkaStream<byte[], byte[]> partition; 
     
            MessageRunner(KafkaStream<byte[], byte[]> partition) { 
                this.partition = partition; 
            } 
     
            public void run() { 
                ConsumerIterator<byte[], byte[]> it = partition.iterator(); 
                while (it.hasNext()) { 
                    // connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用 
                    MessageAndMetadata<byte[], byte[]> item = it.next(); 
                    try{ 
                        executor.execute(new String(item.message(),charset));// UTF-8,注意异常 
                    }catch(Exception e){ 
                        // 
                    } 
                } 
            } 
             
            public String getContent(Message message){ 
                ByteBuffer buffer = message.payload(); 
                if (buffer.remaining() == 0) { 
                    return null; 
                } 
                CharBuffer charBuffer = charset.decode(buffer); 
                return charBuffer.toString(); 
            } 
        } 
     
        public static interface MessageExecutor { 
     
            public void execute(String message); 
        } 
     
        /**
         * @param args
         */ 
        public static void main(String[] args) { 
            KafkaConsumerClient consumer = null; 
            try { 
                MessageExecutor executor = new MessageExecutor() { 
     
                    public void execute(String message) { 
                        System.out.println(message); 
                    } 
                }; 
                consumer = new KafkaConsumerClient(); 
                 
                consumer.setTopic("test-topic"); 
                consumer.setPartitionsNum(2); 
                consumer.setExecutor(executor); 
                consumer.init(); 
            } catch (Exception e) { 
                e.printStackTrace(); 
            } finally { 
                 if(consumer != null){ 
                     consumer.close(); 
                 } 
            } 
     
        } 
     
    }  

 

    需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.

    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

 

详细java api使用见:

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

 

 推荐更详细的文章

http://blog.csdn.net/zhongwen7710/article/details/41252649

http://blog.csdn.net/lizhitao/article/details/39499283

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics