`
m635674608
  • 浏览: 4929321 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Spark2.1.0文档:Spark Streaming 编程指南(上)

 
阅读更多

本文翻译自Spark官方文档,仅翻译了Scala API部分,目前版本为2.1.0,如有疏漏错误之处请多多指教。

原文地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html

因文档篇幅较长故分为上下两篇,本文为上篇,主要包括概述、入门示例、基本概念三部分

 

概述

 

Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且,您还可以在数据流上应用Spark提供的机器学习和图处理算法。

 

在内部,它的工作原理如下。Spark Streaming接收实时输入数据流,并将数据切分成批,然后由Spark引擎对其进行处理,最后生成“批”形式的结果流。

 

Spark Streaming将连续的数据流抽象为discretizedstreamDStream。 可以从诸如Kafka,Flume和Kinesis等来源的输入数据流中创建DStream,或者通过对其他DStream应用高级操作来创建。在内部,DStream 由一个RDD序列表示。

本指南介绍如何开始利用DStreams编写Spark Streaming程序。您可以在scalaJavaPython中编写SparkStreaming程序(在Spark 1.2中引入),所有这些都在本指南中介绍。 您可以在本指南中找到标签,让您可以选择不同语言的代码段(译者注:本文内容仅翻译了Scala部分,如果想学习其他语言接口,请参阅官网)。

 

一个小例子

在我们详细介绍如何编写自己的SparkStreaming程序之前,我们先看一下一个简单的Spark Streaming程序是什么样子的。假设我们有一个数据服务器正在对一个TCP套接字进行侦听,然后需要统计接收的文本数据中的每个单词的出现频率。那么你需要这样做:

首先,我们将Spark Streaming相关的类和StreamingContext的一些隐式转换导入到我们的环境中,以便为我们需要的其他类(如DStream)添加有用的方法。StreamingContext是所有流功能的主要入口点。我们创建一个带有两个执行线程(译者注:如果要执行本例,必须确保机器cpu核心大于2)的本地StreamingContext,并且设置流数据每批的间隔为1秒。

 

[java] view plain copy
 
  1. import org.apache.spark._  
  2. import org.apache.spark.streaming._  
  3. import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3  
  4. // Create a local StreamingContext with two working thread and batch interval of 1 second.  
  5. // The master requires 2 cores to prevent from a starvation scenario.  
  6. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")  
  7. val ssc = new StreamingContext(conf, Seconds(1))  

 

使用此context,我们可以创建一个DStream,它表示来自特定主机名(例如localhost)和端口(例如9999)TCP源的流数据。

[java] view plain copy
 
  1. // Create a DStream that will connect to hostname:port, like localhost:9999  
  2. val lines = ssc.socketTextStream("localhost"9999)  

 

在这行代码中,DStream表示从数据服务器接收的数据流。此DStream中的每个记录都是一行文本。接下来,我们要将每行文本以空格符为分隔符切分成一个个单词。

[java] view plain copy
 
  1. // Split each line into words  
  2. val words = lines.flatMap(_.split(" "))  

 

flatMap是一个一对多的DStream操作,该操作通过从源DStream中的每个记录生成多个新记录来创建新的DStream。在这种情况下,每一行将被分割成多个单词,并将单词流表示为单词DStream。接下来,我们对这些单词进行计数。

[java] view plain copy
 
  1. import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3  
  2. // Count each word in each batch  
  3. val pairs = words.map(word => (word, 1))  
  4. val wordCounts = pairs.reduceByKey(_ + _)  
  5. // Print the first ten elements of each RDD generated in this DStream to the console  
  6. wordCounts.print()  

 

单词DStream进一步映射(一对一变换)到(word,1) 键值对的DStream,然后进行聚合以获得每批数据中的单词的频率。最后,wordCounts.print()将打印每秒产生的计数结果中的若干条记录。

请注意,当执行这些代码时,Spark Streaming仅是设置了预计算流程,目前为止这些计算还没有真正的开始执行。在设置好所有计算操作后,要开始真正的执行过程,我们最终需要调用如下方法:

 

[java] view plain copy
 
  1. ssc.start()             // Start the computation  
  2. ssc.awaitTermination()  // Wait for the computation to terminate  

 

 

完整的代码可以在SparkStreaming示例NetworkWordCount中找到。

如果您已经下载并构建了Spark,则可以以下面的方式运行此示例。在运行spark程序之前您将首先需要运行Netcat(大多数类Unix系统中的一个小型实用程序)作为数据服务器。

$ nc -lk 9999

然后,打开另外一个终端,键入一下命令启动示例

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

 

然后,在运行netcat服务器的终端中输入的任何行将每秒进行单词计数并打印在屏幕上。 运行效果像下面这样。

 

基本概念

 

接下来,我们将越过前面的简单示例,阐述一些Spark Streaming的基础知识。

连接

与Spark类似,Spark Streaming的相关依赖可通过Maven Central获得。 要编写自己的Spark Streaming程序,您必须将以下依赖项添加到SBT或Maven项目中。

Maven:

[html] view plain copy
 
  1. <dependency>  
  2.     <groupId>org.apache.spark</groupId>  
  3.     <artifactId>spark-streaming_2.11</artifactId>  
  4.     <version>2.1.0</version>  
  5. </dependency>  

SBT:

libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"

如果需要从SparkStreaming核心API中没有包含的来源(如Kafka,Flume和Kinesis)采集数据,您必须将相应的artifact :spark-streaming-xyz_2.11添加到依赖关系中。例如,一些常用的artifact如下所示。

Source

Artifact

Kafka

spark-streaming-kafka-0-8_2.11

Flume

spark-streaming-flume_2.11

Kinesis

spark-streaming-kinesis-asl_2.11 [Amazon Software License]

有关最新列表,请参阅Mavenrepository,获取支持的sources和artifacts的完整列表。

初始化StreamingContext

要初始化一个SparkStreaming程序,必须先创建一个StreamingContext对象,它是所有Spark Streaming方法的主要入口点。

StreamingContext对象可以从SparkConf对象中创建。

[java] view plain copy
 
  1. import org.apache.spark._  
  2. import org.apache.spark.streaming._  
  3. val conf = new SparkConf().setAppName(appName).setMaster(master)  
  4. val ssc = new StreamingContext(conf, Seconds(1))  

 

appName参数是应用程序在集群UI上显示的名称。 master是Spark,Mesos或YARN集群的URL,或者一个特殊的“local [*]”字符串来让程序以本地模式运行。在具体的实践中,当您在集群上运行程序时,不需要在程序中硬编码master参数,而是使用spark-submit提交应用程序并将master的URL以脚本参数的形式传入。但是,对于本地测试和单元测试,您可以通过“local[*]”来运行Spark Streaming程序(请确保本地系统中的cpu核心数够用)。 需要注意的是,StreamingContext会内在的创建一个SparkContext的实例(所有Spark功能的起始点),你可以通过ssc.sparkContext访问到这个实例。

批处理的时间窗口长度必须根据应用程序的延迟要求和可用的集群资源进行设置。有关详细信息,请参阅“性能调优”部分。

StreamingContext对象还可以从一个现有的SparkContext实例中创建。

[java] view plain copy
 
  1. import org.apache.spark.streaming._  
  2. val sc = ...                // existing SparkContext  
  3. val ssc = new StreamingContext(sc, Seconds(1))  

定义context后,您必须执行以下操作。

1、通过创建input DStreams的形式来定义数据的输入源。

2、通过将转换和输出操作应用于DStream来定义流式计算。

3、开始接收数据并调用streamingContext.start() 方法开始进行数据处理。

4、使用streamingContext.awaitTermination() 等待处理停止(手动停止或由错误引发)。

5、可以使用streamingContext.stop() 手动停止数据处理。

请记住以下几点:

1、 一旦一个context开始运作,就不能设置或添加新的流计算。

2、 一旦一个上下文被停止,它将无法重新启动。

3、 同一时刻,一个JVM中只能有一个StreamingContext处于活动状态。

4、 StreamingContext上的stop() 方法也会停止SparkContext。 要仅停止StreamingContext(保持SparkContext活跃),请将stop() 方法的可选参数stopSparkContext设置为false。

5、 只要前一个StreamingContext在下一个StreamingContext被创建之前停止(不停止SparkContext),SparkContext就可以被重用来创建多个StreamingContext。

离散流(DStreams)

DiscretizedStreamDStream 是Spark Streaming对流式数据的基本抽象。它表示连续的数据流,这些连续的数据流可以是从数据源接收的输入数据流,也可以是通过对输入数据流执行转换操作而生成的经处理的数据流。在内部,DStream由一系列连续的RDD表示(关于RDD的介绍参见“Spark编程指南”)。如下图所示,DStream中的每个RDD都包含一定时间间隔内的数据。

 

任何定义于DStream之上的处理操作都将被转换为对底层RDD的操作。例如,在之前的示例中,我们将一行行文本组成的流转换为单词流,具体做法为:将flatMap操作应用于名为lines的 DStream中的每个RDD上,以生成words DStream的RDD。如下图所示。

 

这些底层的RDD转换操作由Spark引擎完成计算。定义在DStream之上的操作可以隐藏大部分的底层细节,为开发人员提供更高级别的API以方便使用。 这些操作将在后面的章节中详细讨论。

输入DStreams和接收器

输入DStreams是表示从流媒体源接收的输入数据流的DStream。在第一个示例中,lines是一个输入DStream,因为它表示从netcat服务器接收的数据流。 每个输入DStream(除了文件流,本节稍后讨论这个特例)与一个名为ReceiverScala docJava doc)的对象相关联,该对象从数据源接收数据并将其存储在Spark集群的内存中等待处理。

Spark Streaming提供了两类内置的流数据源:

基本数据源(Basicsources):StreamingContext API中直接提供的数据源获取方式。例如:文件系统和套接字连接。

高级数据源(Advancedsources):如Kafka,Flume,Kinesis等,可以通过额外的实用工具类来获得。使用这些数据源需要添加额外的依赖,可以参考之前的“基本概念”章节中的第一小节。

我们将稍后对每个类别中的一些数据来源进行探讨。

请注意,如果要在流式数据处理程序中并行接收多个数据流,则可以创建多个输入DStream(这将在“性能调优”一文中进一步讨论)。这将创建多个接收器,同时接收多个数据流。但是请注意,Spark worker/executor是一个长期运行的任务,所以它会占据分配给Spark Streaming应用程序的一个cpu核心。因此,一定要记住:你必须分配给Spark Streaming应用程序足够的cpu内核(在local模式下运行时指“线程”)来对接收到的数据进行处理,以及运行数据接收器。

要记住两点:

1、当以本地模式运行Spark Streaming程序时,不要将master URL设置为“local”或“local [1]”。这两者意味着在本地只使用一个线程运行任务。如果您正在使用基于receiver(例如sockets,Kafka,Flume等)的输入DStream,则这一个线程将用于运行receiver,就没有线程来对接受到的数据进行处理了。因此,当在本地运行SparkStreaming程序时,请务必使用“local[n]”作为master URL,其中n大于要运行的接收器数量(有关如何设置master的信息,请参阅Spark Properties)。

2、将逻辑扩展到在集群上运行时,分配给Spark Streaming应用程序的核心数量必须大于接收器数量。否则系统虽然能够收到数据,但无法对数据进行处理。

基本数据源

在第一个小示例中,我们使用了ssc.socketTextStream(...)方法,该方法通过TCP套接字连接接手文本数据并基于这个数据流创建了一个DStream。除了套接字之外,StreamingContext 的API还提供了将文件作为输入源从而创建DStream的方法,下面介绍Spark Streaming支持的几种基本数据源。

1、 文件流:可以从任何支持HDFS API的文件系统(即:HDFS,S3,NFS等)中以流的方式读取数据,读取数据并创建DStream的方式如下所示:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming将对目录dataDirectory进行监视,并处理用户在该目录中创建的任何文件(不支持读取嵌套目录中的文件)。 请注意:

1)所有文件必须具有相同的数据格式。

2)在dataDirectory目录中,文件只能通过原子性的移动或重命名(atomically moving or renaming)的方式创建。

3)文件被移动进来后,就不能再更改。因为,如果文件被不断追加内容,新追加的部分不会被读取。

对于读取简单文本文件,有一个更简单的方法streamingContext.textFileStream(dataDirectory)。 所有的文件流不需要运行接收器(receiver),因此不需要分配内核。

 

注意:fileStream在python API中不可用,你只能使用textFileStream。

2、 基于自定义接收器的流:开发者可以自定义接收器并通过自定义接收器接收的数据流来创建DStream。有关详细信息,请参阅自定义接收器指南

3、 RDD队列作为流: 如果需要利用测试数据来测试Spark Streaming应用程序,您还可以使用streamingContext.queueStream(queueOfRDDs)创建基于RDD队列的DStream。推送到队列中的每个RDD将被视为DStream中的一批数据,并像流一样进行处理。

有关从套接字和文件获取数据流的更多详细信息,请参阅StreamingContext for Scala,JavaStreamingContext for Java和Python的StreamingContext中相关函数的API文档。

高级数据源

使用这些高级的数据源需要与外部非Spark库进行连接,某些数据源还具有复杂的依赖关系(例如Kafka和Flume)。因此,为了最小化由依赖引发的版本冲突相关的问题,从这些数据源创建DStream的功能已被移动到单独的库中,你可以在需要的时候手动添加这些依赖(详见本文“连接”小节)。

请注意,这些高级源在Sparkshell中不可用,因此在shell中无法测试基于这些高级源的应用程序。如果您真的想在Spark shell中使用它们,则必须下载相应的Maven artifact的JAR及其依赖项,并将其添加到类路径中。

一些高级数据源如下所示:

Kafka:Spark Streaming 2.1.0兼容Kafka代理版本0.8.2.1或更高版本。有关详细信息,请参阅“Kafka集成指南”。

Flume:Spark Streaming 2.1.0兼容Flume 1.6.0。有关详细信息,请参阅“Flume集成指南”。

Kinesis:Spark Streaming 2.1.0与Kinesis Client Library 1.2.1兼容。有关详细信息,请参阅“Kinesis集成指南”。

自定义数据源

也可以通过自定义数据源创建输入DStream。你需要做的是实现一个用户定义(user-defined)的接收器(请参见下一部分详细了解)可以从自定义源接收数据并将其推入到Spark中。有关详细信息,请参阅自定义接收器指南

Receiver的可靠性

基于“可靠性”我们可以将数据源分为两类。一些数据源(如Kafka和Flume)允许对传输的数据进行确认。如果系统从这些可靠的数据源获取数据,并可以正确地确认接收到的数据,则可以确保任何故障都不会导致数据丢失。我们可以对接收器作如下两种分类:

1、 可靠接收器(Reliable Receiver):当数据被接受并被以多副本的形式存储在Spark中时,可靠接收器会正确地向可靠数据源发送确认信息。

2、 非可靠接收器(UnreliableReceiver非可靠接收器不会向数据源发送任何确认信息。 这可以用于不支持确认机制的数据源,或者虽然是从可靠数据源接受数据,但是不希望或不需要进行复杂的确认时,可以将该接收器用于对接可靠数据源。

“自定义接收器指南”中讨论了如何编写可靠接收器的细节。

DStreams之上的Transformation操作

与RDD的算子类似,transformation操作允许修改来自输入DStream的数据。DStreams支持许多常规Spark RDD上的transformation操作。一些常见的如下。

Transformation

Meaning

map(func)

利用方法func对源DStream中的元素分别进行处理,并返回一个新的DStream。

flatMap(func)

和map类似,不过每个输入元素可以被映射为0或多个输出元素。

filter(func)

选取被func方法计算后返回true的元素,形成新的DSteeam并返回。

repartition(numPartitions)

通过增加或减少分区数改变DStream的并行度。

union(otherStream)

将源DStream和otherDStream中所有元素取并集,形成一个新的DStream并返回。

count()

计算DStream中的每个RDD中的元素个数,每个RDD返回一个“单元素RDD”,这些单元素RDD组成新的DStream并返回。

reduce(func)

对DStream中每个RDD中的所有元素分别进行聚合,每个RDD生成一个单元素RDD,这些单元素RDD组成新的DStream并返回,func函数接受两个参数并有一个返回值,且func操作必须是associative commutative,这样才能支持并行计算。

countByValue()

对元素类型为K的DStream调用该方法,将返回类型为(K,Long)键值对的新DStream。“键”对应的“值”是该“键”在源DStream中每个RDD中的出现频率。

reduceByKey(func, [numTasks])

当对元素类型为(K, V)对的DStream调用该方法,返回(K,V)对类型的新DStream,其中使用给定的reduce函数聚合每个键的值。注意:默认情况下,它使用Spark的默认并行任务数(本地模式下为2,群集模式中的并行数由属性spark.default.parallelism指定)进行分组。您可以传递一个可选的numTasks参数来设置task的数量。

join(otherStream, [numTasks])

当源DStream类型为(K, V),otherStream类型为(K, W)时,返回一个新的类型为(K, (V,W))的DStream。

cogroup(otherStream, [numTasks])

当源DStream类型为(K, V),otherStream类型为(K, W)时,返回一个新的类型为(K, Seq[V], Seq[W])的DStream。

transform(func)

通过对源DStream的每个RDD应用RDD-to-RDD函数来返回一个新的DStream。这可以用于对DStream进行任意RDD操作。

updateStateByKey(func)

返回一个新的“state”DStream,其中通过对key的先前状态和新的values应用给定的方法func,将计算结果用来更新每个key的状态。这可以用于维护每个key的任意的状态数据。

 

下面将对上述的某些transformation算子进行深入讨论。

UpdateStateByKey算子

updateStateByKey操作允许您在使用新的信息持续更新时保持任意的状态。要使用这个操作,你将需要操作两个步骤。

1、 定义“状态(state)”,状态可以是任意的数据类型。

2、 定义状态更新函数,使用更新函数来指定如何使用先前状态和输入流中的新值更新状态。

在每个批处理中,Spark将对所有现有的key应用状态更新功能,无论它们在新批次数据中是否有对应的新的value。如果更新函数返回None,则键值对将被消除。

我们来举例说明一下。假设你想在文本数据流中对每个单词的计数持续更新。在这里,单词计数是状态,它是一个整数。我们将更新功能定义为:

[java] view plain copy
 
  1. def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {  
  2.     val newCount = ...  // add the new values with the previous running count to get the new count  
  3.     Some(newCount)  
  4. }  

这适用于包含单词的DStream(例如,在前面的示例中,包含(word, 1)键值对的名为pairs的DStream)。

valrunningCounts=pairs.updateStateByKey[Int](updateFunction_)

将为每个单词调用更新函数,其中newValues具有1的顺序(来自(word, 1)键值对)且runningCount持有先前的计数。

请注意,使用updateStateByKey需要配置检查点目录,这在检查点部分将详细讨论。

Transform操作

transform操作(以及其变体如transformWith)允许将任意RDD-to-RDD函数应用于DStream。 利用transform你可以将任何DStream API中未直接提供的RDD操作应用于DStream。例如,在DStream API中,没有提供直接将数据流中的每小批数据与其他数据集相join的功能。但是,您可以通过使用transform来执行此操作。这带来了丰富的可能性。例如,可以通过将输入数据流与预先计算出的垃圾邮件特征信息(这些垃圾邮件特征信息也可能是Spark计算出的)结合起来进行实时数据清理,然后基于这种操作实现垃圾信息的过滤。

[java] view plain copy
 
  1. val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information  
  2. val cleanedDStream = wordCounts.transform { rdd =>  
  3.   rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning  
  4.   ...  
  5. }  

请注意,提供的函数在每个批次间隔中被调用。 这允许您设计随时间变化的RDD操作,即RDD操作、分区数、广播变量等可以在批次之间被更改。

窗口操作

Spark Streaming还提供了窗口计算功能,允许您在数据的滑动窗口上应用转换操作。下图说明了滑动窗口的工作方式。



如图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操作以产生windowed DStream的RDD。在上面的例子中,操作应用于最近3个时间单位的数据,并以2个时间单位滑动。这表明任何窗口操作都需要指定两个参数。

窗口长度(windowlength) - 窗口的时间长度(上图的示例中为:3)。

滑动间隔(slidinginterval) - 两次相邻的窗口操作的间隔(译者注:即每次滑动的时间长度)(上图示例中为:2)。

这两个参数必须是源DStream的批间隔的倍数(上图示例中为:1)。

 

我们以一个例子来说明窗口操作。 假设您希望对之前的单词计数的示例进行扩展,每10秒钟对过去30秒的数据进行wordcount。为此,我们必须在最近30秒的pairs DStream数据中对(word, 1)键值对应用reduceByKey操作。这是通过使用reduceByKeyAndWindow操作完成的。

[java] view plain copy
 
  1. // Reduce last 30 seconds of data, every 10 seconds  
  2. val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))  

 

一些常见的窗口操作如下表所示。所有这些操作都用到了上述两个参数 - windowLength和slideInterval。

Transformation

Meaning

window(windowLength

slideInterval)

返回基于源DStream的窗口批次计算而得到的新DStream。

countByWindow(wind

owLengthslideInterval)

返回基于滑动窗口的数据流中的元素个数。

reduceByWindow(fun

cwindowLengthslide

Interval)

使用func在滑动间隔中聚合数据流中的元素,生成一个新的“单元素”数据流并返回。该函数应该是associative and commutative,从而可以并行的执行计算。

reduceByKeyAndWin

dow(funcinvFuncwin

dowLengthslideInterv

al, [numTasks])

当包含(K,V)对的DStream进行调用时,返回包含(K,V)对的新DStream,其中每个键对应的所有值在滑动窗口的所有batch中使用给定的reduce函数func进行聚合。注意:默认情况下,它使用Spark的默认并行任务数(本地模式下为2,群集模式中的并行度由spark.default.parallelism属性确指定)进行分组。 您可以传递一个可选的numTasks参数来设置不同并行度。

reduceByKeyAndWin

dow(funcinvFuncwin

dowLengthslideInterv

al, [numTasks])

上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce计算结果递增地计算每个窗口的reduce值。这是通过对进入滑动窗口的新数据进行reduce操作,以及“逆减(inverse reducing)”离开窗口的旧数据来完成的。一个例子是当窗口滑动时对键对应的值进行“一加一减”操作。但是,它仅适用于“可逆减函数(invertible reduce functions)”,即具有相应“反减”功能的减函数(作为参数invFunc)。 像reduceByKeyAndWindow一样,通过可选参数可以配置reduce任务的数量。 请注意,使用此操作必须启用检查点。

countByValueAndWin

dow(windowLength,sli

deInterval, [numTasks])

当对包含(K,V)对的DStream调用时,返回(K,Long)对的新DStream,其中每个键的值是其滑动窗口内的出现频数。像reduceByKeyAndWindow一样,通过可选参数可以配置reduce任务的数量。

 

Join操作

最后,值得强调的是,您可以轻松地在Spark Streaming中执行不同类型的join操作。

Stream-stream joins

一个流可以很方便地与其他流进行join操作。

[java] view plain copy
 
  1. val stream1: DStream[String, String] = ...  
  2. val stream2: DStream[String, String] = ...  
  3. val joinedStream = stream1.join(stream2)  

 

上述代码中,在每个批间隔中,由stream1生成的RDD将与stream2生成的RDD相join。 你也可以做leftOuterJoin,rightOuterJoin,fullOuterJoin。 此外,在流的窗口上进行联接通常是非常有用的。这也很容易做到。

[java] view plain copy
 
  1. val windowedStream1 = stream1.window(Seconds(20))  
  2. val windowedStream2 = stream2.window(Minutes(1))  
  3. val joinedStream = windowedStream1.join(windowedStream2)  

 

Stream-dataset joins

这在种操作在前面解释DStream.transform操作时已经进行了展示。下面的例子是另一个join窗口流与数据集的例子。

[java] view plain copy
 
  1. val dataset: RDD[String, String] = ...  
  2. val windowedStream = stream.window(Seconds(20))...  
  3. val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }  

 

实际上,您也可以动态更改用于join操作的数据集。提供给transform的函数参数在每个批次间隔都会被evaluated,因此将使用当前dataset指向的数据集。

DStream转换操作的完整列表可在API文档中找到。有关Scala API,请参阅DStream和PairDStreamFunction。 对于Java API,请参阅JavaDStream和JavaPairDStream。 对于Python API,请参阅DStream。

DStreams的输出操作

输出操作允许将DStream的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统消费变换后的数据,所以输出操作会触发所有DStream transformation操作的实际执行(类似于RDD的action算子)。目前,定义了以下输出操作:

Output Operation

Meaning

print()

在运行streaming应用程序的driver节点上打印DStream中每个batch的前十个元素。一般用于开发和调试。

saveAsTextFiles(prefix, [suffix])

将此DStream的内容保存到文本文件中。每个批数据对应的文件名基于prefix和suffix生成:“prefix-TIME_IN_MS [.suffix]”。

saveAsObjectFiles(prefix, [suffix])

将此DStream的内容另存为SequenceFiles格式的序列化Java对象。 每个批数据对应的文件名基于prefix和suffix生成“prefix-TIME_IN_MS [.suffix]”。

saveAsHadoopFiles(prefix, [suffix])

将此DStream的内容另存为Hadoop文件。 每个批数据对应的文件名基于prefix和suffix生成“prefix-TIME_IN_MS [.suffix]”。

foreachRDD(func)

这是流数据处理中最常用的输出操作,它可以对数据流中的每个RDD应用func方法。此方法应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或将其通过网络写入数据库。请注意,函数func在运行streaming应用程序的driver进程中执行,通常会包括RDD action算子,从而强制流式RDD数据的计算执行。

foreachRDD的设计模式

dstream.foreachRDD是一个强大的原语,它可以将数据发送到外部系统。但是,了解如何正确有效地使用这个原语很重要。 一些需要注意避免的常见错误如下。

通常向外部系统写入数据时需要创建一个连接对象(例如与远程服务器的TCP连接)并使用这个连接将数据发送到远程系统。为此,开发人员可能会尝试在Spark driver程序中创建连接对象,然后在Spark worker中使用该连接对象来保存RDD中的记录。 例如下面这种做法:(在Scala中)

[java] view plain copy
 
  1. dstream.foreachRDD { rdd =>  
  2.   val connection = createNewConnection()  // executed at the driver  
  3.   rdd.foreach { record =>  
  4.     connection.send(record) // executed at the worker  
  5.   }  
  6. }  

 

这显然是不正确的,因为该操作需要将连接对象序列化并从driver程序发送到worker。这种连接对象很少有能够跨机器传输的,此错误可能会显示为序列化错误(连接对象不可序列化),初始化错误(连接对象需要在worker初始化)等。正确的解决方案是在worker中创建连接对象。

但是,这又可能会导致另一个常见的错误 - 为每个记录创建了一个新的连接。 例如:

[java] view plain copy
 
  1. dstream.foreachRDD { rdd =>  
  2.   rdd.foreach { record =>  
  3.     val connection = createNewConnection()  
  4.     connection.send(record)  
  5.     connection.close()  
  6.   }  
  7. }  

 

通常情况下,创建连接对象会有一定的时间和资源开销。因此,为每个记录创建和销毁连接对象可能会引起不必要的高额开销,并且会显著降低系统的总体吞吐量。一个更好的解决方案是使用rdd.foreachPartition – 为每个分区创建一个连接对象,并使用该连接在RDD分区中发送所有记录。

[java] view plain copy
 
  1. dstream.foreachRDD { rdd =>  
  2.   rdd.foreachPartition { partitionOfRecords =>  
  3.     val connection = createNewConnection()  
  4.     partitionOfRecords.foreach(record => connection.send(record))  
  5.     connection.close()  
  6.   }  
  7. }  

 

这样可以在多个记录上分摊创建连接对象的开销。

 

最后,可以通过跨多个RDD 或者batch重用连接对象来进一步优化应用程序。可以维护连接对象的静态连接池,连接池中的连接对象可以在多个batch数据的RDD中得到重用,并将这些数据推送到外部系统,从而进一步减少开销。

[java] view plain copy
 
  1. dstream.foreachRDD { rdd =>  
  2.   rdd.foreachPartition { partitionOfRecords =>  
  3.     // ConnectionPool is a static, lazily initialized pool of connections  
  4.     val connection = ConnectionPool.getConnection()  
  5.     partitionOfRecords.foreach(record => connection.send(record))  
  6.     ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  
  7.   }  
  8. }  

 

请注意,连接池中的连接应根据需要懒惰创建(lazily created),如果一段时间不使用,则会超时。这是实现将数据发送到外部系统最高效的方式。

其它注意点:

1、 就像普通RDD的延迟操作由action算子触发,DStreams通过输出操作触发延迟计算。具体来说,其实是DStream输出操作调用了RDD action算子强制立即处理接收到的数据。因此,如果您的应用程序中没有任何输出操作,或者虽然具有dstream.foreachRDD()这样的输出操作,但是在其中没有定义任何RDD action操作,则不会有任何操作被执行。系统将简单地接收数据并将其丢弃。

2、 默认情况下,输出操作同一时刻只能执行一个。它们按照它们在应用程序中定义的顺序执行。

DataFrame和SQL操作

您可以很方便地使用DataFrames和SQL操作来处理流数据。您必须使用当前的StreamingContext对应的SparkContext创建一个SparkSession。此外,必须这样做的另一个原因是使得应用可以在driver程序故障时得以重新启动,这是通过创建一个可以延迟实例化的单例SparkSession来实现的。在下面的示例中,我们使用DataFrames和SQL来修改之前的wordcount示例并对单词进行计数。我们将每个RDD转换为DataFrame,并注册为临时表,然后在这张表上执行SQL查询。

[java] view plain copy
 
  1. /** DataFrame operations inside your streaming program */  
  2. val words: DStream[String] = ...  
  3. words.foreachRDD { rdd =>  
  4.   // Get the singleton instance of SparkSession  
  5.   val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()  
  6.   import spark.implicits._  
  7.   // Convert RDD[String] to DataFrame  
  8.   val wordsDataFrame = rdd.toDF("word")  
  9.   // Create a temporary view  
  10.   wordsDataFrame.createOrReplaceTempView("words")  
  11.   // Do word count on DataFrame using SQL and print it  
  12.   val wordCountsDataFrame =   
  13.     spark.sql("select word, count(*) as total from words group by word")  
  14.   wordCountsDataFrame.show()  
  15. }  

 

您可以在源码的example目录下找到完整代码或点击这里

您还可以在来自不同线程的流数据(即异步运行的StreamingContext)上定义表并执行SQL查询。为使查询操作可以顺利执行,您需要确保将StreamingContext设置为记录(remember)足够数量的流数据。否则,由于StreamingContext不知道异步SQL查询的存在,可能会在查询完成之前删除旧的流数据。举个例子,如果要对最后一个batch执行查询操作,但是该查询可能执行5分钟之久,则可以调用streamingContext.remember(Minutes(5))让streamingContext留存这些数据。

有关DataFrames的更多信息,请参阅DataFrame和SQL指南。

MLlib操作

您还可以很方便的结合使用MLlib提供的机器学习算法。首先,算法包中提供了流式机器学习算法(例如流式线性回归,流式KMeans等),其可以同时从流数据中学习,并将该模型应用于流数据。除此之外,对于大型的机器学习算法,您可以离线训练模型(即使用历史数据),然后将该模型应用于实时的流数据。有关详细信息,请参阅MLlib指南。

缓存/持久化

与RDD类似,DStreams还允许开发人员将流数据保留在内存中。也就是说,在DStream上调用persist() 方法会自动将该DStream的每个RDD保留在内存中。如果DStream中的数据将被多次计算(例如,相同数据上执行多个操作),这个操作就会很有用。对于基于窗口的操作,如reduceByWindow和reduceByKeyAndWindow以及基于状态的操作,如updateStateByKey,数据会默认进行持久化。 因此,基于窗口的操作生成的DStream会自动保存在内存中,而不需要开发人员调用persist()。

对于通过网络接收数据(例如Kafka,Flume,sockets等)的输入流,默认持久化级别被设置为将数据复制到两个节点进行容错。

请注意,与RDD不同,DStreams的默认持久化级别将数据序列化保存在内存中。这在“性能调优”部分有进一步的讨论。有关不同持久化级别的更多信息,请参见“Spark编程指南”。

 

检查点支持

流数据处理程序通常都是全天候运行,因此必须对应用中逻辑无关的故障(例如,系统故障,JVM崩溃等)具有弹性。为了实现这一特性,Spark Streaming需要checkpoint足够的信息到容错存储系统,以便可以从故障中恢复。一般会对两种类型的数据使用检查点。

元数据检查点(Metadatacheckpointing) - 将定义流计算的信息保存到容错存储中(如HDFS)。这用于从运行streaming程序的driver程序的节点的故障中恢复(稍后详细讨论)。元数据包括以下几种:

1. 配置(Configuration) - 用于创建streaming应用程序的配置信息。

2. DStream操作(DStream operations) - 定义streaming应用程序的DStream操作集合。

3. 不完整的batch(Incomplete batches) - jobs还在队列中但尚未完成的batch。

数据检查点(Datacheckpointing) - 将生成的RDD保存到可靠的存储层。对于一些需要将多个批次之间的数据进行组合的stateful变换操作,设置数据检查点是必需的。在这些转换操作中,当前生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随时间而不断增加,由此也会导致基于血统机制的恢复时间无限增加。为了避免这种情况,stateful转换的中间RDD将定期设置检查点并保存到到可靠的存储层(例如HDFS)以切断依赖关系链。

总而言之,元数据检查点主要用于从driver程序故障中恢复,而数据或RDD检查点在任何使用stateful转换时是必须要有的。

何时启用检查点

对于具有以下任一要求的应用程序,必须启用检查点:

使用状态转换 - 如果在应用程序中使用updateStateByKey或reduceByKeyAndWindow(具有逆函数),则必须提供检查点目录以允许定期保存RDD检查点。

从运行应用程序的driver程序的故障中恢复 - 元数据检查点用于使用进度信息进行恢复。

请注意,不包含上述的“有状态转换操作”的简单streaming应用程序无需启用检查点即可运行。当然,如果不启用检查点,driver端如果出现故障也只能进行部分恢复(一些接收但未处理的数据可能会丢失)。不过这通常是可以接受的,而且许多Spark Streaming应用程序就是以这种方式运行的。在未来,预计对非hadoop环境的支持会有所改善。

如何配置检查点

可以通过在一些可容错、高可靠的文件系统(例如,HDFS,S3等)中设置保存检查点信息的目录来启用检查点。这是通过使用streamingContext.checkpoint(checkpointDirectory)完成的。设置检查点后,您就可以使用上述的有状态转换操作。此外,如果要使应用程序从驱动程序故障中恢复,您应该重写streaming应用程序以使程序具有以下行为。

1、当程序第一次启动时,它将创建一个新的StreamingContext,设置好所有流数据源,然后调用start()方法。

2、当程序在失败后重新启动时,它将从checkpoint目录中的检查点数据重新创建一个StreamingContext。

使用StreamingContext.getOrCreate可以简化此行为。 使用方式如下。

[java] view plain copy
 
  1. // Function to create and setup a new StreamingContext  
  2. def functionToCreateContext(): StreamingContext = {  
  3.   val ssc = new StreamingContext(...)   // new context  
  4.   val lines = ssc.socketTextStream(...) // create DStreams  
  5.   ...  
  6.   ssc.checkpoint(checkpointDirectory)   // set checkpoint directory  
  7.   ssc  
  8. }  
  9.   
  10. // Get StreamingContext from checkpoint data or create a new one  
  11. val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)  
  12.   
  13. // Do additional setup on context that needs to be done,  
  14. // irrespective of whether it is being started or restarted  
  15. context. ...  
  16.   
  17. // Start the context  
  18. context.start()  
  19. context.awaitTermination()  

 

如果checkpointDirectory存在,则将从检查点数据重建上下文。如果目录不存在(即第一次运行),则将调用函数functionToCreateContext来创建新的上下文并设置DStream。请参阅Scala示例RecoverableNetworkWordCount。 此示例将网络数据的单词计数追加到一个文件中。

除了使用getOrCreate之外,还需要确保在失败时能够自动重新启动驱动程序进程。这只能由用于运行应用程序的部署基础架构来完成。这在“部署”部分进一步讨论。

 

请注意,设置RDD的检查点会带来持久化过程的开销。这可能会导致被设置RDD检查点的批数据的处理时间增加。因此,需要仔细设置检查点的时间间隔。当batchsize比较小的时候(例如1秒),为每个批次都设置检查点可能会显着降低操作吞吐量。相反,检查点太少会导致血统和任务大小增长,这也可能会产生不利的影响。对于需要RDD检查点的状态转换,默认间隔批间隔的倍数中第一个大于10秒的值(译者注:即如果批间隔为3s,因为12是大于10且为3的倍数的最小值,所以此时默认的检查点间隔为12秒)。它可以通过使用dstream.checkpoint(checkpointInterval)进行设置。通常,DStream的5到10个滑动间隔是设置检查点间隔的一个比较合适的值。

累加器,广播变量和检查点

在SparkStreaming中,无法从检查点恢复累加器和广播变量。 如果启用检查点并使用累加器或广播变量,则必须为累加器和广播变量创建延迟实例化的单例实例,以便在驱动程序从故障中重新启动后重新进行实例化。这下面的示例中进行了说明。

[java] view plain copy
 
  1. object WordBlacklist {  
  2.   @volatile private var instance: Broadcast[Seq[String]] = null  
  3.   def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {  
  4.     if (instance == null) {  
  5.       synchronized {  
  6.         if (instance == null) {  
  7.           val wordBlacklist = Seq("a""b""c")  
  8.           instance = sc.broadcast(wordBlacklist)  
  9.         }  
  10.       }  
  11.     }  
  12.     instance  
  13.   }  
  14. }  
  15. object DroppedWordsCounter {  
  16.   @volatile private var instance: LongAccumulator = null  
  17.   def getInstance(sc: SparkContext): LongAccumulator = {  
  18.     if (instance == null) {  
  19.       synchronized {  
  20.         if (instance == null) {  
  21.           instance = sc.longAccumulator("WordsInBlacklistCounter")  
  22.         }  
  23.       }  
  24.     }  
  25.     instance  
  26.   }  
  27. }  
  28. wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>  
  29.   // Get or register the blacklist Broadcast  
  30.   val blacklist = WordBlacklist.getInstance(rdd.sparkContext)  
  31.   // Get or register the droppedWordsCounter Accumulator  
  32.   val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)  
  33.   // Use blacklist to drop words and use droppedWordsCounter to count them  
  34.   val counts = rdd.filter { case (word, count) =>  
  35.     if (blacklist.value.contains(word)) {  
  36.       droppedWordsCounter.add(count)  
  37.       false  
  38.     } else {  
  39.       true  
  40.     }  
  41.   }.collect().mkString("["", ""]")  
  42.   val output = "Counts at time " + time + " " + counts  
  43. })  

 

源码中可以看到完整代码。

 

部署应用程序

本节讨论部署SparkStreaming应用程序的步骤。

要求

要运行SparkStreaming应用程序,您需要具备以下几个条件。

集群管理器集群 - 这是任何Spark应用程序的一般要求,并在部署指南中有详细讨论。

 

应用程序打包成JAR - 您必须将流应用程序编译为JAR包。如果您是使用spark-submit启动应用程序,则不需要在JAR中提供Spark和Spark Streaming的依赖jar。但是,如果您的应用程序使用高级数据源(例如Kafka,Flume),那么您将必须将他们连接的额外artifact及其依赖项打包在用于部署应用程序的JAR中。例如,使用KafkaUtils的应用程序必须在应用程序JAR中包含spark-streaming-kafka-0-8_2.11及其所有次级依赖项。

 

为executor配置足够的内存 - 由于接收到的数据必须存储在内存中,所以执行程序必须配置足够的内存来保存接收到的数据。请注意,如果您正在进行10分钟的窗口操作,系统必须至少能够将最近10分钟的数据保存到内存中。因此,应用程序的内存要求取决于其中使用的操作。

 

配置检查点 - 如果流应用程序需要设置检查点,则必须在Hadoop API兼容的可容错的存储层(例如HDFS,S3等)中配置检查点目录,并且streaming应用程序的书写编写方式也必须允许将检查点信息用于故障恢复。有关详细信息,请参阅检查点部分。

 

配置应用程序driver程序的自动重新启动 - 要实现从驱动程序故障中自动恢复,则用于运行streaming应用程序部署的基础架构必须能够监视驱动程序进程,并在驱动程序发生故障时重新启动驱动程序。不同的集群管理器有不同的工具来实现这一点。

Spark standalone- 可以用SparkStandalone模式将Spark应用提交到集群中运行(请参阅“集群部署模式”),即应用程序的驱动程序本身在其中一个工作节点上运行。然后,可以利用Standalone群集管理器来监控驱动程序,如果由于非零退出代码或者运行驱动程序的节点发生故障而导致驱动程序发生出现问题,则可以重新启动它。有关详细信息,请参阅“SparkStandalone”指南中的有关“群集模式”和“监督”的部分。

YARN – yarn也支持类似的机制来实现应用程序的自动重启。有关详细信息,请参阅YARN文档。

Mesos – 一个名Marathon的项目为已被用来在Mesos上实现这一点。

 

配置预写日志 – 从Spark 1.2开始,我们引入了预写日志来实现强大的容错保证。如果启用该功能,则从receiver接收的所有数据都将被写入检查点目录中的预写日志中。这可以防止驱动程序恢复时的数据丢失,从而确保数据零丢失(在容错语义部分中详细讨论)。可以通过将配置参数spark.streaming.receiver.writeAheadLog.enable设置为true来启用此功能。然而,这些更强的语义可能以单个receiver的吞吐量为代价。通过并行运行更多的receiver可以改善这一点,增加总吞吐量。另外,建议在启用预写日志时,如果在日志已经存储在支持复制容错的存储系统中时,禁用Spark接收到的数据的复制。这可以通过将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER来完成。使用S3(或任何不支持刷新的文件系统)写入日志时,请记住启用spark.streaming.driver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有关详细信息,请参阅SparkStreaming配置

 

设置最大接收速率 - 如果集群资源不足以使得流数据的处理速度快于流数据的接受速度,则可以通过设置 “记录/秒”的最大速率来对接收方进行速率限制。请参阅关于receiver的spark.streaming.receiver.maxRate配置参数和用于Direct Kafka方法的配置参数:spark.streaming.kafka.maxRatePerPartition。在Spark 1.5中,我们引入了一个称为“反向施压”(backpressure)的功能,该功能可以无需设置此速率限制,因为SparkStreaming会自动计算速率限制,并在处理条件发生变化时动态调整速率限制。可以通过将配置参数spark.streaming.backpressure.enabled设置为true来启用此功能。

升级应用程序代码

如果正在运行的SparkStreaming应用程序需要添加新的应用程序代码进行升级,则有两种可能的实现机制。

1、升级后的Spark Streaming应用程序与现有应用程序并行启动并运行。一旦新的程序(接收与旧的应用程序相同的数据)已经做好热身并准备好接手,旧的应用就可以被关掉。请注意,这要求数据源支持可以将数据发送到两个处理端(即较早和已升级的应用程序)。

 

2、现有应用程序正常关闭(请参阅StreamingContext.stop(...)或JavaStreamingContext.stop(...)以获取正常关闭选项),以确保已关闭的数据在关闭前已完全处理。然后可以启动升级的应用程序,这将从旧的应用程序停止的同一数据点开始处理。请注意,只有对支持源端缓冲的输入源(如Kafka和Flume)才可以进行此操作,因为数据需要在先前的应用程序关闭并且升级的应用程序尚未启动时进行缓冲。并且,你无法从旧的应用程序设置的检查点信息中重新启动,因为检查点信息基本上包含序列化的Scala / Java / Python对象,如果尝试使用新的修改的类反序列化旧类对象可能会导致错误。在这种情况下,你可以使用不同的检查点目录启动升级的应用程序,也可以删除旧的检查点目录。

监控应用程序

除了Spark的监控功能,Spark Streaming还有其他特有的功能。当使用StreamingContext时,Spark Web UI会显示一个附加的Streaming选项卡,显示有关正在运行的receiver(接收器是否活动,接收到的记录数量,接收器错误等)以及完成的批次(批处理时间,排队延迟等)的统计信息)。这可以用于监视流应用程序的进度。

 

Web UI中的以下两个指标特别重要:

 

1、 处理时间 - 处理每批数据的时间。

2、计划延迟 - 批处理在队列中等待先前批次处理完成的时间。

如果批量处理时间始终超过批次间隔或者排队延迟不断增加或者两种情况同时存在,则表示系统处理批次的速度跟不上批数据生成的速度。在这种情况下,请考虑减少批处理时间(译者注:例如简化批处理操作)。

 

Spark Streaming程序的进展也可以使用StreamingListener接口进行监控,这样可以让您获得接收者状态和处理时间。请注意,这是一个开发人员API,将来可能会有所改进。

http://blog.csdn.net/u013468917/article/details/71274433

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics