结构化流编程指南

使用 Datasets 和 DataFrames 的 API

自 Spark 2.0 以来,DataFrame 和 Dataset 既可以表示静态的、有界的数据,也可以表示流式的、无界的数据。与静态 Datasets/DataFrames 类似,你可以使用通用入口点 SparkSession (Python/Scala/Java/R 文档) 从流式源创建流式 DataFrames/Datasets,并对其应用与静态 DataFrames/Datasets 相同的操作。如果你不熟悉 Datasets/DataFrames,强烈建议你通过 DataFrame/Dataset 编程指南 熟悉它们。

创建流式 DataFrame 和流式 Dataset

流式 DataFrame 可以通过 SparkSession.readStream() 返回的 DataStreamReader 接口 (Python/Scala/Java 文档) 创建。在 R 中,使用 read.stream() 方法。与创建静态 DataFrame 的读取接口类似,你可以指定源的详细信息——数据格式、schema、选项等。

输入源

有一些内置源。

某些源不是容错的,因为它们不保证在故障后可以使用检查点偏移量重播数据。有关 容错语义 的详细信息,请参阅前面的章节。以下是 Spark 中所有源的详细信息。

选项 容错性 备注
文件源 path: 输入目录的路径,所有文件格式通用。
maxFilesPerTrigger: 每个触发器中要考虑的最大新文件数(默认值:无最大限制)
maxBytesPerTrigger: 每个触发器中要考虑的最大新文件总大小(默认值:无最大限制)。maxBytesPerTriggermaxFilesPerTrigger 不能同时设置,只能选择其中一个。请注意,流始终至少读取一个文件,以便能够取得进展,而不会被大于给定最大值的文件卡住。
latestFirst: 是否优先处理最新的文件,当有大量积压文件时很有用(默认值:false)
fileNameOnly: 是否仅根据文件名而不是完整路径检查新文件(默认值:false)。如果设置为 `true`,以下文件将被视为同一文件,因为它们的文件名“dataset.txt”相同
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
maxFileAge: 在被忽略之前,此目录中文件可以存在的最长时限。对于第一个批次,所有文件都将被视为有效。如果 latestFirst 设置为 `true` 并且设置了 maxFilesPerTriggermaxBytesPerTrigger,则此参数将被忽略,因为那些有效且应被处理的旧文件可能会被忽略。最长时限是相对于最新文件的时间戳而不是当前系统时间戳指定的。(默认值:1 周)
maxCachedFiles: 要缓存以在后续批次中处理的最大文件数(默认值:10000)。如果缓存中有文件可用,将优先从缓存中读取,然后才从输入源列表。
discardCachedInputRatio: 缓存文件/字节与最大文件/字节的比例,当缓存的输入少于可读取量时,允许从输入源进行列表(默认值:0.2)。例如,如果一个批次只剩下 10 个缓存文件,但 maxFilesPerTrigger 设置为 100,则这 10 个缓存文件将被丢弃,并改为执行新的列表操作。类似地,如果一个批次剩下 10 MB 的缓存文件,但 maxBytesPerTrigger 设置为 100MB,则缓存文件将被丢弃。
cleanSource: 处理完成后清理已完成文件的选项。
可用选项为“archive”、“delete”、“off”。如果未提供此选项,默认值为“off”。
当提供“archive”时,还必须提供附加选项 sourceArchiveDir。 “sourceArchiveDir”的值不得与源模式在深度上匹配(从根目录开始的目录数),其中深度是两条路径中深度的最小值。这将确保归档文件永远不会作为新源文件包含在内。
例如,假设你提供 '/hello?/spark/*' 作为源模式,则不能使用 '/hello1/spark/archive/dir' 作为“sourceArchiveDir”的值,因为 '/hello?/spark/*' 和 '/hello1/spark/archive' 将会匹配。同样,'/hello1/spark' 也不能用作“sourceArchiveDir”的值,因为 '/hello?/spark' 和 '/hello1/spark' 将会匹配。而 '/archived/here' 则可以,因为它不匹配。
Spark 将根据源文件自身的路径移动文件。例如,如果源文件路径是 /a/b/dataset.txt 并且归档目录路径是 /archived/here,则文件将被移动到 /archived/here/a/b/dataset.txt
注意:归档(通过移动)或删除已完成的文件都会在每个微批次中引入开销(即使在单独的线程中发生也会导致速度变慢),因此在启用此选项之前,你需要了解文件系统中每项操作的成本。另一方面,启用此选项将降低列出源文件的成本,这可能是一项昂贵的操作。
已完成文件清理器中使用的线程数可以通过 spark.sql.streaming.fileSource.cleaner.numThreads 配置(默认值:1)。
注意 2:启用此选项时,源路径不应被多个源或查询使用。同样,你必须确保源路径不与文件流输出目录中的任何文件匹配。
注意 3:删除和移动操作都是尽力而为。未能删除或移动文件不会导致流式查询失败。在某些情况下,Spark 可能无法清理某些源文件——例如,应用程序没有正常关闭,太多文件排队等待清理。

对于特定文件格式的选项,请参阅 DataStreamReader (Python/Scala/Java/R) 中的相关方法。例如,对于“parquet”格式选项,请参阅 DataStreamReader.parquet()

此外,还有影响某些文件格式的会话配置。有关更多详细信息,请参阅 SQL 编程指南。例如,对于“parquet”,请参阅 Parquet 配置 部分。
支持 glob 路径,但不支持多个逗号分隔的路径/glob。
Socket 源 host: 要连接的主机,必须指定
port: 要连接的端口,必须指定
速率源 rowsPerSecond (例如 100,默认值:1):每秒应生成多少行。

rampUpTime (例如 5s,默认值:0s):在生成速度达到 rowsPerSecond 之前需要多长时间预热。使用比秒更精细的粒度将被截断为整数秒。

numPartitions (例如 10,默认值:Spark 的默认并行度):生成行的分区数。

源将尽力达到 rowsPerSecond,但查询可能受到资源限制,可以调整 numPartitions 以帮助达到所需的速度。
每微批次速率源 (格式: rate-micro-batch) rowsPerBatch (例如 100):每个微批次应生成多少行。

numPartitions (例如 10,默认值:Spark 的默认并行度):生成行的分区数。

startTimestamp (例如 1000,默认值:0):生成时间的起始值。

advanceMillisPerBatch (例如 1000,默认值:1000):每个微批次中生成时间推进的时间量。

Kafka 源 请参阅 Kafka 集成指南

以下是一些示例。

spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
SparkSession spark = ...

// Read text from socket
Dataset<Row> socketDF = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources

socketDF.printSchema();

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
  .readStream()
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
sparkR.session(...)

# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)

isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources

printSchema(socketDF)

# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
                     structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")

