结构化流式编程指南

概述

结构化流式是一个基于 Spark SQL 引擎构建的可扩展且容错的流式处理引擎。您可以像在静态数据上表达批处理计算一样表达流式计算。Spark SQL 引擎将负责以增量和连续的方式运行它,并在流式数据不断到达时更新最终结果。您可以使用 Scala、Java、Python 或 R 中的 Dataset/DataFrame API 来表达流式聚合、事件时间窗口、流到批连接等。计算在相同的优化 Spark SQL 引擎上执行。最后,该系统通过检查点和预写日志确保端到端完全一次的容错保证。简而言之,结构化流式提供了快速、可扩展、容错、端到端完全一次的流式处理,而无需用户考虑流式处理。

在内部,默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现低至 100 毫秒的端到端延迟和完全一次的容错保证。但是,从 Spark 2.3 开始,我们引入了一种新的低延迟处理模式,称为连续处理,它可以实现低至 1 毫秒的端到端延迟,并提供至少一次的保证。无需更改查询中的 Dataset/DataFrame 操作,您就可以根据应用程序需求选择模式。

在本指南中,我们将引导您了解编程模型和 API。我们将主要使用默认的微批处理模型来解释概念,然后稍后讨论连续处理模型。首先,让我们从一个简单的结构化流式查询示例开始 - 流式词频统计。

快速示例

假设您想维护从监听 TCP 套接字的数据服务器接收的文本数据的运行词频统计。让我们看看如何使用结构化流式来表达这一点。您可以在 Scala/Java/Python/R 中查看完整代码。如果您下载 Spark,您可以直接运行示例。无论如何,让我们逐步了解示例,并了解它是如何工作的。首先,我们必须导入必要的类并创建一个本地 SparkSession,它是所有与 Spark 相关的功能的起点。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()

import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();
sparkR.session(appName = "StructuredNetworkWordCount")

接下来,让我们创建一个流式 DataFrame,它表示从监听 localhost:9999 的服务器接收的文本数据,并将 DataFrame 转换为计算词频统计。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

lines DataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,流式文本数据中的每一行都成为表中的一行。请注意,由于我们只是在设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用了两个内置的 SQL 函数 - split 和 explode,将每一行拆分为多个包含一个单词的行。此外,我们使用函数 alias 将新列命名为“word”。最后,我们通过对 Dataset 中的唯一值进行分组并对其进行计数来定义 wordCounts DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行词频统计。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

lines DataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,流式文本数据中的每一行都成为表中的一行。请注意,由于我们只是在设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用 .as[String] 将 DataFrame 转换为 String 的 Dataset,以便我们可以应用 flatMap 操作将每一行拆分为多个单词。生成的 words Dataset 包含所有单词。最后,我们通过对 Dataset 中的唯一值进行分组并对其进行计数来定义 wordCounts DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行词频统计。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

lines DataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,流式文本数据中的每一行都成为表中的一行。请注意,由于我们只是在设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用 .as(Encoders.STRING()) 将 DataFrame 转换为 String 的 Dataset,以便我们可以应用 flatMap 操作将每一行拆分为多个单词。生成的 words Dataset 包含所有单词。最后,我们通过对 Dataset 中的唯一值进行分组并对其进行计数来定义 wordCounts DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行词频统计。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(group_by(words, "word"))

lines SparkDataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,流式文本数据中的每一行都成为表中的一行。请注意,由于我们只是在设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们有一个 SQL 表达式,其中包含两个 SQL 函数 - split 和 explode,将每一行拆分为多个包含一个单词的行。此外,我们将新列命名为“word”。最后,我们通过对 SparkDataFrame 中的唯一值进行分组并对其进行计数来定义 wordCounts SparkDataFrame。请注意,这是一个流式 SparkDataFrame,它表示流的运行词频统计。

我们现在已经设置了流式数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们将其设置为在每次更新计数时将完整的计数集(由 outputMode("complete") 指定)打印到控制台。然后使用 start() 启动流式计算。

 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

awaitTermination(query)

执行此代码后,流式计算将在后台启动。 query 对象是该活动流式查询的句柄,我们已决定使用 awaitTermination() 等待查询终止,以防止进程在查询处于活动状态时退出。

要实际执行此示例代码,您可以将其编译到您自己的 Spark 应用程序 中,或者在您下载 Spark 后简单地运行示例。我们将展示后者。您首先需要运行 Netcat(大多数类 Unix 系统中都包含的一个小型实用程序)作为数据服务器,方法是使用

$ nc -lk 9999

然后,在另一个终端中,您可以使用以下命令启动示例:

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

然后,在运行 netcat 服务器的终端中键入的任何行都将被计数,并在每秒钟打印到屏幕上。它看起来类似于以下内容。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop



















...
# TERMINAL 2: RUNNING structured_network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.R

$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

编程模型

结构化流的核心思想是将实时数据流视为一个不断追加的表。这导致了一种新的流处理模型,与批处理模型非常相似。您将以标准的批处理查询方式表达您的流计算,就像在静态表上一样,Spark 将其作为对无界输入表的增量查询运行。让我们更详细地了解这个模型。

基本概念

将输入数据流视为“输入表”。到达流上的每个数据项都像是追加到输入表的新行。

Stream as a Table

对输入的查询将生成“结果表”。每个触发间隔(例如,每 1 秒),新行将被追加到输入表,最终更新结果表。每当结果表更新时,我们都希望将更改的结果行写入外部接收器。

Model

“输出”定义为写入外部存储的内容。输出可以在不同的模式下定义

请注意,每种模式都适用于某些类型的查询。这将在后面详细讨论。

为了说明此模型的使用,让我们在上面的快速示例的上下文中了解该模型。第一个lines DataFrame 是输入表,最终的wordCounts DataFrame 是结果表。请注意,对流式lines DataFrame 的查询以生成wordCounts与静态 DataFrame 完全相同。但是,当此查询启动时,Spark 将不断检查套接字连接是否有新数据。如果有新数据,Spark 将运行一个“增量”查询,将之前的运行计数与新数据结合起来计算更新后的计数,如下所示。

Model

请注意,结构化流不会物化整个表。它从流数据源读取最新的可用数据,增量地处理它以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。

此模型与许多其他流处理引擎有很大不同。许多流式系统要求用户自己维护运行中的聚合,因此必须考虑容错和数据一致性(至少一次、最多一次或完全一次)。在此模型中,Spark 负责在有新数据时更新结果表,从而减轻用户对它的推理负担。例如,让我们看看此模型如何处理基于事件时间的处理和延迟到达的数据。

处理事件时间和延迟数据

事件时间是数据本身嵌入的时间。对于许多应用程序,您可能希望对该事件时间进行操作。例如,如果您想获取每分钟由物联网设备生成的事件数,那么您可能希望使用数据生成的时间(即数据中的事件时间),而不是 Spark 接收它们的时间。此事件时间在此模型中非常自然地表达 - 来自设备的每个事件都是表中的一行,事件时间是行中的列值。这允许基于窗口的聚合(例如,每分钟的事件数)成为事件时间列上的特殊类型的分组和聚合 - 每个时间窗口都是一个组,每行可以属于多个窗口/组。因此,这种基于事件时间窗口的聚合查询可以在静态数据集(例如,来自收集的设备事件日志)和数据流上一致地定义,从而使用户的生活更加轻松。

此外,此模型自然地处理根据其事件时间比预期晚到达的数据。由于 Spark 正在更新结果表,因此它可以完全控制在有延迟数据时更新旧的聚合,以及清理旧的聚合以限制中间状态数据的大小。从 Spark 2.1 开始,我们支持水印,允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。这些将在窗口操作部分中更详细地解释。

容错语义

