Spark Streaming 编程指南
注意
Spark Streaming 是 Spark 流式引擎的上一代。Spark Streaming 已不再更新,它是一个旧版项目。Spark 中有一个更新、更易于使用的流式引擎,称为 Structured Streaming。您应该为您的流式应用程序和管道使用 Spark Structured Streaming。请参阅Structured Streaming 编程指南。
概述
Spark Streaming 是核心 Spark API 的扩展,它支持对实时数据流进行可扩展、高吞吐量、容错的流处理。数据可以从 Kafka、Kinesis 或 TCP sockets 等多种源摄取,并可以使用 map
、reduce
、join
和 window
等高级函数表达的复杂算法进行处理。最后,处理后的数据可以推送到文件系统、数据库和实时仪表板。实际上,您可以在数据流上应用 Spark 的机器学习和图处理算法。
其内部工作方式如下。Spark Streaming 接收实时输入数据流,并将数据分割成批次,然后由 Spark 引擎处理这些批次,以批次形式生成最终结果流。
Spark Streaming 提供了一种称为离散化流或 DStream 的高级抽象,它表示连续的数据流。DStream 可以从 Kafka 和 Kinesis 等源的输入数据流创建,也可以通过对其他 DStream 应用高级操作来创建。在内部,DStream 被表示为一系列的RDD。
本指南将向您展示如何开始使用 DStream 编写 Spark Streaming 程序。您可以使用 Scala、Java 或 Python(在 Spark 1.2 中引入)编写 Spark Streaming 程序,所有这些都在本指南中介绍。您会在本指南中找到选项卡,让您可以在不同语言的代码片段之间进行选择。
注意: 有些 API 在 Python 中不同或不可用。在本指南中,您会看到 Python API 标签,突出显示这些差异。
一个快速示例
在我们深入了解如何编写自己的 Spark Streaming 程序之前,让我们快速看一下一个简单的 Spark Streaming 程序是什么样的。假设我们想计算从监听 TCP 套接字的数据服务器接收到的文本数据中的单词数量。您只需要这样做。
首先,我们导入 StreamingContext,它是所有流功能的主要入口点。我们创建一个具有两个执行线程且批处理间隔为 1 秒的本地 StreamingContext。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
使用此上下文,我们可以创建一个 DStream,它表示来自 TCP 源的流数据,指定为主机名(例如 localhost
)和端口(例如 9999
)。
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
这个 lines
DStream 代表将从数据服务器接收到的数据流。此 DStream 中的每条记录都是一行文本。接下来,我们想将这些行按空格分割成单词。
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
flatMap
是一种一对多的 DStream 操作,它通过从源 DStream 中的每条记录生成多条新记录来创建新的 DStream。在本例中,每行将被分割成多个单词,并且单词流表示为 words
DStream。接下来,我们想统计这些单词。
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
words
DStream 进一步被映射(一对一转换)成一个 (word, 1)
对的 DStream,然后通过归约操作获取每个批次数据中单词的频率。最后,wordCounts.pprint()
将每秒打印一些生成的计数。
请注意,当这些行执行时,Spark Streaming 只是设置了它启动后将执行的计算,但尚未开始实际处理。要在所有转换设置完成后开始处理,我们最终调用
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
完整代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。
首先,我们导入 Spark Streaming 类的名称以及从 StreamingContext 到我们环境的一些隐式转换,以便向我们需要的其他类(如 DStream)添加有用的方法。StreamingContext 是所有流功能的主要入口点。我们创建一个具有两个执行线程且批处理间隔为 1 秒的本地 StreamingContext。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
使用此上下文,我们可以创建一个 DStream,它表示来自 TCP 源的流数据,指定为主机名(例如 localhost
)和端口(例如 9999
)。
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
这个 lines
DStream 代表将从数据服务器接收到的数据流。此 DStream 中的每条记录都是一行文本。接下来,我们想将这些行按空格字符分割成单词。
// Split each line into words
val words = lines.flatMap(_.split(" "))
flatMap
是一种一对多的 DStream 操作,它通过从源 DStream 中的每条记录生成多条新记录来创建新的 DStream。在本例中,每行将被分割成多个单词,并且单词流表示为 words
DStream。接下来,我们想统计这些单词。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
words
DStream 进一步被映射(一对一转换)成一个 (word, 1)
对的 DStream,然后通过归约操作获取每个批次数据中单词的频率。最后,wordCounts.print()
将每秒打印一些生成的计数。
请注意,当这些行执行时,Spark Streaming 只是设置了它启动后将执行的计算,但尚未开始实际处理。要在所有转换设置完成后开始处理,我们最终调用
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
完整代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。
首先,我们创建一个 JavaStreamingContext 对象,它是所有流功能的主要入口点。我们创建一个具有两个执行线程且批处理间隔为 1 秒的本地 StreamingContext。
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
使用此上下文,我们可以创建一个 DStream,它表示来自 TCP 源的流数据,指定为主机名(例如 localhost
)和端口(例如 9999
)。
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
这个 lines
DStream 代表将从数据服务器接收到的数据流。此流中的每条记录都是一行文本。然后,我们想将这些行按空格分割成单词。
// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
flatMap
是一种 DStream 操作,它通过从源 DStream 中的每条记录生成多条新记录来创建新的 DStream。在本例中,每行将被分割成多个单词,并且单词流表示为 words
DStream。请注意,我们使用 FlatMapFunction 对象定义了转换。正如我们将在后续发现的那样,Java API 中有许多此类便利类有助于定义 DStream 转换。
接下来,我们想统计这些单词。
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
words
DStream 进一步被映射(一对一转换)成一个 (word, 1)
对的 DStream,使用 PairFunction 对象。然后,使用 Function2 对象将其归约以获取每个批次数据中单词的频率。最后,wordCounts.print()
将每秒打印一些生成的计数。
请注意,当这些行执行时,Spark Streaming 只是设置了它启动后将执行的计算,但尚未开始实际处理。要在所有转换设置完成后开始处理,我们最终调用 start
方法。
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
完整代码可以在 Spark Streaming 示例 JavaNetworkWordCount 中找到。
如果您已经下载并构建了 Spark,您可以按如下方式运行此示例。您首先需要使用以下命令运行 Netcat(大多数类 Unix 系统中都有的一个小型实用程序)作为数据服务器
$ nc -lk 9999
然后,在另一个终端中,您可以使用以下命令启动示例
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
然后,在运行 netcat 服务器的终端中输入的任何行都将每秒被计数并打印在屏幕上。它看起来会像下面这样。
|
|
基本概念
接下来,我们将超越简单示例,详细阐述 Spark Streaming 的基础知识。
链接
与 Spark 类似,Spark Streaming 可通过 Maven Central 获取。要编写您自己的 Spark Streaming 程序,您必须将以下依赖项添加到您的 SBT 或 Maven 项目中。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.13</artifactId>
<version>4.0.0</version>
<scope>provided</scope>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.13" % "4.0.0" % "provided"
要从 Spark Streaming 核心 API 中不存在的源(如 Kafka 和 Kinesis)摄取数据,您必须将相应的 artifact spark-streaming-xyz_2.13
添加到依赖项中。例如,一些常见的如下所示。
源 | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-10_2.13 |
Kinesis | spark-streaming-kinesis-asl_2.13 [亚马逊软件许可证] |
有关最新列表,请参阅 Maven 仓库,以获取支持的源和 artifact 的完整列表。
初始化 StreamingContext
要初始化 Spark Streaming 程序,必须创建一个 StreamingContext 对象,它是所有 Spark Streaming 功能的主要入口点。
可以从 StreamingContext 对象创建 SparkContext 对象。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
appName
参数是您的应用程序在集群 UI 上显示的名称。master
是一个 Spark 或 YARN 集群 URL,或者是一个特殊的 “local[*]” 字符串,用于在本地模式下运行。实际上,在集群上运行时,您不会想在程序中硬编码 master
,而是使用 spark-submit
启动应用程序并在那里接收它。然而,对于本地测试和单元测试,您可以传递“local[*]”以在进程内运行 Spark Streaming(检测本地系统中的核心数量)。
批处理间隔必须根据应用程序的延迟要求和可用的集群资源进行设置。有关更多详细信息,请参阅性能调优部分。
可以从 StreamingContext 对象创建 SparkConf 对象。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName
参数是您的应用程序在集群 UI 上显示的名称。master
是一个 Spark、Kubernetes 或 YARN 集群 URL,或者是一个特殊的 “local[*]” 字符串,用于在本地模式下运行。实际上,在集群上运行时,您不会想在程序中硬编码 master
,而是使用 spark-submit
启动应用程序并在那里接收它。然而,对于本地测试和单元测试,您可以传递“local[*]”以在进程内运行 Spark Streaming(检测本地系统中的核心数量)。请注意,这会在内部创建一个SparkContext(所有 Spark 功能的起点),可以通过 ssc.sparkContext
访问。
批处理间隔必须根据应用程序的延迟要求和可用的集群资源进行设置。有关更多详细信息,请参阅性能调优部分。
也可以从现有的 SparkContext
对象创建 StreamingContext
对象。
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
可以从 JavaStreamingContext 对象创建 SparkConf 对象。
import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
appName
参数是您的应用程序在集群 UI 上显示的名称。master
是一个 Spark 或 YARN 集群 URL,或者是一个特殊的 “local[*]” 字符串,用于在本地模式下运行。实际上,在集群上运行时,您不会想在程序中硬编码 master
,而是使用 spark-submit
启动应用程序并在那里接收它。然而,对于本地测试和单元测试,您可以传递“local[*]”以在进程内运行 Spark Streaming。请注意,这会在内部创建一个JavaSparkContext(所有 Spark 功能的起点),可以通过 ssc.sparkContext
访问。
批处理间隔必须根据应用程序的延迟要求和可用的集群资源进行设置。有关更多详细信息,请参阅性能调优部分。
也可以从现有的 JavaSparkContext
创建 JavaStreamingContext
对象。
import org.apache.spark.streaming.api.java.*;
JavaSparkContext sc = ... //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
定义上下文后,您必须执行以下操作。
- 通过创建输入 DStream 来定义输入源。
- 通过对 DStream 应用转换和输出操作来定义流式计算。
- 使用
streamingContext.start()
开始接收数据并进行处理。 - 使用
streamingContext.awaitTermination()
等待处理停止(手动或由于任何错误)。 - 可以使用
streamingContext.stop()
手动停止处理。
要记住的要点
- 一旦上下文启动,就不能设置或添加新的流式计算。
- 一旦上下文停止,就不能重新启动。
- 同一个 JVM 中只能同时激活一个 StreamingContext。
- 对 StreamingContext 调用 stop() 也会停止 SparkContext。要只停止 StreamingContext,请将
stop()
的可选参数stopSparkContext
设置为 false。 - 一个 SparkContext 可以重复使用来创建多个 StreamingContext,只要在上一个 StreamingContext 停止(但不停止 SparkContext)之后再创建下一个 StreamingContext。
离散化流 (DStreams)
离散化流或 DStream 是 Spark Streaming 提供的基本抽象。它表示一个连续的数据流,可以是来自源的输入数据流,也可以是通过转换输入流生成的处理后的数据流。在内部,DStream 由一系列连续的 RDD 表示,RDD 是 Spark 对不可变分布式数据集的抽象(有关更多详细信息,请参阅Spark 编程指南)。DStream 中的每个 RDD 都包含来自某个时间间隔的数据,如下图所示。
应用于 DStream 的任何操作都会转换为对底层 RDD 的操作。例如,在前面的示例中,将行流转换为单词,flatMap
操作应用于 lines
DStream 中的每个 RDD,以生成 words
DStream 的 RDD。如下图所示。
这些底层的 RDD 转换由 Spark 引擎计算。DStream 操作隐藏了大部分细节,并为开发人员提供了更高级的 API 以方便使用。这些操作将在后续章节中详细讨论。
输入 DStream 和接收器
输入 DStream 是表示从流源接收到的输入数据流的 DStream。在快速示例中,lines
是一个输入 DStream,因为它代表从 netcat 服务器接收到的数据流。每个输入 DStream(文件流除外,本节后面会讨论)都与一个接收器(Scala 文档,Java 文档)对象关联,该对象从源接收数据并将其存储在 Spark 的内存中进行处理。
Spark Streaming 提供两类内置流源。
- 基本源:直接在 StreamingContext API 中可用的源。示例:文件系统和套接字连接。
- 高级源:Kafka、Kinesis 等源通过额外的实用程序类提供。这些需要链接额外的依赖项,如链接部分所述。
我们将在本节后面讨论每类中存在的一些源。
请注意,如果您想在流式应用程序中并行接收多个数据流,您可以创建多个输入 DStream(在性能调优部分进一步讨论)。这将创建多个接收器,它们将同时接收多个数据流。但请注意,Spark worker/executor 是一个长时间运行的任务,因此它会占用分配给 Spark Streaming 应用程序的一个核心。因此,重要的是要记住,Spark Streaming 应用程序需要分配足够的核(或线程,如果本地运行)来处理接收到的数据,以及运行接收器。
要记住的要点
-
在本地运行 Spark Streaming 程序时,不要使用“local”或“local[1]”作为 master URL。其中任何一个都意味着本地运行任务将只使用一个线程。如果您使用的是基于接收器(例如 sockets、Kafka 等)的输入 DStream,那么单个线程将用于运行接收器,没有线程用于处理接收到的数据。因此,在本地运行时,始终使用“local[n]”作为 master URL,其中 n > 要运行的接收器数量(有关如何设置 master 的信息,请参阅Spark 属性)。
-
将逻辑扩展到在集群上运行,分配给 Spark Streaming 应用程序的核心数量必须多于接收器的数量。否则系统将接收到数据,但无法处理。
基本源
我们已经在快速示例中查看了 ssc.socketTextStream(...)
,它从通过 TCP 套接字连接接收的文本数据创建 DStream。除了套接字,StreamingContext API 还提供了从文件创建 DStream 作为输入源的方法。
文件流
要从与 HDFS API 兼容的任何文件系统(即 HDFS、S3、NFS 等)上的文件读取数据,可以通过 StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]
创建 DStream。
文件流不需要运行接收器,因此无需为接收文件数据分配任何核心。
对于简单的文本文件,最简单的方法是 StreamingContext.textFileStream(dataDirectory)
。
fileStream
在 Python API 中不可用;只有 textFileStream
可用。
streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
对于文本文件
streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
对于文本文件
streamingContext.textFileStream(dataDirectory);
如何监控目录
Spark Streaming 将监控目录 dataDirectory
并处理在该目录中创建的任何文件。
- 可以监控一个简单目录,例如
"hdfs://namenode:8040/logs/"
。在此路径下的所有文件都将在发现后立即处理。 - 可以提供一个POSIX glob 模式,例如
"hdfs://namenode:8040/logs/2017/*"
。在这里,DStream 将由匹配该模式的目录中的所有文件组成。也就是说:它是一个目录模式,而不是目录中文件的模式。 - 所有文件必须采用相同的数据格式。
- 文件是根据其修改时间而不是创建时间来确定属于哪个时间段的。
- 一旦处理,当前窗口内对文件的更改不会导致文件被重新读取。也就是说:更新将被忽略。
- 目录下的文件越多,扫描更改所需的时间就越长——即使没有文件被修改。
- 如果使用通配符来标识目录,例如
"hdfs://namenode:8040/logs/2016-*"
,将整个目录重命名以匹配路径将把该目录添加到受监控目录列表中。只有目录中修改时间在当前窗口内的文件才会被包含在流中。 - 调用
FileSystem.setTimes()
来修复时间戳是一种在后续窗口中获取文件的方法,即使其内容未更改。
使用对象存储作为数据源
像 HDFS 这样的“完整”文件系统通常在输出流创建后立即设置其文件的修改时间。当文件打开时,即使数据尚未完全写入,它也可能被包含在 DStream
中——此后对同一窗口内文件的更新将被忽略。也就是说:更改可能会被遗漏,数据可能会从流中省略。
为确保在一个窗口中捕获更改,请将文件写入未受监控的目录,然后,在输出流关闭后立即,将其重命名到目标目录。如果重命名后的文件在其创建窗口期间出现在扫描的目标目录中,则新数据将被捕获。
相比之下,Amazon S3 和 Azure Storage 等对象存储通常重命名操作缓慢,因为数据实际上是被复制的。此外,重命名后的对象可能将其 rename()
操作的时间作为其修改时间,因此可能不被视为原始创建时间所隐含的窗口的一部分。
需要对目标对象存储进行仔细测试,以验证该存储的时间戳行为与 Spark Streaming 预期的一致。通过选定的对象存储进行流式传输数据时,直接写入目标目录可能是一种合适的策略。
有关此主题的更多详细信息,请查阅Hadoop 文件系统规范。
基于自定义接收器的流
DStream 可以通过自定义接收器接收的数据流创建。有关更多详细信息,请参阅自定义接收器指南。
将 RDD 队列作为流
要使用测试数据测试 Spark Streaming 应用程序,还可以使用 streamingContext.queueStream(queueOfRDDs)
基于 RDD 队列创建 DStream。推入队列的每个 RDD 都将被视为 DStream 中的一批数据,并像流一样处理。
有关来自套接字和文件的流的更多详细信息,请参阅 Python StreamingContext、Scala StreamingContext 和 Java JavaStreamingContext 中相关函数的 API 文档。
高级源
Python API 截至 Spark 4.0.0,在这些源中,Kafka 和 Kinesis 在 Python API 中可用。
这类源需要与外部非 Spark 库进行接口,其中一些具有复杂的依赖关系(例如 Kafka)。因此,为了最大限度地减少与依赖项版本冲突相关的问题,从这些源创建 DStream 的功能已移至单独的库,这些库可以在必要时显式链接。
请注意,这些高级源在 Spark shell 中不可用,因此基于这些高级源的应用程序无法在 shell 中测试。如果您确实想在 Spark shell 中使用它们,您将不得不下载相应的 Maven artifact 的 JAR 及其依赖项,并将其添加到 classpath 中。
其中一些高级源如下所示。
-
Kafka: Spark Streaming 4.0.0 与 Kafka broker 0.10 或更高版本兼容。有关更多详细信息,请参阅Kafka 集成指南。
-
Kinesis: Spark Streaming 4.0.0 与 Kinesis Client Library 1.2.1 兼容。有关更多详细信息,请参阅Kinesis 集成指南。
自定义源
Python API Python 暂不支持此功能。
输入 DStream 也可以从自定义数据源创建。您所要做的就是实现一个用户定义的接收器(请参阅下一节了解其含义),它可以从自定义源接收数据并将其推送到 Spark 中。有关详细信息,请参阅自定义接收器指南。
接收器可靠性
根据可靠性,数据源可以分为两种。某些源(如 Kafka)允许对传输的数据进行确认。如果从这些可靠源接收数据的系统正确确认了接收到的数据,则可以确保不会因任何类型的故障而丢失数据。这导致了两种接收器
- 可靠接收器 - 可靠接收器在数据被接收并以复制方式存储在 Spark 中时,会正确地向可靠源发送确认。
- 不可靠接收器 - 不可靠接收器不向源发送确认。这可用于不支持确认的源,甚至可用于可靠源,当用户不想或不需要处理确认的复杂性时。
有关如何编写可靠接收器的详细信息,请参阅自定义接收器指南。
DStream 上的转换操作
与 RDD 类似,转换允许修改输入 DStream 中的数据。DStream 支持正常 Spark RDD 上的许多可用转换。一些常见的如下所示。
转换 | 含义 |
---|---|
map(func) | 通过将源 DStream 的每个元素传递给函数 func 来返回一个新的 DStream。 |
flatMap(func) | 类似于 map,但每个输入项可以映射到 0 个或更多输出项。 |
filter(func) | 通过仅选择源 DStream 中 func 返回 true 的记录来返回新的 DStream。 |
repartition(numPartitions) | 通过创建更多或更少的分区来改变此 DStream 的并行度。 |
union(otherStream) | 返回一个包含源 DStream 和 otherDStream 中元素并集的新 DStream。 |
count() | 通过计数源 DStream 中每个 RDD 的元素数量,返回一个由单元素 RDD 组成的新 DStream。 |
reduce(func) | 通过使用函数 func(该函数接受两个参数并返回一个)聚合源 DStream 中每个 RDD 的元素,返回一个由单元素 RDD 组成的新 DStream。该函数应该是结合律和交换律的,以便可以并行计算。 |
countByValue() | 当在类型为 K 的 DStream 元素上调用时,返回一个 (K, Long) 对的新 DStream,其中每个键的值是其在源 DStream 的每个 RDD 中的频率。 |
reduceByKey(func, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个 (K, V) 对的新 DStream,其中每个键的值使用给定的 reduce 函数进行聚合。注意: 默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,集群模式下数量由配置属性 spark.default.parallelism 决定)进行分组。您可以传递一个可选的 numTasks 参数来设置不同的任务数量。 |
join(otherStream, [numTasks]) | 当在两个 (K, V) 和 (K, W) 对的 DStream 上调用时,返回一个 (K, (V, W)) 对的新 DStream,其中包含每个键的所有元素对。 |
cogroup(otherStream, [numTasks]) | 当在 (K, V) 和 (K, W) 对的 DStream 上调用时,返回一个 (K, Seq[V], Seq[W]) 元组的新 DStream。 |
transform(func) | 通过对源 DStream 的每个 RDD 应用 RDD 到 RDD 的函数来返回新的 DStream。这可用于在 DStream 上执行任意 RDD 操作。 |
updateStateByKey(func) | 返回一个新的“状态”DStream,其中每个键的状态通过对键的先前状态和键的新值应用给定函数来更新。这可用于为每个键维护任意状态数据。 |
其中一些转换值得更详细地讨论。
UpdateStateByKey 操作
updateStateByKey
操作允许您维护任意状态,同时不断用新信息更新它。要使用它,您需要执行两个步骤。
- 定义状态 - 状态可以是任意数据类型。
- 定义状态更新函数 - 使用一个函数指定如何使用前一个状态和输入流中的新值来更新状态。
在每个批次中,Spark 将对所有现有键应用状态更新函数,无论它们在批次中是否有新数据。如果更新函数返回 None
,则键值对将被消除。
让我们用一个例子来说明这一点。假设您想维护文本数据流中每个单词的运行计数。在这里,运行计数是状态,它是一个整数。我们将更新函数定义为
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
这应用于包含单词的 DStream(例如,前面的示例中包含 (word, 1)
对的 pairs
DStream)。
runningCounts = pairs.updateStateByKey(updateFunction)
更新函数将为每个单词调用,其中 newValues
包含一系列 1(来自 (word, 1)
对),runningCount
包含前一个计数。有关完整的 Python 代码,请查看示例 stateful_network_wordcount.py。
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
这应用于包含单词的 DStream(例如,前面的示例中包含 (word, 1)
对的 pairs
DStream)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
更新函数将为每个单词调用,其中 newValues
包含一系列 1(来自 (word, 1)
对),runningCount
包含前一个计数。
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
Integer newSum = ... // add the new values with the previous running count to get the new count
return Optional.of(newSum);
};
这应用于包含单词的 DStream(例如,快速示例中包含 (word, 1)
对的 pairs
DStream)。
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
更新函数将为每个单词调用,其中 newValues
包含一系列 1(来自 (word, 1)
对),runningCount
包含前一个计数。有关完整的 Java 代码,请查看示例 JavaStatefulNetworkWordCount.java。
请注意,使用 updateStateByKey
需要配置检查点目录,这将在检查点部分详细讨论。
Transform 操作
transform
操作(及其变体如 transformWith
)允许在 DStream 上应用任意 RDD 到 RDD 的函数。它可用于应用 DStream API 中未公开的任何 RDD 操作。例如,将数据流中的每个批次与另一个数据集连接的功能在 DStream API 中没有直接公开。但是,您可以轻松使用 transform
来实现这一点。这带来了非常强大的可能性。例如,可以通过将输入数据流与预计算的垃圾邮件信息(也可能是用 Spark 生成的)连接,然后基于它进行过滤来执行实时数据清理。
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
});
请注意,提供的函数在每个批次间隔都会被调用。这允许您执行时变 RDD 操作,也就是说,RDD 操作、分区数量、广播变量等可以在批次之间更改。
窗口操作
Spark Streaming 还提供了窗口计算,它允许您对数据滑动窗口应用转换。下图说明了此滑动窗口。
如图所示,每次窗口在源 DStream 上滑动时,落在窗口内的源 RDDs 会被组合并进行操作,以生成窗口化 DStream 的 RDDs。在这个特定情况下,操作应用于最后 3 个时间单位的数据,并滑动 2 个时间单位。这表明任何窗口操作都需要指定两个参数。
- 窗口长度 - 窗口的持续时间(图中为 3)。
- 滑动间隔 - 执行窗口操作的间隔(图中为 2)。
这两个参数必须是源 DStream 批处理间隔(图中为 1)的倍数。
让我们用一个例子来说明窗口操作。假设您想扩展前面的示例,每 10 秒生成过去 30 秒数据的单词计数。为此,我们必须对 pairs
DStream 中过去 30 秒数据的 (word, 1)
对应用 reduceByKey
操作。这通过使用 reduceByKeyAndWindow
操作完成。
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
一些常见的窗口操作如下。所有这些操作都接受前面提到的两个参数——windowLength 和 slideInterval。
转换 | 含义 |
---|---|
window(windowLength, slideInterval) | 返回一个基于源 DStream 的窗口化批次计算的新 DStream。 |
countByWindow(windowLength, slideInterval) | 返回流中元素的滑动窗口计数。 |
reduceByWindow(func, windowLength, slideInterval) | 返回一个新单元素流,通过使用 func 在滑动间隔内聚合流中的元素来创建。该函数应是结合律和交换律的,以便可以并行正确计算。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个 (K, V) 对的新 DStream,其中每个键的值使用给定的 reduce 函数 func 在滑动窗口中的批次上进行聚合。注意: 默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,集群模式下数量由配置属性 spark.default.parallelism 决定)进行分组。您可以传递一个可选的 numTasks 参数来设置不同的任务数量。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) |
这是上面 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 当在 (K, V) 对的 DStream 上调用时,返回一个 (K, Long) 对的新 DStream,其中每个键的值是其在滑动窗口中的频率。与 reduceByKeyAndWindow 类似,归约任务的数量可以通过可选参数进行配置。 |
Join 操作
最后,值得强调的是,在 Spark Streaming 中执行不同类型的连接是多么容易。
流-流连接
流可以非常容易地与其他流连接。
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
在这里,在每个批处理间隔中,由 stream1
生成的 RDD 将与由 stream2
生成的 RDD 进行连接。您还可以执行 leftOuterJoin
、rightOuterJoin
、fullOuterJoin
。此外,对流的窗口进行连接通常非常有用。这也很容易实现。
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
流-数据集连接
这在前面解释 DStream.transform
操作时已经展示过。这是将窗口流与数据集连接的另一个示例。
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
实际上,您还可以动态更改要连接的数据集。提供给 transform
的函数在每个批处理间隔都会进行评估,因此它将使用 dataset
引用指向的当前数据集。
DStream 转换的完整列表可在 API 文档中找到。对于 Python API,请参阅 DStream。对于 Scala API,请参阅 DStream 和 PairDStreamFunctions。对于 Java API,请参阅 JavaDStream 和 JavaPairDStream。
DStream 上的输出操作
输出操作允许将 DStream 的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统使用转换后的数据,它们会触发所有 DStream 转换的实际执行(类似于 RDD 的动作)。目前,定义了以下输出操作
输出操作 | 含义 |
---|---|
print() | 在运行流式应用程序的驱动程序节点上打印 DStream 中每批数据的前十个元素。这对于开发和调试很有用。 Python API 在 Python API 中,这被称为 pprint()。 |
saveAsTextFiles(prefix, [suffix]) | 将此 DStream 的内容保存为文本文件。每个批次间隔的文件名根据 prefix 和 suffix 生成:"prefix-TIME_IN_MS[.suffix]"。 |
saveAsObjectFiles(prefix, [suffix]) | 将此 DStream 的内容保存为序列化 Java 对象的 SequenceFiles 。每个批次间隔的文件名根据 prefix 和 suffix 生成:"prefix-TIME_IN_MS[.suffix]"。Python API Python API 不支持此功能。 |
saveAsHadoopFiles(prefix, [suffix]) | 将此 DStream 的内容保存为 Hadoop 文件。每个批次间隔的文件名根据 prefix 和 suffix 生成:"prefix-TIME_IN_MS[.suffix]"。 Python API Python API 不支持此功能。 |
foreachRDD(func) | 最通用的输出操作符,将函数 func 应用于从流生成的每个 RDD。此函数应将每个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件,或通过网络写入数据库。请注意,函数 func 在运行流式应用程序的驱动程序进程中执行,并且通常包含 RDD 动作,这些动作将强制计算流式 RDD。 |
使用 foreachRDD 的设计模式
dstream.foreachRDD
是一个强大的原语,允许将数据发送到外部系统。然而,了解如何正确高效地使用此原语非常重要。一些常见的要避免的错误如下。
通常,将数据写入外部系统需要创建一个连接对象(例如到远程服务器的 TCP 连接)并使用它将数据发送到远程系统。为此,开发人员可能会无意中尝试在 Spark 驱动程序上创建连接对象,然后尝试在 Spark worker 中使用它来保存 RDD 中的记录。例如(在 Scala 中),
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
dstream.foreachRDD(rdd -> {
Connection connection = createNewConnection(); // executed at the driver
rdd.foreach(record -> {
connection.send(record); // executed at the worker
});
});
这是不正确的,因为它需要将连接对象序列化并从驱动程序发送到 worker。此类连接对象很少能在机器之间传输。此错误可能表现为序列化错误(连接对象不可序列化)、初始化错误(连接对象需要在 worker 上初始化)等。正确的解决方案是在 worker 上创建连接对象。
然而,这可能导致另一个常见错误——为每条记录创建一个新连接。例如,
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
dstream.foreachRDD(rdd -> {
rdd.foreach(record -> {
Connection connection = createNewConnection();
connection.send(record);
connection.close();
});
});
通常,创建连接对象会产生时间和资源开销。因此,为每条记录创建和销毁连接对象可能会导致不必要的高开销,并显著降低系统的整体吞吐量。一个更好的解决方案是使用 rdd.foreachPartition
——创建一个单独的连接对象,并使用该连接发送 RDD 分区中的所有记录。
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
});
});
这分摊了连接创建开销,使其作用于多条记录。
最后,这可以通过在多个 RDD/批次之间重用连接对象来进一步优化。可以维护一个连接对象的静态池,当多个批次的 RDD 被推送到外部系统时,这些连接对象可以被重用,从而进一步减少开销。
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
// ConnectionPool is a static, lazily initialized pool of connections
Connection connection = ConnectionPool.getConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
ConnectionPool.returnConnection(connection); // return to the pool for future reuse
});
});
请注意,池中的连接应按需延迟创建,如果一段时间未使用则超时。这样可以实现向外部系统发送数据的最高效率。
其他要记住的要点
-
DStream 由输出操作惰性执行,就像 RDD 由 RDD 动作惰性执行一样。具体来说,DStream 输出操作内部的 RDD 动作会强制处理接收到的数据。因此,如果您的应用程序没有任何输出操作,或者有像
dstream.foreachRDD()
这样的输出操作但其内部没有任何 RDD 动作,那么什么都不会被执行。系统将只是接收数据并丢弃它。 -
默认情况下,输出操作是逐个执行的。并且它们按照在应用程序中定义的顺序执行。
DataFrame 和 SQL 操作
您可以轻松地在流数据上使用DataFrames 和 SQL 操作。您必须使用 StreamingContext 正在使用的 SparkContext 来创建 SparkSession。此外,必须以可以在驱动程序故障时重新启动的方式完成此操作。这通过创建 SparkSession 的惰性实例化单例来实现。以下示例展示了这一点。它修改了之前的单词计数示例,使用 DataFrames 和 SQL 生成单词计数。每个 RDD 都转换为 DataFrame,注册为临时表,然后使用 SQL 查询。
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
...
# DataFrame operations inside your streaming program
words = ... # DStream of strings
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = spark.createDataFrame(rowRdd)
# Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass
words.foreachRDD(process)
查看完整的源代码。
/** DataFrame operations inside your streaming program */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
查看完整的源代码。
/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
private String word;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
...
/** DataFrame operations inside your streaming program */
JavaDStream<String> words = ...
words.foreachRDD((rdd, time) -> {
// Get the singleton instance of SparkSession
SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
JavaRow record = new JavaRow();
record.setWord(word);
return record;
});
DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words");
// Do word count on table using SQL and print it
DataFrame wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word");
wordCountsDataFrame.show();
});
查看完整的源代码。
您还可以在不同线程(即与运行中的 StreamingContext 异步)上对流数据上定义的表运行 SQL 查询。只需确保您将 StreamingContext 设置为记住足够多的流数据,以便查询可以运行。否则,StreamingContext(它不知道任何异步 SQL 查询)将在查询完成之前删除旧的流数据。例如,如果您想查询最后一个批次,但您的查询可能需要 5 分钟才能运行,那么请调用 streamingContext.remember(Minutes(5))
(在 Scala 中,或其他语言中的等效方法)。
请参阅DataFrames 和 SQL 指南以了解更多关于 DataFrames 的信息。
MLlib 操作
您还可以轻松使用 MLlib 提供的机器学习算法。首先,有流式机器学习算法(例如流式线性回归、流式 KMeans 等),它们可以同时从流数据中学习并应用于流数据上的模型。除此之外,对于更广泛的机器学习算法类别,您可以在离线(即使用历史数据)学习一个学习模型,然后在线将其应用于流数据。有关更多详细信息,请参阅MLlib 指南。
缓存 / 持久化
与 RDD 类似,DStream 也允许开发人员将流的数据持久化到内存中。也就是说,在 DStream 上使用 persist()
方法将自动将该 DStream 的每个 RDD 持久化到内存中。如果 DStream 中的数据将被多次计算(例如,对相同数据执行多个操作),这将很有用。对于基于窗口的操作,如 reduceByWindow
和 reduceByKeyAndWindow
,以及基于状态的操作,如 updateStateByKey
,情况隐式如此。因此,由基于窗口的操作生成的 DStream 会自动持久化到内存中,而无需开发人员调用 persist()
。
对于通过网络接收数据的输入流(如 Kafka、套接字等),默认的持久化级别设置为将数据复制到两个节点以实现容错。
请注意,与 RDD 不同,DStream 的默认持久化级别将数据序列化后保存在内存中。这将在性能调优部分进一步讨论。有关不同持久化级别的更多信息,请参阅Spark 编程指南。
检查点
流式应用程序必须 24/7 运行,因此必须能够抵御与应用程序逻辑无关的故障(例如系统故障、JVM 崩溃等)。为了实现这一点,Spark Streaming 需要将足够的信息检查点到容错存储系统,以便能够从故障中恢复。有两种类型的数据会进行检查点。
- 元数据检查点 - 将定义流式计算的信息保存到 HDFS 等容错存储中。这用于从运行流式应用程序驱动程序的节点的故障中恢复(稍后详细讨论)。元数据包括
- 配置 - 用于创建流式应用程序的配置。
- DStream 操作 - 定义流式应用程序的 DStream 操作集。
- 未完成的批次 - 任务已排队但尚未完成的批次。
- 数据检查点 - 将生成的 RDD 保存到可靠存储中。这在一些跨多个批次组合数据的有状态转换中是必要的。在这种转换中,生成的 RDD 依赖于前一个批次的 RDD,这导致依赖链的长度随时间不断增加。为了避免恢复时间(与依赖链成比例)的这种无限制增长,有状态转换的中间 RDD 会定期检查点到可靠存储(例如 HDFS)以切断依赖链。
总而言之,元数据检查点主要用于从驱动程序故障中恢复,而数据或 RDD 检查点即使在使用了有状态转换时也是基本功能所必需的。
何时启用检查点
满足以下任何要求的应用程序都必须启用检查点
- 使用有状态转换 - 如果应用程序中使用了
updateStateByKey
或reduceByKeyAndWindow
(带逆函数),则必须提供检查点目录以允许周期性 RDD 检查点。 - 从运行应用程序的驱动程序故障中恢复 - 元数据检查点用于恢复进度信息。
请注意,没有上述有状态转换的简单流式应用程序可以在不启用检查点的情况下运行。在这种情况下,从驱动程序故障中恢复也将是部分恢复(一些已接收但未处理的数据可能会丢失)。这通常是可以接受的,许多人以这种方式运行 Spark Streaming 应用程序。对非 Hadoop 环境的支持预计将在未来得到改善。
如何配置检查点
通过在容错、可靠的文件系统(例如 HDFS、S3 等)中设置一个目录来启用检查点,检查点信息将保存到该目录中。这通过使用 streamingContext.checkpoint(checkpointDirectory)
完成。这将允许您使用上述有状态转换。此外,如果您希望应用程序从驱动程序故障中恢复,您应该重写您的流式应用程序以具有以下行为。
- 当程序第一次启动时,它将创建一个新的 StreamingContext,设置所有流,然后调用 start()。
- 当程序在故障后重新启动时,它将从检查点目录中的检查点数据重新创建 StreamingContext。
通过使用 StreamingContext.getOrCreate
,此行为变得简单。其用法如下。
# Function to create and setup a new StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # set checkpoint directory
return ssc
# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...
# Start the context
context.start()
context.awaitTermination()
如果 checkpointDirectory
存在,则上下文将从检查点数据重新创建。如果目录不存在(即第一次运行),则将调用函数 functionToCreateContext
来创建新上下文并设置 DStream。请参阅 Python 示例 recoverable_network_wordcount.py。此示例将网络数据的单词计数追加到文件中。
您还可以从检查点数据显式创建 StreamingContext
,并通过使用 StreamingContext.getOrCreate(checkpointDirectory, None)
启动计算。
通过使用 StreamingContext.getOrCreate
,此行为变得简单。其用法如下。
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果 checkpointDirectory
存在,则上下文将从检查点数据重新创建。如果目录不存在(即第一次运行),则将调用函数 functionToCreateContext
来创建新上下文并设置 DStream。请参阅 Scala 示例 RecoverableNetworkWordCount。此示例将网络数据的单词计数追加到文件中。
通过使用 JavaStreamingContext.getOrCreate
,此行为变得简单。其用法如下。
// Create a factory object that can create and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
...
jssc.checkpoint(checkpointDirectory); // set checkpoint directory
return jssc;
}
};
// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start();
context.awaitTermination();
如果 checkpointDirectory
存在,则上下文将从检查点数据重新创建。如果目录不存在(即第一次运行),则将调用函数 contextFactory
来创建新上下文并设置 DStream。请参阅 Java 示例 JavaRecoverableNetworkWordCount。此示例将网络数据的单词计数追加到文件中。
除了使用 getOrCreate
之外,还需要确保驱动程序进程在故障时自动重新启动。这只能由用于运行应用程序的部署基础设施来完成。这将在部署部分进一步讨论。
请注意,RDD 的检查点操作会产生保存到可靠存储的成本。这可能会导致那些 RDD 进行检查点的批次的 processing time 增加。因此,检查点间隔需要仔细设置。在小批次大小(例如 1 秒)下,每批次都进行检查点可能会显著降低操作吞吐量。相反,检查点过于不频繁会导致血缘和任务大小增长,这可能会产生有害影响。对于需要 RDD 检查点的有状态转换,默认间隔是批次间隔的倍数,至少为 10 秒。可以通过使用 dstream.checkpoint(checkpointInterval)
进行设置。通常,将 DStream 的检查点间隔设置为 5 到 10 个滑动间隔是一个不错的尝试。
累加器、广播变量和检查点
在 Spark Streaming 中,累加器和广播变量无法从检查点恢复。如果您启用了检查点并同时使用了累加器或广播变量,则必须为累加器和广播变量创建惰性实例化的单例实例,以便它们在驱动程序故障重启后可以重新实例化。以下示例展示了这一点。
def getWordExcludeList(sparkContext):
if ("wordExcludeList" not in globals()):
globals()["wordExcludeList"] = sparkContext.broadcast(["a", "b", "c"])
return globals()["wordExcludeList"]
def getDroppedWordsCounter(sparkContext):
if ("droppedWordsCounter" not in globals()):
globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
return globals()["droppedWordsCounter"]
def echo(time, rdd):
# Get or register the excludeList Broadcast
excludeList = getWordExcludeList(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)
# Use excludeList to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in excludeList.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True
counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
wordCounts.foreachRDD(echo)
查看完整的源代码。
object WordExcludeList {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordExcludeList = Seq("a", "b", "c")
instance = sc.broadcast(wordExcludeList)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("DroppedWordsCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the excludeList Broadcast
val excludeList = WordExcludeList.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use excludeList to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (excludeList.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
查看完整的源代码。
class JavaWordExcludeList {
private static volatile Broadcast<List<String>> instance = null;
public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordExcludeList.class) {
if (instance == null) {
List<String> wordExcludeList = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordExcludeList);
}
}
}
return instance;
}
}
class JavaDroppedWordsCounter {
private static volatile LongAccumulator instance = null;
public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.sc().longAccumulator("DroppedWordsCounter");
}
}
}
return instance;
}
}
wordCounts.foreachRDD((rdd, time) -> {
// Get or register the excludeList Broadcast
Broadcast<List<String>> excludeList = JavaWordExcludeList.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use excludeList to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(wordCount -> {
if (excludeList.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}
查看完整的源代码。
部署应用程序
本节讨论部署 Spark Streaming 应用程序的步骤。
要求
要运行 Spark Streaming 应用程序,您需要具备以下条件。
-
带有集群管理器的集群 - 这是任何 Spark 应用程序的通用要求,在部署指南中详细讨论。
-
打包应用程序 JAR - 您必须将您的流式应用程序编译成一个 JAR。如果您使用
spark-submit
来启动应用程序,那么您无需在 JAR 中提供 Spark 和 Spark Streaming。然而,如果您的应用程序使用高级源(例如 Kafka),那么您将不得不将它们链接到的额外 artifact 及其依赖项打包到用于部署应用程序的 JAR 中。例如,使用KafkaUtils
的应用程序将必须在应用程序 JAR 中包含spark-streaming-kafka-0-10_2.13
及其所有传递依赖项。 -
为执行器配置足够的内存 - 由于接收到的数据必须存储在内存中,因此必须为执行器配置足够的内存来保存接收到的数据。请注意,如果您正在执行 10 分钟的窗口操作,系统必须在内存中至少保留过去 10 分钟的数据。因此,应用程序的内存需求取决于其中使用的操作。
-
配置检查点 - 如果流式应用程序需要,则必须在与 Hadoop API 兼容的容错存储(例如 HDFS、S3 等)中配置一个目录作为检查点目录,并且流式应用程序的编写方式应使其检查点信息可用于故障恢复。有关更多详细信息,请参阅检查点部分。
- 配置应用程序驱动程序的自动重启 - 为了从驱动程序故障中自动恢复,用于运行流式应用程序的部署基础设施必须监控驱动程序进程,并在驱动程序失败时重新启动它。不同的集群管理器有不同的工具来实现这一点。
- Spark Standalone - Spark 应用程序驱动程序可以提交到 Spark Standalone 集群中运行(参阅集群部署模式),也就是说,应用程序驱动程序本身在其中一个 worker 节点上运行。此外,可以指示 Standalone 集群管理器监督驱动程序,并在驱动程序因非零退出代码或运行驱动程序的节点故障而失败时重新启动它。有关更多详细信息,请参阅Spark Standalone 指南中的集群模式和监督。
- YARN - YARN 支持类似的机制来自动重启应用程序。请参阅 YARN 文档了解更多详细信息。
-
配置预写日志 - 自 Spark 1.2 以来,我们引入了预写日志以实现强大的容错保证。如果启用,从接收器接收到的所有数据都会写入配置的检查点目录中的预写日志。这可以防止驱动程序恢复时数据丢失,从而确保零数据丢失(在容错语义部分详细讨论)。这可以通过将配置参数
spark.streaming.receiver.writeAheadLog.enable
设置为true
来启用。然而,这些更强的语义可能会以牺牲单个接收器的接收吞吐量为代价。这可以通过并行运行更多接收器来增加总吞吐量来纠正。此外,建议在启用预写日志时禁用 Spark 内接收数据的复制,因为日志已经存储在复制存储系统中。这可以通过将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER
来完成。在使用 S3(或任何不支持 flushing 的文件系统)作为预写日志时,请记住启用spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
。有关更多详细信息,请参阅Spark Streaming 配置。请注意,当 I/O 加密启用时,Spark 不会对写入预写日志的数据进行加密。如果需要对预写日志数据进行加密,则应将其存储在原生支持加密的文件系统中。 - 设置最大接收速率 - 如果集群资源不足以让流式应用程序以与接收速度一样快的速度处理数据,则可以通过设置记录/秒的最大速率限制来限制接收器的速率。请参阅接收器的配置参数
spark.streaming.receiver.maxRate
和 Direct Kafka 方法的spark.streaming.kafka.maxRatePerPartition
。在 Spark 1.5 中,我们引入了一个名为反压的功能,它消除了设置此速率限制的需要,因为 Spark Streaming 会自动找出速率限制并在处理条件发生变化时动态调整它们。可以通过将配置参数spark.streaming.backpressure.enabled
设置为true
来启用此反压。
升级应用程序代码
如果正在运行的 Spark Streaming 应用程序需要使用新的应用程序代码进行升级,则有两种可能的机制。
-
升级后的 Spark Streaming 应用程序与现有应用程序并行启动和运行。一旦新的应用程序(接收与旧应用程序相同的数据)预热并准备就绪,旧的应用程序就可以停止。请注意,这适用于支持将数据发送到两个目的地(即早期应用程序和升级后的应用程序)的数据源。
-
现有应用程序将优雅地关闭(参见
StreamingContext.stop(...)
或JavaStreamingContext.stop(...)
以了解优雅关闭选项),这确保已接收的数据在关闭前完全处理。然后可以启动升级后的应用程序,它将从先前应用程序停止的点开始处理。请注意,这只能与支持源端缓冲(如 Kafka)的输入源一起完成,因为在先前应用程序停止且升级后的应用程序尚未启动时,数据需要进行缓冲。并且无法从升级前代码的早期检查点信息重新启动。检查点信息本质上包含序列化的 Scala/Java/Python 对象,尝试使用新的、修改过的类反序列化对象可能会导致错误。在这种情况下,要么使用不同的检查点目录启动升级后的应用程序,要么删除先前的检查点目录。
监控应用程序
除了 Spark 的监控功能之外,还有一些特定于 Spark Streaming 的附加功能。当使用 StreamingContext 时,Spark Web UI 会显示一个额外的 Streaming
选项卡,其中显示有关运行中的接收器(接收器是否处于活动状态、接收到的记录数、接收器错误等)和已完成批次(批处理时间、排队延迟等)的统计信息。这可用于监控流式应用程序的进度。
Web UI 中以下两个指标尤其重要
- 处理时间 - 处理每批数据所需的时间。
- 调度延迟 - 批次在队列中等待前一批次处理完成的时间。
如果批处理时间持续超过批处理间隔和/或排队延迟不断增加,则表明系统无法像生成批次那样快速处理批次,并且正在落后。在这种情况下,请考虑减少批处理时间。
Spark Streaming 程序的进度也可以使用 StreamingListener 接口进行监控,该接口允许您获取接收器状态和处理时间。请注意,这是一个开发人员 API,未来有望得到改进(即报告更多信息)。
性能调优
在集群上获得 Spark Streaming 应用程序的最佳性能需要进行一些调优。本节解释了一些可以调整的参数和配置,以提高应用程序的性能。从宏观上看,您需要考虑两件事
-
通过高效利用集群资源,减少每批数据的处理时间。
-
设置正确的批次大小,使数据批次能够以与接收速度一样快的速度进行处理(即数据处理跟上数据摄取)。
减少批处理时间
在 Spark 中可以进行许多优化来最小化每个批次的处理时间。这些已在调优指南中详细讨论。本节重点介绍其中一些最重要的。
数据接收中的并行度
通过网络(如Kafka、socket等)接收数据需要将数据反序列化并存储到Spark中。如果数据接收成为系统的瓶颈,则考虑并行化数据接收。请注意,每个输入DStream都会创建一个接收器(运行在工作机器上),它接收单个数据流。因此,可以通过创建多个输入DStream并配置它们以从源接收数据流的不同分区来实现接收多个数据流。例如,一个接收两个主题数据的Kafka输入DStream可以分成两个Kafka输入流,每个流只接收一个主题。这将运行两个接收器,从而实现并行接收数据,提高整体吞吐量。这些多个DStream可以合并在一起,创建一个单一的DStream。然后,之前应用于单个输入DStream的转换可以应用于这个统一的流。操作如下。
numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
另一个需要考虑的参数是接收器的块间隔,它由配置参数 spark.streaming.blockInterval
决定。对于大多数接收器,接收到的数据在存储到Spark内存中之前会合并成数据块。每个批次中的块数量决定了在类似map的转换中处理接收到的数据时将使用的任务数量。每个接收器每个批次的任务数量大约为(批处理间隔 / 块间隔)。例如,200毫秒的块间隔将在每2秒的批次中创建10个任务。如果任务数量过低(即,小于每台机器的核心数量),则效率会很低,因为并非所有可用核心都将用于处理数据。为了增加给定批处理间隔的任务数量,请减小块间隔。然而,建议的块间隔最小值为约50毫秒,低于此值可能会出现任务启动开销问题。
接收数据的另一种方法是使用多个输入流/接收器,或者明确地对输入数据流进行重新分区(使用inputStream.repartition(<number of partitions>)
)。这会在进一步处理之前,将接收到的数据批次分发到集群中指定数量的机器上。
对于直接流,请参考Spark Streaming + Kafka 集成指南
数据处理的并行度
如果计算的任何阶段中使用的并行任务数量不够高,集群资源可能会利用不足。例如,对于像reduceByKey
和reduceByKeyAndWindow
这样的分布式归约操作,默认的并行任务数量由spark.default.parallelism
配置属性控制。您可以将并行度作为参数传递(参见PairDStreamFunctions
文档),或者设置spark.default.parallelism
配置属性来更改默认值。
数据序列化
可以通过调整序列化格式来减少数据序列化的开销。在流式处理中,有两种类型的数据正在被序列化。
-
输入数据:默认情况下,通过接收器接收的输入数据以StorageLevel.MEMORY_AND_DISK_SER_2存储在执行器的内存中。也就是说,数据被序列化为字节以减少GC开销,并进行复制以容忍执行器故障。此外,数据首先保留在内存中,只有当内存不足以容纳流式计算所需的所有输入数据时,才会溢写到磁盘。这种序列化显然有开销——接收器必须反序列化接收到的数据,然后使用Spark的序列化格式重新序列化。
-
流式操作生成的持久化RDD:流式计算生成的RDD可能会持久化在内存中。例如,窗口操作会将数据持久化在内存中,因为它们会被多次处理。然而,与Spark Core默认的StorageLevel.MEMORY_ONLY不同,流式计算生成的持久化RDD默认以StorageLevel.MEMORY_ONLY_SER(即序列化)持久化,以最大程度地减少GC开销。
在这两种情况下,使用Kryo序列化都可以减少CPU和内存开销。有关更多详细信息,请参见Spark调优指南。对于Kryo,请考虑注册自定义类,并禁用对象引用跟踪(参见配置指南中与Kryo相关的配置)。
在特定情况下,如果流应用程序需要保留的数据量不大,则可以将数据(两种类型)作为反序列化对象进行持久化,而不会产生过多的GC开销。例如,如果您使用几秒的批处理间隔且没有窗口操作,那么您可以尝试通过显式设置存储级别来禁用持久化数据中的序列化。这将减少序列化导致的CPU开销,从而可能在没有太多GC开销的情况下提高性能。
设置合适的批处理间隔
为了使运行在集群上的Spark Streaming应用程序保持稳定,系统应该能够以与接收数据一样快的速度处理数据。换句话说,数据批次应该以生成它们的速度一样快地被处理。对于一个应用程序来说,这是否属实可以通过监控流式Web UI中的处理时间来发现,其中批处理时间应该小于批处理间隔。
根据流式计算的性质,所使用的批处理间隔可能会对应用程序在固定集群资源集上可维持的数据速率产生显著影响。例如,让我们考虑之前的WordCountNetwork示例。对于特定的数据速率,系统可能能够每2秒报告一次单词计数(即,批处理间隔为2秒),但无法每500毫秒报告一次。因此,需要设置批处理间隔,以使生产中的预期数据速率能够持续维持。
确定应用程序正确批次大小的一个好方法是使用保守的批处理间隔(例如,5-10秒)和较低的数据速率进行测试。为了验证系统是否能够跟上数据速率,您可以检查每个已处理批次所经历的端到端延迟值(可以查看Spark驱动器log4j日志中的“Total delay”,或使用StreamingListener接口)。如果延迟保持与批次大小相当,则系统是稳定的。否则,如果延迟持续增加,则表示系统无法跟上,因此不稳定。一旦您对稳定配置有了概念,就可以尝试增加数据速率和/或减小批次大小。请注意,由于临时数据速率增加而导致的瞬时延迟增加可能是可以接受的,只要延迟随后降低到较低值(即,小于批次大小)。
内存调优
调优指南中已详细讨论了Spark应用程序的内存使用和GC行为。强烈建议您阅读该部分。在本节中,我们将专门讨论Spark Streaming应用程序上下文中的一些调优参数。
Spark Streaming应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,如果您想对过去10分钟的数据使用窗口操作,那么您的集群应该有足够的内存来容纳10分钟的数据。或者,如果您想对大量键使用updateStateByKey
,那么所需的内存将很高。相反,如果您只想执行简单的map-filter-store操作,那么所需的内存将很低。
通常,由于通过接收器接收的数据以StorageLevel.MEMORY_AND_DISK_SER_2存储,不适合内存的数据将溢写到磁盘。这可能会降低流应用程序的性能,因此建议根据您的流应用程序的需要提供足够的内存。最好是先小规模尝试并查看内存使用情况,然后进行相应估算。
内存调优的另一个方面是垃圾回收。对于需要低延迟的流应用程序,由JVM垃圾回收引起的大暂停是不可取的。
有几个参数可以帮助您调优内存使用和GC开销
-
DStream的持久化级别:如数据序列化部分所述,输入数据和RDD默认以序列化字节形式持久化。与反序列化持久化相比,这减少了内存使用和GC开销。启用Kryo序列化进一步减少了序列化大小和内存使用。通过压缩(参见Spark配置
spark.rdd.compress
)可以进一步减少内存使用,但这会牺牲CPU时间。 -
清除旧数据:默认情况下,所有输入数据和由DStream转换生成的持久化RDD都会被自动清除。Spark Streaming根据使用的转换来决定何时清除数据。例如,如果您使用10分钟的窗口操作,那么Spark Streaming将保留最近10分钟的数据,并主动丢弃旧数据。通过设置
streamingContext.remember
,数据可以保留更长时间(例如,交互式查询旧数据)。 -
其他提示:为了进一步减少GC开销,这里有一些可以尝试的更多提示。
- 使用
OFF_HEAP
存储级别持久化RDD。更多详细信息请参见Spark编程指南。 - 使用更多执行器,但堆大小更小。这将减少每个JVM堆内的GC压力。
- 使用
重要注意事项
-
一个DStream与一个接收器相关联。为了实现读取并行性,需要创建多个接收器,即多个DStream。一个接收器运行在一个执行器内部。它占用一个核心。确保在接收器槽位预定后有足够的可用核心用于处理,即
spark.cores.max
应考虑接收器槽位。接收器以轮询方式分配给执行器。 -
当数据从流源接收时,接收器会创建数据块。每
blockInterval
毫秒生成一个新的数据块。在batchInterval
期间会创建N个数据块,其中N =batchInterval
/blockInterval
。这些块由当前执行器的BlockManager分发给其他执行器的块管理器。之后,运行在驱动器上的Network Input Tracker会收到关于块位置的通知,以进行进一步处理。 -
对于在
batchInterval
期间创建的块,驱动器上会创建一个RDD。在batchInterval
期间生成的块是RDD的分区。每个分区都是spark中的一个任务。blockInterval
==batchInterval
意味着只创建一个分区,并且可能在本地处理。 -
块上的map任务在拥有这些块的执行器中(接收块的执行器以及复制块的执行器)进行处理,而不管块间隔如何,除非非本地调度启动。更大的块间隔意味着更大的块。较高的
spark.locality.wait
值会增加在本地节点处理块的机会。需要在这两个参数之间找到一个平衡点,以确保更大的块能够在本地处理。 -
除了依赖
batchInterval
和blockInterval
之外,您可以通过调用inputDstream.repartition(n)
来定义分区数量。这会将RDD中的数据随机重洗以创建n个分区。是的,为了更大的并行度。但这会带来shuffle的开销。RDD的处理由驱动器的jobscheduler作为一个作业进行调度。在给定时间点,只有一个作业是活动的。因此,如果一个作业正在执行,其他作业将排队。 -
如果您有两个dstream,将形成两个RDD并创建两个作业,它们将一个接一个地调度。为避免这种情况,您可以合并两个dstream。这将确保为两个dstream的RDD形成一个单一的unionRDD。然后这个unionRDD被视为一个单一作业。但是,RDD的分区不受影响。
-
如果批处理时间超过批处理间隔,那么接收器的内存显然会开始填满,并最终抛出异常(很可能是BlockNotFoundException)。目前,无法暂停接收器。可以使用SparkConf配置
spark.streaming.receiver.maxRate
来限制接收器的速率。
容错语义
本节我们将讨论Spark Streaming应用程序在发生故障时的行为。
背景
为了理解Spark Streaming提供的语义,让我们回顾一下Spark RDDs的基本容错语义。
- RDD是一个不可变、可确定性地重新计算的分布式数据集。每个RDD都记住用于从容错输入数据集创建它的确定性操作的血缘。
- 如果RDD的任何分区因工作节点故障而丢失,则可以使用操作血缘从原始容错数据集中重新计算该分区。
- 假设所有的RDD转换都是确定性的,那么最终转换的RDD中的数据将始终相同,无论Spark集群中发生何种故障。
Spark在HDFS或S3等容错文件系统上操作数据。因此,所有从容错数据生成的RDD也都是容错的。然而,Spark Streaming并非如此,因为大多数情况下数据是通过网络接收的(除非使用fileStream
)。为了为所有生成的RDD实现相同的容错属性,接收到的数据在集群中工作节点上的多个Spark执行器之间进行复制(默认复制因子为2)。这导致系统中有两种数据需要在发生故障时进行恢复:
- 已接收并复制的数据 - 这些数据在单个工作节点故障时仍然存在,因为它的副本存在于其他节点上。
- 已接收但缓冲等待复制的数据 - 由于这些数据未复制,恢复这些数据的唯一方法是再次从源获取。
此外,我们需要关注两种故障:
- 工作节点故障 - 任何运行执行器的工作节点都可能发生故障,并且这些节点上的所有内存数据都将丢失。如果任何接收器在故障节点上运行,那么它们的缓冲数据也将丢失。
- 驱动器节点故障 - 如果运行Spark Streaming应用程序的驱动器节点发生故障,那么SparkContext显然会丢失,并且所有执行器及其内存数据也将丢失。
有了这些基本知识,让我们理解Spark Streaming的容错语义。
定义
流系统语义通常通过系统处理每个记录的次数来捕获。在所有可能的操作条件下(尽管有故障等),系统可以提供三种类型的保证:
- 最多一次:每个记录将被处理一次或根本不处理。
- 至少一次:每个记录将被处理一次或多次。这比“最多一次”更强,因为它确保不会丢失数据。但可能会有重复。
- 恰好一次:每个记录将被精确处理一次——不会丢失数据,也不会多次处理数据。这显然是三者中最强的保证。
基本语义
在任何流处理系统中,广义上讲,数据处理有三个步骤。
-
接收数据:使用接收器或其他方式从源接收数据。
-
转换数据:使用DStream和RDD转换来转换接收到的数据。
-
输出数据:将最终转换的数据推送到外部系统,如文件系统、数据库、仪表板等。
如果流应用程序要实现端到端恰好一次的保证,那么每个步骤都必须提供恰好一次的保证。也就是说,每个记录必须恰好接收一次,恰好转换一次,并恰好推送到下游系统一次。让我们在Spark Streaming的上下文中理解这些步骤的语义。
-
接收数据:不同的输入源提供不同的保证。这将在下一小节中详细讨论。
-
转换数据:由于RDD提供的保证,所有已接收的数据将恰好处理一次。即使发生故障,只要接收到的输入数据可访问,最终转换的RDD将始终具有相同的内容。
-
输出数据:输出操作默认确保至少一次语义,因为它取决于输出操作的类型(幂等或非幂等)以及下游系统的语义(是否支持事务)。但用户可以实现自己的事务机制以实现恰好一次语义。这将在本节稍后进行更详细的讨论。
接收数据的语义
不同的输入源提供不同的保证,范围从至少一次到恰好一次。阅读更多详细信息。
使用文件
如果所有输入数据已存在于HDFS等容错文件系统中,Spark Streaming始终可以从任何故障中恢复并处理所有数据。这提供了恰好一次语义,意味着无论发生什么故障,所有数据都将被恰好处理一次。
使用基于接收器的源
对于基于接收器的输入源,容错语义取决于故障场景和接收器类型。正如我们之前讨论的,接收器有两种类型:
- 可靠接收器 - 这些接收器只有在确保接收到的数据已被复制后,才会向可靠源发送确认。如果此类接收器发生故障,源将不会收到缓冲(未复制)数据的确认。因此,如果接收器重新启动,源将重新发送数据,并且不会因故障而丢失数据。
- 不可靠接收器 - 此类接收器不发送确认,因此当它们因工作节点或驱动器故障而失败时,可能会丢失数据。
根据使用的接收器类型,我们实现以下语义。如果工作节点发生故障,则使用可靠接收器不会丢失数据。使用不可靠接收器时,已接收但未复制的数据可能会丢失。如果驱动器节点发生故障,除了这些损失之外,所有已接收并复制到内存中的历史数据都将丢失。这将影响有状态转换的结果。
为避免这种历史接收数据丢失,Spark 1.2引入了预写日志,它将接收到的数据保存到容错存储中。启用预写日志并使用可靠接收器后,数据零丢失。在语义方面,它提供了至少一次的保证。
下表总结了故障下的语义
部署场景 | 工作节点故障 | 驱动器故障 |
---|---|---|
Spark 1.1 或更早版本, 或 Spark 1.2 或更高版本,没有预写日志 |
使用不可靠接收器时缓冲数据丢失 使用可靠接收器时数据零丢失 至少一次语义 |
使用不可靠接收器时缓冲数据丢失 所有接收器丢失历史数据 未定义语义 |
Spark 1.2 或更高版本,有预写日志 | 使用可靠接收器时数据零丢失 至少一次语义 |
使用可靠接收器和文件时数据零丢失 至少一次语义 |
使用Kafka Direct API
在Spark 1.3中,我们引入了新的Kafka Direct API,它可以确保Spark Streaming恰好一次地接收所有Kafka数据。此外,如果您实现恰好一次的输出操作,您可以实现端到端恰好一次的保证。这种方法在Kafka集成指南中进一步讨论。
输出操作的语义
输出操作(如foreachRDD
)具有至少一次语义,即在工作节点故障时,转换后的数据可能会多次写入外部实体。虽然这对于使用saveAs***Files
操作保存到文件系统是可接受的(因为文件只会用相同的数据被覆盖),但可能需要额外的努力才能实现恰好一次的语义。有两种方法:
-
幂等更新:多次尝试总是写入相同的数据。例如,
saveAs***Files
总是将相同的数据写入生成的文件。 -
事务性更新:所有更新都是事务性进行的,以便更新恰好一次地原子执行。一种实现方式如下:
- 使用批处理时间(可在
foreachRDD
中获取)和RDD的分区索引来创建标识符。此标识符唯一标识流应用程序中的一个数据块。 -
使用此标识符以事务性方式(即,恰好一次,原子地)更新外部系统。也就是说,如果标识符尚未提交,则原子地提交分区数据和标识符。否则,如果已提交,则跳过更新。
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
- 使用批处理时间(可在