Spark Streaming 编程指南

注意

Spark Streaming 是 Spark 上一代流处理引擎。Spark Streaming 不再进行更新,是一个遗留项目。Spark 中有一个更新、更易于使用的流处理引擎,称为结构化流处理。你应该使用 Spark 结构化流处理来开发你的流处理应用程序和管道。参见结构化流处理编程指南

概述

Spark Streaming 是核心 Spark API 的扩展,能够对实时数据流进行可扩展、高吞吐量、容错的流处理。 可以从 Kafka、Kinesis 或 TCP 套接字等多种来源提取数据,并且可以使用高级函数(如 mapreducejoinwindow)表示的复杂算法进行处理。 最后,可以将处理后的数据推送到文件系统、数据库和实时仪表板。 事实上,您可以在数据流上应用 Spark 的 机器学习图处理 算法。

Spark Streaming

从内部来看,它的工作方式如下。 Spark Streaming 接收实时输入数据流并将数据划分为批次,然后由 Spark 引擎处理这些批次以生成最终的批处理结果流。

Spark Streaming

Spark Streaming 提供了一个高级抽象,称为离散化数据流DStream,它表示连续的数据流。 DStream 可以从来自 Kafka 和 Kinesis 等来源的输入数据流创建,也可以通过对其他 DStream 应用高级操作来创建。 在内部,DStream 表示为 RDD 的序列。

本指南向您展示如何开始使用 DStream 编写 Spark Streaming 程序。 您可以使用 Scala、Java 或 Python(在 Spark 1.2 中引入)编写 Spark Streaming 程序,所有这些都在本指南中介绍。 您将在本指南中找到选项卡,让您可以在不同语言的代码段之间进行选择。

注意: Python 中有一些 API 不同或不可用。 在本指南中,您将找到带有 Python API 标签,突出显示这些差异。


快速示例

在我们详细介绍如何编写自己的 Spark Streaming 程序之前,让我们快速了解一下一个简单的 Spark Streaming 程序是什么样的。 假设我们要统计从 TCP 套接字上侦听的数据服务器接收到的文本数据中的单词数。 你所需要做的如下。

首先,我们导入 StreamingContext,它是所有流功能的入口点。 我们创建一个带有两个执行线程的本地 StreamingContext,以及 1 秒的批处理间隔。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

使用此上下文,我们可以创建一个 DStream,它表示来自 TCP 源的流数据,指定为主机名(例如 localhost)和端口(例如 9999)。

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

这个 lines DStream 表示将从数据服务器接收的数据流。 此 DStream 中的每个记录都是一行文本。 接下来,我们想按空格将行拆分为单词。

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

flatMap 是一种一对多的 DStream 操作,它通过从源 DStream 中的每个记录生成多个新记录来创建新的 DStream。 在这种情况下,每行将被拆分为多个单词,并且单词流表示为 words DStream。 接下来,我们想统计这些单词。

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

words DStream 进一步映射(一对一转换)到 (word, 1) 对的 DStream,然后进行归约以获得每个批处理数据中单词的频率。 最后,wordCounts.pprint() 将每秒打印一些生成的计数。

请注意,当执行这些行时,Spark Streaming 仅设置在启动时将执行的计算,并且尚未开始真正的处理。 为了在设置完所有转换后开始处理,我们最终调用

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

完整的代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。

首先,我们导入 Spark Streaming 类的名称以及来自 StreamingContext 的一些隐式转换到我们的环境中,以便向我们需要(如 DStream)的其他类添加有用的方法。 StreamingContext 是所有流功能的入口点。 我们创建一个带有两个执行线程的本地 StreamingContext,以及 1 秒的批处理间隔。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

使用此上下文,我们可以创建一个 DStream,它表示来自 TCP 源的流数据,指定为主机名(例如 localhost)和端口(例如 9999)。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

这个 lines DStream 表示将从数据服务器接收的数据流。 此 DStream 中的每个记录都是一行文本。 接下来,我们想按空格字符将行拆分为单词。

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap 是一种一对多的 DStream 操作,它通过从源 DStream 中的每个记录生成多个新记录来创建新的 DStream。 在这种情况下,每行将被拆分为多个单词,并且单词流表示为 words DStream。 接下来,我们想统计这些单词。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

words DStream 进一步映射(一对一转换)到 (word, 1) 对的 DStream,然后进行归约以获得每个批处理数据中单词的频率。 最后,wordCounts.print() 将每秒打印一些生成的计数。

请注意,当执行这些行时,Spark Streaming 仅设置在启动时将执行的计算,并且尚未开始真正的处理。 为了在设置完所有转换后开始处理,我们最终调用

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完整的代码可以在 Spark Streaming 示例 NetworkWordCount 中找到。

首先,我们创建一个 JavaStreamingContext 对象,它是所有流功能的入口点。 我们创建一个带有两个执行线程的本地 StreamingContext,以及 1 秒的批处理间隔。

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

使用此上下文,我们可以创建一个 DStream,它表示来自 TCP 源的流数据,指定为主机名(例如 localhost)和端口(例如 9999)。

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

这个 lines DStream 表示将从数据服务器接收的数据流。 此流中的每个记录都是一行文本。 然后,我们想按空格将行拆分为单词。

// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

flatMap 是一种 DStream 操作,它通过从源 DStream 中的每个记录生成多个新记录来创建新的 DStream。 在这种情况下,每行将被拆分为多个单词,并且单词流表示为 words DStream。 请注意,我们使用 FlatMapFunction 对象定义转换。 正如我们将一路发现的那样,Java API 中有许多这样的便利类,可帮助定义 DStream 转换。

接下来,我们想统计这些单词。

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

words DStream 进一步映射(一对一转换)到 (word, 1) 对的 DStream,使用 PairFunction 对象。 然后,使用 Function2 对象对其进行归约以获得每个批处理数据中单词的频率。 最后,wordCounts.print() 将每秒打印一些生成的计数。

请注意,当执行这些行时,Spark Streaming 仅设置将在启动后执行的计算,并且尚未开始真正的处理。 为了在设置完所有转换后开始处理,我们最终调用 start 方法。

jssc.start();              // Start the computation
jssc.awaitTermination();   // Wait for the computation to terminate

完整的代码可以在 Spark Streaming 示例 JavaNetworkWordCount 中找到。

如果您已经下载构建了 Spark,您可以按如下方式运行此示例。 您首先需要使用以下命令运行 Netcat(在大多数类 Unix 系统中找到的小实用程序)作为数据服务器

$ nc -lk 9999

然后,在不同的终端中,您可以使用以下命令启动示例

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999

然后,在运行 netcat 服务器的终端中键入的任何行将被计数并每秒在屏幕上打印。 它看起来像这样。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world



...
# TERMINAL 2: RUNNING network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING JavaNetworkWordCount

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...


基本概念

接下来,我们超越简单的例子,详细说明 Spark Streaming 的基础知识。