这些示例生成的是无类型的流式 DataFrames,这意味着 DataFrame 的 schema 在编译时不会检查,只在查询提交的运行时检查。一些操作,如 mapflatMap 等,需要在编译时知道类型。要执行这些操作,你可以使用与静态 DataFrame 相同的方法将这些无类型的流式 DataFrames 转换为有类型的流式 Datasets。有关更多详细信息,请参阅 SQL 编程指南。此外,本文档后面还会讨论有关支持的流式源的更多详细信息。

自 Spark 3.1 起,你还可以使用 DataStreamReader.table() 从表中创建流式 DataFrame。有关更多详细信息,请参阅 流式表 API

流式 DataFrames/Datasets 的 Schema 推断和分区

默认情况下,基于文件的结构化流要求你指定 schema,而不是依赖 Spark 自动推断。此限制确保即使在发生故障的情况下,流式查询也将使用一致的 schema。对于临时用例,你可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用 schema 推断。

当存在名为 /key=value/ 的子目录时,会发生分区发现,并且列表会自动递归进入这些目录。如果这些列出现在用户提供的 schema 中,Spark 将根据读取文件的路径填充它们。构成分区方案的目录在查询启动时必须存在,并且必须保持静态。例如,在 /data/year=2015/ 存在时添加 /data/year=2016/ 是可以的,但更改分区列(例如,通过创建目录 /data/date=2016-04-17/)是无效的。

流式 DataFrames/Datasets 操作

你可以在流式 DataFrames/Datasets 上应用各种操作——从无类型的 SQL 类似操作(例如 selectwheregroupBy),到有类型的 RDD 类似操作(例如 mapfilterflatMap)。有关更多详细信息,请参阅 SQL 编程指南。让我们看几个你可以使用的操作示例。

基本操作 - 选择、投影、聚合

DataFrame/Dataset 上的大多数常见操作都支持流式处理。本节稍后将讨论少数不支持的操作。

df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
  private String device;
  private String deviceType;
  private Double signal;
  private java.sql.Date time;
  ...
  // Getter and setter methods for each field
}

Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")

# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))

你还可以将流式 DataFrame/Dataset 注册为临时视图,然后对其应用 SQL 命令。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # returns another streaming DF
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // returns another streaming DF
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates");  // returns another streaming DF
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")

请注意,你可以使用 df.isStreaming 来识别 DataFrame/Dataset 是否包含流式数据。

df.isStreaming()
df.isStreaming
df.isStreaming()
isStreaming(df)

你可能需要检查查询的执行计划,因为 Spark 在解释针对流式数据集的 SQL 语句时可能会注入有状态操作。一旦查询计划中注入了有状态操作,你可能需要根据有状态操作的考虑因素来检查你的查询。(例如,输出模式、watermark、状态存储大小维护等)

事件时间上的窗口操作

使用结构化流对滑动事件时间窗口进行聚合非常简单,并且与分组聚合非常相似。在分组聚合中,聚合值(例如计数)会为用户指定分组列中的每个唯一值维护。在基于窗口的聚合中,聚合值会为行事件时间落入的每个窗口维护。让我们通过一个示例来理解这一点。

想象一下我们的快速示例被修改,现在流中包含行以及生成行的时间。我们不是运行词频统计,而是希望在 10 分钟的窗口内统计词频,每 5 分钟更新一次。也就是说,统计在 10 分钟窗口(例如 12:00 - 12:10、12:05 - 12:15、12:10 - 12:20 等)之间接收到的单词的词频。请注意,12:00 - 12:10 表示在 12:00 之后但在 12:10 之前到达的数据。现在,考虑一个在 12:07 接收到的单词。这个单词应该增加对应于两个窗口(12:00 - 12:10 和 12:05 - 12:15)的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)共同索引。

结果表将类似于以下内容。

Window Operations

由于这种窗口操作类似于分组,因此在代码中,你可以使用 groupBy()window() 操作来表达窗口聚合。你可以在 Python/Scala/Java 中查看以下示例的完整代码。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

处理迟到数据和水位线

现在考虑如果一个事件迟到应用程序会发生什么。例如,假设一个在 12:04(即事件时间)生成的单词可能在 12:11 被应用程序接收。应用程序应该使用 12:04 而不是 12:11 来更新窗口 12:00 - 12:10 的旧计数。这在我们的基于窗口的分组中自然发生——结构化流可以长时间维护部分聚合的中间状态,以便迟到数据可以正确更新旧窗口的聚合,如下所示。

Handling Late Data

然而,要长时间运行此查询,系统需要限制其累积的中间内存状态量。这意味着系统需要知道何时可以将旧的聚合从内存状态中丢弃,因为应用程序不再会收到该聚合的迟到数据。为了实现这一点,在 Spark 2.1 中,我们引入了水位线(watermarking),它允许引擎自动跟踪数据中的当前事件时间,并相应地尝试清理旧状态。你可以通过指定事件时间列以及数据在事件时间方面预期迟到程度的阈值来定义查询的水位线。对于一个在时间 T 结束的特定窗口,引擎将维护状态并允许迟到数据更新状态,直到 (引擎观察到的最大事件时间 - 迟到阈值 > T)。换句话说,在阈值内的迟到数据将被聚合,但晚于阈值的数据将开始被丢弃(有关确切的保证,请参阅本节后面)。让我们通过一个示例来理解这一点。我们可以使用 withWatermark() 轻松地在前面的示例上定义水位线,如下所示。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group

words <- withWatermark(words, "timestamp", "10 minutes")
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

在此示例中,我们正在列“timestamp”的值上定义查询的水位线,并且还将“10 分钟”定义为允许数据迟到的阈值。如果此查询以更新输出模式(稍后在输出模式一节中讨论)运行,引擎将继续更新结果表中的窗口计数,直到该窗口比水位线更旧,而水位线比列“timestamp”中的当前事件时间落后 10 分钟。以下是一个说明。

Watermarking in Update Mode

如插图所示,引擎跟踪的最大事件时间是蓝色虚线,每个触发器开始时设置为 (最大事件时间 - '10 分钟') 的水位线是红色线条。例如,当引擎观察到数据 (12:14, dog) 时,它将下一个触发器的水位线设置为 12:04。此水位线允许引擎额外维护 10 分钟的中间状态,以便可以计算迟到数据。例如,数据 (12:09, cat) 是乱序和迟到的,它落在窗口 12:00 - 12:1012:05 - 12:15 中。由于它仍然在触发器中领先于水位线 12:04,引擎仍然将中间计数作为状态维护,并正确更新相关窗口的计数。然而,当水位线更新到 12:11 时,窗口 (12:00 - 12:10) 的中间状态被清除,所有后续数据(例如 (12:04, donkey))都被认为是“太迟”,因此被忽略。请注意,在每次触发后,更新的计数(即紫色行)作为触发器输出写入到 sink,这由更新模式决定。

某些 sink(例如文件)可能不支持更新模式所需的细粒度更新。为了与它们配合使用,我们还支持追加模式(Append Mode),其中只有最终计数写入到 sink。这在下面进行了说明。

