结构化流式编程指南

概述

结构化流式处理是一个构建在 Spark SQL 引擎上的可扩展且容错的流处理引擎。您可以像表达静态数据上的批处理计算一样表达您的流式计算。 Spark SQL 引擎将负责以增量和连续的方式运行它,并在流数据不断到达时更新最终结果。您可以使用 Scala、Java、Python 或 R 中的 Dataset/DataFrame API 来表达流式聚合、事件时间窗口、流到批 Join 等。该计算在相同的优化 Spark SQL 引擎上执行。最后,系统通过检查点和预写日志确保端到端的一次性容错保证。简而言之,结构化流式处理提供快速、可扩展、容错、端到端的一次性流处理,而无需用户考虑流式处理。

在内部,默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流处理为一系列小型批处理作业,从而实现低至 100 毫秒的端到端延迟和一次性容错保证。但是,自 Spark 2.3 以来,我们引入了一种名为连续处理的新型低延迟处理模式,该模式可以实现低至 1 毫秒的端到端延迟,并具有至少一次的保证。无需更改查询中的 Dataset/DataFrame 操作,您就可以根据您的应用程序需求选择模式。

在本指南中,我们将引导您了解编程模型和 API。我们将主要使用默认的微批处理模型来解释这些概念,然后稍后讨论连续处理模型。首先,让我们从一个结构化流式查询的简单示例开始 - 流式字数统计。

快速示例

假设您想维护从监听 TCP 套接字的数据服务器接收到的文本数据的运行字数统计。让我们看看如何使用结构化流式处理来表达这一点。您可以在 Scala/Java/Python/R 中查看完整代码。如果您下载 Spark,您可以直接运行该示例。无论如何,让我们逐步了解该示例并了解其工作原理。首先,我们必须导入必要的类并创建一个本地 SparkSession,这是与 Spark 相关的所有功能的起点。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()

import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();
sparkR.session(appName = "StructuredNetworkWordCount")

接下来,让我们创建一个流式 DataFrame,它表示从监听 localhost:9999 的服务器接收到的文本数据,并转换 DataFrame 以计算字数。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

这个 lines DataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,并且流式文本数据中的每一行都成为该表中的一行。请注意,由于我们只是设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用了两个内置 SQL 函数 - split 和 explode,将每一行拆分为多个包含一个单词的行。此外,我们使用函数 alias 将新列命名为“word”。最后,我们通过按 Dataset 中的唯一值分组并计算它们来定义 wordCounts DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行字数统计。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

这个 lines DataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,并且流式文本数据中的每一行都成为该表中的一行。请注意,由于我们只是设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用 .as[String] 将 DataFrame 转换为 String 的 Dataset,以便我们可以应用 flatMap 操作将每一行拆分为多个单词。生成的 words Dataset 包含所有单词。最后,我们通过按 Dataset 中的唯一值分组并计算它们来定义 wordCounts DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行字数统计。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

这个 lines DataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,并且流式文本数据中的每一行都成为该表中的一行。请注意,由于我们只是设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用 .as(Encoders.STRING()) 将 DataFrame 转换为 String 的 Dataset,以便我们可以应用 flatMap 操作将每一行拆分为多个单词。生成的 words Dataset 包含所有单词。最后,我们通过按 Dataset 中的唯一值分组并计算它们来定义 wordCounts DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行字数统计。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(group_by(words, "word"))

这个 lines SparkDataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,并且流式文本数据中的每一行都成为该表中的一行。请注意,由于我们只是设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用一个带有两个 SQL 函数的 SQL 表达式 - split 和 explode,将每一行拆分为多个包含一个单词的行。此外,我们将新列命名为“word”。最后,我们通过按 SparkDataFrame 中的唯一值分组并计算它们来定义 wordCounts SparkDataFrame。请注意,这是一个流式 SparkDataFrame,它表示流的运行字数统计。

我们现在已经设置了流式数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们将其设置为将完整的计数集(由 outputMode("complete") 指定)打印到控制台,每次更新时都会打印。然后使用 start() 启动流式计算。

 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

awaitTermination(query)

执行此代码后,流式计算将在后台启动。query 对象是指向该活动流式查询的句柄,我们已决定使用 awaitTermination() 等待查询终止,以防止进程在查询活动期间退出。

要实际执行此示例代码,您可以将代码编译到您自己的 Spark 应用程序中,或者只需在下载 Spark 后 运行该示例。我们展示后者。您首先需要使用以下命令运行 Netcat(一种在大多数类 Unix 系统中找到的小型实用程序)作为数据服务器:

$ nc -lk 9999

然后,在不同的终端中,您可以使用以下命令启动该示例:

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

然后,在运行 netcat 服务器的终端中输入的任何行都将被计数,并且每秒钟都会在屏幕上打印出来。它看起来像下面这样。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop



















...
# TERMINAL 2: RUNNING structured_network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.R

$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

编程模型

结构化流处理的关键思想是将实时数据流视为一个不断附加数据的表。这导致了一种新的流处理模型,该模型与批量处理模型非常相似。您将把流式计算表达为对静态表的标准类批量查询,而 Spark 将其作为对无界输入表的增量查询来运行。让我们更详细地了解这个模型。

基本概念

将输入数据流视为“输入表”。流中到达的每个数据项都像附加到输入表的新行。

Stream as a Table

对输入的查询将生成“结果表”。每个触发间隔(例如,每 1 秒),新行会被附加到输入表,这最终会更新结果表。每当结果表更新时,我们都希望将更改后的结果行写入到外部接收器。

Model

“输出”被定义为写入到外部存储的内容。输出可以用不同的模式定义

请注意,每种模式都适用于某些类型的查询。这将在稍后详细讨论。

为了说明此模型的使用,让我们在 快速示例 的上下文中理解该模型。第一个 lines DataFrame 是输入表,而最终的 wordCounts DataFrame 是结果表。请注意,对流式 lines DataFrame 生成 wordCounts 的查询与静态 DataFrame 的查询完全相同。但是,当此查询启动时,Spark 将持续检查来自套接字连接的新数据。如果有新数据,Spark 将运行一个“增量”查询,该查询将先前的运行计数与新数据相结合,以计算更新的计数,如下所示。

Model

请注意,结构化流处理不会具体化整个表。它从流式数据源读取最新的可用数据,以增量方式处理它以更新结果,然后丢弃源数据。它仅保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。

此模型与许多其他流处理引擎显着不同。许多流式处理系统要求用户自己维护运行聚合,因此必须考虑容错和数据一致性(至少一次、至多一次或恰好一次)。在此模型中,当有新数据时,Spark 负责更新结果表,从而使用户免于考虑它。例如,让我们看看此模型如何处理基于事件时间的处理和延迟到达的数据。

处理事件时间和迟到数据

事件时间是嵌入在数据本身中的时间。对于许多应用程序,您可能希望基于此事件时间进行操作。例如,如果您想获取 IoT 设备每分钟生成的事件数,那么您可能想使用生成数据的时间(即数据中的事件时间),而不是 Spark 接收它们的时间。此事件时间在此模型中非常自然地表达 – 来自设备的每个事件都是表中的一行,而事件时间是行中的一个列值。这允许基于窗口的聚合(例如,每分钟的事件数)仅仅是事件时间列上的特殊类型的分组和聚合 – 每个时间窗口是一个组,并且每一行可以属于多个窗口/组。因此,这种基于事件时间窗口的聚合查询可以在静态数据集(例如,来自收集的设备事件日志)以及数据流上一致地定义,从而使用户的生活更加轻松。

此外,此模型自然地处理了基于其事件时间晚于预期到达的数据。由于 Spark 正在更新结果表,因此它可以完全控制在存在延迟数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。自 Spark 2.1 起,我们支持水印,允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。这些将在窗口操作部分中稍后更详细地解释。

容错语义

