转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/5443789.html
最近由于使用sparkstreaming的同学需要对接到部门内部的的kafka集群,由于官方的spark-streaming-kafka包和现有公司的kafka集群权限系统无法对接,需要研究下spark-streaming-kafka包原有代码以便改造,本文研究的代码版本为spark在github的tag的v1.6.1版本。
官方给出的JavaKafkaWordCount以及KafkaWordCount代码里产生kafka-streaming消费流数据的调用代码分别如下
1
2
3
4
5
|
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[ 0 ], args[ 1 ], topicMap);
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map( _ . _ 2 )
|
可以看到无论是java还是scala调用的都是KafkaUtils内重载实现的createStream方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
object KafkaUtils {
/**
* Create an input stream that pulls messages from Kafka Brokers.
* @param ssc StreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @return DStream of (Kafka message key, Kafka message value)
*/
def createStream(
ssc : StreamingContext,
zkQuorum : String,
groupId : String,
topics : Map[String, Int],
storageLevel : StorageLevel = StorageLevel.MEMORY _ AND _ DISK _ SER _ 2
) : ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000" )
createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, storageLevel)
}
/**
* Create an input stream that pulls messages from Kafka Brokers.
* @param ssc StreamingContext object
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam U type of Kafka message key decoder
* @tparam T type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/
def createStream[K : ClassTag, V : ClassTag, U < : Decoder[ _ ] : ClassTag, T < : Decoder[ _ ] : ClassTag](
ssc : StreamingContext,
kafkaParams : Map[String, String],
topics : Map[String, Int],
storageLevel : StorageLevel
) : ReceiverInputDStream[(K, V)] = {
val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}
/**
* Create an input stream that pulls messages from Kafka Brokers.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @return DStream of (Kafka message key, Kafka message value)
*/
def createStream(
jssc : JavaStreamingContext,
zkQuorum : String,
groupId : String,
topics : JMap[String, JInt]
) : JavaPairReceiverInputDStream[String, String] = {
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues( _ .intValue()).toSeq : _ *))
}
/**
* Create an input stream that pulls messages from Kafka Brokers.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
* @return DStream of (Kafka message key, Kafka message value)
*/
def createStream(
jssc : JavaStreamingContext,
zkQuorum : String,
groupId : String,
topics : JMap[String, JInt],
storageLevel : StorageLevel
) : JavaPairReceiverInputDStream[String, String] = {
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues( _ .intValue()).toSeq : _ *),
storageLevel)
}
/**
* Create an input stream that pulls messages from Kafka Brokers.
* @param jssc JavaStreamingContext object
* @param keyTypeClass Key type of DStream
* @param valueTypeClass value type of Dstream
* @param keyDecoderClass Type of kafka key decoder
* @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel RDD storage level.
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam U type of Kafka message key decoder
* @tparam T type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/
def createStream[K, V, U < : Decoder[ _ ], T < : Decoder[ _ ]](
jssc : JavaStreamingContext,
keyTypeClass : Class[K],
valueTypeClass : Class[V],
keyDecoderClass : Class[U],
valueDecoderClass : Class[T],
kafkaParams : JMap[String, String],
topics : JMap[String, JInt],
storageLevel : StorageLevel
) : JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt : ClassTag[K] = ClassTag(keyTypeClass)
implicit val valueCmt : ClassTag[V] = ClassTag(valueTypeClass)
implicit val keyCmd : ClassTag[U] = ClassTag(keyDecoderClass)
implicit val valueCmd : ClassTag[T] = ClassTag(valueDecoderClass)
createStream[K, V, U, T](
jssc.ssc,
kafkaParams.asScala.toMap,
Map(topics.asScala.mapValues( _ .intValue()).toSeq : _ *),
storageLevel)
}
|
其中java相关的第三个和第四个createStream调用了第一个createStream,而第一个createStream最后调用的是第二个createStream,所以所有的rdd数据流都是从下面这句代码产生的:
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
查看KafkaInputDStream类定义,发现获取receiver有两种类型:KafkaReceiver和ReliableKafkaReceiver。
1
2
3
4
5
6
7
|
def getReceiver() : Receiver[(K, V)] = {
if (!useReliableReceiver) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
}
} |
其中,KafkaReceiver实现比较简单,调用的是kafka的high level api产生数据流,产生的每个线程的数据流都被放到一个线程池由单独的线程来消费
val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder)
ReliableKafkaReceiver是结合了spark的预写日志(Write Ahead Logs)功能,开启这个功能需要设置sparkconf属性 spark.streaming.receiver.writeAheadLog.enable为真(默认值是假)
这个receiver会把收到的kafka数据首先存储到日志上,然后才会向kafka提交offset,这样保证了在driver程序出现问题的时候不会丢失kafka数据。
相关推荐
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip ...
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip...
该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! ...
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设...基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip
1、资源内容:基于spark streaming+flume+kafka+hbase的实时日志处理分析系统(分为控制台版本和基于springboot、Echarts等的Web UI可视化版本) 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便...
1、资源内容:基于spark streaming和kafka,hbase的日志统计分析系统+源代码+文档说明 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都经过测试运行成功,...
毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 ...
1、资源内容:基于Spark的实时日志分析及异常检测系统 Flume + Kafka + Hbase + Spark-Streaming + Scala 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都...
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子...基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分为控制台版本和Web UI可视化版本).zip
基于spark开发的完整项目算法源码,可用于毕业设计、课程设计、练手学习等
毕业设计,课程设计,项目源码均经过助教老师测试,运行无误,欢迎下载交流 ----- 下载后请首先打开README.md文件(如有)
streaming与Kafka结合起来处理分析; 5.其它模块持续更新中... -------- 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内...
Sparkstreaming,Kafka,Node.js,Socket.IO和Highcharts构建实时分析Dashboard。电子商务门户希望构建一个实时分析仪表盘,对每分钟发货的订单数量做到可视化,从而优化物流的效率。解决方案之前,先快速看看我们将...
该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! ...
本文来自于网络,本文首先从Kafka的架构着手,先了解下Kafka的基本原理,然后通过对kakfa的存储机制、复制原理、同步原理、可靠性和持久性保证等等一步步对其可靠性进行分析,最后通过benchmark来增强对Kafka高可靠...
3,综合运用HttpClient+Jsoup+Kafka+SparkStreaming+StructuredStreaming+SpringBoot+Echarts等多种实用技术 适用人群 1、对大数据感兴趣的在校生及应届毕业生。 2、对目前职业有进一步提升要求,希望从事大数据...