SocketServer是kafka nio,包含一个accept线程,接受socket连接,并把连接(平均)放入processors中,多个processor线程接受nio的处理请求和相应
processor请求只是将request放入requestchannel queue中(由KafkaRequestHandlerPool中handler完成)
processor响应是在requestchannel上注册对应的processor,processor将response发送给client
/** * Start the socket server */ def startup() { val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) for(i <- 0 until numProcessorThreads) { processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter, newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), numProcessorThreads, requestChannel, quotas, connectionsMaxIdleMs) //processor负责接受网络请求和响应,请求read:放入RequestChannel里的queue中,响应write Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } newGauge("ResponsesBeingSent", new Gauge[Int] { def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } }) // register the processor threads for notification of responses // 将processor的wakeup作为方法,加入到requestchannel的listener,对应processor的id requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections //接受网络accept请求,将nio的connection放入processor的connnectlist中,让processor处理之后的请求相应 this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() acceptor.awaitStartup info("Started") }
获得请求,放入queue中,等待handler处理
/* * Process reads from ready sockets */ def read(key: SelectionKey) { lruConnections.put(key, currentTimeNanos) val socketChannel = channelFor(key) var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { receive = new BoundedByteBufferReceive(maxRequestSize) key.attach(receive) } val read = receive.readFrom(socketChannel) val address = socketChannel.socket.getRemoteSocketAddress(); trace(read + " bytes read from " + address) if(read < 0) { close(key) } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req)//放入queue中,等待handler处理 key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } }
响应请求
/* * Process writes to ready sockets */ def write(key: SelectionKey) { val socketChannel = channelFor(key) val response = key.attachment().asInstanceOf[RequestChannel.Response] val responseSend = response.responseSend if(responseSend == null) throw new IllegalStateException("Registered for write interest but no response attached to key.") val written = responseSend.writeTo(socketChannel)//将response发送相应给client trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key) if(responseSend.complete) { response.request.updateRequestMetrics() key.attach(null) trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) } else { trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_WRITE) wakeup() } }
相关推荐
SpringBoot整合Kafka配置类方式
socketserver-kafka用Java netty 实现简单的socket 通讯,消费kafka消息队列appserver 目录是netty的socket监听启动。ServiceOrderConsumerAPI.java 是kafka的主要消费程序。程序写的很简单。只是做个小演示希望各位...
java集成kafka单机或kafka集群,并对kafka进行常用的操作,封装成工具方法,亲体整理,没有坑
kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala
kafka详细配置
kafka kafka kafka kafka kafka
kafka连接工具
kafka
kafka封装类,简单封装了kafka的订阅和发布,方便订阅数据
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka kafka kafka
kafka java依赖包下载 kafka java依赖包下载 kafka java依赖包下载
kafka_2.11-2.0.0.tgz, kafka_2.11-2.0.1.tgz, kafka_2.11-2.1.0.tgz, kafka_2.11-2.1.1.tgz, kafka_2.11-2.2.0.tgz, kafka_2.11-2.2.1.tgz, kafka_2.11-2.2.2.tgz, kafka_2.11-2.3.0.tgz, kafka_2.11-2.3.1.tgz, ...
Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...
kafka 插件
kafka的docker镜像包含了kafka,zookeeper 和kafkamanager,可以通过docker 来load 安装
最新的已编译的kafka manager包,解压之后可以直接使用 需要已安装jdk8
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
1、图形化界面可以直观地查看 Kafka 的 Topic 里的内容 2、自由设置 Kafka 数据展示格式 3、使用 Kafka Tool 创建/删除 Topic 4、使用 Kafka Tool 模拟发送 Messages
本人在北美刚刚毕业,目前面试的几家大厂包括小公司在面试中都频繁的问道kafka这个技术,作为大数据开发或者java全栈的开发者来说,2020年很有必要系统的学习一下kafka. 1.[全面][Kafka2.11][jdk1.8][ZooKeeper3.4.6...