提供端到端恰好一次的语义是结构化流处理设计背后的关键目标之一。为了实现这一点,我们设计了结构化流处理源、接收器和执行引擎,以可靠地跟踪处理的准确进度,以便它可以处理通过重新启动和/或重新处理进行的任何类型的故障。假定每个流式源都具有偏移量(类似于 Kafka 偏移量或 Kinesis 序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录在每个触发器中处理的数据的偏移量范围。流式接收器被设计为幂等的,用于处理重新处理。总之,通过使用可重放的源和幂等的接收器,结构化流处理可以确保在任何故障下都具有端到端恰好一次的语义

使用 Dataset 和 DataFrame 的 API

自 Spark 2.0 以来,DataFrames 和 Datasets 可以表示静态、有界的数据,以及流式、无界的数据。与静态 Datasets/DataFrames 类似,您可以使用公共入口点 SparkSessionScala/Java/Python/R 文档)从流式源创建流式 DataFrames/Datasets,并将与静态 DataFrames/Datasets 相同的操作应用于它们。如果您不熟悉 Datasets/DataFrames,强烈建议您使用 DataFrame/Dataset 编程指南熟悉它们。

创建流式 DataFrame 和流式 Dataset

可以通过 DataStreamReader 接口(Scala/Java/Python 文档)创建流式 DataFrames,该接口由 SparkSession.readStream() 返回。在 R 中,使用 read.stream() 方法。与用于创建静态 DataFrame 的读取接口类似,您可以指定源的详细信息 – 数据格式、模式、选项等。

输入源

有一些内置的源。

某些源不是容错的,因为它们不保证在失败后可以使用检查点偏移量重播数据。请参阅前面关于 容错语义 的章节。以下是 Spark 中所有源的详细信息。

选项 容错 备注
文件源 path:输入目录的路径,所有文件格式通用。
maxFilesPerTrigger:每次触发器中要考虑的最大新文件数(默认值:无最大值)
latestFirst:是否首先处理最新的新文件,当存在大量文件积压时很有用(默认值:false)
fileNameOnly:是否仅根据文件名而不是完整路径检查新文件(默认值:false)。如果将其设置为 `true`,则以下文件将被视为同一文件,因为它们的文件名“dataset.txt”相同
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
maxFileAge:在忽略之前,可以在此目录中找到的文件的最大存在时间。对于第一批,所有文件将被视为有效。如果将 latestFirst 设置为 `true` 并且设置了 maxFilesPerTrigger,则将忽略此参数,因为可能忽略有效且应处理的旧文件。最大期限是相对于最新文件的时间戳,而不是当前系统的时间戳指定的。(默认值:1 周)
cleanSource:用于清理处理完成后的文件的选项。
可用选项为“archive”、“delete”、“off”。如果未提供该选项,则默认值为“off”。
如果提供“archive”,则还必须提供附加选项 sourceArchiveDir。“sourceArchiveDir”的值在深度上(从根目录开始的目录数)不得与源模式匹配,其中深度是两个路径上的最小深度。这将确保存档文件永远不会包含为新的源文件。
例如,假设你提供 '/hello?/spark/*' 作为源模式,则 '/hello1/spark/archive/dir' 不能用作 "sourceArchiveDir" 的值,因为 '/hello?/spark/*' 和 '/hello1/spark/archive' 将匹配。'/hello1/spark' 也不能用作 "sourceArchiveDir" 的值,因为 '/hello?/spark' 和 '/hello1/spark' 将匹配。'/archived/here' 将是可以的,因为它不匹配。
Spark 将按照源文件自己的路径移动源文件。例如,如果源文件的路径是 /a/b/dataset.txt,而存档目录的路径是 /archived/here,则该文件将被移动到 /archived/here/a/b/dataset.txt
注意:通过移动或删除已完成的文件进行存档,都会在每个微批次中引入开销(减慢速度,即使它发生在单独的线程中),因此在启用此选项之前,你需要了解文件系统中每个操作的成本。另一方面,启用此选项将降低列出源文件的成本,这可能是一项昂贵的操作。
已完成的文件清理器中使用的线程数可以使用 spark.sql.streaming.fileSource.cleaner.numThreads 进行配置(默认值:1)。
注意 2:启用此选项时,不应从多个源或查询中使用源路径。同样,你必须确保源路径与文件流接收器的输出目录中的任何文件都不匹配。
注意 3:删除和移动操作都是尽力而为。未能删除或移动文件不会使流式查询失败。在某些情况下,Spark 可能无法清理某些源文件 - 例如,应用程序没有正常关闭,或者有太多的文件排队等待清理。

有关特定于文件格式的选项,请参阅 DataStreamReader 中的相关方法(Scala/Java/Python/R)。例如,对于 "parquet" 格式选项,请参阅 DataStreamReader.parquet()

此外,还有一些会影响某些文件格式的会话配置。有关更多详细信息,请参阅 SQL 编程指南。例如,对于 "parquet",请参阅 Parquet 配置 部分。
支持 glob 路径,但不支持多个逗号分隔的路径/glob。
Socket 源 host:要连接到的主机,必须指定
port:要连接到的端口,必须指定
速率源 rowsPerSecond(例如 100,默认值:1):每秒应生成多少行。

rampUpTime(例如 5s,默认值:0s):在生成速度达到 rowsPerSecond 之前要预热多长时间。使用比秒更精细的粒度将被截断为整数秒。

numPartitions(例如 10,默认值:Spark 的默认并行度):生成行的分区数。

该源将尽最大努力达到 rowsPerSecond,但查询可能会受到资源限制,并且可以调整 numPartitions 以帮助达到所需的速度。
每个微批次的速率源(格式:rate-micro-batch rowsPerBatch(例如 100):每个微批次应生成多少行。

numPartitions(例如 10,默认值:Spark 的默认并行度):生成行的分区数。

startTimestamp(例如 1000,默认值:0):生成时间的起始值。

advanceMillisPerBatch(例如 1000,默认值:1000):每个微批次中生成的时间提前量。

Kafka 源 请参阅 Kafka 集成指南

以下是一些示例。

spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
SparkSession spark = ...

// Read text from socket
Dataset<Row> socketDF = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources

socketDF.printSchema();

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
  .readStream()
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
sparkR.session(...)

# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)

isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources

printSchema(socketDF)

# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
                     structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")

这些示例生成的是非类型化的流式 DataFrame,这意味着 DataFrame 的模式不会在编译时进行检查,仅在提交查询时在运行时进行检查。某些操作(如 mapflatMap 等)需要在编译时知道类型。要执行这些操作,你可以使用与静态 DataFrame 相同的方法将这些非类型化的流式 DataFrame 转换为类型化的流式 Dataset。有关更多详细信息,请参阅 SQL 编程指南。此外,本文档稍后将讨论有关受支持的流式源的更多详细信息。

自 Spark 3.1 起,你还可以使用 DataStreamReader.table() 从表中创建流式 DataFrame。 有关更多详细信息,请参见 流式表 API

流式 DataFrame/Dataset 的模式推断和分区

默认情况下,来自基于文件的源的结构化流需要你指定模式,而不是依赖 Spark 自动推断它。此限制确保将为流式查询使用一致的模式,即使在发生故障的情况下也是如此。 对于临时用例,你可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。

当存在名为 /key=value/ 的子目录时,会发生分区发现,并且列表将自动递归到这些目录中。 如果这些列出现在用户提供的模式中,则 Spark 将根据读取文件的路径填写它们。 构成分区方案的目录必须在查询开始时存在,并且必须保持静态。 例如,当 /data/year=2015/ 存在时,添加 /data/year=2016/ 是可以的,但更改分区列(即,通过创建目录 /data/date=2016-04-17/)是无效的。

流式 DataFrame/Dataset 上的操作

你可以在流式 DataFrame/Dataset 上应用各种操作 - 从非类型化、类似 SQL 的操作(例如,selectwheregroupBy)到类型化的类似 RDD 的操作(例如,mapfilterflatMap)。 有关更多详细信息,请参阅 SQL 编程指南。 让我们看一下你可以使用的一些示例操作。

基本操作 - 选择、投影、聚合

DataFrame/Dataset 上的大多数常见操作都支持流式处理。 此部分稍后将讨论一些不支持的操作。

df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
  private String device;
  private String deviceType;
  private Double signal;
  private java.sql.Date time;
  ...
  // Getter and setter methods for each field
}

Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")

# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))