链接

与 Spark 类似,Spark Streaming 通过 Maven Central 提供。 要编写自己的 Spark Streaming 程序,您必须将以下依赖项添加到您的 SBT 或 Maven 项目。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.5.5</version>
    <scope>provided</scope>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.5.5" % "provided"

对于从 Kafka 和 Kinesis 等 Spark Streaming 核心 API 中不存在的源提取数据,您必须将相应的工件 spark-streaming-xyz_2.12 添加到依赖项中。 例如,一些常见的如下。

来源工件
Kafkaspark-streaming-kafka-0-10_2.12
Kinesis
spark-streaming-kinesis-asl_2.12 [Amazon Software License]

有关最新列表,请参阅 Maven 仓库,以获取支持的源和工件的完整列表。


初始化 StreamingContext

要初始化 Spark Streaming 程序,必须创建一个 StreamingContext 对象,它是所有 Spark Streaming 功能的主要入口点。

可以从 SparkContext 对象创建一个 StreamingContext 对象。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)

appName 参数是应用程序的名称,用于在集群 UI 上显示。master 是一个 Spark、Mesos 或 YARN 集群 URL,或者一个特殊的 “local[*]” 字符串,用于在本地模式下运行。实际上,在集群上运行时,您不希望在程序中硬编码 master,而是使用 spark-submit 启动应用程序并在那里接收它。但是,对于本地测试和单元测试,您可以传递 “local[*]” 以在进程内运行 Spark Streaming(检测本地系统中的核心数量)。

必须根据应用程序的延迟要求和可用的集群资源设置批处理间隔。有关更多详细信息,请参阅 性能调优 部分。

可以从 SparkConf 对象创建一个 StreamingContext 对象。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName 参数是应用程序的名称,用于在集群 UI 上显示。master 是一个 Spark、Mesos、Kubernetes 或 YARN 集群 URL,或者一个特殊的 “local[*]” 字符串,用于在本地模式下运行。实际上,在集群上运行时,您不希望在程序中硬编码 master,而是使用 spark-submit 启动应用程序并在那里接收它。但是,对于本地测试和单元测试,您可以传递 “local[*]” 以在进程内运行 Spark Streaming(检测本地系统中的核心数量)。请注意,这会在内部创建一个 SparkContext(所有 Spark 功能的起点),可以作为 ssc.sparkContext 访问。

必须根据应用程序的延迟要求和可用的集群资源设置批处理间隔。有关更多详细信息,请参阅 性能调优 部分。

也可以从现有的 SparkContext 对象创建一个 StreamingContext 对象。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

可以从 SparkConf 对象创建一个 JavaStreamingContext 对象。

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

appName 参数是应用程序的名称,用于在集群 UI 上显示。master 是一个 Spark、Mesos 或 YARN 集群 URL,或者一个特殊的 “local[*]” 字符串,用于在本地模式下运行。实际上,在集群上运行时,您不希望在程序中硬编码 master,而是使用 spark-submit 启动应用程序并在那里接收它。但是,对于本地测试和单元测试,您可以传递 “local[*]” 以在进程内运行 Spark Streaming。请注意,这会在内部创建一个 JavaSparkContext(所有 Spark 功能的起点),可以作为 ssc.sparkContext 访问。

必须根据应用程序的延迟要求和可用的集群资源设置批处理间隔。有关更多详细信息,请参阅 性能调优 部分。

也可以从现有的 JavaSparkContext 创建一个 JavaStreamingContext 对象。

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

定义上下文后,您必须执行以下操作。

  1. 通过创建输入 DStream 来定义输入源。
  2. 通过将转换和输出操作应用于 DStream 来定义流式计算。
  3. 使用 streamingContext.start() 开始接收数据并进行处理。
  4. 使用 streamingContext.awaitTermination() 等待处理停止(手动停止或由于任何错误)。
  5. 可以使用 streamingContext.stop() 手动停止处理。
要记住的要点

离散化数据流 (DStream)

离散化流DStream 是 Spark Streaming 提供的基本抽象。它表示连续的数据流,无论是从源接收的输入数据流,还是通过转换输入流生成的处理后的数据流。在内部,DStream 由一系列连续的 RDD 表示,RDD 是 Spark 对不可变的分布式数据集的抽象(有关更多详细信息,请参阅 Spark 编程指南)。DStream 中的每个 RDD 都包含来自特定间隔的数据,如下图所示。

Spark Streaming

应用于 DStream 的任何操作都会转换为对底层 RDD 的操作。例如,在将行流转换为单词的前面的示例中,flatMap 操作应用于 lines DStream 中的每个 RDD,以生成 words DStream 的 RDD。如下图所示。

Spark Streaming

这些底层 RDD 转换由 Spark 引擎计算。DStream 操作隐藏了这些细节中的大部分,并为开发人员提供了更高级别的 API 以方便使用。这些操作将在后面的章节中详细讨论。


输入 DStream 和接收器

输入 DStream 是表示从流源接收的输入数据流的 DStream。在快速示例中,lines 是一个输入 DStream,因为它表示从 netcat 服务器接收的数据流。每个输入 DStream(文件流除外,稍后将在本节中讨论)都与一个 ReceiverScala 文档Java 文档)对象相关联,该对象从源接收数据并将其存储在 Spark 的内存中以进行处理。

Spark Streaming 提供了两类内置流源。

我们将在本节的后面部分讨论每种类别中的一些源。

请注意,如果要在流应用程序中并行接收多个数据流,则可以创建多个输入 DStream(在性能调优部分中进一步讨论)。这将创建多个接收器,这些接收器将同时接收多个数据流。但请注意,Spark worker/executor 是一个长期运行的任务,因此它会占用分配给 Spark Streaming 应用程序的一个核心。因此,重要的是要记住,Spark Streaming 应用程序需要分配足够的核心(或线程,如果在本地运行)来处理接收到的数据,以及运行接收器。

要记住的要点

基本源

我们已经在快速示例中看过了 ssc.socketTextStream(...),它从通过 TCP 套接字连接接收的文本数据创建一个 DStream。除了套接字之外,StreamingContext API 还提供了用于从文件创建 DStream 作为输入源的方法。

文件流

对于从与 HDFS API 兼容的任何文件系统(即 HDFS、S3、NFS 等)上的文件读取数据,可以通过 StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass] 创建 DStream。

文件流不需要运行接收器,因此无需分配任何核心来接收文件数据。

对于简单的文本文件,最简单的方法是 StreamingContext.textFileStream(dataDirectory)

fileStream 在 Python API 中不可用;只有 textFileStream 可用。

streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

对于文本文件

streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

对于文本文件

streamingContext.textFileStream(dataDirectory);
如何监视目录

Spark Streaming 将监视目录 dataDirectory 并处理在该目录中创建的任何文件。

使用对象存储作为数据源