请注意,在非流式 Dataset 上使用 withWatermark 是一个空操作。由于水位线不应以任何方式影响任何批处理查询,我们将直接忽略它。

Watermarking in Append Mode

与之前的更新模式类似,引擎会为每个窗口维护中间计数。然而,部分计数不会更新到结果表,也不会写入到 sink。引擎会等待“10 分钟”以计算迟到数据,然后丢弃小于水位线的窗口的中间状态,并将最终计数追加到结果表/sink。例如,窗口 12:00 - 12:10 的最终计数仅在水位线更新到 12:11 后才会追加到结果表。

时间窗口的类型

Spark 支持三种时间窗口类型:翻转(固定)、滑动和会话。

The types of time windows

翻转窗口是一系列固定大小、不重叠且连续的时间间隔。一个输入只能绑定到一个窗口。

滑动窗口与翻转窗口类似,都是“固定大小”的,但如果滑动持续时间小于窗口持续时间,窗口可以重叠,在这种情况下,一个输入可以绑定到多个窗口。

翻转窗口和滑动窗口使用 window 函数,该函数已在上述示例中描述。

会话窗口与前两种类型相比具有不同的特性。会话窗口的长度是动态的,取决于输入。会话窗口从一个输入开始,如果后续输入在间隔持续时间内接收到,它会自行扩展。对于静态间隔持续时间,会话窗口在接收到最新输入后,如果在间隔持续时间内没有接收到任何输入,则关闭。

会话窗口使用 session_window 函数。该函数的使用方式与 window 函数类似。

events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window(events.timestamp, "5 minutes"),
        events.userId) \
    .count()
import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window($"timestamp", "5 minutes"),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window(col("timestamp"), "5 minutes"),
        col("userId"))
    .count();

除了静态值,我们还可以提供一个表达式来根据输入行动态指定间隔持续时间。请注意,具有负或零间隔持续时间的行将从聚合中过滤掉。

有了动态间隔持续时间,会话窗口的关闭不再依赖于最新输入。会话窗口的范围是所有事件范围的并集,这些事件范围由查询执行期间的事件开始时间和评估的间隔持续时间决定。

from pyspark.sql import functions as sf

events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

session_window = session_window(events.timestamp, \
    sf.when(events.userId == "user1", "5 seconds") \
    .when(events.userId == "user2", "20 seconds").otherwise("5 minutes"))

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window,
        events.userId) \
    .count()
import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

val sessionWindow = session_window($"timestamp", when($"userId" === "user1", "5 seconds")
  .when($"userId" === "user2", "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        Column(sessionWindow),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

SessionWindow sessionWindow = session_window(col("timestamp"), when(col("userId").equalTo("user1"), "5 seconds")
  .when(col("userId").equalTo("user2"), "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        new Column(sessionWindow),
        col("userId"))
    .count();

请注意,在流式查询中使用会话窗口时存在一些限制,如下所示

对于批处理查询,支持全局窗口(分组键中只有 session_window)。

默认情况下,Spark 不对会话窗口聚合执行部分聚合,因为它在分组之前需要在本地分区中进行额外的排序。这对于每个本地分区中同一分组键的输入行数较少的情况效果更好,但对于本地分区中同一分组键的输入行数较多的情况,即使有额外的排序,执行部分聚合仍然可以显著提高性能。

你可以启用 spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition 来指示 Spark 执行部分聚合。

时间窗口的时间表示

在某些用例中,需要提取时间窗口的时间表示,以便将需要时间戳的操作应用于时间窗口数据。一个例子是链式时间窗口聚合,用户希望针对时间窗口定义另一个时间窗口。例如,有人希望将 5 分钟的时间窗口聚合为 1 小时的翻转时间窗口。

有两种方法可以实现这一点,如下所示

  1. 使用 window_time SQL 函数,并以时间窗口列作为参数
  2. 使用 window SQL 函数,并以时间窗口列作为参数

window_time 函数将生成一个表示时间窗口的时间戳。用户可以将结果传递给 window 函数的参数(或任何需要时间戳的地方),以对需要时间戳的时间窗口执行操作。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window(window_time($"window"), "1 hour"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
  functions.window(functions.window_time("window"), "1 hour"),
  windowedCounts.col("word")
).count();

window 函数不仅接受时间戳列,还接受时间窗口列。这对于用户希望应用链式时间窗口聚合的场景特别有用。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(windowedCounts.window, "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
  functions.window("window", "1 hour"),
  windowedCounts.col("word")
).count();
水位线清理聚合状态的条件

需要注意的是,为了使水位线清理聚合查询中的状态,必须满足以下条件(截至 Spark 2.1.1,将来可能会更改)

带水位线的聚合的语义保证

Join 操作

结构化流支持将流式 Dataset/DataFrame 与静态 Dataset/DataFrame 以及另一个流式 Dataset/DataFrame 进行连接(join)。流式连接的结果是增量生成的,类似于上一节中流式聚合的结果。在本节中,我们将探讨上述情况下支持的连接类型(即内连接、外连接、半连接等)。请注意,在所有支持的连接类型中,与流式 Dataset/DataFrame 的连接结果将与包含流中相同数据的静态 Dataset/DataFrame 的连接结果完全相同。

流-静态连接

自 Spark 2.0 引入以来,结构化流已支持流式 DataFrame/Dataset 与静态 DataFrame/Dataset 之间的连接(内连接和某些类型的外连接)。以下是一个简单的示例。

staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  # left outer join with a static DF
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  // left outer join with a static DF
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer");  // left outer join with a static DF
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE)  # inner equi-join with a static DF
joined <- join(
            streamingDf,
            staticDf,
            streamingDf$value == staticDf$value,
            "left_outer")  # left outer join with a static DF

请注意,流-静态连接不是有状态的,因此不需要状态管理。但是,少数类型的流-静态外连接尚不支持。这些在本连接部分末尾列出。

流-流连接

在 Spark 2.3 中,我们增加了对流-流连接的支持,也就是说,你可以连接两个流式 Datasets/DataFrames。在两个数据流之间生成连接结果的挑战在于,在任何时间点,连接两侧的数据集视图都是不完整的,这使得查找输入之间的匹配变得更加困难。从一个输入流接收到的任何行都可以与另一个输入流中任何未来尚未接收到的行进行匹配。因此,对于两个输入流,我们将过去的输入作为流状态进行缓冲,以便我们可以将每个未来的输入与过去的输入进行匹配,并相应地生成连接结果。此外,与流式聚合类似,我们自动处理迟到、乱序的数据,并可以使用水位线限制状态。让我们讨论支持的不同类型的流-流连接以及如何使用它们。

带可选水位线的内连接

支持在任何类型的列上进行内连接以及任何类型的连接条件。然而,随着流的运行,流状态的大小将无限增长,因为所有过去的输入都必须保存,以便任何新的输入都可以与过去的任何输入进行匹配。为了避免无界状态,你必须定义额外的连接条件,使得无限旧的输入无法与未来的输入匹配,从而可以从状态中清除。换句话说,你必须在连接中执行以下附加步骤。

  1. 在两个输入上定义水位线延迟,以便引擎知道输入可以有多迟(类似于流式聚合)

  2. 在两个输入之间定义事件时间约束,以便引擎可以确定何时不再需要一个输入的旧行(即不满足时间约束)来与另一个输入进行匹配。此约束可以通过以下两种方式之一定义。

    1. 时间范围连接条件(例如 ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),

    2. 基于事件时间窗口的连接(例如 ...JOIN ON leftTimeWindow = rightTimeWindow)。

让我们通过一个例子来理解这一点。

假设我们想将广告展示(广告展示时间)流与用户点击广告流连接起来,以关联展示何时产生了可变现的点击。为了允许在此流-流连接中进行状态清理,你必须指定水位线延迟和时间约束,如下所示。

  1. 水位线延迟:例如,展示和相应的点击在事件时间上最多可以迟到/乱序 2 小时和 3 小时。

  2. 事件时间范围条件:例如,点击可以在相应展示后的 0 秒到 1 小时的时间范围内发生。

代码将如下所示。

from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import org.apache.spark.sql.functions.expr

val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import static org.apache.spark.sql.functions.expr

Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour ")
);
impressions <- read.stream(...)
clicks <- read.stream(...)

# Apply watermarks on event-time columns
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")

# Join with event-time constraints
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"
)))
带水位线的流-流内连接的语义保证