你还可以将流式 DataFrame/Dataset 注册为临时视图,然后在其上应用 SQL 命令。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # returns another streaming DF
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // returns another streaming DF
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates");  // returns another streaming DF
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")

请注意,你可以使用 df.isStreaming 识别 DataFrame/Dataset 是否具有流式数据。

df.isStreaming()
df.isStreaming
df.isStreaming()
isStreaming(df)

你可能需要检查查询的查询计划,因为 Spark 可能会在针对流式数据集解释 SQL 语句期间注入有状态操作。 一旦有状态操作被注入到查询计划中,你可能需要根据有状态操作的注意事项检查你的查询。 (例如,输出模式、水印、状态存储大小维护等)

事件时间上的窗口操作

使用结构化流式处理,在滑动事件时间窗口上的聚合非常简单,并且与分组聚合非常相似。 在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如,计数)。 对于基于窗口的聚合,为行事件时间所属的每个窗口维护聚合值。 让我们用一个图示来理解这一点。

假设我们的 快速示例 被修改,并且流现在包含行以及生成行的时间。 我们不想运行字数统计,而是想在 10 分钟的窗口内统计字数,每 5 分钟更新一次。 也就是说,在 10 分钟窗口 12:00 - 12:10、12:05 - 12:15、12:10 - 12:20 等之间收到的单词的字数统计。 请注意,12:00 - 12:10 表示在 12:00 之后但在 12:10 之前到达的数据。 现在,考虑一个在 12:07 收到的单词。 此单词应将对应于两个窗口 12:00 - 12:10 和 12:05 - 12:15 的计数递增。 因此,计数将按分组键(即,单词)和窗口(可以从事件时间计算)进行索引。

结果表如下所示。

Window Operations

由于此窗口与分组类似,因此在代码中,你可以使用 groupBy()window() 操作来表示窗口聚合。 你可以在 Scala/Java/Python 中查看以下示例的完整代码。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

处理迟到数据和水印

现在考虑如果其中一个事件延迟到达应用程序会发生什么。 例如,假设在 12:04(即事件时间)生成的单词,应用程序在 12:11 收到。 应用程序应使用 12:04 而不是 12:11 来更新 12:00 - 12:10 窗口的旧计数。 这在我们基于窗口的分组中自然发生——Structured Streaming 可以长时间维护部分聚合的中间状态,以便延迟数据可以正确更新旧窗口的聚合,如下所示。

Handling Late Data

但是,要运行此查询几天,系统必须限制其累积的内存中间状态量。 这意味着系统需要知道何时可以从内存状态中删除旧的聚合,因为应用程序将不再收到该聚合的延迟数据。 为了实现这一点,在 Spark 2.1 中,我们引入了水印 (watermarking),它使引擎能够自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态。 您可以通过指定事件时间列和数据预期延迟的时间阈值来定义查询的水印。 对于在时间 T 结束的特定窗口,引擎将维护状态并允许延迟数据更新状态,直到 (引擎看到的最大事件时间 - 延迟阈值 > T)。 换句话说,在阈值范围内的延迟数据将被聚合,但晚于阈值的数据将开始被丢弃(有关确切的保证,请参阅本节后面的语义保证)。 让我们用一个例子来理解这一点。 我们可以使用下面的 withWatermark() 轻松地在之前的示例中定义水印。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group

words <- withWatermark(words, "timestamp", "10 minutes")
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

在此示例中,我们正在定义查询水印的列的值为 “timestamp”,并且还定义了 “10 分钟” 作为允许的数据延迟的阈值。 如果此查询在更新输出模式下运行(稍后在输出模式部分中讨论),引擎将继续更新结果表中窗口的计数,直到窗口比水印旧,水印滞后于列 “timestamp” 中的当前事件时间 10 分钟。 这是一个说明。

Watermarking in Update Mode

如图所示,引擎跟踪的最大事件时间是蓝色虚线,并且在每次触发开始时设置为 (最大事件时间 - '10 分钟') 的水印是红线。 例如,当引擎观察到数据 (12:14, dog) 时,它将下一个触发器的水印设置为 12:04。 此水印允许引擎维护额外的 10 分钟的中间状态,以允许对延迟数据进行计数。 例如,数据 (12:09, cat) 是无序且延迟的,它落在窗口 12:00 - 12:1012:05 - 12:15 中。 由于它仍然领先于触发器中的水印 12:04,因此引擎仍然维护中间计数作为状态,并正确更新相关窗口的计数。 但是,当水印更新为 12:11 时,窗口 (12:00 - 12:10) 的中间状态被清除,所有后续数据(例如 (12:04, donkey))都被认为是“太晚”因此被忽略。 请注意,在每次触发后,更新的计数(即紫色行)将作为触发器输出写入接收器 (sink),如更新模式所指定的那样。

某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。 为了与它们一起工作,我们还支持追加模式,其中只有最终计数写入接收器。 如下图所示。

请注意,在非流式 Dataset 上使用 withWatermark 是无操作。 由于水印不应以任何方式影响任何批处理查询,因此我们将直接忽略它。

Watermarking in Append Mode

与之前的更新模式类似,引擎维护每个窗口的中间计数。 但是,部分计数不会更新到结果表,也不会写入接收器。 引擎等待“10 分钟”以计算延迟日期,然后删除水印 < 窗口的中间状态,并将最终计数附加到结果表/接收器。 例如,只有在水印更新为 12:11 后,窗口 12:00 - 12:10 的最终计数才会附加到结果表。

时间窗口的类型

Spark 支持三种类型的时间窗口:滚动(固定)、滑动和会话。

The types of time windows

滚动窗口是一系列固定大小、非重叠和连续的时间间隔。 一个输入只能绑定到一个窗口。

滑动窗口类似于滚动窗口,它们都是“固定大小”的,但如果滑动的持续时间小于窗口的持续时间,则窗口可以重叠,在这种情况下,一个输入可以绑定到多个窗口。

滚动和滑动窗口使用 window 函数,该函数已在上述示例中描述。

与前两种类型相比,会话窗口具有不同的特征。 会话窗口具有动态的窗口长度,具体取决于输入。 会话窗口以输入开始,如果在间隔持续时间内收到后续输入,则会自行扩展。 对于静态间隔持续时间,如果在收到最新输入后间隔持续时间内没有收到任何输入,则会话窗口关闭。

会话窗口使用 session_window 函数。 该函数的使用与 window 函数类似。

events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window(events.timestamp, "5 minutes"),
        events.userId) \
    .count()
import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window($"timestamp", "5 minutes"),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window(col("timestamp"), "5 minutes"),
        col("userId"))
    .count();

除了静态值之外,我们还可以提供一个表达式来根据输入行动态指定间隔持续时间。 请注意,间隔持续时间为负数或零的行将从聚合中过滤掉。

使用动态间隔持续时间时,会话窗口的关闭不再取决于最新的输入。 会话窗口的范围是所有事件范围的并集,这些范围由事件开始时间和查询执行期间评估的间隔持续时间决定。

from pyspark.sql import functions as sf

events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

session_window = session_window(events.timestamp, \
    sf.when(events.userId == "user1", "5 seconds") \
    .when(events.userId == "user2", "20 seconds").otherwise("5 minutes"))

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window,
        events.userId) \
    .count()
import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