诸如 HDFS 之类的“完整”文件系统倾向于在创建输出流后立即设置其文件的修改时间。打开文件时,即使在数据尚未完全写入之前,它也可能包含在 DStream 中 - 之后对同一窗口内的文件的更新将被忽略。也就是说:可能会错过更改,并且从流中省略数据。

为了保证在窗口中拾取更改,请将文件写入未监视的目录,然后,在输出流关闭后立即将其重命名为目标目录。如果在其创建窗口期间,重命名的文件出现在扫描的目标目录中,则将拾取新数据。

相比之下,诸如 Amazon S3 和 Azure Storage 之类的对象存储通常具有缓慢的重命名操作,因为实际上复制了数据。此外,重命名的对象的修改时间可能是 rename() 操作的时间,因此可能不被视为原始创建时间暗示的窗口的一部分。

需要针对目标对象存储进行仔细的测试,以验证该存储的时间戳行为与 Spark Streaming 期望的行为一致。直接写入目标目录可能是通过所选对象存储传输流数据的适当策略。

有关此主题的更多详细信息,请参阅Hadoop 文件系统规范

基于自定义接收器的流

DStream 可以通过自定义接收器接收到的数据流来创建。有关更多详细信息,请参阅自定义接收器指南

RDD 队列作为流

为了使用测试数据测试 Spark Streaming 应用程序,还可以使用 streamingContext.queueStream(queueOfRDDs) 基于 RDD 队列创建一个 DStream。推送到队列中的每个 RDD 将被视为 DStream 中的一批数据,并像流一样进行处理。

有关来自套接字和文件的流的更多详细信息,请参阅 Scala 中的 StreamingContext、Java 中的 JavaStreamingContext 和 Python 中的 StreamingContext 中相关函数的 API 文档。

高级来源

Python API 从 Spark 3.5.5 开始,在这些来源中,Kafka 和 Kinesis 在 Python API 中可用。

此类源需要与外部非 Spark 库进行交互,其中一些库具有复杂的依赖关系(例如,Kafka)。因此,为了最大限度地减少与依赖项的版本冲突相关的问题,从这些源创建 DStream 的功能已移至单独的库,这些库可以在必要时显式链接

请注意,这些高级源在 Spark shell 中不可用,因此基于这些高级源的应用程序无法在 shell 中进行测试。如果确实想在 Spark shell 中使用它们,则必须下载相应的 Maven 工件的 JAR 及其依赖项,并将其添加到类路径中。

以下是一些高级来源。

自定义来源

Python API Python 尚不支持此功能。

输入 DStream 也可以从自定义数据源创建。您所要做的就是实现一个用户定义的 接收器(请参阅下一节以了解其含义),该接收器可以从自定义来源接收数据并将其推送到 Spark 中。有关详细信息,请参阅自定义接收器指南

接收器可靠性

根据其可靠性,可以有两种数据源。源(如 Kafka)允许确认传输的数据。如果从这些可靠源接收数据的系统正确确认接收到的数据,则可以确保不会因任何类型的故障而丢失数据。 这导致了两种接收器

  1. 可靠接收器 - 当数据已被接收并以复制的方式存储在 Spark 中时,可靠接收器会正确地向可靠源发送确认。
  2. 不可靠接收器 - 不可靠接收器向源发送确认。 这可以用于不支持确认的源,甚至可以用于可靠的源,当人们不想要或不需要处理确认的复杂性时。

如何在自定义接收器指南中讨论了编写可靠接收器的详细信息。


DStream 上的转换

与 RDD 类似,转换允许修改来自输入 DStream 的数据。DStream 支持许多在普通 Spark RDD 上可用的转换。以下是一些常见的转换。

转换含义
map(func) 通过将源 DStream 的每个元素传递给函数 func 来返回一个新的 DStream。
flatMap(func) 类似于 map,但每个输入项可以映射到 0 个或多个输出项。
filter(func) 通过仅选择源 DStream 中 func 返回 true 的记录来返回一个新的 DStream。
repartition(numPartitions) 通过创建更多或更少的分区来更改此 DStream 中的并行级别。
union(otherStream) 返回一个新的 DStream,其中包含源 DStream 和 otherDStream 中元素的并集。
count() 通过计算源 DStream 的每个 RDD 中的元素数量,返回一个新的单元素 RDD 的 DStream。
reduce(func) 通过使用函数 func(它接受两个参数并返回一个)聚合源 DStream 的每个 RDD 中的元素,返回一个新的单元素 RDD 的 DStream。该函数应该是结合律和交换律的,以便可以并行计算。
countByValue() 在类型 K 的元素的 DStream 上调用时,返回一个新的 (K, Long) 对的 DStream,其中每个键的值是其在源 DStream 的每个 RDD 中的频率。
reduceByKey(func, [numTasks]) 在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对的 DStream,其中每个键的值使用给定的 reduce 函数进行聚合。注意:默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,群集模式下的数量由配置属性 spark.default.parallelism 确定)来进行分组。您可以传递一个可选的 numTasks 参数来设置不同的任务数。
join(otherStream, [numTasks]) 在 (K, V) 和 (K, W) 对的两个 DStream 上调用时,返回一个新的 (K, (V, W)) 对的 DStream,其中包含每个键的所有元素对。
cogroup(otherStream, [numTasks]) 在 (K, V) 和 (K, W) 对的 DStream 上调用时,返回一个新的 (K, Seq[V], Seq[W]) 元组的 DStream。
transform(func) 通过将 RDD 到 RDD 的函数应用于源 DStream 的每个 RDD 来返回一个新的 DStream。这可以用于对 DStream 执行任意 RDD 操作。
updateStateByKey(func) 返回一个新的“状态”DStream,其中每个键的状态通过将给定的函数应用于该键的先前状态和该键的新值来更新。这可以用于维护每个键的任意状态数据。

其中一些转换值得更详细地讨论。

UpdateStateByKey 操作

updateStateByKey 操作允许您维护任意状态,同时不断地使用新信息更新它。要使用此操作,您需要执行两个步骤。

  1. 定义状态 - 状态可以是任意数据类型。
  2. 定义状态更新函数 - 使用函数指定如何使用先前的状态和来自输入流的新值来更新状态。

在每个批次中,Spark 将对所有现有键应用状态更新函数,无论它们在一个批次中是否有新数据。如果更新函数返回 None,则键值对将被删除。

让我们用一个例子来说明这一点。假设您想维护文本数据流中每个单词的运行计数。在这里,运行计数是状态,它是一个整数。我们将更新函数定义为

def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

这应用于包含单词的 DStream(例如,前面的示例中包含 (word, 1) 对的 pairs DStream)。

runningCounts = pairs.updateStateByKey(updateFunction)

将为每个单词调用更新函数,其中 newValues 包含 1 的序列(来自 (word, 1) 对),runningCount 包含先前的计数。有关完整的 Python 代码,请查看示例 stateful_network_wordcount.py

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

