kafka获得partition下标,需要用到kafka的simpleconsumer
import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.TreeMap; import java.util.Map.Entry; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.consumer.SimpleConsumer; public class KafkaOffsetTools { public static void main(String[] args) { // 读取kafka最新数据 // Properties props = new Properties(); // props.put("zookeeper.connect", // "192.168.6.18:2181,192.168.6.20:2181,192.168.6.44:2181,192.168.6.237:2181,192.168.6.238:2181/kafka-zk"); // props.put("zk.connectiontimeout.ms", "1000000"); // props.put("group.id", "dirk_group"); // // ConsumerConfig consumerConfig = new ConsumerConfig(props); // ConsumerConnector connector = // Consumer.createJavaConsumerConnector(consumerConfig); String topic = "dirkz"; String seed = "118.26.148.18"; int port = 9092; if (args.length >= 3) { topic = args[0]; seed = args[1]; port = Integer.valueOf(args[2]); } List<String> seeds = new ArrayList<String>(); seeds.add(seed); KafkaOffsetTools kot = new KafkaOffsetTools(); TreeMap<Integer,PartitionMetadata> metadatas = kot.findLeader(seeds, port, topic); int sum = 0; for (Entry<Integer,PartitionMetadata> entry : metadatas.entrySet()) { int partition = entry.getKey(); String leadBroker = entry.getValue().leader().host(); String clientName = "Client_" + topic + "_" + partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); sum += readOffset; System.out.println(partition+":"+readOffset); if(consumer!=null)consumer.close(); } System.out.println("总和:"+sum); } public KafkaOffsetTools() { // m_replicaBrokers = new ArrayList<String>(); } // private List<String> m_replicaBrokers = new ArrayList<String>(); public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo( whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out .println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); // long[] offsets2 = response.offsets(topic, 3); return offsets[0]; } private TreeMap<Integer,PartitionMetadata> findLeader(List<String> a_seedBrokers, int a_port, String a_topic) { TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>(); loop: for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"+new Date().getTime()); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { map.put(part.partitionId(), part); // if (part.partitionId() == a_partition) { // returnMetaData = part; // break loop; // } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", ] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } // if (returnMetaData != null) { // m_replicaBrokers.clear(); // for (kafka.cluster.Broker replica : returnMetaData.replicas()) { // m_replicaBrokers.add(replica.host()); // } // } return map; } }
相关推荐
kafka监控工具KafkaOffsetMonitor afkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有...
Kafka Eagle 用于监控 Kafka 集群中 Topic 被消费的情况。包含 Lag 的产生,Offset 的变动,Partition 的分布,Owner ,Topic 被创建的时间和修改的时间等信息
你可以设置消息的键(key)和值(value),并选择性地指定分区(partition)和时间戳(timestamp)。 在消费者方面,你可以编写代码来订阅一个或多个Kafka主题,并实现对接收到的消息的处理逻辑。你可以选择使用注解或编程...
接着,文档深入探讨了Kafka中重要的机制,包括Partition副本选举、消费者消费消息的Offset记录机制以及消费者Rebalance机制。特别地,对于Kafka的生产者和消费者客户端行为进行了详细分析,包括消息的发布机制、消息...
KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以一目了然
发送消息(支持向指定的topic和partition发送字符串消息) 延迟消息(通过扩展使kafka支持18个级别的延迟消息) 如您需要在企业网络中使用 kafka-map ,建议先征求 IT 管理员的同意。下载、使用或分发 kafka-map 前...
发送消息(支持向指定的topic和partition发送字符串消息) 延迟消息(通过扩展使kafka支持18个级别的延迟消息) 截图 添加集群 集群管理 broker 主题管理 消费组 查看消费组已订阅主题 消费组详情 topic详情——分区...
Kafka是由LinkedIn公司用Scala语言开发的,一个分布式、分区的、多副本的、多订阅者的,基于Zookeeper协调的分布式日志系统(也可做MQ系统)。主要初衷目标是构建一个用来处理海量日志,用户行为和网站运营统计等的...
pip install --user kafka-python==1.4.3 如果报错压缩相关的错尝试安装下面的依赖 yum install snappy-devel yum install lz4-devel pip install python-snappy pip install lz4 2.生产者 #!/usr/bin/env python...
KafkaOffsetMonitor是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以一目了然。
Kafka是一个分布式流处理平台,被广泛用于构建实时数据管道,允许你流式地处理数据。...4.分区(Partition):主题的子集,数据分布于多个分区。 消息(Message):传送的数据。 6.偏移量(Offset):分区中的消息位置。
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己...
为了做到水平扩展,一个 Topic 实际是由多个 Partition 组成的,遇到瓶颈时,可以通过增加 Partition 的数量来进行横向扩容。单个 Parition 内是保证消息有序。 每新写一条消息,Kafka 就是在对应的文件 append 写,...
Kafka 文章目录Kafka架构名次解释Producer(生产者)命令使用脚本常用参数举例分区策略发送返回值幂等性Consumer(消费者)命令使用脚本常用参数举例...Partition:消息分区。消息物理存储上概念 Offset:偏移量 Repli
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import * from collections import OrderedDict threads = []
#Kafka Java API案例Producer可选配置,如果不配置,则使用默认的partitioner根据key值value映射到指定的Parition props... if(offset>0){ partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;}
KafkaOffsetMonitor(版本0.2.1)是Kafka的一款客户端消费监控工具,用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,我们可以浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都...
基本概念:broker:Kafka服务器,负责消息存储和转发topic:消息类别,Kafka按照topic来分类消息partition:topic的分区,一个topic可以包含多个partition,topic消息保存在各个partition上offset:消息在日志中
Kafka是一个Pub-Sub的消息系统,无论是发布还是订阅,都...在逻辑上,可以把一个Partition当作一个非常长的数组,可通过这个“数组”的索引(offset)去访问其数据。一方面,由于不同Partition可位于不同机器,因此可以