val sessionWindow = session_window($"timestamp", when($"userId" === "user1", "5 seconds")
  .when($"userId" === "user2", "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        Column(sessionWindow),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

SessionWindow sessionWindow = session_window(col("timestamp"), when(col("userId").equalTo("user1"), "5 seconds")
  .when(col("userId").equalTo("user2"), "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        new Column(sessionWindow),
        col("userId"))
    .count();

请注意,在流式查询中使用会话窗口时,存在一些限制,如下所示

对于批处理查询,支持全局窗口(仅在分组键中具有 session_window)。

默认情况下,Spark 不对会话窗口聚合执行部分聚合,因为它需要在分组之前在本地分区中进行额外的排序。 对于每个本地分区的同一组键中只有少量输入行的情况,它效果更好,但对于本地分区中存在大量具有相同组键的输入行的情况,尽管需要额外的排序,但执行部分聚合仍然可以显着提高性能。

您可以启用 spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition 以指示 Spark 执行部分聚合。

时间窗口的时间表示

在某些用例中,需要提取时间窗口的时间表示,以便将需要时间戳的操作应用于时间窗口数据。 一个例子是链式时间窗口聚合,用户想要针对时间窗口定义另一个时间窗口。 例如,有人想要将 5 分钟的时间窗口聚合为 1 小时的滚动时间窗口。

有两种方法可以实现这一点,如下所示

  1. 使用带有时间窗口列作为参数的 window_time SQL 函数
  2. 使用带有时间窗口列作为参数的 window SQL 函数

window_time 函数将生成一个时间戳,表示时间窗口的时间。 用户可以将结果传递给 window 函数(或任何需要时间戳的地方)的参数,以执行需要时间戳的时间窗口的操作。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window(window_time($"window"), "1 hour"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
  functions.window(functions.window_time("window"), "1 hour"),
  windowedCounts.col("word")
).count();

window 函数不仅接受时间戳列,还接受时间窗口列。 这对于用户想要应用链式时间窗口聚合的情况特别有用。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(windowedCounts.window, "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
  functions.window("window", "1 hour"),
  windowedCounts.col("word")
).count();
水印清理聚合状态的条件

重要的是要注意,必须满足以下条件,水印才能清理聚合查询中的状态(截至 Spark 2.1.1,将来可能会更改)

带有水印的聚合的语义保证

Join 操作

Structured Streaming 支持将流式 Dataset/DataFrame 与静态 Dataset/DataFrame 以及另一个流式 Dataset/DataFrame 连接。 流式连接的结果会以增量方式生成,类似于上一节中流式聚合的结果。 在本节中,我们将探讨在上述情况下支持哪些类型的连接(即内部、外部、半连接等)。 请注意,在所有支持的连接类型中,与流式 Dataset/DataFrame 连接的结果与使用包含流中相同数据的静态 Dataset/DataFrame 连接的结果完全相同。

流-静态 Join

自 Spark 2.0 引入以来,Structured Streaming 一直支持流式和静态 DataFrame/Dataset 之间的连接(内部连接和某些类型的外部连接)。 这是一个简单的例子。

staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  # left outer join with a static DF
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  // left outer join with a static DF
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer");  // left outer join with a static DF
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE)  # inner equi-join with a static DF
joined <- join(
            streamingDf,
            staticDf,
            streamingDf$value == staticDf$value,
            "left_outer")  # left outer join with a static DF

请注意,流式-静态连接是无状态的,因此不需要状态管理。 但是,某些类型的流式-静态外部连接尚未受支持。 这些列在本连接部分的末尾

流-流 Join

在 Spark 2.3 中,我们添加了对流式-流式连接的支持,也就是说,您可以连接两个流式 Datasets/DataFrames。 在两个数据流之间生成连接结果的挑战在于,在任何时间点,数据集的视图对于连接的双方都是不完整的,这使得在输入之间找到匹配项变得更加困难。 从一个输入流接收的任何行都可以与来自另一个输入流的任何未来的、尚未接收的行匹配。 因此,对于两个输入流,我们将过去的输入缓冲为流式状态,以便我们可以将每个未来的输入与过去的输入进行匹配,并相应地生成连接的结果。 此外,与流式聚合类似,我们会自动处理延迟的、乱序的数据,并且可以使用水印限制状态。 让我们讨论不同类型的支持的流式-流式连接以及如何使用它们。

带有可选水印的 Inner Join

支持任何类型的列以及任何类型的连接条件上的内部连接。但是,随着流的运行,流式状态的大小会无限增长,因为所有过去的输入都必须保存,以便任何新的输入都可以与过去的任何输入匹配。为避免无界状态,您必须定义额外的连接条件,以便无限旧的输入无法与未来的输入匹配,因此可以从状态中清除。换句话说,您需要在连接中执行以下额外步骤。

  1. 在两个输入上定义水印延迟,以便引擎知道输入的延迟程度(类似于流式聚合)

  2. 在两个输入之间定义事件时间约束,以便引擎可以确定何时不需要一个输入的旧行(即,不会满足时间约束)与另一个输入进行匹配。此约束可以通过以下两种方式之一定义。

    1. 时间范围连接条件(例如,...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),

    2. 基于事件时间窗口的连接(例如,...JOIN ON leftTimeWindow = rightTimeWindow)。

让我们通过一个例子来理解这一点。

假设我们想将广告展示流(广告何时展示)与用户点击广告的另一个流连接起来,以关联展示何时导致可盈利的点击。为了允许在此流-流连接中进行状态清理,您必须按如下方式指定水印延迟和时间约束。

  1. 水印延迟:假设展示和相应的点击在事件时间中最多可以延迟/乱序 2 小时和 3 小时。

  2. 事件时间范围条件:假设点击可以在相应展示后的 0 秒到 1 小时的时间范围内发生。

代码如下所示。

from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import org.apache.spark.sql.functions.expr

val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import static org.apache.spark.sql.functions.expr

Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour ")
);
impressions <- read.stream(...)
clicks <- read.stream(...)

# Apply watermarks on event-time columns
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")

# Join with event-time constraints
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"
)))
带有水印的流-流内部连接的语义保证

这类似于水印在聚合上提供的保证。“2 小时”的水印延迟保证引擎永远不会丢弃任何延迟小于 2 小时的数据。但是,延迟超过 2 小时的数据可能会也可能不会被处理。

带有水印的 Outer Join

虽然水印 + 事件时间约束对于内部连接是可选的,但对于外部连接,它们必须指定。这是因为为了在外部连接中生成 NULL 结果,引擎必须知道何时输入行将不会与未来中的任何内容匹配。因此,必须指定水印 + 事件时间约束才能生成正确的结果。因此,带有外部连接的查询看起来很像之前的广告盈利示例,除了会有一个额外的参数指定它是外部连接。

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
 )
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"),
  "left_outer"                 # can be "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))
带有水印的流-流外部连接的语义保证

外部连接与内部连接在水印延迟以及数据是否会被丢弃方面具有相同的保证。

注意事项

关于如何生成外部结果,需要注意一些重要的特征。

带有水印的 Semi Join

半连接返回关系的左侧与右侧匹配的值。它也称为左半连接。与外部连接类似,必须为半连接指定水印 + 事件时间约束。这是为了驱逐左侧不匹配的输入行,引擎必须知道左侧的输入行何时不会在将来与右侧的任何内容匹配。

带有水印的流-流半连接的语义保证

半连接与内部连接在水印延迟以及数据是否会被丢弃方面具有相同的保证。

流式查询中 Join 的支持矩阵
左侧输入 右侧输入 连接类型
静态 静态 所有类型 支持,因为它不是在流式数据上,即使它可能存在于流式查询中
静态 内部 支持,无状态
左外部 支持,无状态
右外部 不支持
完全外部 不支持
左半 支持,无状态
静态 内部 支持,无状态
左外部 不支持
右外部 支持,无状态
完全外部 不支持
左半 不支持
内部 支持,可选择指定两侧的水印 + 用于状态清理的时间约束
左外部 有条件支持,必须指定右侧的水印 + 用于正确结果的时间约束,可选择指定左侧的水印以进行所有状态清理
右外部 有条件支持,必须指定左侧的水印 + 用于正确结果的时间约束,可选择指定右侧的水印以进行所有状态清理
完全外部 有条件支持,必须指定一侧的水印 + 用于正确结果的时间约束,可选择指定另一侧的水印以进行所有状态清理
左半 有条件支持,必须指定右侧的水印 + 用于正确结果的时间约束,可选择指定左侧的水印以进行所有状态清理

有关支持的连接的更多详细信息

在 append 输出模式下,您可以构建一个具有非 map 类操作的查询,例如在连接之前/之后进行聚合、去重、流-流连接。

例如,这是一个在两个流中进行时间窗口聚合,然后进行带有事件时间窗口的流-流连接的例子

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")
Dataset<Row> clicksWindow = clicksWithWatermark
  .groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
  .count();

Dataset<Row> impressionsWindow = impressionsWithWatermark
  .groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour"))
  .count();

clicksWindow.join(impressionsWindow, "window", "inner");
clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

