问题:
<Failed to update metadata after 3000 ms.>
sender类的发送数据时候,会
List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);//nio 发送数据
NetworkClient类,方法poll,检查metadata是否需要更新
方法:
/** * Add a metadata request to the list of sends if we can make one */ private void maybeUpdateMetadata(List<NetworkSend> sends, long now) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. // 最新的可用node Node node = this.leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); // mark the timestamp for no node available to connect this.lastNoNodeAvailableMs = now; return; } log.debug("Trying to send metadata request to node {}", node.id()); if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set<String> topics = metadata.topics(); this.metadataFetchInProgress = true; //生成metadata请求,加入到sends队列中 ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); sends.add(metadataRequest.request()); this.inFlightRequests.add(metadataRequest); } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); initiateConnect(node, now); // If initiateConnect failed immediately, this node will be put into blackout and we // should allow immediately retrying in case there is another candidate node. If it // is still connecting, the worst case is that we end up setting a longer timeout // on the next round and then wait for the response. } else { // connected, but can't send more OR connecting // In either case, we just need to wait for a network tevent to let us know the seleced // connection might be usable again. this.lastNoNodeAvailableMs = now; } }
选择一个请求最少,并且链接状态可用的host,作为获取metadata的host
这里
/** * Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a * connection if all existing connections are in use. This method will never choose a node for which there is no * existing connection and from which we have disconnected within the reconnect backoff period. * @return The node with the fewest in-flight requests. */ public Node leastLoadedNode(long now) { List<Node> nodes = this.metadata.fetch().nodes(); int inflight = Integer.MAX_VALUE; Node found = null; for (int i = 0; i < nodes.size(); i++) { int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size()); Node node = nodes.get(idx); int currInflight = this.inFlightRequests.inFlightRequestCount(node.id()); if (currInflight == 0 && this.connectionStates.isConnected(node.id())) { // if we find an established connection with no in-flight requests we can stop right away return node; } else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) { // otherwise if this is the best we have found so far, record that inflight = currInflight; found = node; } } return found; }
相关推荐
Flink无法获取Kafka Topic Metadata异常及解决.docx
以前自己写的一套,从kafka获取数据源,通过flink插入my-flink_kafka_mysql-hbase
kafka kafka kafka kafka kafka
今天小编就为大家分享一篇python3实现从kafka获取数据,并解析为json格式,写入到mysql中,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
kafka
kafka连接工具
kafka kafka kafka
一共包含两个程序,分别是Kafka生产者工具、Kafka消费者工具。 1、使用bootstrap、userName、password连接kafka。 2、可使用text、json格式发送topic消息。 3、异步producer、customer,收发消息畅通无阻。 Kafka...
Storm集成Kafka 一、整合说明 二、写入数据到Kafka 三、从Kafka中读取数据 从Kafka中读取数据 Storm从Kafka集群读取数据处理
kettle kafka 消息者插件,用于集成到kettle,接收Kafka消息。
kafka参数配置详解
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka 插件
C# Demo,从kafka中读取消息, wpf源码,有需要的同学可以参考
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka的docker镜像包含了kafka,zookeeper 和kafkamanager,可以通过docker 来load 安装
kafka_2.11-2.0.0.tgz, kafka_2.11-2.0.1.tgz, kafka_2.11-2.1.0.tgz, kafka_2.11-2.1.1.tgz, kafka_2.11-2.2.0.tgz, kafka_2.11-2.2.1.tgz, kafka_2.11-2.2.2.tgz, kafka_2.11-2.3.0.tgz, kafka_2.11-2.3.1.tgz, ...
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
此案例使用的是IDEA开发工具,项目...使用JavaAPI操作Flink的流处理,Flink从Kafka中获取数据,执行处理后再执行输出。 根据(《Flink入门与实战》徐葳著)教材最后的综合案例改变,适合没有学习不会使用Flume的人使用