提供端到端完全一次语义是结构化流设计背后的关键目标之一。为了实现这一点,我们设计了结构化流源、接收器和执行引擎,以可靠地跟踪处理的精确进度,以便它可以通过重新启动和/或重新处理来处理任何类型的故障。假设每个流源都有偏移量(类似于 Kafka 偏移量或 Kinesis 序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移量范围。流式接收器被设计为幂等的,以处理重新处理。共同使用可重放的源和幂等的接收器,结构化流可以在任何故障下确保端到端完全一次语义

使用 Dataset 和 DataFrame 的 API

从 Spark 2.0 开始,DataFrame 和 Dataset 可以表示静态的、有界的数据,以及流式的、无界的数据。与静态 Dataset/DataFrame 类似,您可以使用通用入口点SparkSessionScala/Java/Python/R 文档)从流源创建流式 DataFrame/Dataset,并对它们应用与静态 DataFrame/Dataset 相同的操作。如果您不熟悉 Dataset/DataFrame,强烈建议您使用DataFrame/Dataset 编程指南来熟悉它们。

创建流式 DataFrame 和流式 Dataset

流式 DataFrame 可以通过DataStreamReader 接口(Scala/Java/Python 文档)创建,该接口由SparkSession.readStream() 返回。在R 中,使用read.stream() 方法。与用于创建静态 DataFrame 的读取接口类似,您可以指定源的详细信息 - 数据格式、模式、选项等。

输入源

有一些内置源。

某些源不是容错的,因为它们不能保证在发生故障后使用检查点偏移量重放数据。请参阅前面关于容错语义的部分。以下是 Spark 中所有源的详细信息。