这是另一个带有时间范围连接条件的流-流连接,然后是时间窗口聚合的例子

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()
Dataset<Row> joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);

joined
  .groupBy(joined.col("clickAdId"), functions.window(joined.col("clickTime"), "1 hour"))
  .count();
joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

流式去重

您可以使用事件中的唯一标识符来对数据流中的记录进行去重。这与使用唯一标识符列对静态数据进行去重完全相同。查询将存储来自先前记录的必要数量的数据,以便它可以过滤重复的记录。与聚合类似,您可以使用带有或不带有水印的去重。

streamingDf = spark.readStream. ...

# Without watermark using guid column
streamingDf.dropDuplicates("guid")

# With watermark using guid and eventTime columns
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("guid", "eventTime")
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
streamingDf <- read.stream(...)

# Without watermark using guid column
streamingDf <- dropDuplicates(streamingDf, "guid")

# With watermark using guid and eventTime columns
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")

特别是对于流式处理,您可以使用事件中的唯一标识符,在水印的时间范围内对数据流中的记录进行去重。例如,如果您将水印的延迟阈值设置为“1 小时”,则在 1 小时内发生的重复事件可以被正确地去重。(有关更多详细信息,请参阅dropDuplicatesWithinWatermark的 API 文档。)

这可以用于处理事件时间列不能作为唯一标识符的一部分的用例,主要是由于相同记录的事件时间以某种方式不同的情况。(例如,非幂等写入器,其中发出事件时间发生在写入时)

建议用户将水印的延迟阈值设置得长于重复事件之间的最大时间戳差异。

此功能要求在流式 DataFrame/Dataset 中设置带有延迟阈值的水印。

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
streamingDf \
  .withWatermark("eventTime", "10 hours") \
  .dropDuplicatesWithinWatermark("guid")
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid");

处理多个水印的策略

一个流式查询可以有多个联合或连接在一起的输入流。每个输入流可以具有不同的延迟数据阈值,这些阈值需要为有状态操作容忍。您可以使用每个输入流上的withWatermarks("eventTime", delay)来指定这些阈值。例如,考虑一个在inputStream1inputStream2之间进行流-流连接的查询。

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

在执行查询时,Structured Streaming 会单独跟踪每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择一个全局水印与它们一起使用于有状态操作。默认情况下,选择最小值作为全局水印,因为它确保如果其中一个流落后于其他流(例如,由于上游故障导致其中一个流停止接收数据),则不会意外地将任何数据作为过晚而丢弃。换句话说,全局水印将安全地以最慢的流的速度移动,并且查询输出将相应地延迟。

但是,在某些情况下,即使这意味着丢弃来自最慢流的数据,您可能也希望获得更快的结果。从 Spark 2.4 开始,您可以通过将 SQL 配置spark.sql.streaming.multipleWatermarkPolicy设置为max(默认值为min)来设置多水印策略以选择最大值作为全局水印。这使得全局水印以最快的流的速度移动。但是,作为副作用,来自较慢流的数据将被积极地丢弃。因此,请谨慎使用此配置。

任意有状态操作

许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,您必须跟踪来自事件数据流的会话。要进行此类会话化,您必须将任意类型的数据保存为状态,并在每次触发时使用数据流事件对状态执行任意操作。从 Spark 2.2 开始,可以使用操作mapGroupsWithState和更强大的操作flatMapGroupsWithState来完成。这两个操作都允许您将用户定义的代码应用于分组的 Datasets,以更新用户定义的状态。有关更具体的详细信息,请查看 API 文档(Scala/Java)和示例(Scala/Java)。

虽然 Spark 无法检查和强制执行,但状态函数应该根据输出模式的语义来实现。例如,在 Update 模式下,Spark 不希望状态函数发出比当前水印加上允许的延迟记录延迟更旧的行,而在 Append 模式下,状态函数可以发出这些行。

不支持的操作

有一些 DataFrame/Dataset 操作不支持流式 DataFrames/Datasets。其中一些如下。

此外,一些 Dataset 方法不能在流式 Dataset 上使用。 它们是会立即运行查询并返回结果的 action,这在流式 Dataset 上没有意义。 相反,这些功能可以通过显式启动流式查询来完成(请参阅下一节)。

如果你尝试任何这些操作,你将看到一个 AnalysisException,如 “operation XYZ is not supported with streaming DataFrames/Datasets”。 虽然其中一些操作可能会在 Spark 的未来版本中得到支持,但其他一些操作从根本上来说很难在流式数据上高效地实现。 例如,不支持对输入流进行排序,因为它需要跟踪流中接收到的所有数据。 因此,从根本上来说,很难有效地执行此操作。

状态存储

状态存储是一个版本化的键值存储,提供读取和写入操作。 在 Structured Streaming 中,我们使用状态存储提供程序来处理跨批次的有状态操作。 有两种内置的状态存储提供程序实现。 最终用户也可以通过扩展 StateStoreProvider 接口来实现他们自己的状态存储提供程序。

HDFS 状态存储提供程序

HDFS 后端状态存储提供程序是 [[StateStoreProvider]] 和 [[StateStore]] 的默认实现,其中所有数据首先存储在内存映射中,然后由 HDFS 兼容文件系统中的文件备份。 对存储的所有更新必须以事务方式成组完成,并且每组更新都会增加存储的版本。 这些版本可用于在正确的存储版本上重新执行更新(通过 RDD 操作中的重试),并重新生成存储版本。

RocksDB 状态存储实现

从 Spark 3.2 开始,我们添加了一个新的内置状态存储实现,RocksDB 状态存储提供程序。

如果你的流式查询中有状态操作(例如,流式聚合、流式 dropDuplicates、流-流 joins、mapGroupsWithState 或 flatMapGroupsWithState),并且你希望在状态中维护数百万个键,那么你可能会面临与大型 JVM 垃圾回收 (GC) 暂停相关的问题,从而导致微批处理时间出现很大的变化。 这是因为,通过 HDFSBackedStateStore 的实现,状态数据保存在 executors 的 JVM 内存中,并且大量状态对象给 JVM 造成内存压力,从而导致高 GC 暂停。

在这种情况下,你可以选择使用基于 RocksDB 的更优化的状态管理解决方案。 此解决方案不是将状态保存在 JVM 内存中,而是使用 RocksDB 有效地管理本机内存和本地磁盘中的状态。 此外,Structured Streaming 会自动将对此状态的任何更改保存到你提供的检查点位置,从而提供完全的容错保证(与默认状态管理相同)。

要启用新的内置状态存储实现,请将 spark.sql.streaming.stateStore.providerClass 设置为 org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

以下是关于状态存储提供程序的 RocksDB 实例的配置

配置名称 描述 默认值
spark.sql.streaming.stateStore.rocksdb.compactOnCommit 我们是否对提交操作的 RocksDB 实例执行范围压缩 False
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled 在 RocksDB StateStore 提交期间是否上传变更日志而不是快照 False
spark.sql.streaming.stateStore.rocksdb.blockSizeKB 每个块中打包的用户数据的大致大小(以 KB 为单位),用于 RocksDB BlockBasedTable,它是 RocksDB 的默认 SST 文件格式。 4
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB 块缓存的容量大小(以 MB 为单位)。 8
spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs 在 RocksDB 实例的加载操作中,获取锁的等待时间(以毫秒为单位)。 60000
spark.sql.streaming.stateStore.rocksdb.maxOpenFiles RocksDB 实例可以使用的打开文件数。 值 -1 表示始终保持打开状态的文件。 如果达到打开文件限制,RocksDB 将从打开文件缓存中逐出条目,关闭这些文件描述符,并从缓存中删除这些条目。 -1
spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad 我们是否在加载时重置 RocksDB 的所有 ticker 和 histogram 统计信息。 True
spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows 我们是否跟踪状态存储中的总行数。 请参阅 性能方面注意事项 中的详细信息。 True
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB RocksDB 中 MemTable 的最大大小。 值 -1 表示将使用 RocksDB 内部默认值 -1
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber RocksDB 中 MemTable 的最大数量,包括活动和不可变的。 值 -1 表示将使用 RocksDB 内部默认值 -1
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage 单个节点上 RocksDB 状态存储实例的总内存使用量是否有限制。 false
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB 单个节点上 RocksDB 状态存储实例的总内存限制(以 MB 为单位)。 500
spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio 写缓冲区占用的总内存,作为使用 maxMemoryUsageMB 在单个节点上分配给所有 RocksDB 实例的内存的一部分。 0.5
spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio 高优先级池中块占用的总内存,作为使用 maxMemoryUsageMB 在单个节点上分配给所有 RocksDB 实例的内存的一部分。 0.1
RocksDB 状态存储内存管理