这类似于聚合上的水位线提供的保证。水位线延迟为“2 小时”保证引擎永远不会丢弃任何延迟小于 2 小时的数据。但延迟超过 2 小时的数据可能被处理,也可能不被处理。

带水位线的外连接

虽然水位线 + 事件时间约束对于内连接是可选的,但对于外连接则必须指定。这是因为为了在外连接中生成 NULL 结果,引擎必须知道输入行何时在将来不会与任何内容匹配。因此,必须指定水位线 + 事件时间约束以生成正确的结果。因此,一个带有外连接的查询将看起来很像前面提到的广告变现示例,只是会有一个额外的参数指定它是一个外连接。

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
 )
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"),
  "left_outer"                 # can be "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))
带水位线的流-流外连接的语义保证

外连接在水位线延迟以及数据是否会被丢弃方面与内连接具有相同的保证。

注意事项

关于外部结果的生成方式,有几个重要的特性需要注意。

带水位线的半连接

半连接返回关系左侧与右侧匹配的值。它也称为左半连接。与外连接类似,半连接必须指定水位线 + 事件时间约束。这是为了清除左侧不匹配的输入行,引擎必须知道左侧的输入行何时在将来不会与右侧的任何内容匹配。

带水位线的流-流半连接的语义保证

半连接在水位线延迟以及数据是否会被丢弃方面与内连接具有相同的保证。

流式查询中连接的支持矩阵
左输入 右输入 连接类型
静态 静态 所有类型 支持,因为它不作用于流式数据,即使它可能存在于流式查询中
静态 内连接 支持,无状态
左外连接 支持,无状态
右外连接 不支持
全外连接 不支持
左半连接 支持,无状态
静态 内连接 支持,无状态
左外连接 不支持
右外连接 支持,无状态
全外连接 不支持
左半连接 不支持
内连接 支持,可选地在两侧指定水位线 + 时间约束以进行状态清理
左外连接 有条件支持,必须在右侧指定水位线 + 时间约束以获得正确结果,可选地在左侧指定水位线以进行所有状态清理
右外连接 有条件支持,必须在左侧指定水位线 + 时间约束以获得正确结果,可选地在右侧指定水位线以进行所有状态清理
全外连接 有条件支持,必须在一侧指定水位线 + 时间约束以获得正确结果,可选地在另一侧指定水位线以进行所有状态清理
左半连接 有条件支持,必须在右侧指定水位线 + 时间约束以获得正确结果,可选地在左侧指定水位线以进行所有状态清理

支持的连接的附加详细信息

在追加输出模式下,你可以构建一个查询,其中包含非 map 类似操作,例如在连接之前/之后进行聚合、去重、流-流连接。

例如,这是一个在两个流中进行时间窗口聚合,然后进行带有事件时间窗口的流-流连接的示例

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")
val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")
Dataset<Row> clicksWindow = clicksWithWatermark
  .groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
  .count();

Dataset<Row> impressionsWindow = impressionsWithWatermark
  .groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour"))
  .count();

clicksWindow.join(impressionsWindow, "window", "inner");

这是另一个带有时间范围连接条件,然后进行时间窗口聚合的流-流连接示例

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()
val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()
Dataset<Row> joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);

joined
  .groupBy(joined.col("clickAdId"), functions.window(joined.col("clickTime"), "1 hour"))
  .count();

流式去重

你可以使用事件中的唯一标识符对数据流中的记录进行去重。这与使用唯一标识符列对静态数据进行去重完全相同。查询将存储来自先前记录的必要数据量,以便可以过滤重复记录。与聚合类似,你可以使用带或不带水位线的去重。

streamingDf = spark.readStream. ...

# Without watermark using guid column
streamingDf.dropDuplicates(["guid"])

# With watermark using guid and eventTime columns
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates(["guid", "eventTime"])
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
streamingDf <- read.stream(...)

# Without watermark using guid column
streamingDf <- dropDuplicates(streamingDf, "guid")

# With watermark using guid and eventTime columns
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")

特别是对于流式处理,你可以在水位线的时间范围内,使用事件中的唯一标识符对数据流中的记录进行去重。例如,如果你将水位线的延迟阈值设置为“1 小时”,则在 1 小时内发生的重复事件可以正确去重。(更多详细信息,请参阅 dropDuplicatesWithinWatermark 的 API 文档。)

这可用于处理事件时间列不能作为唯一标识符一部分的用例,主要是由于相同记录的事件时间可能以某种方式不同。(例如,非幂等写入器在写入时发出事件时间)

建议用户将水位线的延迟阈值设置得比重复事件之间的最大时间戳差异更长。

此功能要求在流式 DataFrame/Dataset 中设置带有延迟阈值的水位线。

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
streamingDf \
  .withWatermark("eventTime", "10 hours") \
  .dropDuplicatesWithinWatermark(["guid"])
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid");

处理多个水位线的策略

一个流式查询可以有多个输入流,这些流被合并或连接在一起。每个输入流都可以有不同的迟到数据阈值,这些阈值需要为有状态操作所容忍。你可以在每个输入流上使用 withWatermarks("eventTime", delay) 来指定这些阈值。例如,考虑一个在 inputStream1inputStream2 之间进行流-流连接的查询。

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

在执行查询时,结构化流会单独跟踪每个输入流中观察到的最大事件时间,根据相应的延迟计算水位线,并选择一个全局水位线用于有状态操作。默认情况下,选择最小值作为全局水位线,因为它确保如果其中一个流落后于其他流(例如,其中一个流由于上游故障停止接收数据),则不会意外地丢弃任何数据。换句话说,全局水位线将安全地以最慢流的速度移动,并且查询输出将相应地延迟。