这应用于包含单词的 DStream(例如,前面的示例中包含 (word, 1) 对的 pairs DStream)。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

将为每个单词调用更新函数,其中 newValues 包含 1 的序列(来自 (word, 1) 对),runningCount 包含先前的计数。

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  (values, state) -> {
    Integer newSum = ...  // add the new values with the previous running count to get the new count
    return Optional.of(newSum);
  };

这应用于包含单词的 DStream(例如,快速示例中包含 (word, 1) 对的 pairs DStream)。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

将为每个单词调用更新函数,其中 newValues 包含 1 的序列(来自 (word, 1) 对),runningCount 包含先前的计数。有关完整的 Java 代码,请查看示例 JavaStatefulNetworkWordCount.java

请注意,使用 updateStateByKey 需要配置检查点目录,这在检查点部分中进行了详细讨论。

Transform 操作

transform 操作(及其变体,如 transformWith)允许将任意 RDD 到 RDD 的函数应用于 DStream。它可以用于应用 DStream API 中未公开的任何 RDD 操作。例如,在数据流中将每个批次与另一个数据集连接的功能未直接在 DStream API 中公开。但是,您可以轻松地使用 transform 来执行此操作。这提供了非常强大的可能性。例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可能由 Spark 生成)连接,然后根据它进行过滤来执行实时数据清理。

spamInfoRDD = sc.pickleFile(...)  # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
  rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
  ...
});

请注意,提供的函数会在每个批处理间隔中调用。这允许您执行时变 RDD 操作,即,可以在批处理之间更改 RDD 操作、分区数、广播变量等。

窗口操作

Spark Streaming 还提供窗口计算,允许您对数据的滑动窗口应用转换。下图说明了此滑动窗口。

Spark Streaming

如图所示,每次窗口在源 DStream 上滑动时,窗口内的源 RDD 都会被组合并对其进行操作,以生成窗口化 DStream 的 RDD。在这种特定情况下,该操作应用于最后 3 个时间单位的数据,并滑动 2 个时间单位。这表明任何窗口操作都需要指定两个参数。

这两个参数必须是源 DStream 的批处理间隔的倍数(在图中为 1)。

让我们用一个例子来说明窗口操作。假设您想扩展之前的例子,在最近 30 秒的数据上每 10 秒生成一次单词计数。为此,我们必须对 pairs DStream 的 (word, 1) 对应用 reduceByKey 操作,这个 DStream 包含最近 30 秒的数据。这可以使用 reduceByKeyAndWindow 操作来完成。

# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));

以下是一些常见的窗口操作。所有这些操作都接受两个参数 - windowLengthslideInterval

转换含义
window(windowLength, slideInterval) 返回一个新的 DStream,该 DStream 基于源 DStream 的窗口化批次计算。
countByWindow(windowLength, slideInterval) 返回流中元素的滑动窗口计数。
reduceByWindow(func, windowLength, slideInterval) 返回一个新的单元素流,通过使用 func 聚合滑动间隔内的流中的元素来创建。该函数应该是关联的和可交换的,以便它可以并行地正确计算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对的 DStream,其中每个键的值使用给定的 reduce 函数 func 在滑动窗口中的批次上聚合。注意:默认情况下,这使用 Spark 的默认并行任务数(本地模式为 2,集群模式下的数量由配置属性 spark.default.parallelism 确定)来进行分组。您可以传递一个可选的 numTasks 参数来设置不同的任务数。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

上述 reduceByKeyAndWindow() 的更高效版本,其中每个窗口的 reduce 值使用前一个窗口的 reduce 值增量计算。这是通过减少进入滑动窗口的新数据,以及“反向减少”离开窗口的旧数据来完成的。一个例子是在窗口滑动时“添加”和“减去”键的计数。但是,它仅适用于“可逆 reduce 函数”,即那些具有相应的“反向 reduce”函数(作为参数 invFunc)的 reduce 函数。与 reduceByKeyAndWindow 中一样,reduce 任务的数量可以通过可选参数配置。请注意,必须启用检查点才能使用此操作。

countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, Long) 对的 DStream,其中每个键的值是其在滑动窗口内的频率。与 reduceByKeyAndWindow 中一样,reduce 任务的数量可以通过可选参数配置。

连接操作

最后,值得强调的是,您可以在 Spark Streaming 中多么容易地执行不同类型的连接。

流-流连接

流可以很容易地与其他流连接。

stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

在这里,在每个批次间隔中,由 stream1 生成的 RDD 将与由 stream2 生成的 RDD 连接。您也可以执行 leftOuterJoinrightOuterJoinfullOuterJoin。此外,在流的窗口上执行连接通常非常有用。这也非常容易。

windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
流-数据集连接

这已经在前面解释 DStream.transform 操作时展示过了。这是将窗口化流与数据集连接的另一个示例。

dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));

事实上,您还可以动态地更改要连接的数据集。提供给 transform 的函数在每个批次间隔中都会被评估,因此将使用 dataset 引用指向的当前数据集。

DStream 转换的完整列表可在 API 文档中找到。 对于 Scala API,请参阅 DStreamPairDStreamFunctions。 对于 Java API,请参阅 JavaDStreamJavaPairDStream。 对于 Python API,请参阅 DStream


DStream 上的输出操作

输出操作允许将 DStream 的数据推送到外部系统,如数据库或文件系统。 由于输出操作实际上允许外部系统使用转换后的数据,因此它们会触发所有 DStream 转换的实际执行(类似于 RDD 的操作)。 目前,定义了以下输出操作

输出操作含义
print() 在运行流式应用程序的驱动程序节点上打印 DStream 中每个数据批次的前十个元素。 这对于开发和调试非常有用。
Python API 在 Python API 中,这称为 pprint()
saveAsTextFiles(prefix, [suffix]) 将此 DStream 的内容保存为文本文件。 每个批次间隔的文件名基于 prefixsuffix 生成:"prefix-TIME_IN_MS[.suffix]"
saveAsObjectFiles(prefix, [suffix]) 将此 DStream 的内容保存为序列化的 Java 对象的 SequenceFiles。 每个批次间隔的文件名基于 prefixsuffix 生成:"prefix-TIME_IN_MS[.suffix]"
Python API 这在 Python API 中不可用。
saveAsHadoopFiles(prefix, [suffix]) 将此 DStream 的内容保存为 Hadoop 文件。 每个批次间隔的文件名基于 prefixsuffix 生成:"prefix-TIME_IN_MS[.suffix]"
Python API 这在 Python API 中不可用。
foreachRDD(func) 最通用的输出运算符,将函数 func 应用于从流生成的每个 RDD。 此函数应将每个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件,或通过网络将其写入数据库。 请注意,函数 func 在运行流式应用程序的驱动程序进程中执行,并且通常包含 RDD 操作,这些操作将强制计算流式 RDD。

使用 foreachRDD 的设计模式