RocksDB 为不同的对象分配内存,例如 memtable、块缓存和过滤器/索引块。 如果不加以限制,多个实例的 RocksDB 内存使用量可能会无限增长,并可能导致 OOM(内存不足)问题。 RocksDB 提供了一种通过使用写缓冲区管理器功能来限制运行在单个节点上的所有 DB 实例的内存使用量的方法。 如果你想限制 Spark Structured Streaming 部署中的 RocksDB 内存使用量,可以通过将 spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage 配置设置为 true 来启用此功能。 你还可以通过将 spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB 值设置为静态数字或节点上可用物理内存的一部分来确定 RocksDB 实例的最大允许内存。 还可以通过将 spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMBspark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber 设置为所需值来配置单个 RocksDB 实例的限制。 默认情况下,这些设置使用 RocksDB 内部默认值。

RocksDB 状态存储变更日志检查点

在较新版本的 Spark 中,为 RocksDB 状态存储引入了变更日志检查点。 RocksDB State Store 的传统检查点机制是增量快照检查点,其中 RocksDB 实例的 manifest 文件和新生成的 RocksDB SST 文件被上传到持久存储。 变更日志检查点不是上传 RocksDB 实例的数据文件,而是上传自上次检查点以来对状态所做的更改以实现持久性。 为了实现可预测的故障恢复和变更日志修剪,定期在后台持久保存快照。 变更日志检查点避免了捕获和上传 RocksDB 实例快照的成本,并显著降低了流式查询延迟。

默认情况下,变更日志检查点已禁用。 你可以通过将 spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled 配置设置为 true 来启用 RocksDB State Store 变更日志检查点。 变更日志检查点旨在与传统检查点机制向后兼容。 RocksDB 状态存储提供程序为两种检查点机制之间的双向转换提供无缝支持。 这使你可以利用变更日志检查点的性能优势,而无需丢弃旧的状态检查点。 在支持变更日志检查点的 Spark 版本中,你可以通过在 spark 会话中启用变更日志检查点,将流式查询从旧版本的 Spark 迁移到变更日志检查点。 反之亦然,你可以在较新版本的 Spark 中安全地禁用变更日志检查点,然后任何已经运行变更日志检查点的查询将切换回传统检查点。 你需要重新启动流式查询才能应用检查点机制中的更改,但在此过程中不会观察到任何性能下降。

性能方面注意事项
  1. 你可能想要禁用对总行数的跟踪,以实现 RocksDB 状态存储的更好性能。

跟踪行数会在写入操作上带来额外的查找 - 我们建议你尝试在调整 RocksDB 状态存储时关闭该配置,特别是状态运算符的指标值很大时 - numRowsUpdatednumRowsRemoved

你可以在重启查询期间更改配置,这使你可以更改“可观察性与性能”之间的权衡决策。 如果禁用该配置,则状态中的行数 (numTotalStateRows) 将报告为 0。

状态存储和任务本地性

有状态操作会为执行器状态存储中的事件存储状态。状态存储占用内存和磁盘空间等资源来存储状态。因此,让状态存储提供者在不同的流式处理批次中在同一个执行器中运行会更有效率。更改状态存储提供者的位置需要加载检查点状态的额外开销。从检查点加载状态的开销取决于外部存储和状态的大小,这往往会损害微批处理的延迟。对于某些用例,例如处理非常大的状态数据,从检查点状态加载新的状态存储提供者可能会非常耗时且效率低下。

Structured Streaming 查询中的有状态操作依赖于 Spark 的 RDD 的首选位置特性,以便在同一个执行器上运行状态存储提供者。如果在下一个批次中,对应的状态存储提供者再次被调度到该执行器上,它可以重用之前的状态,并节省加载检查点状态的时间。

然而,通常首选位置不是硬性要求,Spark 仍然有可能将任务调度到首选执行器之外的执行器。在这种情况下,Spark 将从新的执行器上的检查点状态加载状态存储提供者。在前一个批次中运行的状态存储提供者不会立即卸载。Spark 运行一个维护任务,该任务检查并卸载执行器上不活跃的状态存储提供者。

通过更改与任务调度相关的 Spark 配置,例如 spark.locality.wait,用户可以配置 Spark 等待启动数据本地任务的时间。对于 Structured Streaming 中的有状态操作,它可以用于让状态存储提供者在不同的批次中在同一个执行器上运行。

特别是对于内置的 HDFS 状态存储提供者,用户可以检查状态存储指标,例如 loadedMapCacheHitCountloadedMapCacheMissCount。理想情况下,最好是尽量减少缓存未命中计数,这意味着 Spark 不会在加载检查点状态上浪费太多时间。用户可以增加 Spark 本地性等待配置,以避免在不同的批次中在不同的执行器上加载状态存储提供者。

启动流式查询

一旦您定义了最终结果 DataFrame/Dataset,剩下的就是启动流式计算。为此,您必须使用通过 Dataset.writeStream() 返回的 DataStreamWriter (Scala/Java/Python docs)。您将需要在此接口中指定以下一个或多个内容。

输出模式

有几种类型的输出模式。

不同类型的流式查询支持不同的输出模式。这是一个兼容性矩阵。

查询类型 支持的输出模式 备注
带有聚合的查询 具有水印的事件时间聚合 追加、更新、完整 追加模式使用水印来删除旧的聚合状态。但是窗口聚合的输出会延迟在 withWatermark() 中指定的延迟阈值,因为按照模式语义,只有在行被最终确定后(即,在跨越水印之后),才能将行添加到结果表中。有关更多详细信息,请参阅 延迟数据部分。

更新模式使用水印来删除旧的聚合状态。

完整模式不删除旧的聚合状态,因为根据定义,此模式会保留结果表中的所有数据。
其他聚合 完整、更新 由于未定义水印(仅在其他类别中定义),因此不会删除旧的聚合状态。

不支持追加模式,因为聚合可以更新,从而违反此模式的语义。
带有 mapGroupsWithState 的查询 更新 不允许在带有 mapGroupsWithState 的查询中进行聚合。
带有 flatMapGroupsWithState 的查询 追加操作模式 追加 允许在 flatMapGroupsWithState 之后进行聚合。
更新操作模式 更新 不允许在带有 flatMapGroupsWithState 的查询中进行聚合。
带有 joins 的查询 追加 尚不支持更新和完整模式。 有关支持的连接类型的更多详细信息,请参阅 连接操作部分中的支持矩阵
其他查询 追加、更新 不支持完整模式,因为将所有未聚合的数据保留在结果表中是不可行的。

输出接收器

有几种类型的内置输出接收器。

writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
writeStream
    .foreach(...)
    .start()
writeStream
    .format("console")
    .start()
writeStream
    .format("memory")
    .queryName("tableName")
    .start()

某些接收器不是容错的,因为它们不保证输出的持久性,仅用于调试目的。请参阅前面关于 容错语义的部分。以下是 Spark 中所有接收器的详细信息。

接收器 支持的输出模式 选项 容错 备注
文件接收器 追加 path: 输出目录的路径,必须指定。
retention: 输出文件的生存时间 (TTL)。 批处理提交早于 TTL 的输出文件最终将在元数据日志中排除。 这意味着读取接收器输出目录的读取器查询可能不会处理它们。 您可以提供时间字符串格式的值。 (例如 "12h"、"7d" 等) 默认情况下,此选项已禁用。

