版权声明:原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明,否则将追究法律责任。
1. 更强的控制自由度
2. 语义一致性
object DirectKafkaWordCount{
def main(args:Array[String]){
if(args.length <2){
System.err.println(s"""
|Usage:DirectKafkaWordCount<brokers><topics>
|<brokers> is a list of one or more Kafka brokers
|<topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(brokers, topics)= args
// Create context with 2 second batch interval
val sparkConf =newSparkConf().setAppName("DirectKafkaWordCount")
val ssc =newStreamingContext(sparkConf,Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams =Map[String,String]("metadata.broker.list"-> brokers)
val messages =KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x =>(x,1L)).reduceByKey(_ + _)
wordCounts.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/**
* A batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param messageHandler function for translating each message into the desired type
*/
private[kafka]
classKafkaRDD[
K:ClassTag,
V:ClassTag,
U <:Decoder[_]:ClassTag,
T <:Decoder[_]:ClassTag,
R:ClassTag]private[spark](
sc:SparkContext,
kafkaParams:Map[String,String],
val offsetRanges:Array[OffsetRange], //该RDD的数据偏移量
leaders:Map[TopicAndPartition,(String,Int)],
messageHandler:MessageAndMetadata[K, V]=> R
)extends RDD[R](sc,Nil) with Logging with HasOffsetRanges
trait HasOffsetRanges{
def offsetRanges:Array[OffsetRange]
}
inal classOffsetRangeprivate(
val topic:String,
val partition:Int,
val fromOffset:Long,
val untilOffset:Long)extendsSerializable
override def getPartitions:Array[Partition]={
offsetRanges.zipWithIndex.map {case(o, i)=>
val (host, port)= leaders(TopicAndPartition(o.topic, o.partition))
newKafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
private[kafka]
classKafkaRDDPartition(
val index:Int,
val topic:String,
val partition:Int,
val fromOffset:Long,
val untilOffset:Long,
val host:String,
val port:Int
)extendsPartition{
/** Number of messages this partition refers to */
def count():Long= untilOffset - fromOffset
}
override def compute(thePart:Partition, context:TaskContext):Iterator[R]={
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if(part.fromOffset == part.untilOffset){
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset "+
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
}else{
newKafkaRDDIterator(part, context)
}
}
privateclassKafkaRDDIterator(
part:KafkaRDDPartition,
context:TaskContext)extendsNextIterator[R]{
context.addTaskCompletionListener{ context => closeIfNeeded()}
log.info(s"Computing topic ${part.topic}, partition ${part.partition} "+
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
val kc =newKafkaCluster(kafkaParams)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[V]]
val consumer = connectLeader
var requestOffset = part.fromOffset
var iter:Iterator[MessageAndOffset]=null
val messages =KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc, kafkaParams, topicsSet)
def createDirectStream[
K:ClassTag,
V:ClassTag,
KD <:Decoder[K]:ClassTag,
VD <:Decoder[V]:ClassTag](
ssc:StreamingContext,
kafkaParams:Map[String,String],
topics:Set[String]
):InputDStream[(K, V)]={
val messageHandler =(mmd:MessageAndMetadata[K, V])=>(mmd.key, mmd.message)
val kc =newKafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
newDirectKafkaInputDStream[K, V, KD, VD,(K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}
override def compute(validTime:Time):Option[KafkaRDD[K, V, U, T, R]]={
//计算最近的数据终止偏移量
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
val rdd =KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
// Report the record number and metadata of this batch interval to InputInfoTracker.
val offsetRanges = currentOffsets.map {case(tp, fo)=>
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t"+
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata =Map(
"offsets"-> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo =StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}
总结:
而且KafkaRDDPartition只能属于一个topic,不能让partition跨多个topic,直接消费一个kafkatopic,topic不断进来、数据不断偏移,Offset代表kafka数据偏移量指针。
数据不断流进kafka,batchDuration假如每十秒都会从配置的topic中消费数据,每次会消费一部分直到消费完,下一个batchDuration会再流进来的数据,又可以从头开始读或上一个数据的基础上读取数据。
思考直接抓取kafka数据和receiver读取数据:
好处一:
直接抓取fakfa数据的好处,没有缓存,不会出现内存溢出等之类的问题。但是如果kafka Receiver的方式读取会存在缓存的问题,需要设置读取的频率和block interval等信息。
好处二:
采用receiver方式的话receiver默认情况需要和worker的executor绑定,不方便做分布式,当然可以配置成分布式,采用direct方式默认情况下数据会存在多个worker上的executor。Kafkardd数据默认都是分布在多个executor上的,天然数据是分布式的存在多个executor,而receiver就不方便计算。
好处三:
数据消费的问题,在实际操作的时候采用receiver的方式有个弊端,消费数据来不及处理即操作数据有deLay多才时,Spark Streaming程序有可能奔溃。但如果是direct方式访问kafka数据不会存在此类情况。因为diect方式直接读取kafka数据,如果delay就不进行下一个batchDuration读取。
好处四:
完全的语义一致性,不会重复消费数据,而且保证数据一定被消费,跟kafka进行交互,只有数据真正执行成功之后才会记录下来。
生产环境下强烈建议采用direct方式读取kafka数据。
相关推荐
Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及机制 Spark-submit关于参数及部署模式的部分解析 GroupByKey VS ReduceByKey OrderedRDDFunctions...
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...
c)源码解析SparkStreaming数据清理的工作无论是在实际开发中,还是自己动手实践中都是会面临的,Spark Streaming中BatchDurations中会不断的产生RDD,这样会不断的有内存对象生成,其中包含元数据和数据本身。由此...
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
java的sparkstreaming连接kafka的例子,kafka生产者生产消息,消费者读取消息,sparkstreaming读取kafka小区并进行存储iotdb数据库。
spark Streaming和structed streaming分析,理解整个 Spark Streaming 的模块划分和代码逻辑。
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架 MapReduce 已经不能满足业务,大量的场景需要实时的数据处理结果来进行分析,决策。例如实时的用户推荐,在 618 这样的刺激环境下普通历史数据的推荐...
1.理解Spark Streaming的工作流程。 2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)...
spark之sparkStreaming 理解,总结了自己的理解,欢迎大家下载观看!
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
spark streaming streaming
Spark源码解析迷你电子版书籍,该书籍主要解读与跟踪Spark Core与Spark Streaming部分源码,书中内容还添加tachyon部分源码解读。
写的非常好,早了好久才找到。SparkStreaming预研报告
spark Streaming的原理介绍和与storm的对比
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设...基于spark streaming+kafka+hbase的日志统计分析系统源码+项目说明.zip
Spark中的Transformation操作之Value数据类型的算子: Spark中的Transformation操作之Key-Value数据类型的算子: Spark中的Action操作: Transformation->map算子: Transformation->flatMap算子: FaltMap算子与Map...