然而,在某些情况下,你可能希望更快地获得结果,即使这意味着丢弃最慢流中的数据。自 Spark 2.4 起,你可以通过将 SQL 配置 spark.sql.streaming.multipleWatermarkPolicy 设置为 max(默认值为 min)来设置多水位线策略,选择最大值作为全局水位线。这使得全局水位线以最快流的速度移动。然而,副作用是来自较慢流的数据将被激进地丢弃。因此,请谨慎使用此配置。

任意有状态操作

许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,你必须从事件数据流中跟踪会话。为了进行这种会话化,你必须将任意类型的数据保存为状态,并在每次触发时使用数据流事件对状态执行任意操作。

自 Spark 2.2 起,可以使用遗留的 mapGroupsWithStateflatMapGroupsWithState 运算符来完成此操作。这两个运算符都允许你在分组的 Datasets 上应用用户定义的代码来更新用户定义的状态。有关更具体的详细信息,请查看 API 文档 (Scala/Java) 和示例 (Scala/Java)。

自 Spark 4.0 发布以来,鼓励用户使用新的 transformWithState 运算符来构建其复杂的有状态应用程序。有关更多详细信息,请参阅此处的深入文档。

尽管 Spark 无法检查和强制执行,但状态函数应根据输出模式的语义实现。例如,在更新模式下,Spark 不期望状态函数会发出早于当前水位线加上允许的迟到记录延迟的行,而在追加模式下,状态函数可以发出这些行。

不支持的操作

有少数 DataFrame/Dataset 操作不支持流式 DataFrames/Datasets。其中一些如下所示。

此外,有些 Dataset 方法不适用于流式 Datasets。它们是会立即运行查询并返回结果的操作,这在流式 Dataset 上没有意义。相反,这些功能可以通过显式启动流式查询来完成(请参阅下一节相关内容)。

如果你尝试任何这些操作,你将看到一个 AnalysisException,例如“操作 XYZ 不支持流式 DataFrames/Datasets”。虽然其中一些可能在 Spark 的未来版本中得到支持,但还有一些从根本上很难在流式数据上高效实现。例如,不支持对输入流进行排序,因为它需要跟踪流中接收到的所有数据。因此,这从根本上很难高效执行。

状态存储

状态存储是一个版本化的键值存储,提供读写操作。在结构化流中,我们使用状态存储提供者来处理跨批次的有状态操作。有两种内置的状态存储提供者实现。最终用户也可以通过扩展 StateStoreProvider 接口来实现自己的状态存储提供者。

HDFS 状态存储提供者

HDFS 后端状态存储提供者是 [[StateStoreProvider]] 和 [[StateStore]] 的默认实现,其中所有数据在第一阶段存储在内存映射中,然后由 HDFS 兼容文件系统中的文件支持。对存储的所有更新都必须以事务方式批量完成,并且每组更新都会增加存储的版本。这些版本可用于在存储的正确版本上重新执行更新(通过 RDD 操作中的重试),并重新生成存储版本。

RocksDB 状态存储实现

截至 Spark 3.2,我们新增了一种内置的状态存储实现:RocksDB 状态存储提供者。

如果你的流式查询中有有状态操作(例如,流式聚合、流式去重、流-流连接、mapGroupsWithState 或 flatMapGroupsWithState),并且你想在状态中维护数百万个键,那么你可能会面临与大型 JVM 垃圾回收(GC)暂停相关的问题,导致微批处理时间出现较大波动。这是因为,根据 HDFSBackedStateStore 的实现,状态数据维护在执行器的 JVM 内存中,大量状态对象给 JVM 带来内存压力,从而导致高 GC 暂停。

在这种情况下,你可以选择使用基于 RocksDB 的更优化的状态管理解决方案。该解决方案不将状态保留在 JVM 内存中,而是使用 RocksDB 在本机内存和本地磁盘中高效管理状态。此外,对此状态的任何更改都会由结构化流自动保存到你提供的检查点位置,从而提供完整的容错保证(与默认状态管理相同)。

要启用新的内置状态存储实现,请将 spark.sql.streaming.stateStore.providerClass 设置为 org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

以下是关于状态存储提供者的 RocksDB 实例的配置

配置名称 描述 默认值
spark.sql.streaming.stateStore.rocksdb.compactOnCommit 是否对 RocksDB 实例进行范围压缩以进行提交操作 False
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled 在 RocksDB StateStore 提交期间是上传变更日志还是快照 False
spark.sql.streaming.stateStore.rocksdb.blockSizeKB RocksDB BlockBasedTable(RocksDB 的默认 SST 文件格式)每个块中打包的用户数据的近似大小(KB)。 4
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB 块缓存的大小容量(MB)。 8
spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs RocksDB 实例在加载操作中获取锁的等待时间(毫秒)。 60000
spark.sql.streaming.stateStore.rocksdb.maxOpenFiles RocksDB 实例可使用的打开文件数。值为 -1 表示文件一旦打开将始终保持打开状态。如果达到打开文件限制,RocksDB 将从打开文件缓存中逐出条目并关闭这些文件描述符,然后从缓存中删除条目。 -1
spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad 加载时是否重置 RocksDB 的所有 ticker 和 histogram 统计信息。 True
spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows 我们是否跟踪状态存储中的总行数。请参阅性能方面考虑中的详细信息。 True
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB RocksDB 中 MemTable 的最大大小。值为 -1 表示将使用 RocksDB 内部默认值 -1
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber RocksDB 中 MemTable 的最大数量,包括活动和不可变。值为 -1 表示将使用 RocksDB 内部默认值 -1
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage 单个节点上 RocksDB 状态存储实例的总内存使用量是否受限。 false
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB 单个节点上 RocksDB 状态存储实例的总内存限制(MB)。 500
spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio 写缓冲区占用的总内存,作为使用 maxMemoryUsageMB 在单个节点上所有 RocksDB 实例分配内存的比例。 0.5
spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio 高优先级池中块占用的总内存,作为使用 maxMemoryUsageMB 在单个节点上所有 RocksDB 实例分配内存的比例。 0.1
spark.sql.streaming.stateStore.rocksdb.allowFAllocate 允许 rocksdb 运行时使用 fallocate 预分配日志等的磁盘空间… 对于具有许多较小状态存储的应用程序,禁用此功能可以权衡磁盘空间以换取写入性能。 true
spark.sql.streaming.stateStore.rocksdb.compression RocksDB 中使用的压缩类型。该字符串通过 RocksDB Java API getCompressionType() 转换为 RocksDB 压缩类型。 lz4
RocksDB 状态存储内存管理

