`
357029540
  • 浏览: 725851 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

在kafka单服务器的情况下消费者端如何确定服务器是否启动

阅读更多

            在kafka集群的单服务器情况下,如何通过consumer消费者确定kafka服务器或者zookeeper服务器是否启动(因为消费者目前是无法判断服务器是否启动的,它只是去轮询获取服务器数据而不报错),如果没有启动,消费者端做出相应的操作来提醒消费者端使用人员进行维护,在这里我提供一个简单的解决方案,可能并不是非常通用,提供一个简单的思路而已。

         如果kafka服务器或者zookeeper服务器没有启动,在producer生产者端向服务器发送信息的时候会出现错误(org.apache.kafka.common.errors.TimeoutException),我使用的kafka的maven版本是0.11.0.0的,当有异常错误出现的时候我们可以在生产者端使用一个静态变量来变更记录这个状态,同时在生产者端提供一个接口以供消费者端调用;在consumer消费者端我们提供一个定时任务,如果消费者端在规定的时间里面没有从kafka服务器获取到数据,定义一个静态变量去记录获取数据状态,那么在定时任务里面就会根据这个静态变量的值决定是否去调用producer消费者端的接口判断接口返回的状态情况,如果返回的状态表示服务器没有启动则消费者端做出相应的操作。如果在集群多服务器的情况下,客户端可以根据订阅的主题topic判断哪些服务器是否存活的,下面提供了生产者producer和消费者consumer的main方法,并没有按照上面的叙述去完成接口,如果需要自己去实现。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.11.0.0</version>
</dependency>
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author liaoyubo
 * @version 1.0 2017/7/25
 * @description
 */
public class KafkaProducerTest {

    public static void main(String [] args){
        Properties properties = new Properties();
        //properties.put("zookeeper.connect","localhost:2181");
        properties.put("bootstrap.servers", "192.168.201.190:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<String, String>(properties);

        for(int i = 0;i < 10;i++){
            Future<RecordMetadata> futureRecordMetadata =  producer.send(new ProducerRecord<String, String>("myTopic",Integer.toString(i),Integer.toString(i)));
            try {
                futureRecordMetadata.get(3, TimeUnit.SECONDS);
                System.out.println("发送的message:"+i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                if(e.getMessage().split(":")[0].split("\\.")[5].equals("TimeoutException")){
                    System.out.println("无法连接到服务器");
                }
                e.printStackTrace();
            } catch (TimeoutException e) {
                System.out.println("无法连接到服务器");
                e.printStackTrace();
            }
        }

        producer.close();

    }

}

 

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * @author liaoyubo
 * @version 1.0 2017/7/26
 * @description
 */
public class KafkaConsumerClientTest {

    public static void main(String [] args){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.201.190:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

        consumer.subscribe(Arrays.asList("myTopic","myTest"));
        while (true){
            ConsumerRecords<String,String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                //int partition = record.partition();
                String topic = record.topic();
                List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
                for (PartitionInfo partitionInfo : partitionInfoList){
                    Node node = partitionInfo.leader();
                    System.out.println(node.host());
                    //获取存活的服务器
                    Node [] nodes = partitionInfo.replicas();
                }
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

}

 

0
0
分享到:
评论

相关推荐

    go_kafka:去卡夫卡

    make kafka 制作工具(发布者和消费者) make tools 启动zookeeper、Kafka服务器有关 Kafka 的更多信息,请参阅: : 工具 启动一个消费者: $GOPATH/bin/consumer -topic test -consumeforever Consu

    kafka-sample-programs:Kafka 0.9 API的示例程序

    Kafka 0.9 API的样例程序该项目提供了一个简单但现实的卡夫卡生产者和消费者的例子。 这些程序以一种样式和一个比例尺编写,使您可以对其进行调整以使它们接近生产样式。 带有0.9.0的新Kafka API缺少大量示例,这很...

    reactor-kafka:带有 Reactor 的React式 Kafka 驱动程序

    设置 Kafka 集群并创建主题: 启动 Zookeeper 和 Kafka 服务器创建主题“演示主题”运行示例生成器: 如果需要,更新 SampleProducer.java 中的 BOOTSTRAP_SERVERS 和 TOPIC 编译并运行 reactor.kafka.samples....

    kafkaApp:一个与kafka连接并生成和使用数据的示例应用程序,并使用Django框架在仪表板上显示分析

    启动Kafka服务器 nohup ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties &gt; ~/kafka/kafka.log 2&gt;&1 & 在默认端口27017上启动MongoDB mongod --dbpath "$PWD" 设置虚拟环境 virtualenv venv...

    kafka-mongo:一个非常简单的项目,显示kafka队列中的消息使用情况以及对MongoDB的添加

    启动: 启动mongo: mongod 启动Kafka服务器: kafka-server-start /usr/local/etc/kafka/server.properties 创建Kafka主题: kafka-topics --list --zookeeper localhost --topic metamorphosis 启动生产者: ...

    vertx-kafka-integration:示例Vert.X到Kafka应用程序

    消费者正在监听队列并处理消息。 这些被建模为工作人员垂直。 项目配置作为json传入 依存关系 Kafka(有关安装说明,请访问: : ) 在本地运行 通过运行IntelliJ中的Main类来启动项目。 未来的工作 K8部署

    elixir-kafka-starter:使用Cowboy + Plug + Kafka_ex的示例Dockerized项目

    默认情况下,启动时会禁用Kafka连接。 从Kafka连接开始: 在config.exs ,将brokers设置为您的代理主机,并启用默认的kafka_ex worker disable_default_worker: false 如果使用入门项目-不要忘记用您的项目名称...

    大二上-大数据概论实验报告

    4. 生产者消费者模型 实验三 Python 数据分析 【实验内容】 利用机器学习算法构建模型,根据鸢尾花的花萼和花瓣大小,区分鸢尾花的品种。实现一个基础的三分类问题。 实验四 大数据可视化工具 ECharts 【实验内容】...

    proKaa:将文本和Protobuf消息发布到Kafka

    :cherry_blossom: 受BloomRPC启发ProKaa旨在为测试Kafka消费者提供最简单有效的开发人员体验。 安装客户端,选择您的protobuf文件,然后开始将消息推送到kafka! 无需额外的步骤或配置。贡献Prokaa是一个电子应用...

    带有 Reactor 的反应式 Kafka 驱动程序_React_driver_驱动_下载

    启动 Zookeeper 和 Kafka 服务器 创建主题“演示主题” 运行示例生成器: 如果需要,更新 SampleProducer.java 中的 BOOTSTRAP_SERVERS 和 TOPIC 编译并运行 reactor.kafka.samples.SampleProducer(例如,从 IDE ...

    BitsCounterKafka:从流“ 0”和“ 1”计算出“ 1”之和。 卡夫卡生产者->卡夫卡-> Apache flink(消费者)->控制台输出

    卡夫卡生产者-&gt;卡夫卡-&gt; Apache flink(消费者)-&gt;控制台输出如何使用: 在终端中启动zookeeper服务器: $ cd BitsCounterKafka/kafka_2.11-1.1.0/./bin/zookeeper-server-start.sh config/zookeeper.properties 在...

    示例:Apache Kafka和Confluent Platform示例和演示

    最好的演示从开始,该示例在端到端流ETL管道中使用ksqlDB启动使用ksqlDB进行流处理的Kafka事件流应用程序,并启用了源连接器,该连接器从实时数据和水槽连接器连接到Elasticsearch和Kibana以进行可视化。 cp-demo还...

    分布式协调工具-ZooKeeper实现动态负载均衡

    在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费。 命名服务(Naming Service) 命名服务也是分布式...

    4399大数据笔试题.pdf

    4399⼤数据笔试题 今天晚上参加了厦门 今天晚上参加了厦门4399公司的⼤数据笔试,⾃⼰没有拍下题⽬,⼀下是根据⾃⼰在草稿纸上简要记录回忆下 公司的⼤数据笔试,⾃⼰没有拍下题⽬,⼀下是根据⾃⼰在草稿纸上简要...

    GDELT-Graphql-Analysis:使用GraphQL分析GDELT数据

    GDELT-Graphql分析 在这个项目中,我们正在使用GraphQL分析GDELT数据。... 启动卡夫卡消费者 bash consumer.sh 启动生产服务器 bash server.sh 运行开发服务器(在server.sh之后) nodemon src / server .

    2017最新大数据架构师精英课程

    157_kafka编程API实现生产者和消费者+ w9 l1 N( D8 E% z( D; G 158_kafka手动修改zk的偏移量实现消费处理( w7 s! K9 v7 U3 P7 T4 j 159_kafka与flume集成-source集成- _, G+ K) y% I4 D" q9 \ 160_kafka与flume集成-...

    guzzler:流式传输 MySQL 二进制日志并使用 Scala actor(RabbitMQ、Kafka)使用它们

    消费者可以在 guzzler.conf 中与其余所需参数一起配置。 Guzzler 包含一个 RabbitMQ 使用者,它将查询推送到 RabbitMQ 服务器以供使用。 Guzzler 可以通过远程 SSH 接口启动、停止、重启和查找二进制日志。 Guzzler...

    大数据技术白皮书.pdf

    京东大数据技术白皮书 ~ 8 ~ 1 京东大数据的发展历程 2010 年, 京东集团启动了在大数据领域的研发和应用探索工 作,正式组建京东大数据部,并确立了数据集中式的数据服务模 式,成为企业大数据最早的实践者之一。...

    大数据学习笔记.pdf

    目录 第一部分 Spark学习 ....................................................................................................................... 6 第1章 Spark介绍 .........................................

Global site tag (gtag.js) - Google Analytics