背景介绍:
我们公司的实时流项目现在用的spark streaming比较多,这里在介绍下版本:
spark streaming2.1.0
kafka 0.9.0.0
spark streaming如果想要集成kafka使用时,必须得使用spark官网提供的依赖包,目前有两种大的kafka分支依赖集成包,他们的一些信息如下:
描述信息 | spark-streaming-kafka-0-8|spark-streaming-kafka-0-10
---|---|---
kafka版本 | 0.8.2.1 or higher|0.10.0 or higher
稳定程度 | Stable|Experimental
语言支持 |Scala, Java, Python|Scala, Java|
Receiver DStream|Yes|No|
Direct DStream|Yes|Yes|
SSL TLS Support|No|Yes|
Offset Commit Api |No|Yes|
Dynamic Topic Subscription|No|Yes|
从上面的表格可以看出
spark-streaming-kafka-0-8目前是支持版本大于或等于0.8.2.1时需要用到的,因为我们生产环境的kafka的版本是0.9.0.0所以只能选择spark-streaming-kafka-0-8_2.11这个依赖,然后spark streaming流程序跑起来,通过一定间隔不断从kafka消费数据,实时处理,整个流程是没有问题的,后来因为需要统一收集流程序的log中转到kafka中,最后通过logstash再发送到ElasticSearch中方便查看和检索日志,所以给项目集成了
kafka-log4j-appender-0.9.0.0,其功能是把log4j打印的日志给发送到kafka,配置完成之后再次启动项目,发现log也能收集到kafka中了,但通过后台发现时不时的会出现几条下面的log:
[2017-11-28 23:49:37,176] [WARN] [kafka-producer-network-thread | producer-2] Error in I/O with 192.168.10.160
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
但并不影响正常功能使用,从log里面能够看出来是生产者的问题,也就是说发送消息到kafka的server时出现连接中断了,导致抛出EOF异常。
那么为什么会中断连接呢?经查资料发现,这是由于kafka的版本不一致导致的,也就是说用0.8.2.1的kafka client向kafka0.9.0.0的server端发送数据,如果在经过了一定时间内,连接还没断开,那么服务端会主动断开这个连接,如果都是0.9.0.0的版本,服务端主动断开连接,客户端是不会抛出异常的,但由于版本不一样,在服务端主动中断的时候,就出现了上面的异常。
如何模拟重现?
(1)搭建一套0.9.0.0的kafka集群,为了方便重现,将server.properties里面的加上这个空闲连接关闭参数connections.max.idle.ms为30秒,默认不设置是10分钟,然后启动集群。
(2)在java项目里面使用0.8.2.1的client作为生产者,并使用生产者发送一条数据后,程序主动sleep40秒。
(3)然后观察等到30秒的时候就会抛出这个异常,但是主程序还是会等到40秒后结束,因为kafka发送消息是起的单独的线程所以抛出这个log时候主线程是不会受到影响的。
如何解决:
(1)最简单的办法就是升级client和server版本一致
(2)网上有朋友建议调大connections.max.idle.ms这个参数,减少抛出异常的次数,算是治标不治本吧,不建议这么干。
那么可能有朋友疑问,如果客户端一直不关闭空闲连接,必须得10分钟后由服务端强制关闭,那么会不会出现这个时间内kafka的连接资源被耗尽的情况呢?答案是几乎不可能,因为kafka允许每台主机默认的连接数为Int.MaxValue差不多21亿多吧。只要10分钟内每台主机的连接数达不到这个量级,程序就不会有问题。而实际情况生产者也不能出现这么多连接,所以我们的一些生产者程序一旦启动起来基本上不会调用close方法,除非在手动停止程序时,可以通过钩子函数来触发资源关闭,其他情况的空闲连接,可以由服务端进行管理通过超时关闭。注意如果是一直被占用的连接,服务端是不会主动关闭的,另外经过测试发现消费者就算版本不一致也不存在这个问题,目前来看只会版本不一致 而且是在生产者的程序中才会出现这个问题。
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
大数据 消息队列 Kafka 2.9 版本 jdk1.7+ 解压后直接使用
适合搭建kafka环境所需要的jdk1.8linux版本,以及kafka3.2版本。
springboot中使用kafka,含安装包
Kafka Tool是一个用于管理和使用Apache Kafka集群的GUI应用程序。它提供了一个直观的UI,允许用户快速查看Kafka群集中的对象以及存储在群集主题中的消息
Kafka .Net Framework4.0 版本 有完整的依赖包 Visual C++ Redistributable Packages for Visual Studio 2013
kafka kafka kafka kafka kafka
ZooKeeper是一个开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。 Kafka目前主要作为一个分布式的发布订阅式的消息系统使用 本资源包括...
包括kafka-eagle1.1.0和1.1.3两个版本,由于官网下载慢,顾提供
kafka
kafka-manager-1.3.3.18,已经使用sbt编译好,可以直接部署使用 kafka版本支持:kafka-0.8.1.1 到 kafka-1.1.0
kafka kafka kafka
kafka连接工具
Kafka 2.xx所有版本下载的百度网盘链接。包括: kafka_2.11-2.0.0.tgz, kafka_2.11-2.0.1.tgz, kafka_2.11-2.1.0.tgz, kafka_2.11-2.1.1.tgz, kafka_2.11-2.2.0.tgz, kafka_2.11-2.2.1.tgz, kafka_2.11-2.2.2.tgz, ...
Kafka Test
kafka 知识要点,基于0.9、 0.10版本,很全面
Kafka默认位移提交方式是自动提交,但它不是在你每消费一次数据之后就提交一次位移,而是每隔5秒将拉取到的每个分区中的最大的消费位移进行提交。自动位移提交在正常情况下不会发生消息丢失或重复消费的现象,唯一...
kafka java简单实现 kafka java小例子 kafka java小例子
kafka_2.12-2.2.2版本,由于有时从官网下载比较慢,把kafka的安装包放上来,方便大家使用的时候,可以进行下载,在我的文章系列中,也有kafka在windows系统中的安装使用教程,可以具体查看
如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体 表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,...
开源软件kafka_2.13-3.5.1版本,仅供学习安装,不可用做其他用途,建议直接在官网进行下载,版本为3.5.1,Scala版本2.13;开源软件kafka_2.13-3.5.1版本,仅供学习安装,不可用做其他用途,建议直接在官网进行下载,...