RocksDB 为不同的对象(如 memtables、块缓存和过滤器/索引块)分配内存。如果不受限制,RocksDB 在多个实例上的内存使用量可能会无限增长,并可能导致 OOM(内存不足)问题。RocksDB 提供了一种通过写入缓冲区管理器功能来限制单个节点上所有 DB 实例的内存使用量的方法。如果你想在 Spark 结构化流部署中限制 RocksDB 内存使用,可以通过将 spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage 配置设置为 true 来启用此功能。你还可以通过将 spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB 值设置为静态数字或节点上可用物理内存的一部分来确定 RocksDB 实例允许的最大内存。单个 RocksDB 实例的限制也可以通过将 spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMBspark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber 设置为所需值来配置。默认情况下,这些设置使用 RocksDB 内部默认值。

请注意,boundedMemoryUsage 配置将对 RocksDB 的总内存使用量启用一个软限制。因此,如果所有分配给更高级别读取器的块都在使用中,RocksDB 使用的总内存可能会暂时超过此值。目前无法启用严格限制,因为它会导致查询失败,并且我们不支持跨附加节点的状态重新平衡。

RocksDB 状态存储变更日志检查点

在较新版本的 Spark 中,为 RocksDB 状态存储引入了变更日志检查点。RocksDB 状态存储的传统检查点机制是增量快照检查点,其中 RocksDB 实例的清单文件和新生成的 RocksDB SST 文件上传到持久存储。变更日志检查点不是上传 RocksDB 实例的数据文件,而是上传自上次检查点以来对状态所做的更改以实现持久性。快照会定期在后台持久化,以便进行可预测的故障恢复和变更日志修剪。变更日志检查点避免了捕获和上传 RocksDB 实例快照的成本,并显著降低了流式查询延迟。

变更日志检查点默认是禁用的。你可以通过将 spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled 配置设置为 true 来启用 RocksDB 状态存储变更日志检查点。变更日志检查点设计为与传统检查点机制向后兼容。RocksDB 状态存储提供者为两种检查点机制之间的双向转换提供了无缝支持。这允许你利用变更日志检查点的性能优势,而无需丢弃旧的状态检查点。在支持变更日志检查点的 Spark 版本中,你可以通过在 Spark 会话中启用变更日志检查点,将流式查询从旧版本的 Spark 迁移到变更日志检查点。反之亦然,你可以在较新版本的 Spark 中安全地禁用变更日志检查点,然后任何已经使用变更日志检查点运行的查询都将切换回传统检查点。你需要重新启动流式查询以应用检查点机制的更改,但在此过程中你不会观察到任何性能下降。

性能方面考虑
  1. 你可能希望禁用对总行数的跟踪,以在 RocksDB 状态存储上获得更好的性能。

跟踪行数会增加写入操作的额外查找——建议你在调优 RocksDB 状态存储时尝试关闭此配置,特别是当状态运算符的指标值很大时——numRowsUpdatednumRowsRemoved

你可以在重新启动查询时更改配置,这使你能够改变“可观察性与性能”之间的权衡决策。如果此配置被禁用,状态中的行数(numTotalStateRows)将报告为 0。

状态存储和任务本地性

有状态操作将事件状态存储在执行器的状态存储中。状态存储占用内存和磁盘空间等资源来存储状态。因此,在不同的流批次中,让状态存储提供者在同一个执行器中运行效率更高。更改状态存储提供者的位置需要额外的开销来加载检查点状态。从检查点加载状态的开销取决于外部存储和状态大小,这往往会损害微批处理运行的延迟。对于某些用例,例如处理非常大的状态数据,从检查点状态加载新的状态存储提供者可能非常耗时且效率低下。

结构化流查询中的有状态操作依赖于 Spark RDD 的首选位置功能,以便在同一个执行器上运行状态存储提供者。如果在下一个批次中,相应的状态存储提供者再次在此执行器上调度,则它可以重用之前的状态,并节省加载检查点状态的时间。

然而,通常首选位置并非硬性要求,Spark 仍然可能将任务调度到非首选的执行器上。在这种情况下,Spark 将在新执行器上从检查点状态加载状态存储提供者。在上一批次中运行的状态存储提供者不会立即卸载。Spark 会运行一个维护任务,检查并卸载执行器上不活跃的状态存储提供者。

通过更改与任务调度相关的 Spark 配置,例如 spark.locality.wait,用户可以配置 Spark 等待多长时间来启动一个数据本地任务。对于结构化流中的有状态操作,它可用于让状态存储提供者在跨批次的相同执行器上运行。

特别是对于内置的 HDFS 状态存储提供者,用户可以检查状态存储指标,例如 loadedMapCacheHitCountloadedMapCacheMissCount。理想情况下,最好将缓存未命中计数降至最低,这意味着 Spark 不会在加载检查点状态上浪费太多时间。用户可以增加 Spark 本地性等待配置,以避免在不同执行器上跨批次加载状态存储提供者。

状态数据源(实验性)

Apache Spark 提供了一个与流状态相关的数据源,它提供了在检查点中操作状态存储的能力。用户可以使用状态数据源运行批处理查询,以获取现有流式查询的状态可见性。

截至 Spark 4.0,该数据源仅支持读取功能。有关更多详细信息,请参阅状态数据源集成指南

注意:此数据源目前标记为实验性——源选项和行为(输出)可能会有所更改。

启动流式查询

定义好最终结果 DataFrame/Dataset 后,剩下的就是启动流式计算。为此,你必须使用通过 Dataset.writeStream() 返回的 DataStreamWriter (Python/Scala/Java 文档)。你需要在此接口中指定以下一个或多个内容。

输出模式

有几种输出模式。

不同类型的流式查询支持不同的输出模式。下面是兼容性矩阵。

查询类型 支持的输出模式 备注
带有聚合的查询 带水印的事件时间聚合 追加、更新、完成 追加模式使用水印来删除旧的聚合状态。但是,窗口聚合的输出会延迟 withWatermark() 中指定的滞后阈值,因为根据模式语义,行一旦最终确定(即水印被跨越后),只能添加到结果表中一次。有关更多详细信息,请参阅滞后数据部分。

更新模式使用水印来删除旧的聚合状态。

完成模式不删除旧的聚合状态,因为根据定义,此模式保留结果表中的所有数据。
其他聚合 完成、更新 由于未定义水印(仅在其他类别中定义),旧的聚合状态不会被删除。

不支持追加模式,因为聚合可能会更新,从而违反此模式的语义。
带有 mapGroupsWithState 的查询 更新 在带有 mapGroupsWithState 的查询中不允许进行聚合。
带有 flatMapGroupsWithState 的查询 追加操作模式 追加 flatMapGroupsWithState 之后允许进行聚合。
更新操作模式 更新 在带有 flatMapGroupsWithState 的查询中不允许进行聚合。
带有 joins 的查询 追加 更新和完成模式暂不支持。有关支持的连接类型,请参阅连接操作部分中的支持矩阵
其他查询 追加、更新 不支持完成模式,因为将所有未聚合的数据保留在结果表中是不可行的。

输出槽

有几种内置的输出槽。

writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
writeStream
    .foreach(...)
    .start()