dstream.foreachRDD 是一个强大的原语,允许将数据发送到外部系统。 但是,重要的是要了解如何正确有效地使用此原语。 要避免的一些常见错误如下。

通常,将数据写入外部系统需要创建一个连接对象(例如,到远程服务器的 TCP 连接)并使用它将数据发送到远程系统。 为此,开发人员可能会无意中尝试在 Spark 驱动程序上创建连接对象,然后尝试在 Spark 工作人员中使用它来保存 RDD 中的记录。 例如(在 Scala 中),

def sendRecord(rdd):
    connection = createNewConnection()  # executed at the driver
    rdd.foreach(lambda record: connection.send(record))
    connection.close()

dstream.foreachRDD(sendRecord)
dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}
dstream.foreachRDD(rdd -> {
  Connection connection = createNewConnection(); // executed at the driver
  rdd.foreach(record -> {
    connection.send(record); // executed at the worker
  });
});

这是不正确的,因为这需要序列化连接对象并将其从驱动程序发送到工作人员。 这种连接对象很少能在机器之间传输。 此错误可能会表现为序列化错误(连接对象不可序列化)、初始化错误(需要在工作人员处初始化连接对象)等。 正确的解决方案是在工作人员处创建连接对象。

但是,这可能会导致另一个常见的错误 - 为每个记录创建一个新连接。 例如,

def sendRecord(record):
    connection = createNewConnection()
    connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreach(record -> {
    Connection connection = createNewConnection();
    connection.send(record);
    connection.close();
  });
});

通常,创建连接对象具有时间和资源开销。 因此,为每个记录创建和销毁连接对象可能会导致不必要的高开销,并显着降低系统的整体吞吐量。 更好的解决方案是使用 rdd.foreachPartition - 创建单个连接对象,并使用该连接发送 RDD 分区中的所有记录。

def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

这分摊了许多记录的连接创建开销。

最后,可以通过在多个 RDD/批次之间重用连接对象来进一步优化这一点。 可以维护一个静态的连接对象池,在将多个批次的 RDD 推送到外部系统时可以重用这些连接对象,从而进一步减少开销。

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    // ConnectionPool is a static, lazily initialized pool of connections
    Connection connection = ConnectionPool.getConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    ConnectionPool.returnConnection(connection); // return to the pool for future reuse
  });
});

请注意,池中的连接应按需延迟创建,如果一段时间内未使用则超时。 这实现了将数据发送到外部系统的最有效方式。

需要记住的其他要点

DataFrame 和 SQL 操作

您可以轻松地在流式数据上使用DataFrames 和 SQL 操作。 您必须使用 StreamingContext 正在使用的 SparkContext 创建一个 SparkSession。 此外,必须这样做,以便可以在驱动程序故障时重新启动它。 这是通过创建 SparkSession 的延迟实例化的单例实例来完成的。 以下示例显示了这一点。 它修改了之前的单词计数示例,以使用 DataFrames 和 SQL 生成单词计数。 每个 RDD 都转换为 DataFrame,注册为临时表,然后使用 SQL 查询。

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)

请参阅完整的源代码

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

请参阅完整的源代码

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
  private String word;

  public String getWord() {
    return word;
  }

  public void setWord(String word) {
    this.word = word;
  }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ...

words.foreachRDD((rdd, time) -> {
  // Get the singleton instance of SparkSession
  SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();

  // Convert RDD[String] to RDD[case class] to DataFrame
  JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
    JavaRow record = new JavaRow();
    record.setWord(word);
    return record;
  });
  DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);

  // Creates a temporary view using the DataFrame
  wordsDataFrame.createOrReplaceTempView("words");

  // Do word count on table using SQL and print it
  DataFrame wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word");
  wordCountsDataFrame.show();
});

请参阅完整的源代码

您还可以在从不同线程(即,与正在运行的 StreamingContext 异步)的流式数据上定义的表上运行 SQL 查询。只需确保您将 StreamingContext 设置为记住足够多的流式数据,以便可以运行查询。否则,StreamingContext(不知道任何异步 SQL 查询)将在查询完成之前删除旧的流式数据。例如,如果您想查询最后一个批次,但您的查询可能需要 5 分钟才能运行,则调用 streamingContext.remember(Minutes(5)) (在 Scala 中,或在其他语言中等效的调用)。

请参阅 DataFrames and SQL 指南,了解有关 DataFrames 的更多信息。


MLlib 操作

您还可以轻松地使用 MLlib 提供的机器学习算法。首先,有流式机器学习算法(例如 流式线性回归流式 KMeans 等),它们可以同时从流式数据中学习并将模型应用于流式数据。除此之外,对于更大类的机器学习算法,您可以离线学习学习模型(即,使用历史数据),然后在线将该模型应用于流式数据。有关更多详细信息,请参阅 MLlib 指南。


缓存/持久化

与 RDD 类似,DStream 也允许开发人员将流的数据保存在内存中。也就是说,在 DStream 上使用 persist() 方法会自动将该 DStream 的每个 RDD 保存在内存中。如果 DStream 中的数据将被多次计算(例如,对同一数据的多个操作),这将非常有用。对于基于窗口的操作(如 reduceByWindowreduceByKeyAndWindow)和基于状态的操作(如 updateStateByKey),这都是隐含的。因此,由基于窗口的操作生成的 DStream 会自动保存在内存中,而无需开发人员调用 persist()

对于通过网络接收数据的输入流(例如 Kafka、sockets 等),默认持久化级别设置为将数据复制到两个节点以实现容错。

请注意,与 RDD 不同,DStream 的默认持久化级别将数据序列化保存在内存中。这将在 性能调优 部分中进一步讨论。有关不同持久化级别的更多信息,请参见 Spark 编程指南


检查点

流式应用程序必须 24/7 全天候运行,因此必须能够应对与应用程序逻辑无关的故障(例如,系统故障、JVM 崩溃等)。为了实现这一点,Spark Streaming 需要将足够的信息checkpoint到容错存储系统中,以便它可以从故障中恢复。有两种类型的数据会被 checkpoint。

总而言之,元数据 checkpoint 主要用于从驱动程序故障中恢复,而数据或 RDD checkpoint 对于使用有状态转换的基本功能也是必要的。

何时启用 Checkpointing

对于具有以下任何要求的应用程序,必须启用 Checkpointing

请注意,如果没有上述有状态转换的简单流式应用程序可以在不启用 checkpoint 的情况下运行。在这种情况下,从驱动程序故障中的恢复也将是部分的(一些已接收但未处理的数据可能会丢失)。这通常是可以接受的,并且许多人以这种方式运行 Spark Streaming 应用程序。对非 Hadoop 环境的支持有望在未来得到改善。

如何配置 Checkpointing