选项 容错 备注
文件源 path: 输入目录的路径,对所有文件格式都通用。
maxFilesPerTrigger: 每个触发器中要考虑的新文件的最大数量(默认:无最大值)
latestFirst: 是否首先处理最新的新文件,在文件积压量很大的情况下很有用(默认:false)
fileNameOnly: 是否仅根据文件名而不是完整路径来检查新文件(默认:false)。如果将此设置为 `true`,则以下文件将被视为同一个文件,因为它们的文件名“dataset.txt”相同
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
maxFileAge: 文件在被忽略之前,可以在此目录中找到的最大文件年龄。对于第一批,所有文件都将被视为有效。如果将latestFirst设置为true,并且设置了maxFilesPerTrigger,则将忽略此参数,因为可能忽略了有效且应处理的旧文件。最大年龄是相对于最新文件的 时间戳指定的,而不是相对于当前系统的 时间戳。(默认:1 周)
cleanSource: 处理后清理已完成文件的选项。
可用选项为“archive”、“delete”、“off”。如果未提供选项,则默认值为“off”。
当提供“archive”时,还必须提供附加选项sourceArchiveDir。 “sourceArchiveDir”的值不能与源模式在深度(从根目录到目录的目录数)上匹配,其中深度是两个路径的最小深度。这将确保归档文件永远不会被包含为新的源文件。
例如,假设您提供“/hello?/spark/*”作为源模式,“/hello1/spark/archive/dir”不能用作“sourceArchiveDir”的值,因为“/hello?/spark/*”和“/hello1/spark/archive”将匹配。“/hello1/spark”也不能用作“sourceArchiveDir”的值,因为“/hello?/spark”和“/hello1/spark”将匹配。“/archived/here”可以,因为它不匹配。
Spark 将移动源文件,尊重它们自己的路径。例如,如果源文件的路径为/a/b/dataset.txt,并且归档目录的路径为/archived/here,则文件将被移动到/archived/here/a/b/dataset.txt
注意:归档(通过移动)或删除已完成文件都会在每个微批次中引入开销(减慢速度,即使它是在单独的线程中发生的),因此您需要在启用此选项之前了解文件系统中每个操作的成本。另一方面,启用此选项将降低列出源文件的成本,这可能是一个昂贵的操作。
已完成文件清理器中使用的线程数可以通过spark.sql.streaming.fileSource.cleaner.numThreads配置(默认:1)。
注意 2:启用此选项时,不应从多个源或查询中使用源路径。同样,您必须确保源路径与文件流接收器输出目录中的任何文件不匹配。
注意 3:删除和移动操作都是尽力而为的。无法删除或移动文件不会导致流式查询失败。在某些情况下,Spark 可能无法清理某些源文件 - 例如,应用程序没有正常关闭,有太多文件排队等待清理。

有关特定于文件格式的选项,请参阅DataStreamReader 中的相关方法(Scala/Java/Python/R)。例如,有关“parquet”格式选项,请参阅DataStreamReader.parquet()

此外,还有一些会话配置会影响某些文件格式。有关更多详细信息,请参阅SQL 编程指南。例如,对于“parquet”,请参阅Parquet 配置部分。
支持 glob 路径,但不支持多个逗号分隔的路径/glob。
套接字源 host: 要连接的主机,必须指定
port: 要连接的端口,必须指定
速率源 rowsPerSecond(例如 100,默认:1):每秒应生成多少行。

rampUpTime(例如 5 秒,默认:0 秒):在生成速度达到rowsPerSecond之前要花费多长时间。使用比秒更精细的粒度将被截断为整数秒。

numPartitions(例如 10,默认:Spark 的默认并行度):为生成的 行分配的 分区数。

源将尽力达到rowsPerSecond,但查询可能受资源限制,并且可以调整numPartitions以帮助达到所需的速度。
每微批次速率源(格式:rate-micro-batch rowsPerBatch(例如 100):每个微批次应生成多少行。

numPartitions(例如 10,默认:Spark 的默认并行度):为生成的 行分配的 分区数。

startTimestamp(例如 1000,默认:0):生成的 时间的 起始值。

advanceMillisPerBatch(例如 1000,默认:1000):每个微批次在生成的 时间中推进的时间量。

Kafka 源 请参阅Kafka 集成指南

以下是一些示例。

spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
SparkSession spark = ...

// Read text from socket
Dataset<Row> socketDF = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources

socketDF.printSchema();

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
  .readStream()
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
sparkR.session(...)

# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)

isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources

printSchema(socketDF)

# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
                     structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")

这些示例生成未类型化的流式 DataFrame,这意味着 DataFrame 的模式在编译时不会被检查,而是在提交查询时在运行时被检查。一些操作(如mapflatMap 等)需要在编译时知道类型。要执行这些操作,您可以使用与静态 DataFrame 相同的方法将这些未类型化的流式 DataFrame 转换为类型化的流式 Dataset。有关更多详细信息,请参阅SQL 编程指南。此外,本文档后面将详细讨论支持的流式源。

从 Spark 3.1 开始,您还可以使用DataStreamReader.table()从表创建流式 DataFrame。有关更多详细信息,请参阅流式表 API

流式 DataFrame/Dataset 的模式推断和分区

默认情况下,从基于文件的源进行的结构化流式传输要求您指定模式,而不是依赖 Spark 自动推断模式。此限制确保即使在发生故障的情况下,也会为流式查询使用一致的模式。对于临时使用情况,您可以通过将spark.sql.streaming.schemaInference设置为true来重新启用模式推断。

当存在名为/key=value/ 的子目录时,将发生分区发现,并且列表将自动递归到这些目录中。如果这些列出现在用户提供的模式中,Spark 将根据正在读取的文件的路径填充它们。构成分区方案的目录必须在查询启动时存在,并且必须保持静态。例如,当/data/year=2015/ 存在时,添加/data/year=2016/ 是可以的,但更改分区列(例如,通过创建目录/data/date=2016-04-17/)是无效的。

流式 DataFrame/Dataset 上的操作

您可以在流式 DataFrame/Dataset 上应用各种操作 - 从未类型化的、类似 SQL 的操作(例如selectwheregroupBy)到类型化的类似 RDD 的操作(例如mapfilterflatMap)。有关更多详细信息,请参阅SQL 编程指南。让我们看一下您可以使用的一些示例操作。

基本操作 - 选择、投影、聚合

DataFrame/Dataset 上的大多数常见操作都支持流式传输。本节后面将讨论一些不支持的操作。

df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
  private String device;
  private String deviceType;
  private Double signal;
  private java.sql.Date time;
  ...
  // Getter and setter methods for each field
}

Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")

# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))

您还可以将流式 DataFrame/Dataset 注册为临时视图,然后对其应用 SQL 命令。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # returns another streaming DF
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // returns another streaming DF
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates");  // returns another streaming DF
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")

请注意,您可以使用df.isStreaming来确定 DataFrame/Dataset 是否包含流式数据。

df.isStreaming()
df.isStreaming
df.isStreaming()
isStreaming(df)

您可能需要检查查询计划,因为 Spark 可能会在解释针对流式数据集的 SQL 语句时注入有状态操作。一旦在查询计划中注入了有状态操作,您可能需要在有状态操作中考虑您的查询。(例如,输出模式、水印、状态存储大小维护等)

事件时间上的窗口操作

使用结构化流式传输,在滑动事件时间窗口上进行聚合非常简单,并且与分组聚合非常相似。在分组聚合中,为用户指定的每个唯一分组列值维护聚合值(例如计数)。在基于窗口的聚合中,为每个事件时间行所在的窗口维护聚合值。让我们通过一个例子来理解这一点。

想象一下,我们的快速示例被修改了,并且流现在包含行以及生成行的 时间。我们不希望运行词频统计,而是希望在 10 分钟的窗口内统计词频,每 5 分钟更新一次。也就是说,在 10 分钟窗口 12:00 - 12:10、12:05 - 12:15、12:10 - 12:20 等中接收到的词频统计。请注意,12:00 - 12:10 表示在 12:00 之后但在 12:10 之前到达的数据。现在,考虑一个在 12:07 接收到的词。此词应该增加对应于两个窗口 12:00 - 12:10 和 12:05 - 12:15 的计数。因此,计数将按分组键(即词)和窗口(可以从事件时间计算)索引。

结果表将类似于以下内容。

Window Operations

由于此窗口化类似于分组,因此在代码中,您可以使用groupBy()window() 操作来表达基于窗口的聚合。您可以在Scala/Java/Python中看到以下示例的完整代码。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

处理延迟数据和水印

现在考虑一下,如果其中一个事件延迟到达应用程序会发生什么。例如,假设在 12:04(即事件时间)生成的词可能在 12:11 被应用程序接收。应用程序应该使用时间 12:04 而不是 12:11 来更新窗口12:00 - 12:10 的旧计数。这在我们基于窗口的分组中自然发生 - 结构化流式传输可以长时间维护部分聚合的中间状态,以便延迟数据可以正确更新旧窗口的聚合,如下所示。

Handling Late Data

但是,要运行此查询几天,系统需要限制其累积的中间内存状态量。这意味着系统需要知道何时可以从内存状态中删除旧的聚合,因为应用程序不再接收该聚合的延迟数据。为了实现这一点,在 Spark 2.1 中,我们引入了**水印**,它允许引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态。您可以通过指定事件时间列和数据在事件时间方面预期延迟的阈值来定义查询的水印。对于在时间 T 结束的特定窗口,引擎将维护状态并允许延迟数据更新状态,直到 (引擎看到的最大事件时间 - 延迟阈值 > T)。换句话说,阈值内的延迟数据将被聚合,但晚于阈值的数据将开始被丢弃(有关确切保证,请参阅本节后面的 内容)。让我们通过一个例子来理解这一点。我们可以使用 withWatermark() 在前面的示例中轻松定义水印,如下所示。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group

words <- withWatermark(words, "timestamp", "10 minutes")
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

在此示例中,我们正在定义查询在“timestamp”列值上的水印,并将“10 分钟”定义为允许数据延迟的阈值。如果此查询在 Update 输出模式下运行(稍后在 输出模式 部分中讨论),引擎将继续更新结果表中窗口的计数,直到窗口比水印旧,水印在“timestamp”列中的当前事件时间后面延迟 10 分钟。以下是说明。

Watermarking in Update Mode

如插图所示,引擎跟踪的最大事件时间是蓝色虚线,在每次触发开始时设置为 (最大事件时间 - '10 分钟') 的水印是红线。例如,当引擎观察到数据 (12:14, dog) 时,它将下一个触发的水印设置为 12:04。此水印允许引擎维护中间状态 10 分钟,以允许延迟数据被计数。例如,数据 (12:09, cat) 是乱序且延迟的,它落在窗口 12:00 - 12:1012:05 - 12:15 中。由于它仍然领先于触发器中的水印 12:04,因此引擎仍然维护中间计数作为状态,并正确更新相关窗口的计数。但是,当水印更新为 12:11 时,窗口 (12:00 - 12:10) 的中间状态将被清除,所有后续数据(例如 (12:04, donkey))将被视为“太晚”,因此被忽略。请注意,在每次触发后,更新的计数(即紫色行)将根据 Update 模式写入接收器作为触发器输出。

某些接收器(例如文件)可能不支持 Update 模式所需的细粒度更新。为了与它们一起使用,我们还支持 Append 模式,其中仅将最终计数写入接收器。如下所示。

请注意,在非流式 Dataset 上使用 withWatermark 是无操作的。由于水印不应该以任何方式影响任何批处理查询,因此我们将直接忽略它。

Watermarking in Append Mode

与之前的 Update 模式类似,引擎为每个窗口维护中间计数。但是,部分计数不会更新到结果表,也不会写入接收器。引擎等待“10 分钟”以计算延迟日期,然后删除小于水印的窗口的中间状态,并将最终计数追加到结果表/接收器。例如,窗口 12:00 - 12:10 的最终计数仅在水印更新为 12:11 后追加到结果表。

时间窗口的类型

Spark 支持三种类型的时间窗口:滚动(固定)、滑动和会话。

The types of time windows

滚动窗口是一系列固定大小、不重叠且连续的时间间隔。输入只能绑定到单个窗口。

滑动窗口类似于滚动窗口,从“固定大小”的角度来看,但如果滑动的持续时间小于窗口的持续时间,则窗口可以重叠,在这种情况下,输入可以绑定到多个窗口。

滚动和滑动窗口使用 window 函数,该函数已在上面的示例中描述。

会话窗口与前两种类型相比具有不同的特征。会话窗口的窗口长度大小是动态的,取决于输入。会话窗口从一个输入开始,如果在间隙持续时间内收到后续输入,则会扩展自身。对于静态间隙持续时间,当在收到最新输入后在间隙持续时间内没有收到输入时,会话窗口将关闭。

会话窗口使用 session_window 函数。该函数的使用类似于 window 函数。

events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window(events.timestamp, "5 minutes"),
        events.userId) \
    .count()
import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window($"timestamp", "5 minutes"),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window(col("timestamp"), "5 minutes"),
        col("userId"))
    .count();

除了静态值之外,我们还可以提供一个表达式,根据输入行动态指定间隙持续时间。请注意,间隙持续时间为负或零的行将从聚合中过滤掉。

使用动态间隙持续时间,会话窗口的关闭不再依赖于最新输入。会话窗口的范围是所有事件范围的并集,这些范围由事件开始时间和查询执行期间评估的间隙持续时间确定。

from pyspark.sql import functions as sf

events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

session_window = session_window(events.timestamp, \
    sf.when(events.userId == "user1", "5 seconds") \
    .when(events.userId == "user2", "20 seconds").otherwise("5 minutes"))

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window,
        events.userId) \
    .count()
import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

val sessionWindow = session_window($"timestamp", when($"userId" === "user1", "5 seconds")
  .when($"userId" === "user2", "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        Column(sessionWindow),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

SessionWindow sessionWindow = session_window(col("timestamp"), when(col("userId").equalTo("user1"), "5 seconds")
  .when(col("userId").equalTo("user2"), "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        new Column(sessionWindow),
        col("userId"))
    .count();

请注意,在流式查询中使用会话窗口时存在一些限制,如下所示

对于批处理查询,支持全局窗口(分组键中只有 session_window)。

默认情况下,Spark 不会对会话窗口聚合执行部分聚合,因为它需要在分组之前对本地分区进行额外的排序。对于每个本地分区中具有相同分组键的输入行数量很少的情况,它效果更好,但是对于本地分区中具有相同分组键的大量输入行的这种情况,即使有额外的排序,执行部分聚合仍然可以显着提高性能。

您可以启用 spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition 来指示 Spark 执行部分聚合。

时间窗口的时间表示

在某些用例中,有必要提取时间窗口的时间表示,以便对时间窗口数据应用需要时间戳的操作。一个例子是链式时间窗口聚合,用户希望针对时间窗口定义另一个时间窗口。例如,有人希望将 5 分钟时间窗口聚合为 1 小时滚动时间窗口。

有两种方法可以实现这一点,如下所示

  1. 使用 window_time SQL 函数,并将时间窗口列作为参数
  2. 使用 window SQL 函数,并将时间窗口列作为参数

window_time 函数将生成一个时间戳,表示时间窗口的时间。用户可以将结果传递给 window 函数的参数(或任何需要时间戳的地方)以执行需要时间戳的时间窗口操作。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window(window_time($"window"), "1 hour"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
  functions.window(functions.window_time("window"), "1 hour"),
  windowedCounts.col("word")
).count();

window 函数不仅接受时间戳列,还接受时间窗口列。这对于用户希望应用链式时间窗口聚合的情况特别有用。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(windowedCounts.window, "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
  functions.window("window", "1 hour"),
  windowedCounts.col("word")
).count();
水印清理聚合状态的条件

重要的是要注意,以下条件必须满足才能使水印清理聚合查询中的状态(截至 Spark 2.1.1,将来可能会更改)

使用水印的聚合的语义保证

连接操作

结构化流式处理支持将流式 Dataset/DataFrame 与静态 Dataset/DataFrame 以及另一个流式 Dataset/DataFrame 连接。流式连接的结果是增量生成的,类似于上一节中流式聚合的结果。在本节中,我们将探讨在上述情况下支持哪些类型的连接(即内部连接、外部连接、半连接等)。请注意,在所有支持的连接类型中,与流式 Dataset/DataFrame 连接的结果与使用包含流中相同数据的静态 Dataset/DataFrame 连接的结果完全相同。

流-静态连接

自 Spark 2.0 引入以来,结构化流式处理已支持流式和静态 DataFrame/Dataset 之间的连接(内部连接和某些类型的外部连接)。以下是一个简单的示例。

staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  # left outer join with a static DF
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  // left outer join with a static DF
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer");  // left outer join with a static DF
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE)  # inner equi-join with a static DF
joined <- join(
            streamingDf,
            staticDf,
            streamingDf$value == staticDf$value,
            "left_outer")  # left outer join with a static DF

请注意,流式-静态连接不是有状态的,因此不需要状态管理。但是,一些类型的流式-静态外部连接尚不支持。这些将在 本连接部分的末尾 列出。

流-流连接

在 Spark 2.3 中,我们添加了对流-流连接的支持,也就是说,您可以连接两个流式 Dataset/DataFrame。在两个数据流之间生成连接结果的挑战在于,在任何时间点,连接双方的数据集视图都是不完整的,这使得在输入之间找到匹配项变得更加困难。从一个输入流接收到的任何行都可能与来自另一个输入流的任何未来的、尚未接收到的行匹配。因此,对于这两个输入流,我们将过去的输入缓冲为流式状态,以便我们可以将每个未来的输入与过去的输入匹配,并相应地生成连接结果。此外,与流式聚合类似,我们自动处理延迟的、乱序的数据,并可以使用水印限制状态。让我们讨论支持的不同类型的流-流连接以及如何使用它们。

带有可选水印的内连接

支持对任何类型的列以及任何类型的连接条件进行内连接。但是,随着流的运行,流式状态的大小将无限增长,因为所有过去的输入都必须保存,因为任何新的输入都可能与过去的任何输入匹配。为了避免无界状态,您必须定义额外的连接条件,以便无限旧的输入不能与未来的输入匹配,因此可以从状态中清除。换句话说,您将需要在连接中执行以下额外步骤。

  1. 在两个输入上定义水印延迟,以便引擎知道输入可以延迟多长时间(类似于流式聚合)

  2. 在两个输入之间定义一个事件时间约束,以便引擎可以确定一个输入的旧行何时不再需要(即不满足时间约束)与另一个输入匹配。此约束可以通过以下两种方式之一定义。

    1. 时间范围连接条件(例如,...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),

    2. 在事件时间窗口上连接(例如,...JOIN ON leftTimeWindow = rightTimeWindow)。

让我们用一个例子来理解这一点。

假设我们想将广告展示(广告何时显示)流与用户点击广告的另一个流连接起来,以关联何时展示导致可获利的点击。为了允许在此流-流连接中清理状态,您将必须指定水印延迟和时间约束,如下所示。

  1. 水印延迟:假设展示和相应的点击在事件时间上最多可以延迟 2 小时和 3 小时。

  2. 事件时间范围条件:假设点击可以在对应展示后的 0 秒到 1 小时的时间范围内发生。

代码将如下所示。

from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import org.apache.spark.sql.functions.expr

val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import static org.apache.spark.sql.functions.expr

Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour ")
);
impressions <- read.stream(...)
clicks <- read.stream(...)

# Apply watermarks on event-time columns
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")

# Join with event-time constraints
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"
)))
带有水印的流-流内连接的语义保证

这类似于聚合上水印提供的保证。“2 小时”的水印延迟保证引擎永远不会丢弃延迟少于 2 小时的任何数据。但延迟超过 2 小时的数据可能会被处理,也可能不会被处理。

带有水印的外连接

虽然水印 + 事件时间约束对于内连接是可选的,但对于外连接,必须指定它们。这是因为为了在外连接中生成 NULL 结果,引擎必须知道何时一个输入行不会与未来的任何内容匹配。因此,必须指定水印 + 事件时间约束才能生成正确的结果。因此,带有外连接的查询将非常类似于之前的广告货币化示例,只是将有一个额外的参数指定它是一个外连接。

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
 )
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"),
  "left_outer"                 # can be "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))
带有水印的流-流外连接的语义保证

外连接与内连接具有相同的保证,关于水印延迟以及数据是否会被丢弃。

注意事项

关于如何生成外部结果,有一些重要的特征需要注意。

带有水印的半连接

半连接返回关系左侧具有与右侧匹配项的值。它也称为左半连接。与外连接类似,必须为半连接指定水印 + 事件时间约束。这是为了驱逐左侧的不匹配输入行,引擎必须知道左侧的输入行何时不会与右侧的任何内容在将来匹配。

带有水印的流-流半连接的语义保证

半连接与内连接具有相同的保证,关于水印延迟以及数据是否会被丢弃。

流式查询中连接的支持矩阵
左输入 右输入 连接类型
静态 静态 所有类型 支持,因为它不在流式数据上,即使它可能存在于流式查询中
静态 内部 支持,无状态
左外 支持,无状态
右外 不支持
全外 不支持
左半 支持,无状态
静态 内部 支持,无状态
左外 不支持
右外 支持,无状态
全外 不支持
左半 不支持
内部 支持,可以选择在两侧指定水印 + 时间约束以进行状态清理
左外 有条件地支持,必须在右侧指定水印 + 时间约束以获得正确的结果,可以选择在左侧指定水印以进行所有状态清理
右外 有条件地支持,必须在左侧指定水印 + 时间约束以获得正确的结果,可以选择在右侧指定水印以进行所有状态清理
全外 有条件地支持,必须在一侧指定水印 + 时间约束以获得正确的结果,可以选择在另一侧指定水印以进行所有状态清理
左半 有条件地支持,必须在右侧指定水印 + 时间约束以获得正确的结果,可以选择在左侧指定水印以进行所有状态清理

有关支持连接的更多详细信息

在追加输出模式下,您可以构建一个查询,该查询在连接之前/之后具有非映射类操作,例如聚合、去重、流-流连接。

例如,以下是一个在两个流中进行时间窗口聚合,然后使用事件时间窗口进行流-流连接的示例

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")
Dataset<Row> clicksWindow = clicksWithWatermark
  .groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
  .count();

Dataset<Row> impressionsWindow = impressionsWithWatermark
  .groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour"))
  .count();

clicksWindow.join(impressionsWindow, "window", "inner");
clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

以下是一个使用时间范围连接条件进行流-流连接,然后进行时间窗口聚合的另一个示例

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()
Dataset<Row> joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);

joined
  .groupBy(joined.col("clickAdId"), functions.window(joined.col("clickTime"), "1 hour"))
  .count();
joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

流式去重

您可以使用事件中的唯一标识符对数据流中的记录进行去重。这与使用唯一标识符列对静态数据进行去重完全相同。查询将存储来自先前记录的必要数据量,以便它可以过滤重复记录。与聚合类似,您可以使用带或不带水印的去重。

streamingDf = spark.readStream. ...

# Without watermark using guid column
streamingDf.dropDuplicates("guid")

# With watermark using guid and eventTime columns
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("guid", "eventTime")
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
streamingDf <- read.stream(...)

# Without watermark using guid column
streamingDf <- dropDuplicates(streamingDf, "guid")

# With watermark using guid and eventTime columns
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")

专门针对流式处理,您可以在水印的时间范围内使用事件中的唯一标识符对数据流中的记录进行去重。例如,如果您将水印的延迟阈值设置为“1 小时”,则可以在 1 小时内发生的重复事件可以被正确地去重。(有关更多详细信息,请参阅dropDuplicatesWithinWatermark 的 API 文档。)

这可以用于处理事件时间列不能作为唯一标识符一部分的用例,主要是因为事件时间对于相同记录来说有所不同。(例如,非幂等写入器,其中发出事件时间发生在写入时)

鼓励用户将水印的延迟阈值设置为比重复事件之间的最大时间戳差异更长。

此功能需要在流式 DataFrame/Dataset 中设置带延迟阈值的 watermakr。

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
streamingDf \
  .withWatermark("eventTime", "10 hours") \
  .dropDuplicatesWithinWatermark("guid")
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid");

处理多个水印的策略

流式查询可以有多个输入流,这些输入流被联合或连接在一起。每个输入流都可以具有不同的延迟数据阈值,这些数据需要被容忍以进行有状态操作。您使用每个输入流上的 withWatermarks("eventTime", delay) 指定这些阈值。例如,考虑一个在 inputStream1inputStream2 之间进行流-流连接的查询。

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

在执行查询时,结构化流式处理会分别跟踪每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择一个用于有状态操作的单个全局水印。默认情况下,最小值被选为全局水印,因为它确保如果其中一个流落后于其他流(例如,其中一个流由于上游故障而停止接收数据),则不会意外地将任何数据丢弃为太晚。换句话说,全局水印将安全地以最慢流的速度移动,查询输出将相应地延迟。

但是,在某些情况下,您可能希望获得更快的结果,即使这意味着丢弃来自最慢流的数据。从 Spark 2.4 开始,您可以通过将 SQL 配置 spark.sql.streaming.multipleWatermarkPolicy 设置为 max(默认值为 min)来将多个水印策略设置为选择最大值作为全局水印。这使全局水印能够以最快流的速度移动。但是,作为副作用,来自较慢流的数据将被积极地丢弃。因此,请谨慎使用此配置。

任意有状态操作

许多用例需要比聚合更高级的状态操作。例如,在许多用例中,您必须从事件数据流中跟踪会话。为了进行这种会话化,您必须将任意类型的数据保存为状态,并在每次触发时使用数据流事件对状态执行任意操作。从 Spark 2.2 开始,可以使用 mapGroupsWithState 操作和更强大的 flatMapGroupsWithState 操作来完成此操作。这两种操作都允许您对分组的数据集应用用户定义的代码来更新用户定义的状态。有关更具体的详细信息,请查看 API 文档 (Scala/Java) 和示例 (Scala/Java)。

虽然 Spark 无法检查和强制执行,但状态函数应根据输出模式的语义进行实现。例如,在更新模式下,Spark 不希望状态函数发出比当前水印加上允许的延迟记录延迟更旧的行,而在追加模式下,状态函数可以发出这些行。

不支持的操作

有一些 DataFrame/Dataset 操作不支持流式 DataFrame/Dataset。其中一些如下所示。

此外,还有一些 Dataset 方法在流式数据集上不起作用。它们是将立即运行查询并返回结果的操作,这在流式数据集上没有意义。相反,这些功能可以通过显式启动流式查询来完成(请参阅下一节关于此内容)。

如果您尝试执行任何这些操作,您将看到一个类似于“操作 XYZ 不支持流式 DataFrame/Dataset”的 AnalysisException。虽然其中一些可能在 Spark 的未来版本中得到支持,但另一些在流式数据上高效实现起来从根本上来说很困难。例如,不支持对输入流进行排序,因为它需要跟踪流中接收到的所有数据。因此,这从根本上来说很难高效地执行。

状态存储

状态存储是一个版本化的键值存储,它提供读写操作。在结构化流中,我们使用状态存储提供程序来处理跨批次的无状态操作。有两个内置的状态存储提供程序实现。最终用户还可以通过扩展 StateStoreProvider 接口来实现自己的状态存储提供程序。

HDFS 状态存储提供程序

HDFS 后端状态存储提供程序是 [[StateStoreProvider]] 和 [[StateStore]] 的默认实现,其中所有数据都存储在第一阶段的内存映射中,然后由 HDFS 兼容的文件系统中的文件支持。对存储的所有更新都必须以事务方式进行,并且每组更新都会增加存储的版本。这些版本可用于在存储的正确版本上重新执行更新(通过 RDD 操作中的重试),并重新生成存储版本。

RocksDB 状态存储实现

从 Spark 3.2 开始,我们添加了一个新的内置状态存储实现,RocksDB 状态存储提供程序。

如果您在流式查询中使用状态操作(例如,流式聚合、流式 dropDuplicates、流流联接、mapGroupsWithState 或 flatMapGroupsWithState),并且您希望在状态中维护数百万个键,那么您可能会遇到与大型 JVM 垃圾收集 (GC) 暂停导致微批处理时间出现高变化相关的问题。这是因为,通过 HDFSBackedStateStore 的实现,状态数据保存在执行器的 JVM 内存中,大量状态对象会给 JVM 带来内存压力,导致高 GC 暂停。

在这种情况下,您可以选择使用基于 RocksDB 的更优化的状态管理解决方案。此解决方案不是将状态保存在 JVM 内存中,而是使用 RocksDB 在本机内存和本地磁盘中高效地管理状态。此外,对该状态的任何更改都会由结构化流自动保存到您提供的检查点位置,从而提供完整的容错保证(与默认状态管理相同)。

要启用新的内置状态存储实现,请将 spark.sql.streaming.stateStore.providerClass 设置为 org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

以下是关于状态存储提供程序的 RocksDB 实例的配置

配置名称 描述 默认值
spark.sql.streaming.stateStore.rocksdb.compactOnCommit 我们是否对提交操作执行 RocksDB 实例的范围压缩 False
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled 在 RocksDB StateStore 提交期间是否上传变更日志而不是快照 False
spark.sql.streaming.stateStore.rocksdb.blockSizeKB RocksDB BlockBasedTable(RocksDB 的默认 SST 文件格式)中每个块打包的用户数据的近似大小(以 KB 为单位)。 4
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB 块缓存的大小容量(以 MB 为单位)。 8
spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs 在 RocksDB 实例的加载操作中获取锁的等待时间(以毫秒为单位)。 60000
spark.sql.streaming.stateStore.rocksdb.maxOpenFiles RocksDB 实例可以使用打开的文件数量。值为 -1 表示始终保持打开打开的文件。如果达到打开文件限制,RocksDB 将从打开文件缓存中逐出条目,并关闭这些文件描述符并从缓存中删除条目。 -1
spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad 我们是否在加载时重置 RocksDB 的所有计时器和直方图统计信息。 True
spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows 我们是否跟踪状态存储中的总行数。请参阅 性能方面注意事项 中的详细信息。 True
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB RocksDB 中 MemTable 的最大大小。值为 -1 表示将使用 RocksDB 内部默认值 -1
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber RocksDB 中 MemTable 的最大数量,包括活动和不可变的 MemTable。值为 -1 表示将使用 RocksDB 内部默认值 -1
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage 单个节点上 RocksDB 状态存储实例的总内存使用量是否有限制。 false
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB 单个节点上 RocksDB 状态存储实例的总内存限制(以 MB 为单位)。 500
spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio 写缓冲区占用的总内存,占单个节点上使用 maxMemoryUsageMB 为所有 RocksDB 实例分配的内存的比例。 0.5
spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio 高优先级池中块占用的总内存,占单个节点上使用 maxMemoryUsageMB 为所有 RocksDB 实例分配的内存的比例。 0.1
RocksDB 状态存储内存管理

RocksDB 为不同的对象分配内存,例如 memtable、块缓存和过滤器/索引块。如果保持无限制,RocksDB 在多个实例中的内存使用量可能会无限增长,并可能导致 OOM(内存不足)问题。RocksDB 提供了一种方法,通过使用写缓冲区管理器功能来限制单个节点上运行的所有 DB 实例的内存使用量。如果您想在 Spark 结构化流部署中限制 RocksDB 内存使用量,则可以通过将 spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage 配置设置为 true 来启用此功能。您还可以通过将 spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB 值设置为静态数字或节点上可用物理内存的比例来确定 RocksDB 实例的最大允许内存。还可以通过将 spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMBspark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber 设置为所需的值来配置单个 RocksDB 实例的限制。默认情况下,将使用 RocksDB 内部默认值进行这些设置。

RocksDB 状态存储变更日志检查点

在较新的 Spark 版本中,引入了变更日志检查点机制,用于 RocksDB 状态存储。RocksDB State Store 的传统检查点机制是增量快照检查点,其中 RocksDB 实例的清单文件和新生成的 RocksDB SST 文件被上传到持久存储。变更日志检查点机制不是上传 RocksDB 实例的数据文件,而是上传自上次检查点以来对状态所做的更改以实现持久性。快照会定期在后台持久化,以实现可预测的故障恢复和变更日志修剪。变更日志检查点机制避免了捕获和上传 RocksDB 实例快照的成本,并显着减少了流式查询延迟。

变更日志检查点机制默认情况下处于禁用状态。您可以通过将 spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled 配置设置为 true 来启用 RocksDB State Store 变更日志检查点机制。变更日志检查点机制旨在与传统的检查点机制向后兼容。RocksDB 状态存储提供程序为在两种检查点机制之间进行双向转换提供了无缝支持。这使您能够利用变更日志检查点机制的性能优势,而不会丢弃旧的状态检查点。在支持变更日志检查点机制的 Spark 版本中,您可以通过在 Spark 会话中启用变更日志检查点机制,将流式查询从较旧的 Spark 版本迁移到变更日志检查点机制。反之,您可以在较新的 Spark 版本中安全地禁用变更日志检查点机制,然后任何已经使用变更日志检查点机制运行的查询都将切换回传统的检查点机制。您需要重新启动流式查询才能应用检查点机制的更改,但您不会在过程中观察到任何性能下降。

性能方面考虑
  1. 您可能希望禁用对总行数的跟踪,以期在 RocksDB 状态存储上获得更好的性能。

跟踪行数会在写操作中带来额外的查找 - 建议您尝试关闭配置以调整 RocksDB 状态存储,尤其是状态操作指标的值很大时 - numRowsUpdatednumRowsRemoved

您可以在重新启动查询期间更改配置,这使您能够更改“可观察性与性能”之间的权衡决策。如果禁用配置,则状态中的行数 (numTotalStateRows) 将报告为 0。

状态存储和任务位置

有状态操作将事件的状态存储在执行器的状态存储中。状态存储会占用内存和磁盘空间等资源来存储状态。因此,在不同的流式批次中,让状态存储提供者在同一个执行器中运行效率更高。更改状态存储提供者的位置需要额外的开销来加载检查点状态。从检查点加载状态的开销取决于外部存储和状态的大小,这往往会影响微批次运行的延迟。对于某些用例,例如处理非常大的状态数据,从检查点状态加载新的状态存储提供者可能非常耗时且效率低下。

结构化流查询中的有状态操作依赖于 Spark 的 RDD 的首选位置功能,以在同一个执行器上运行状态存储提供者。如果在下一批中,相应的状态存储提供者再次调度到该执行器,它可以重用先前状态并节省加载检查点状态的时间。

但是,通常首选位置不是硬性要求,Spark 仍然有可能将任务调度到除首选执行器以外的执行器。在这种情况下,Spark 将从新执行器上的检查点状态加载状态存储提供者。先前批次中运行的状态存储提供者不会立即卸载。Spark 运行一个维护任务,该任务检查并卸载执行器上处于非活动状态的状态存储提供者。

通过更改与任务调度相关的 Spark 配置,例如 spark.locality.wait,用户可以配置 Spark 等待多长时间才能启动数据本地任务。对于结构化流中的有状态操作,它可以用来让状态存储提供者在跨批次的同一个执行器上运行。

特别是对于内置的 HDFS 状态存储提供者,用户可以检查状态存储指标,例如 loadedMapCacheHitCountloadedMapCacheMissCount。理想情况下,最好将缓存未命中计数降到最低,这意味着 Spark 不会浪费太多时间加载检查点状态。用户可以增加 Spark 本地等待配置,以避免在跨批次的不同的执行器中加载状态存储提供者。

启动流式查询

一旦您定义了最终结果 DataFrame/Dataset,剩下的就是启动流式计算。为此,您必须使用通过 Dataset.writeStream() 返回的 DataStreamWriter (Scala/Java/Python 文档)。您必须在此接口中指定以下一项或多项。

输出模式

输出模式有几种类型。

不同类型的流式查询支持不同的输出模式。以下是兼容性矩阵。

查询类型 支持的输出模式 备注
带有聚合的查询 基于事件时间的带有水印的聚合 追加、更新、完整 追加模式使用水印来丢弃旧的聚合状态。但是,窗口化聚合的输出会延迟在 withWatermark() 中指定的延迟阈值,因为根据模式语义,只有在行最终确定后(即跨越水印后),才能将行添加到结果表中。有关更多详细信息,请参见 延迟数据 部分。

更新模式使用水印来丢弃旧的聚合状态。

完整模式不会丢弃旧的聚合状态,因为根据定义,这种模式会保留结果表中的所有数据。
其他聚合 完整、更新 由于没有定义水印(仅在其他类别中定义),因此不会丢弃旧的聚合状态。

不支持追加模式,因为聚合可以更新,从而违反了这种模式的语义。
带有 mapGroupsWithState 的查询 更新 带有 mapGroupsWithState 的查询中不允许聚合。
带有 flatMapGroupsWithState 的查询 追加操作模式 追加 flatMapGroupsWithState 之后允许聚合。
更新操作模式 更新 带有 flatMapGroupsWithState 的查询中不允许聚合。
带有 joins 的查询 追加 目前不支持更新和完整模式。有关支持的联接类型的更多详细信息,请参见 联接操作部分中的支持矩阵
其他查询 追加、更新 不支持完整模式,因为在结果表中保存所有未聚合的数据是不可行的。

输出接收器

内置输出接收器有几种类型。

writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
writeStream
    .foreach(...)
    .start()
writeStream
    .format("console")
    .start()
writeStream
    .format("memory")
    .queryName("tableName")
    .start()

某些接收器不是容错的,因为它们不保证输出的持久性,并且仅用于调试目的。请参见前面关于 容错语义 的部分。以下是 Spark 中所有接收器的详细信息。

接收器 支持的输出模式 选项 容错 备注
文件接收器 追加 path: 输出目录的路径,必须指定。
retention: 输出文件的生存时间 (TTL)。提交时间早于 TTL 的输出文件最终将从元数据日志中排除。这意味着读取接收器输出目录的读取器查询可能不会处理它们。您可以以时间字符串格式提供值(例如“12h”、“7d”等)。默认情况下,它被禁用。

对于特定于文件格式的选项,请参见 DataFrameWriter 中的相关方法 (Scala/Java/Python/R)。例如,对于“parquet”格式选项,请参见 DataFrameWriter.parquet()
是(精确一次) 支持写入分区表。按时间分区可能很有用。
Kafka 接收器 追加、更新、完整 请参见 Kafka 集成指南 是(至少一次) 有关更多详细信息,请参见 Kafka 集成指南
Foreach 接收器 追加、更新、完整 是(至少一次) 有关更多详细信息,请参见 下一节
ForeachBatch 接收器 追加、更新、完整 取决于实现 有关更多详细信息,请参见 下一节
控制台接收器 追加、更新、完整 numRows: 每次触发打印的行数(默认值:20)
truncate: 如果输出过长是否截断(默认值:true)
内存接收器 追加、完整 否。但在完整模式下,重新启动的查询将重新创建完整的表。 表名是查询名。

请注意,您必须调用 start() 才能真正启动查询的执行。这将返回一个 StreamingQuery 对象,它是持续运行执行的句柄。您可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个示例来了解所有这些。

# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")

# Print new data to console
noAggDF \
    .writeStream \
    .format("console") \
    .start()

# Write new data to Parquet files
noAggDF \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .option("path", "path/to/destination/dir") \
    .start()

# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF \
    .writeStream \
    .queryName("aggregates") \
    .outputMode("complete") \
    .format("memory") \
    .start()

spark.sql("select * from aggregates").show()   # interactively query in-memory table
// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table
// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// Print new data to console
noAggDF
  .writeStream()
  .format("console")
  .start();

// Write new data to Parquet files
noAggDF
  .writeStream()
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start();

// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// Print updated aggregations to console
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

// Have all the aggregates in an in-memory table
aggDF
  .writeStream()
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start();

spark.sql("select * from aggregates").show();   // interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# ========== DF with aggregation ==========
aggDF <- count(groupBy(df, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

# Interactively query in-memory table
head(sql("select * from aggregates"))
使用 Foreach 和 ForeachBatch

通过 foreachforeachBatch 操作,您可以对流式查询的输出应用任意操作和写入逻辑。它们的使用场景略有不同 - foreach 允许对每一行进行自定义写入逻辑,而 foreachBatch 允许对每个微批次的输出进行任意操作和自定义逻辑。让我们更详细地了解它们的使用方法。

ForeachBatch

foreachBatch(...) 允许您指定一个函数,该函数将在流式查询的每个微批次的输出数据上执行。自 Spark 2.4 起,这在 Scala、Java 和 Python 中受支持。它接受两个参数:一个包含微批次输出数据的 DataFrame 或 Dataset,以及微批次的唯一 ID。

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // Transform and write batchDF
}.start()
streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long>() {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }
  }
).start();

R 尚未支持。

使用 foreachBatch,您可以执行以下操作。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

注意

Foreach

如果 foreachBatch 不是一种选择(例如,相应的批处理数据写入器不存在,或者使用连续处理模式),那么您可以使用 foreach 表达您的自定义写入器逻辑。具体来说,您可以通过将数据写入逻辑划分为三个方法来表达:openprocessclose。从 Spark 2.4 开始,foreach 在 Scala、Java 和 Python 中可用。

在 Python 中,您可以通过两种方式调用 foreach:在函数中或在对象中。该函数提供了一种简单的方法来表达您的处理逻辑,但它不允许您在故障导致重新处理某些输入数据时对生成的数据进行重复数据删除。对于这种情况,您必须在对象中指定处理逻辑。

  • 首先,该函数将一行作为输入。
def process_row(row):
    # Write row to storage
    pass

query = streamingDF.writeStream.foreach(process_row).start()
  • 其次,该对象具有一个 process 方法和可选的 open 和 close 方法
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        pass

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        pass

    def close(self, error):
        # Close the connection. This method in optional in Python.
        pass

query = streamingDF.writeStream.foreach(ForeachWriter()).start()

在 Scala 中,您必须扩展类 ForeachWriter (docs).

streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String): Unit = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

在 Java 中,您必须扩展类 ForeachWriter (docs).

streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter<String>() {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

R 尚未支持。

执行语义 当流式查询启动时,Spark 会以以下方式调用函数或对象的 method

流式表 API

从 Spark 3.1 开始,您还可以使用 DataStreamReader.table() 将表读取为流式 DataFrame,并使用 DataStreamWriter.toTable() 将流式 DataFrame 写入表

spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()
val spark: SparkSession = ...

// Create a streaming DataFrame
val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load()

// Write the streaming DataFrame to a table
df.writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable")

// Check the table result
spark.read.table("myTable").show()

// Transform the source dataset and write to a new table
spark.readStream
  .table("myTable")
  .select("value")
  .writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable")

// Check the new table result
spark.read.table("newTable").show()
SparkSession spark = ...

// Create a streaming DataFrame
Dataset<Row> df = spark.readStream()
  .format("rate")
  .option("rowsPerSecond", 10)
  .load();

// Write the streaming DataFrame to a table
df.writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable");

// Check the table result
spark.read().table("myTable").show();

// Transform the source dataset and write to a new table
spark.readStream()
  .table("myTable")
  .select("value")
  .writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable");

// Check the new table result
spark.read().table("newTable").show();

在 R 中不可用。

有关更多详细信息,请查看 DataStreamReader 的文档 (Scala/Java/Python 文档) 和 DataStreamWriter (Scala/Java/Python 文档)。

触发器

流式查询的触发器设置定义了流式数据处理的时间,以及查询将以固定批次间隔的微批次查询方式执行,还是以连续处理查询方式执行。以下是支持的不同类型的触发器。

触发器类型 描述
未指定(默认) 如果未显式指定触发器设置,则默认情况下,查询将以微批次模式执行,其中微批次将在前一个微批次完成处理后立即生成。
固定间隔微批次 查询将以微批次模式执行,其中微批次将在用户指定的间隔处启动。
  • 如果前一个微批次在间隔内完成,则引擎将等待直到间隔结束,然后启动下一个微批次。
  • 如果前一个微批次完成所需时间超过间隔(即,如果错过了间隔边界),则下一个微批次将在前一个微批次完成时立即启动(即,它不会等待下一个间隔边界)。
  • 如果没有新数据可用,则不会启动任何微批次。
一次性微批次(已弃用) 查询将仅执行一次微批次来处理所有可用数据,然后自行停止。这在您希望定期启动集群、处理自上次周期以来可用的所有内容,然后关闭集群的场景中很有用。在某些情况下,这可能会导致显着的成本节省。请注意,此触发器已弃用,鼓励用户迁移到立即可用微批次,因为它提供了更好的处理保证、更细粒度的批次规模以及对水印推进的更好渐进处理,包括无数据批次。
立即可用微批次 与一次性微批次触发器查询类似,查询将处理所有可用数据,然后自行停止。不同之处在于,它将根据源选项(例如,文件源的maxFilesPerTrigger)在(可能)多个微批次中处理数据,这将导致更好的查询可扩展性。
  • 此触发器提供了强大的处理保证:无论之前运行中剩下多少批次,它都确保在终止之前处理执行时可用的所有数据。所有未提交的批次将首先被处理。
  • 水印在每个批次中都会推进,并且如果最后一个批次推进水印,则在终止之前会执行无数据批次。这有助于保持更小且可预测的状态大小以及状态操作输出的更小延迟。
具有固定检查点间隔的连续
(实验性)
查询将在新的低延迟连续处理模式下执行。在下面的连续处理部分中了解更多信息。

以下是一些代码示例。

# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
  .format("console") \
  .start()

# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

# One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

# Available-now trigger
df.writeStream \
  .format("console") \
  .trigger(availableNow=True) \
  .start()

# Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start()

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start();

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start();

// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start();

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start();

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start();
# Default trigger (runs micro-batch as soon as it can)
write.stream(df, "console")

# ProcessingTime trigger with two-seconds micro-batch interval
write.stream(df, "console", trigger.processingTime = "2 seconds")

# One-time trigger
write.stream(df, "console", trigger.once = TRUE)

# Continuous trigger is not yet supported

管理流式查询

启动查询时创建的 StreamingQuery 对象可用于监控和管理查询。

query = df.writeStream.format("console").start()   # get the query object

query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart

query.name()        # get the name of the auto-generated or user-specified name

query.explain()   # print detailed explanations of the query

query.stop()      # stop the query

query.awaitTermination()   # block until query is terminated, with stop() or with error

query.exception()       # the exception if the query has been terminated with error

query.recentProgress  # a list of the most recent progress updates for this query

query.lastProgress    # the most recent progress update of this streaming query
val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query
StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query
query <- write.stream(df, "console")  # get the query object

queryName(query)          # get the name of the auto-generated or user-specified name

explain(query)            # print detailed explanations of the query

stopQuery(query)          # stop the query

awaitTermination(query)   # block until query is terminated, with stop() or with error

lastProgress(query)       # the most recent progress update of this streaming query

您可以在单个 SparkSession 中启动任意数量的查询。它们将同时运行,共享集群资源。您可以使用 sparkSession.streams() 获取 StreamingQueryManager (Scala/Java/Python 文档),它可用于管理当前活动的查询。

spark = ...  # spark session

spark.streams.active  # get the list of currently active streaming queries

spark.streams.get(id)  # get a query object by its unique id

spark.streams.awaitAnyTermination()  # block until any one of them terminates
val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates
SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates
Not available in R.

监控流式查询

有多种方法可以监控活动的流式查询。您可以使用 Spark 的 Dropwizard Metrics 支持将指标推送到外部系统,或者以编程方式访问它们。

交互式读取指标

您可以使用 streamingQuery.lastProgress()streamingQuery.status() 直接获取活动查询的当前状态和指标。 lastProgress()ScalaJava 中返回一个 StreamingQueryProgress 对象,在 Python 中返回一个具有相同字段的字典。它包含有关流的最后一个触发器中取得的进展的所有信息 - 处理了哪些数据、处理速率、延迟等。还有一个 streamingQuery.recentProgress,它返回最近几次进展的数组。

此外,streamingQuery.status()ScalaJava 中返回一个 StreamingQueryStatus 对象,在 Python 中返回一个具有相同字段的字典。它提供有关查询立即在做什么的信息 - 触发器是否处于活动状态、是否正在处理数据等。

以下是一些示例。

query = ...  # a StreamingQuery
print(query.lastProgress)

'''
Will print something like the following.

{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''

print(query.status)
'''
Will print something like the following.

{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


println(query.status)

/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
StreamingQuery query = ...

System.out.println(query.lastProgress());
/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


System.out.println(query.status());
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
query <- ...  # a StreamingQuery
lastProgress(query)

'''
Will print something like the following.

{
  "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
  "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
  "name" : null,
  "timestamp" : "2017-04-26T08:27:28.835Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 1
  },
  "stateOperators" : [ {
    "numRowsTotal" : 4,
    "numRowsUpdated" : 0
  } ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9999]",
    "startOffset" : 1,
    "endOffset" : 1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
  }
}
'''

status(query)
'''
Will print something like the following.

{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
'''

使用异步 API 以编程方式报告指标

您还可以通过附加 StreamingQueryListener (Scala/Java/Python 文档) 来异步监控与 SparkSession 关联的所有查询。一旦您使用 sparkSession.streams.addListener() 附加了自定义的 StreamingQueryListener 对象,您将在查询启动和停止以及活动查询取得进展时收到回调。以下是一个示例,

spark = ...

class Listener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("Query started: " + queryStarted.id)

    def onQueryProgress(self, event):
        print("Query made progress: " + queryProgress.progress)

    def onQueryTerminated(self, event):
    	print("Query terminated: " + queryTerminated.id)


spark.streams.addListener(Listener())
val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
Not available in R.

使用 Dropwizard 报告指标

Spark 支持使用 Dropwizard 库 报告指标。要启用结构化流式查询的指标报告,您必须在 SparkSession 中显式启用配置 spark.sql.streaming.metricsEnabled

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
sql("SET spark.sql.streaming.metricsEnabled=true")

在此配置启用后,在 SparkSession 中启动的所有查询都将通过 Dropwizard 向已配置的任何 接收器(例如 Ganglia、Graphite、JMX 等)报告指标。

使用检查点从故障中恢复

在发生故障或有意关闭的情况下,您可以恢复先前查询的先前进度和状态,并从中断的地方继续。这是使用检查点和预写日志完成的。您可以使用检查点位置配置查询,查询将所有进度信息(即每个触发器中处理的偏移量范围)和正在运行的聚合(例如,快速示例 中的词频统计)保存到检查点位置。此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在 启动查询 时作为 DataStreamWriter 中的选项设置。

aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()
aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()
aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")

流式查询更改后的恢复语义

从同一个检查点位置重新启动时,对流式查询所允许的更改存在限制。以下是一些不允许更改的类型,或者更改的效果不明确。对于所有这些

更改类型

异步进度跟踪

它是什么?

异步进度跟踪允许流式查询异步地并行于微批次中的实际数据处理来检查点进度,从而减少与维护偏移量日志和提交日志相关的延迟。

Async Progress Tracking

它是如何工作的?

结构化流式处理依赖于持久化和管理偏移量作为查询处理的进度指示器。偏移量管理操作会直接影响处理延迟,因为在这些操作完成之前不会发生任何数据处理。异步进度跟踪使流式查询能够检查点进度,而不会受到这些偏移量管理操作的影响。

如何使用它?

以下代码片段提供了一个使用此功能的示例

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()
val query = stream.writeStream
     .format("kafka")
	.option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
	.option("asyncProgressTrackingEnabled", "true")
     .start()

下表描述了此功能的配置及其关联的默认值。

选项 默认值 描述
asyncProgressTrackingEnabled true/false false 启用或禁用异步进度跟踪
asyncProgressTrackingCheckpointIntervalMs 毫秒 1000 我们提交偏移量和完成提交的间隔

限制

该功能的初始版本具有以下限制

关闭设置

关闭异步进度跟踪可能会导致抛出以下异常

java.lang.IllegalStateException: batch x doesn't exist

此外,以下错误消息可能会在驱动程序日志中打印

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

这是因为,当启用异步进度跟踪时,框架不会为每个批次检查点进度,就像在不使用异步进度跟踪的情况下那样。要解决此问题,只需重新启用“asyncProgressTrackingEnabled”并将“asyncProgressTrackingCheckpointIntervalMs”设置为 0,然后运行流式查询,直到至少处理了两个微批次。现在可以安全地禁用异步进度跟踪,重启查询应该正常进行。

连续处理

[实验性]

连续处理是 Spark 2.3 中引入的一种新的实验性流式执行模式,它能够以至少一次容错保证实现低(~1 毫秒)端到端延迟。将其与默认的微批处理引擎进行比较,该引擎可以实现完全一次保证,但最佳情况下只能实现 ~100 毫秒的延迟。对于某些类型的查询(如下所述),您可以在不修改应用程序逻辑(即不更改 DataFrame/Dataset 操作)的情况下选择在其中执行它们的模式。

要在连续处理模式下运行受支持的查询,您需要做的就是使用所需的检查点间隔作为参数指定一个连续触发器。例如,

spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load() \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .trigger(continuous="1 second") \     # only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger;

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();

1 秒的检查点间隔意味着连续处理引擎将每秒记录一次查询的进度。生成的检查点采用与微批处理引擎兼容的格式,因此任何查询都可以使用任何触发器重新启动。例如,以微批处理模式启动的受支持查询可以在连续模式下重新启动,反之亦然。请注意,每次切换到连续模式时,您将获得至少一次容错保证。

受支持的查询

从 Spark 2.4 开始,只有以下类型的查询在连续处理模式下受支持。

有关它们的更多详细信息,请参见输入源输出接收器 部分。虽然控制台接收器适合测试,但端到端低延迟处理可以通过 Kafka 作为源和接收器来最好地观察,因为这允许引擎在输入数据在输入主题中可用后的几毫秒内处理数据并在输出主题中提供结果。

注意事项

其他信息

备注

进一步阅读

演讲

迁移指南

迁移指南现已存档 在此页面上