writeStream
    .format("console")
    .start()
writeStream
    .format("memory")
    .queryName("tableName")
    .start()

有些槽不是容错的,因为它们不保证输出的持久性,并且仅用于调试目的。请参阅前面关于容错语义的部分。以下是 Spark 中所有槽的详细信息。

支持的输出模式 选项 容错性 备注
文件槽 追加 path: 输出目录的路径,必须指定。
retention: 输出文件的存活时间 (TTL)。提交批次早于 TTL 的输出文件最终将被排除在元数据日志之外。这意味着读取槽输出目录的读取器查询可能不会处理它们。您可以将值提供为时间的字符串格式(例如“12h”、“7d”等)。默认情况下,此功能是禁用的。

有关文件格式特定的选项,请参阅 DataFrameWriter 中的相关方法(Python/Scala/Java/R)。例如,对于“parquet”格式选项,请参阅 DataFrameWriter.parquet()
是(精确一次) 支持写入分区表。按时间分区可能很有用。
Kafka 槽 追加、更新、完成 请参阅Kafka 集成指南 是(至少一次) 更多详细信息请参阅Kafka 集成指南
Foreach 槽 追加、更新、完成 是(至少一次) 更多详细信息请参阅下一节
ForeachBatch 槽 追加、更新、完成 取决于实现 更多详细信息请参阅下一节
控制台槽 追加、更新、完成 numRows: 每次触发时打印的行数(默认值:20)
truncate: 如果输出过长是否截断(默认值:true)
内存槽 追加、完成 否。但在完成模式下,重新启动的查询将重新创建完整表。 表名即查询名称。

请注意,您必须调用 start() 才能实际开始查询的执行。这会返回一个 StreamingQuery 对象,它是持续运行执行的句柄。您可以使用此对象管理查询,我们将在下一小节中讨论。现在,让我们通过一些示例来理解所有这些。

# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")

# Print new data to console
noAggDF \
    .writeStream \
    .format("console") \
    .start()

# Write new data to Parquet files
noAggDF \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .option("path", "path/to/destination/dir") \
    .start()

# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF \
    .writeStream \
    .queryName("aggregates") \
    .outputMode("complete") \
    .format("memory") \
    .start()

spark.sql("select * from aggregates").show()   # interactively query in-memory table
// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table
// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// Print new data to console
noAggDF
  .writeStream()
  .format("console")
  .start();

// Write new data to Parquet files
noAggDF
  .writeStream()
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start();

// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// Print updated aggregations to console
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

// Have all the aggregates in an in-memory table
aggDF
  .writeStream()
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start();

spark.sql("select * from aggregates").show();   // interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# ========== DF with aggregation ==========
aggDF <- count(groupBy(df, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

# Interactively query in-memory table
head(sql("select * from aggregates"))
使用 Foreach 和 ForeachBatch

foreachforeachBatch 操作允许您对流式查询的输出应用任意操作和写入逻辑。它们的使用场景略有不同——foreach 允许对每行应用自定义写入逻辑,而 foreachBatch 允许对每个微批次的输出应用任意操作和自定义逻辑。让我们更详细地了解它们的用法。

ForeachBatch

foreachBatch(...) 允许您指定一个函数,该函数在流式查询的每个微批次的输出数据上执行。从 Spark 2.4 开始,Scala、Java 和 Python 都支持此功能。它接受两个参数:一个包含微批次输出数据的 DataFrame 或 Dataset,以及微批次的唯一 ID。

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // Transform and write batchDF
}.start()
streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long>() {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }
  }
).start();

R 尚不支持。

使用 foreachBatch,您可以执行以下操作。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

注意

Foreach

如果 foreachBatch 不是一个选项(例如,不存在相应的批处理数据写入器,或处于连续处理模式),那么您可以使用 foreach 来表达您的自定义写入逻辑。具体来说,您可以通过将其分为三个方法来表达数据写入逻辑:openprocessclose。从 Spark 2.4 开始,foreach 在 Scala、Java 和 Python 中均可用。

在 Python 中,您可以通过两种方式调用 foreach:在函数中或在对象中。函数提供了一种表达处理逻辑的简单方法,但当故障导致某些输入数据重新处理时,它不允许您对生成的数据进行去重。对于这种情况,您必须在对象中指定处理逻辑。

  • 首先,函数将一行作为输入。
def process_row(row):
    # Write row to storage
    pass

query = streamingDF.writeStream.foreach(process_row).start()
  • 其次,对象具有一个 process 方法以及可选的 open 和 close 方法。
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        pass

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        pass

    def close(self, error):
        # Close the connection. This method in optional in Python.
        pass

query = streamingDF.writeStream.foreach(ForeachWriter()).start()

在 Scala 中,您必须扩展类 ForeachWriter (文档)。

streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String): Unit = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

在 Java 中,您必须扩展类 ForeachWriter (文档)。

streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter<String>() {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

R 尚不支持。

执行语义 当流式查询启动时,Spark 以以下方式调用函数或对象的方法:

流表 API

从 Spark 3.1 开始,您还可以使用 DataStreamReader.table() 将表读取为流式 DataFrame,并使用 DataStreamWriter.toTable() 将流式 DataFrame 写入为表。

spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()
val spark: SparkSession = ...

// Create a streaming DataFrame
val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load()

// Write the streaming DataFrame to a table
df.writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable")

// Check the table result
spark.read.table("myTable").show()

// Transform the source dataset and write to a new table
spark.readStream
  .table("myTable")
  .select("value")
  .writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable")

// Check the new table result
spark.read.table("newTable").show()
SparkSession spark = ...

// Create a streaming DataFrame
Dataset<Row> df = spark.readStream()
  .format("rate")
  .option("rowsPerSecond", 10)
  .load();

// Write the streaming DataFrame to a table
df.writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable");

// Check the table result
spark.read().table("myTable").show();

// Transform the source dataset and write to a new table
spark.readStream()
  .table("myTable")
  .select("value")
  .writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable");

// Check the new table result
spark.read().table("newTable").show();

R 中不可用。

有关更多详细信息,请查阅 DataStreamReader(Python/Scala/Java 文档)和 DataStreamWriter(Python/Scala/Java 文档)的文档。

触发器

流式查询的触发器设置定义了流式数据处理的时机,无论是作为具有固定批处理间隔的微批次查询执行,还是作为连续处理查询执行。以下是支持的不同类型的触发器。

触发器类型 描述
未指定(默认) 如果未明确指定触发器设置,则默认情况下,查询将在微批次模式下执行,一旦前一个微批次完成处理,就会生成微批次。
固定间隔微批次 查询将以微批次模式执行,微批次将按用户指定的间隔启动。
  • 如果前一个微批次在间隔内完成,则引擎将等待间隔结束才会启动下一个微批次。
  • 如果前一个微批次完成所需时间超过间隔(即错过了间隔边界),则下一个微批次将在前一个完成时立即开始(即,它不会等待下一个间隔边界)。
  • 如果没有新数据可用,则不会启动微批次。
一次性微批次(已弃用) 查询将执行仅一个微批次来处理所有可用数据,然后自行停止。这在您希望定期启动集群、处理自上一周期以来所有可用数据,然后关闭集群的场景中很有用。在某些情况下,这可能会显著节省成本。请注意,此触发器已弃用,建议用户迁移到即时可用微批次,因为它提供了更好的处理保证、细粒度的批次扩展以及更好的水印推进渐进处理(包括无数据批次)。
即时可用微批次 与查询的一次性微批次触发器类似,查询将处理所有可用数据,然后自行停止。不同之处在于,它将根据源选项(例如,文件源的 maxFilesPerTriggermaxBytesPerTrigger)以(可能)多个微批次处理数据,这将带来更好的查询可伸缩性。
  • 此触发器提供了强大的处理保证:无论上次运行中剩余多少批次,它都确保在终止之前处理执行时所有可用数据。所有未提交的批次将首先被处理。
  • 水印会随每个批次推进,如果最后一个批次推进了水印,则无数据批次会在终止前执行。这有助于保持更小、可预测的状态大小,并减少有状态运算符输出的延迟。
注意:当存在任何不支持 Trigger.AvailableNow 的源时,此触发器将被停用。Spark 将回退到执行一次性微批次。请检查上述差异以了解回退的风险。
连续处理,带固定检查点间隔
(实验性)
查询将以新的低延迟、连续处理模式执行。有关更多信息,请参阅下面的连续处理部分

以下是一些代码示例。

# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
  .format("console") \
  .start()

# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

# One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

# Available-now trigger
df.writeStream \
  .format("console") \
  .trigger(availableNow=True) \
  .start()

# Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start()

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start();

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start();

// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start();

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start();

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start();
# Default trigger (runs micro-batch as soon as it can)
write.stream(df, "console")

# ProcessingTime trigger with two-seconds micro-batch interval
write.stream(df, "console", trigger.processingTime = "2 seconds")

# One-time trigger
write.stream(df, "console", trigger.once = TRUE)

# Continuous trigger is not yet supported

管理流式查询

查询启动时创建的 StreamingQuery 对象可用于监视和管理查询。

query = df.writeStream.format("console").start()   # get the query object

query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart

query.name()        # get the name of the auto-generated or user-specified name

query.explain()   # print detailed explanations of the query

query.stop()      # stop the query

query.awaitTermination()   # block until query is terminated, with stop() or with error

query.exception()       # the exception if the query has been terminated with error

query.recentProgress  # a list of the most recent progress updates for this query

query.lastProgress    # the most recent progress update of this streaming query
val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query
StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query
query <- write.stream(df, "console")  # get the query object

queryName(query)          # get the name of the auto-generated or user-specified name

explain(query)            # print detailed explanations of the query

stopQuery(query)          # stop the query

awaitTermination(query)   # block until query is terminated, with stop() or with error

lastProgress(query)       # the most recent progress update of this streaming query

您可以在单个 SparkSession 中启动任意数量的查询。它们都将并发运行并共享集群资源。您可以使用 sparkSession.streams() 获取 StreamingQueryManagerPython/Scala/Java 文档),它可用于管理当前活动的查询。

spark = ...  # spark session

spark.streams.active  # get the list of currently active streaming queries

spark.streams.get(id)  # get a query object by its unique id

spark.streams.awaitAnyTermination()  # block until any one of them terminates
val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates
SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates
Not available in R.

监控流式查询

有多种方法可以监视活动的流式查询。您可以使用 Spark 的 Dropwizard 指标支持将指标推送到外部系统,或以编程方式访问它们。

交互式读取指标

您可以使用 streamingQuery.lastProgress()streamingQuery.status() 直接获取活动查询的当前状态和指标。lastProgress()ScalaJava 中返回一个 StreamingQueryProgress 对象,在 Python 中返回一个包含相同字段的字典。它包含流的上次触发中所有进度信息——处理了哪些数据、处理速率、延迟等。还有 streamingQuery.recentProgress,它返回最近几次进度的数组。

此外,streamingQuery.status()ScalaJava 中返回一个 StreamingQueryStatus 对象,在 Python 中返回一个包含相同字段的字典。它提供了查询正在立即执行的操作信息——触发器是否处于活动状态,数据是否正在处理等。

以下是一些示例。

query = ...  # a StreamingQuery
print(query.lastProgress)

'''
Will print something like the following.

{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''

print(query.status)
'''
Will print something like the following.

{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


println(query.status)

/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
StreamingQuery query = ...

System.out.println(query.lastProgress());
/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


System.out.println(query.status());
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
query <- ...  # a StreamingQuery
lastProgress(query)

'''
Will print something like the following.

{
  "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
  "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
  "name" : null,
  "timestamp" : "2017-04-26T08:27:28.835Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 1
  },
  "stateOperators" : [ {
    "numRowsTotal" : 4,
    "numRowsUpdated" : 0
  } ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9999]",
    "startOffset" : 1,
    "endOffset" : 1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
  }
}
'''

status(query)
'''
Will print something like the following.

{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
'''

使用异步 API 以编程方式报告指标

您还可以通过附加 StreamingQueryListenerPython/Scala/Java 文档)来异步监视与 SparkSession 关联的所有查询。一旦您使用 sparkSession.streams.addListener() 附加您的自定义 StreamingQueryListener 对象,当查询启动和停止以及活动查询取得进展时,您将收到回调。这是一个示例,

spark = ...

class Listener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("Query started: " + queryStarted.id)

    def onQueryProgress(self, event):
        print("Query made progress: " + queryProgress.progress)

    def onQueryTerminated(self, event):
        print("Query terminated: " + queryTerminated.id)


spark.streams.addListener(Listener())
val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
Not available in R.

使用 Dropwizard 报告指标

Spark 支持使用Dropwizard 库报告指标。要启用结构化流式查询的指标报告,您必须在 SparkSession 中明确启用配置 spark.sql.streaming.metricsEnabled

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
sql("SET spark.sql.streaming.metricsEnabled=true")

在此配置启用后,在 SparkSession 中启动的所有查询都将通过 Dropwizard 向已配置的任何(例如 Ganglia、Graphite、JMX 等)报告指标。

通过检查点从故障中恢复

在发生故障或有意关闭的情况下,您可以恢复先前查询的进度和状态,并从上次中断的地方继续。这是通过检查点和预写日志完成的。您可以为查询配置一个检查点位置,查询将所有进度信息(即每个触发器中处理的偏移量范围)和运行中的聚合(例如快速示例中的单词计数)保存到检查点位置。此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在启动查询时在 DataStreamWriter 中设置为一个选项。

aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()
aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()
aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")

流式查询更改后的恢复语义

从相同检查点位置重新启动时,对流式查询中允许的更改类型存在限制。以下是几种不允许更改或更改效果不明确的类型。对于所有这些:

更改类型