有关文件格式特定的选项,请参阅 DataFrameWriter 中的相关方法 (Scala/Java/Python/R)。 例如,对于“parquet”格式选项,请参阅 DataFrameWriter.parquet()
是(精确一次) 支持写入分区表。按时间分区可能很有用。
Kafka 接收器 追加、更新、完整 请参阅 Kafka 集成指南 是(至少一次) 更多详细信息请参阅 Kafka 集成指南
Foreach 接收器 追加、更新、完整 是(至少一次) 更多详细信息请参阅 下一节
ForeachBatch 接收器 追加、更新、完整 取决于实现 更多详细信息请参阅 下一节
控制台接收器 追加、更新、完整 numRows: 每次触发时要打印的行数 (默认值: 20)
truncate: 如果输出太长是否截断输出 (默认值: true)
内存接收器 追加、完整 否。但在完整模式下,重新启动的查询将重新创建完整的表。 表名是查询名称。

请注意,您必须调用 start() 才能实际开始执行查询。这将返回一个 StreamingQuery 对象,该对象是连续运行的执行的句柄。您可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个示例来理解所有这些。

# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")

# Print new data to console
noAggDF \
    .writeStream \
    .format("console") \
    .start()

# Write new data to Parquet files
noAggDF \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .option("path", "path/to/destination/dir") \
    .start()

# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF \
    .writeStream \
    .queryName("aggregates") \
    .outputMode("complete") \
    .format("memory") \
    .start()

spark.sql("select * from aggregates").show()   # interactively query in-memory table
// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table
// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// Print new data to console
noAggDF
  .writeStream()
  .format("console")
  .start();

// Write new data to Parquet files
noAggDF
  .writeStream()
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start();

// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// Print updated aggregations to console
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

// Have all the aggregates in an in-memory table
aggDF
  .writeStream()
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start();

spark.sql("select * from aggregates").show();   // interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# ========== DF with aggregation ==========
aggDF <- count(groupBy(df, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

# Interactively query in-memory table
head(sql("select * from aggregates"))
使用 Foreach 和 ForeachBatch

foreachforeachBatch 操作允许您对流式查询的输出应用任意操作和写入逻辑。 它们具有略微不同的用例 - 虽然 foreach 允许对每一行进行自定义写入逻辑,但 foreachBatch 允许对每个微批处理的输出进行任意操作和自定义逻辑。 让我们更详细地了解它们的用法。

ForeachBatch

foreachBatch(...) 允许您指定一个函数,该函数在流式查询的每个微批处理的输出数据上执行。 从 Spark 2.4 开始,Scala、Java 和 Python 中都支持此功能。 它接受两个参数:一个 DataFrame 或 Dataset,其中包含微批处理的输出数据,以及微批处理的唯一 ID。

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // Transform and write batchDF
}.start()
streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long>() {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }
  }
).start();

尚不支持 R。

使用 foreachBatch,您可以执行以下操作。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

注意

Foreach

如果 foreachBatch 不是一个选项(例如,不存在相应的批处理数据写入器,或者使用连续处理模式),那么您可以使用 foreach 来表达您的自定义写入器逻辑。 具体来说,您可以通过将数据写入逻辑划分为三个方法来表达:openprocessclose。 从 Spark 2.4 开始,foreach 在 Scala、Java 和 Python 中可用。

在 Python 中,您可以通过两种方式调用 foreach:在函数中或在对象中。 函数提供了一种表达处理逻辑的简单方法,但是当故障导致某些输入数据的重新处理时,它不允许您对生成的数据进行重复数据删除。 对于这种情况,您必须在对象中指定处理逻辑。

  • 首先,该函数将一行作为输入。
def process_row(row):
    # Write row to storage
    pass

query = streamingDF.writeStream.foreach(process_row).start()
  • 其次,该对象具有一个 process 方法和可选的 open 和 close 方法
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        pass

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        pass

    def close(self, error):
        # Close the connection. This method in optional in Python.
        pass

query = streamingDF.writeStream.foreach(ForeachWriter()).start()

在 Scala 中,您必须扩展类 ForeachWriter (文档)。

streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String): Unit = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

在 Java 中,您必须扩展类 ForeachWriter (文档)。

streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter<String>() {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

尚不支持 R。

执行语义 启动流式查询时,Spark 以下列方式调用该函数或对象的方法

流式表 API

从 Spark 3.1 开始,您还可以使用 DataStreamReader.table() 将表读取为流式 DataFrames,并使用 DataStreamWriter.toTable() 将流式 DataFrames 写入为表

spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()
val spark: SparkSession = ...

// Create a streaming DataFrame
val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load()

// Write the streaming DataFrame to a table
df.writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable")

// Check the table result
spark.read.table("myTable").show()

// Transform the source dataset and write to a new table
spark.readStream
  .table("myTable")
  .select("value")
  .writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable")

// Check the new table result
spark.read.table("newTable").show()
SparkSession spark = ...

// Create a streaming DataFrame
Dataset<Row> df = spark.readStream()
  .format("rate")
  .option("rowsPerSecond", 10)
  .load();

// Write the streaming DataFrame to a table
df.writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable");

// Check the table result
spark.read().table("myTable").show();

// Transform the source dataset and write to a new table
spark.readStream()
  .table("myTable")
  .select("value")
  .writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable");

// Check the new table result
spark.read().table("newTable").show();

在 R 中不可用。

有关更多详细信息,请查看 DataStreamReader(Scala/Java/Python 文档)和 DataStreamWriter(Scala/Java/Python 文档)的文档。

触发器

流式查询的触发器设置定义了流式数据处理的时序,无论该查询将作为具有固定批处理间隔的微批处理查询还是作为连续处理查询执行。 以下是支持的不同类型的触发器。

触发器类型 描述
未指定(默认) 如果未显式指定触发器设置,则默认情况下,查询将以微批处理模式执行,其中微批处理将在上一个微批处理完成处理后立即生成。
固定间隔微批处理 该查询将以微批处理模式执行,其中微批处理将以用户指定的时间间隔启动。
  • 如果上一个微批处理在间隔内完成,则引擎将等到间隔结束后再启动下一个微批处理。
  • 如果上一个微批处理花费的时间超过间隔才能完成(即,如果错过了间隔边界),则下一个微批处理将在上一个微批处理完成后立即启动(即,它将不会等待下一个间隔边界)。
  • 如果没有新的数据可用,则不会启动任何微批处理。
一次性微批处理(已弃用) 查询将执行一个微批处理以处理所有可用数据,然后自行停止。 这在您想要定期启动集群、处理自上一周期以来可用的所有内容,然后关闭集群的场景中非常有用。 在某些情况下,这可能会带来显着的成本节省。 请注意,此触发器已弃用,建议用户迁移到立即可用微批处理,因为它提供了更好的处理保证、更精细的批处理规模,以及更好地逐步处理水印推进(包括无数据批处理)。
立即可用微批处理 与查询的一次性微批处理触发器类似,查询将处理所有可用数据,然后自行停止。 不同之处在于,它将根据源选项(例如,文件源的 maxFilesPerTrigger)在(可能)多个微批处理中处理数据,这将带来更好的查询可伸缩性。
  • 此触发器提供了强大的处理保证:无论上一次运行中剩余多少批次,它都确保执行时所有可用数据在终止之前得到处理。 所有未提交的批次将首先被处理。
  • 水印会针对每个批次进行推进,并且如果在最后一个批次推进水印,则在终止之前会执行无数据批处理。 这有助于保持更小且可预测的状态大小,并减少有状态运算符输出的延迟。
具有固定检查点间隔的连续模式
(实验性)
查询将在新的低延迟连续处理模式下执行。 在下面的连续处理部分中阅读有关此内容的更多信息。

以下是一些代码示例。

# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
  .format("console") \
  .start()

# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

# One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

# Available-now trigger
df.writeStream \
  .format("console") \
  .trigger(availableNow=True) \
  .start()

# Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start()

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start();

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start();

// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start();

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start();

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start();
# Default trigger (runs micro-batch as soon as it can)
write.stream(df, "console")

# ProcessingTime trigger with two-seconds micro-batch interval
write.stream(df, "console", trigger.processingTime = "2 seconds")

# One-time trigger
write.stream(df, "console", trigger.once = TRUE)

# Continuous trigger is not yet supported

管理流式查询

在启动查询时创建的 StreamingQuery 对象可用于监视和管理查询。

query = df.writeStream.format("console").start()   # get the query object

query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart

query.name()        # get the name of the auto-generated or user-specified name

query.explain()   # print detailed explanations of the query

query.stop()      # stop the query

query.awaitTermination()   # block until query is terminated, with stop() or with error

query.exception()       # the exception if the query has been terminated with error

query.recentProgress  # a list of the most recent progress updates for this query

query.lastProgress    # the most recent progress update of this streaming query
val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query
StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query
query <- write.stream(df, "console")  # get the query object

queryName(query)          # get the name of the auto-generated or user-specified name

explain(query)            # print detailed explanations of the query

stopQuery(query)          # stop the query

awaitTermination(query)   # block until query is terminated, with stop() or with error

lastProgress(query)       # the most recent progress update of this streaming query

您可以在单个 SparkSession 中启动任意数量的查询。 它们将并发运行,共享集群资源。 您可以使用 sparkSession.streams() 来获取 StreamingQueryManager (Scala/Java/Python 文档),该管理器可用于管理当前活动的查询。

spark = ...  # spark session

spark.streams.active  # get the list of currently active streaming queries

spark.streams.get(id)  # get a query object by its unique id

spark.streams.awaitAnyTermination()  # block until any one of them terminates
val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates
SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates
Not available in R.

监控流式查询

有多种方法可以监视活动的流式查询。 您可以使用 Spark 的 Dropwizard Metrics 支持将指标推送到外部系统,或者以编程方式访问它们。

交互式读取指标

您可以使用 streamingQuery.lastProgress()streamingQuery.status() 直接获取活动查询的当前状态和指标。 lastProgress()ScalaJava 中返回一个 StreamingQueryProgress 对象,并在 Python 中返回一个具有相同字段的字典。 它具有有关流的最后一个触发器中取得的进展的所有信息 - 处理了哪些数据,处理速率,延迟等等。 还有 streamingQuery.recentProgress,它返回最近几个进展的数组。

此外,streamingQuery.status()ScalaJava 中返回一个 StreamingQueryStatus 对象,并在 Python 中返回一个具有相同字段的字典。 它提供有关查询正在立即执行的操作的信息 - 触发器是否处于活动状态,数据是否正在处理等等。

以下是一些示例。

query = ...  # a StreamingQuery
print(query.lastProgress)

'''
Will print something like the following.

{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''

print(query.status)
'''
Will print something like the following.

{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


println(query.status)

/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
StreamingQuery query = ...

System.out.println(query.lastProgress());
/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


System.out.println(query.status());
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
query <- ...  # a StreamingQuery
lastProgress(query)

'''
Will print something like the following.

{
  "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
  "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
  "name" : null,
  "timestamp" : "2017-04-26T08:27:28.835Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 1
  },
  "stateOperators" : [ {
    "numRowsTotal" : 4,
    "numRowsUpdated" : 0
  } ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9999]",
    "startOffset" : 1,
    "endOffset" : 1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
  }
}
'''

status(query)
'''
Will print something like the following.

{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
'''

使用异步 API 以编程方式报告指标

您还可以通过附加 StreamingQueryListener (Scala/Java/Python 文档) 异步监视与 SparkSession 关联的所有查询。 一旦您将自定义 StreamingQueryListener 对象与 sparkSession.streams.addListener() 关联,您将在查询启动和停止时以及活动查询取得进展时获得回调。 这是一个例子,

spark = ...

class Listener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("Query started: " + queryStarted.id)

    def onQueryProgress(self, event):
        print("Query made progress: " + queryProgress.progress)

    def onQueryTerminated(self, event):
    	print("Query terminated: " + queryTerminated.id)


spark.streams.addListener(Listener())
val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
Not available in R.

使用 Dropwizard 报告指标

Spark 支持使用 Dropwizard Library 报告指标。 要使结构化流式查询的指标也一起报告,您必须在 SparkSession 中显式启用配置 spark.sql.streaming.metricsEnabled

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
sql("SET spark.sql.streaming.metricsEnabled=true")

启用此配置后,在 SparkSession 中启动的所有查询都将通过 Dropwizard 将指标报告给已配置的任何接收器(例如,Ganglia,Graphite,JMX 等)。

通过检查点从故障中恢复

如果发生故障或有意关闭,您可以恢复先前查询的先前进度和状态,并从中断的地方继续。 这是通过检查点和预写日志完成的。 您可以使用检查点位置配置查询,并且查询会将所有进度信息(即,每个触发器中处理的偏移量范围)和正在运行的聚合(例如,快速示例中的字数统计)保存到检查点位置。 此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在 启动查询时将其设置为 DataStreamWriter 中的一个选项。

aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()
aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()
aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")

流式查询更改后的恢复语义

从同一检查点位置重新启动时,允许对流式查询进行的更改类型存在限制。 以下是一些不允许的更改类型,或者更改的效果未明确定义。 对于所有这些

更改类型

异步进度跟踪

它是什么?

异步进度跟踪允许流式查询异步并行地检查进度,而不需要像微批处理中那样在实际数据处理中进行,从而减少与维护偏移日志和提交日志相关的延迟。

Async Progress Tracking

它是如何工作的?

Structured Streaming 依赖于持久化和管理偏移量作为查询处理的进度指标。偏移量管理操作直接影响处理延迟,因为在这些操作完成之前,无法进行任何数据处理。异步进度跟踪使流式查询能够在不受这些偏移量管理操作影响的情况下检查进度。

如何使用它?

下面的代码片段提供了一个如何使用此功能的示例

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()
val query = stream.writeStream
     .format("kafka")
	.option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
	.option("asyncProgressTrackingEnabled", "true")
     .start()

下表描述了此功能的配置以及与其相关的默认值。

选项 默认值 描述
asyncProgressTrackingEnabled true/false false 启用或禁用异步进度跟踪
asyncProgressTrackingCheckpointIntervalMs 毫秒 1000 我们提交偏移量和完成提交的间隔

限制

此功能的初始版本具有以下限制

关闭设置

关闭异步进度跟踪可能会导致抛出以下异常

java.lang.IllegalStateException: batch x doesn't exist

此外,以下错误消息可能会打印在驱动程序日志中

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

这是因为启用异步进度跟踪后,框架不会像不使用异步进度跟踪那样为每个批处理检查进度。要解决此问题,只需重新启用“asyncProgressTrackingEnabled”并将“asyncProgressTrackingCheckpointIntervalMs”设置为 0,然后运行流式查询,直到至少处理了两个微批处理。现在可以安全地禁用异步进度跟踪,并且重新启动查询应该可以正常进行。

连续处理

[实验性]

连续处理是 Spark 2.3 中引入的一种新的实验性流式执行模式,它支持低延迟(约 1 毫秒)的端到端处理,并具有至少一次的容错保证。与默认的微批处理引擎相比,后者可以实现完全一次的保证,但延迟最多只能达到约 100 毫秒。对于某些类型的查询(如下所述),您可以选择哪种模式来执行它们,而无需修改应用程序逻辑(即,无需更改 DataFrame/Dataset 操作)。

要在连续处理模式下运行受支持的查询,您只需要指定一个连续触发器,并将所需的检查点间隔作为参数。例如,

spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load() \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .trigger(continuous="1 second") \     # only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger;

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();

1 秒的检查点间隔意味着连续处理引擎将每秒记录查询的进度。生成的检查点与微批处理引擎兼容,因此任何查询都可以使用任何触发器重新启动。例如,以微批处理模式启动的受支持查询可以在连续模式下重新启动,反之亦然。请注意,每次切换到连续模式时,您都将获得至少一次的容错保证。

支持的查询

截至 Spark 2.4,只有以下类型的查询在连续处理模式下受支持。

有关更多详细信息,请参见输入源输出接收器部分。虽然控制台接收器适合测试,但使用 Kafka 作为源和接收器可以最好地观察到端到端的低延迟处理,因为这允许引擎处理数据并在输入数据在输入主题中可用后的几毫秒内使结果在输出主题中可用。

注意事项

附加信息

备注

进一步阅读

演讲

迁移指南

迁移指南现在已存档在此页面上