可以通过在容错、可靠的文件系统(例如 HDFS、S3 等)中设置一个目录来启用 Checkpointing,checkpoint 信息将保存到该目录中。这是通过使用 streamingContext.checkpoint(checkpointDirectory) 完成的。这将允许您使用上述有状态转换。此外,如果您想使应用程序从驱动程序故障中恢复,则应重写您的流式应用程序以具有以下行为。

通过使用 StreamingContext.getOrCreate 可以简化此行为。它使用如下。

# Function to create and setup a new StreamingContext
def functionToCreateContext():
    sc = SparkContext(...)  # new context
    ssc = StreamingContext(...)
    lines = ssc.socketTextStream(...)  # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)  # set checkpoint directory
    return ssc

# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...

# Start the context
context.start()
context.awaitTermination()

如果 checkpointDirectory 存在,则将从 checkpoint 数据重新创建上下文。如果该目录不存在(即,第一次运行),则将调用函数 functionToCreateContext 以创建一个新上下文并设置 DStream。请参阅 Python 示例 recoverable_network_wordcount.py。此示例将网络数据的单词计数附加到文件中。

您还可以通过使用 StreamingContext.getOrCreate(checkpointDirectory, None) 从 checkpoint 数据显式创建一个 StreamingContext 并启动计算。

通过使用 StreamingContext.getOrCreate 可以简化此行为。它使用如下。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果 checkpointDirectory 存在,则将从 checkpoint 数据重新创建上下文。如果该目录不存在(即,第一次运行),则将调用函数 functionToCreateContext 以创建一个新上下文并设置 DStream。请参阅 Scala 示例 RecoverableNetworkWordCount。此示例将网络数据的单词计数附加到文件中。

通过使用 JavaStreamingContext.getOrCreate 可以简化此行为。它使用如下。

// Create a factory object that can create and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
    ...
    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
    return jssc;
  }
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start();
context.awaitTermination();

如果 checkpointDirectory 存在,则将从 checkpoint 数据重新创建上下文。如果该目录不存在(即,第一次运行),则将调用函数 contextFactory 以创建一个新上下文并设置 DStream。请参阅 Java 示例 JavaRecoverableNetworkWordCount。此示例将网络数据的单词计数附加到文件中。

除了使用 getOrCreate 之外,还需要确保驱动程序进程在发生故障时自动重新启动。这只能由用于运行应用程序的部署基础结构来完成。这将在 部署 部分中进一步讨论。

请注意,RDD 的 checkpoint 会产生保存到可靠存储的成本。这可能会导致 RDD 进行 checkpoint 的那些批次的处理时间增加。因此,需要仔细设置 checkpoint 的间隔。在小批量大小(例如 1 秒)时,对每个批次进行 checkpoint 可能会显着降低操作吞吐量。相反,过于频繁的 checkpoint 会导致 lineage 和任务大小增加,这可能会产生不利影响。对于需要 RDD checkpoint 的有状态转换,默认间隔是批处理间隔的倍数,至少为 10 秒。可以使用 dstream.checkpoint(checkpointInterval) 设置它。通常,DStream 的 5 - 10 个滑动间隔的 checkpoint 间隔是一个很好的尝试设置。


累加器、广播变量和检查点

累加器广播变量 无法从 Spark Streaming 中的 checkpoint 恢复。 如果您启用 checkpoint 并同时使用 累加器广播变量,则必须为 累加器广播变量 创建延迟实例化的单例实例,以便在驱动程序在发生故障后重新启动时可以重新实例化它们。 以下示例显示了这一点。

def getWordExcludeList(sparkContext):
    if ("wordExcludeList" not in globals()):
        globals()["wordExcludeList"] = sparkContext.broadcast(["a", "b", "c"])
    return globals()["wordExcludeList"]

def getDroppedWordsCounter(sparkContext):
    if ("droppedWordsCounter" not in globals()):
        globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
    return globals()["droppedWordsCounter"]

def echo(time, rdd):
    # Get or register the excludeList Broadcast
    excludeList = getWordExcludeList(rdd.context)
    # Get or register the droppedWordsCounter Accumulator
    droppedWordsCounter = getDroppedWordsCounter(rdd.context)

    # Use excludeList to drop words and use droppedWordsCounter to count them
    def filterFunc(wordCount):
        if wordCount[0] in excludeList.value:
            droppedWordsCounter.add(wordCount[1])
            False
        else:
            True

    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())

wordCounts.foreachRDD(echo)

请参阅完整的 源代码

object WordExcludeList {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordExcludeList = Seq("a", "b", "c")
          instance = sc.broadcast(wordExcludeList)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("DroppedWordsCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the excludeList Broadcast
  val excludeList = WordExcludeList.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use excludeList to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (excludeList.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

请参阅完整的 源代码

class JavaWordExcludeList {

  private static volatile Broadcast<List<String>> instance = null;

  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaWordExcludeList.class) {
        if (instance == null) {
          List<String> wordExcludeList = Arrays.asList("a", "b", "c");
          instance = jsc.broadcast(wordExcludeList);
        }
      }
    }
    return instance;
  }
}

class JavaDroppedWordsCounter {

  private static volatile LongAccumulator instance = null;

  public static LongAccumulator getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaDroppedWordsCounter.class) {
        if (instance == null) {
          instance = jsc.sc().longAccumulator("DroppedWordsCounter");
        }
      }
    }
    return instance;
  }
}

wordCounts.foreachRDD((rdd, time) -> {
  // Get or register the excludeList Broadcast
  Broadcast<List<String>> excludeList = JavaWordExcludeList.getInstance(new JavaSparkContext(rdd.context()));
  // Get or register the droppedWordsCounter Accumulator
  LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
  // Use excludeList to drop words and use droppedWordsCounter to count them
  String counts = rdd.filter(wordCount -> {
    if (excludeList.value().contains(wordCount._1())) {
      droppedWordsCounter.add(wordCount._2());
      return false;
    } else {
      return true;
    }
  }).collect().toString();
  String output = "Counts at time " + time + " " + counts;
}

请参阅完整的 源代码


部署应用程序

本节讨论部署 Spark Streaming 应用程序的步骤。

要求

要运行 Spark Streaming 应用程序,您需要具备以下条件。

升级应用程序代码

如果需要使用新的应用程序代码升级正在运行的 Spark Streaming 应用程序,则有两种可能的机制。


监控应用程序

除了 Spark 的监控功能之外,还有特定于 Spark Streaming 的其他功能。当使用 StreamingContext 时,Spark Web UI 会显示一个额外的 Streaming 选项卡,该选项卡显示有关正在运行的接收器(接收器是否处于活动状态、接收的记录数、接收器错误等)和已完成批次(批处理时间、排队延迟等)的统计信息。 这可用于监控流式应用程序的进度。

Web UI 中的以下两个指标特别重要

如果批处理时间始终大于批处理间隔,并且/或排队延迟不断增加,则表明系统无法以生成批处理的速度处理批处理,并且正在落后。 在这种情况下,请考虑减少批处理时间。

还可以使用StreamingListener 接口来监控 Spark Streaming 程序的进度,该接口允许您获取接收器状态和处理时间。 请注意,这是一个开发人员 API,将来可能会对其进行改进(即,报告更多信息)。



性能调优

要使 Spark Streaming 应用程序在集群上获得最佳性能,需要进行一些调整。 本节介绍许多可以调整的参数和配置,以提高应用程序的性能。 从高层次上讲,您需要考虑两件事

  1. 通过有效地使用集群资源来减少每个数据批处理的处理时间。

  2. 设置正确的批处理大小,以便可以尽快处理数据批次(即,数据处理与数据接收保持同步)。

减少批处理时间

可以在 Spark 中进行许多优化,以最大程度地减少每个批次的处理时间。 这些已在调整指南中进行了详细讨论。 本节重点介绍一些最重要的。

数据接收中的并行度

通过网络(如 Kafka、socket 等)接收数据需要将数据反序列化并存储在 Spark 中。 如果数据接收成为系统中的瓶颈,请考虑并行化数据接收。 请注意,每个输入 DStream 都会创建一个接收器(在 worker 机器上运行),该接收器接收单个数据流。 因此,可以通过创建多个输入 DStream 并将它们配置为从源接收数据流的不同分区来实现接收多个数据流。 例如,接收两个数据主题的单个 Kafka 输入 DStream 可以拆分为两个 Kafka 输入流,每个流仅接收一个主题。 这将运行两个接收器,从而允许并行接收数据,从而提高整体吞吐量。 这些多个 DStream 可以联合在一起以创建单个 DStream。 然后,可以将应用于单个输入 DStream 的转换应用于统一的流。 这样做如下。

numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

另一个应该考虑的参数是接收器的块间隔,它由配置参数 spark.streaming.blockInterval 确定。 对于大多数接收器,接收到的数据在存储到 Spark 内存中之前会合并到数据块中。 每个批次中的块数决定了将用于在类似映射的转换中处理接收到的数据的任务数。 每个批次每个接收器的任务数将大约为(批次间隔/块间隔)。 例如,200 毫秒的块间隔将为 2 秒的批次创建 10 个任务。 如果任务数太低(即,小于每台机器的内核数),这将是低效的,因为所有可用的内核将不会用于处理数据。 要增加给定批次间隔的任务数,请减少块间隔。 但是,建议的最小块间隔值约为 50 毫秒,低于此值,任务启动开销可能会成为问题。

使用多个输入流/接收器接收数据的替代方法是显式地重新分区输入数据流(使用 inputStream.repartition(<number of partitions>))。 这会在进一步处理之前将接收到的数据批次分发到集群中指定数量的机器上。

对于直接流,请参阅Spark Streaming + Kafka 集成指南

数据处理中的并行度

如果在计算的任何阶段中使用的并行任务数量不够多,集群资源可能无法得到充分利用。例如,对于像 reduceByKeyreduceByKeyAndWindow 这样的分布式 reduce 操作,默认的并行任务数量由 spark.default.parallelism 配置属性 控制。 您可以将并行度作为参数传递(请参阅 PairDStreamFunctions 文档),或者设置 spark.default.parallelism 配置属性 来更改默认值。

数据序列化

可以通过调整序列化格式来减少数据序列化的开销。在流处理中,有两种类型的数据会被序列化。

在这两种情况下,使用 Kryo 序列化都可以减少 CPU 和内存开销。有关更多详细信息,请参阅 Spark 调优指南。对于 Kryo,请考虑注册自定义类并禁用对象引用跟踪(请参阅 配置指南 中与 Kryo 相关的配置)。

在某些特定情况下,如果流应用程序需要保留的数据量不大,则可以考虑将数据(两种类型)作为反序列化的对象持久化,而不会产生过多的 GC 开销。例如,如果您使用几秒钟的批处理间隔且没有窗口操作,则可以尝试通过显式设置存储级别来禁用持久化数据中的序列化。这将减少由于序列化导致的 CPU 开销,从而有可能在不产生太多 GC 开销的情况下提高性能。

任务启动开销

如果每秒启动的任务数量很高(例如,每秒 50 个或更多),那么将任务发送到 executor 的开销可能会很大,并且难以实现亚秒级延迟。可以通过以下更改来减少开销

这些更改可能会将批处理时间减少数百毫秒,从而使亚秒级批处理大小可行。


设置正确的批处理间隔

为了使在集群上运行的 Spark Streaming 应用程序稳定,系统应该能够以与接收数据相同的速度处理数据。换句话说,应该以与生成数据相同的速度处理数据批次。可以通过监控流式 Web UI 中的处理时间来找到应用程序是否属于这种情况,其中批处理时间应小于批处理间隔。

根据流计算的性质,使用的批处理间隔可能会对应用程序在固定集群资源上可以维持的数据速率产生重大影响。例如,让我们考虑之前的 WordCountNetwork 示例。对于特定的数据速率,系统可能能够跟上每 2 秒报告一次单词计数(即,批处理间隔为 2 秒),但不能每 500 毫秒报告一次。因此,需要设置批处理间隔,以便可以维持生产中预期的数据速率。

确定应用程序正确批处理大小的一个好方法是使用保守的批处理间隔(例如,5-10 秒)和低数据速率对其进行测试。要验证系统是否能够跟上数据速率,您可以检查每个已处理批次经历的端到端延迟值(在 Spark 驱动程序 log4j 日志中查找“Total delay”,或者使用 StreamingListener 接口)。如果延迟保持在与批处理大小相当的水平,则系统是稳定的。否则,如果延迟持续增加,则表示系统无法跟上,因此不稳定。一旦您了解了稳定的配置,就可以尝试提高数据速率和/或减小批处理大小。请注意,由于临时数据速率增加导致的延迟暂时增加可能是可以接受的,只要延迟降回较低的值(即,小于批处理大小)。


内存调优

调优指南 中,已经非常详细地讨论了 Spark 应用程序的内存使用情况和 GC 行为的调整。强烈建议您阅读该指南。在本节中,我们将专门讨论 Spark Streaming 应用程序中的一些调优参数。

Spark Streaming 应用程序所需的集群内存量很大程度上取决于所使用的转换类型。例如,如果您想对最近 10 分钟的数据使用窗口操作,那么您的集群应该有足够的内存来保存 10 分钟的数据。或者,如果您想将 updateStateByKey 与大量键一起使用,那么所需的内存将很高。相反,如果您想执行简单的 map-filter-store 操作,那么所需的内存将很低。

通常,由于通过 receiver 接收的数据以 StorageLevel.MEMORY_AND_DISK_SER_2 存储,因此不适合内存的数据将溢出到磁盘。这可能会降低流应用程序的性能,因此建议根据流应用程序的需要提供足够的内存。最好尝试查看小规模的内存使用情况并进行相应的估算。

内存调优的另一个方面是垃圾回收。对于需要低延迟的流应用程序,不希望出现由 JVM 垃圾回收引起的大量暂停。

以下是一些可以帮助您调整内存使用情况和 GC 开销的参数


需要记住的要点


容错语义

在本节中,我们将讨论 Spark Streaming 应用程序在发生故障时的行为。

背景

要理解 Spark Streaming 提供的语义,让我们回顾一下 Spark RDD 的基本容错语义。

  1. RDD 是一个不可变的、可确定性重新计算的、分布式数据集。 每个 RDD 都会记住应用于容错输入数据集以创建它的确定性操作的血缘关系。
  2. 如果 RDD 的任何分区由于 worker 节点故障而丢失,则可以使用操作的血缘关系从原始容错数据集重新计算该分区。
  3. 假设所有 RDD 转换都是确定性的,则无论 Spark 集群中发生什么故障,最终转换后的 RDD 中的数据始终相同。

Spark 在 HDFS 或 S3 等容错文件系统中的数据上运行。 因此,从容错数据生成的所有 RDD 也是容错的。 但是,对于 Spark Streaming 来说,情况并非如此,因为在大多数情况下,数据是通过网络接收的(除非使用 fileStream)。 为了为所有生成的 RDD 实现相同的容错属性,接收的数据在集群中 worker 节点上的多个 Spark 执行器之间复制(默认复制因子为 2)。 这导致系统中存在两种需要在发生故障时恢复的数据

  1. 已接收和复制的数据 - 此数据可承受单个 worker 节点的故障,因为它在其他节点之一上存在副本。
  2. 已接收但缓冲以进行复制的数据 - 由于此数据未复制,因此恢复此数据的唯一方法是从源再次获取它。

此外,我们应该关注两种故障

  1. Worker 节点故障 - 运行执行器的任何 worker 节点都可能发生故障,并且这些节点上的所有内存数据都将丢失。 如果任何接收器在发生故障的节点上运行,则其缓冲的数据将丢失。
  2. Driver 节点故障 - 如果运行 Spark Streaming 应用程序的 driver 节点发生故障,那么很明显,SparkContext 将丢失,并且所有带有内存数据的执行器都将丢失。

有了这些基本知识,让我们了解 Spark Streaming 的容错语义。

定义

流处理系统的语义通常以每个记录可被系统处理多少次来捕获。 在所有可能的运行条件下(尽管发生故障等),系统可以提供三种类型的保证

  1. 最多一次:每个记录将被处理一次或根本不处理。
  2. 至少一次:每个记录将被处理一次或多次。 这比最多一次更强,因为它确保不会丢失任何数据。 但可能存在重复项。
  3. 恰好一次:每个记录将被处理恰好一次 - 不会丢失任何数据,也不会多次处理数据。 显然,这是三种保证中最强的。

基本语义

在任何流处理系统中,广义上讲,处理数据有三个步骤。

  1. 接收数据:使用接收器或其他方式从源接收数据。

  2. 转换数据:使用 DStream 和 RDD 转换来转换接收到的数据。

  3. 推送数据:最终转换的数据被推送到外部系统,例如文件系统、数据库、仪表板等。

如果流应用程序必须实现端到端恰好一次的保证,那么每个步骤都必须提供恰好一次的保证。 也就是说,每个记录必须被恰好一次接收、恰好一次转换,并恰好一次推送到下游系统。 让我们了解一下 Spark Streaming 中这些步骤的语义。

  1. 接收数据:不同的输入源提供不同的保证。 这将在下一小节中详细讨论。

  2. 转换数据:由于 RDD 提供的保证,所有已接收的数据都将被恰好一次处理。 即使发生故障,只要接收到的输入数据可访问,最终转换后的 RDD 将始终具有相同的内容。

  3. 推送数据:默认情况下,输出操作确保至少一次语义,因为它取决于输出操作的类型(幂等的还是非幂等的)以及下游系统的语义(支持事务还是不支持事务)。 但是用户可以实现自己的事务机制来实现恰好一次语义。 这将在本节稍后详细讨论。

接收数据的语义

不同的输入源提供不同的保证,范围从至少一次恰好一次。 阅读以获取更多详细信息。

使用文件

如果所有输入数据已经存在于 HDFS 等容错文件系统中,则 Spark Streaming 始终可以从任何故障中恢复并处理所有数据。 这给出了恰好一次语义,这意味着无论发生什么故障,所有数据都将被恰好一次处理。

使用基于接收器的源

对于基于接收器的输入源,容错语义取决于故障场景和接收器的类型。 正如我们前面讨论的,有两种类型的接收器

  1. 可靠接收器 - 这些接收器仅在确保接收到的数据已复制后才确认可靠的源。 如果此类接收器发生故障,则源将不会收到已缓冲(未复制)数据的确认。 因此,如果重新启动接收器,则源将重新发送数据,并且不会因故障而丢失任何数据。
  2. 不可靠接收器 - 此类接收器发送确认,因此可能在因 worker 或 driver 故障而发生故障时丢失数据。

根据使用的接收器类型,我们可以实现以下语义。 如果 worker 节点发生故障,则使用可靠接收器时不会丢失数据。 对于不可靠的接收器,接收到但未复制的数据可能会丢失。 如果 driver 节点发生故障,那么除了这些损失之外,所有已接收并复制到内存中的过去数据都将丢失。 这将影响有状态转换的结果。

为了避免丢失过去接收到的数据,Spark 1.2 引入了预写日志,它将接收到的数据保存到容错存储中。 启用 预写日志并使用可靠的接收器,则不会丢失任何数据。 在语义方面,它提供了至少一次的保证。

下表总结了故障情况下的语义

部署场景 Worker 故障 Driver 故障
Spark 1.1 或更早版本,
没有预写日志的 Spark 1.2 或更高版本
使用不可靠接收器时缓冲的数据丢失
使用可靠接收器时不会丢失任何数据
至少一次语义
使用不可靠接收器时缓冲的数据丢失
使用所有接收器时过去的数据丢失
未定义的语义
使用预写日志的 Spark 1.2 或更高版本 使用可靠接收器时不会丢失任何数据
至少一次语义
使用可靠的接收器和文件时不会丢失任何数据
至少一次语义

使用 Kafka Direct API

在 Spark 1.3 中,我们引入了一个新的 Kafka Direct API,它可以确保 Spark Streaming 恰好一次接收所有 Kafka 数据。 除此之外,如果您实现恰好一次的输出操作,则可以实现端到端恰好一次的保证。 这种方法在Kafka 集成指南中进一步讨论。

输出操作的语义

输出操作(如 foreachRDD)具有至少一次语义,也就是说,在发生 worker 故障时,转换后的数据可能会多次写入外部实体。 虽然这对于使用 saveAs***Files 操作保存到文件系统是可以接受的(因为文件将被相同的数据简单地覆盖),但可能需要额外的努力来实现恰好一次的语义。 有两种方法。



下一步