结构化流式编程指南
- 概述
- 快速示例
- 编程模型
- 使用 Dataset 和 DataFrame 的 API
- 异步进度跟踪
- 连续处理
- 附加信息
- 迁移指南
概述
结构化流式处理是一个构建在 Spark SQL 引擎上的可扩展且容错的流处理引擎。您可以像表达静态数据上的批处理计算一样表达您的流式计算。 Spark SQL 引擎将负责以增量和连续的方式运行它,并在流数据不断到达时更新最终结果。您可以使用 Scala、Java、Python 或 R 中的 Dataset/DataFrame API 来表达流式聚合、事件时间窗口、流到批 Join 等。该计算在相同的优化 Spark SQL 引擎上执行。最后,系统通过检查点和预写日志确保端到端的一次性容错保证。简而言之,结构化流式处理提供快速、可扩展、容错、端到端的一次性流处理,而无需用户考虑流式处理。
在内部,默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流处理为一系列小型批处理作业,从而实现低至 100 毫秒的端到端延迟和一次性容错保证。但是,自 Spark 2.3 以来,我们引入了一种名为连续处理的新型低延迟处理模式,该模式可以实现低至 1 毫秒的端到端延迟,并具有至少一次的保证。无需更改查询中的 Dataset/DataFrame 操作,您就可以根据您的应用程序需求选择模式。
在本指南中,我们将引导您了解编程模型和 API。我们将主要使用默认的微批处理模型来解释这些概念,然后稍后讨论连续处理模型。首先,让我们从一个结构化流式查询的简单示例开始 - 流式字数统计。
快速示例
假设您想维护从监听 TCP 套接字的数据服务器接收到的文本数据的运行字数统计。让我们看看如何使用结构化流式处理来表达这一点。您可以在 Scala/Java/Python/R 中查看完整代码。如果您下载 Spark,您可以直接运行该示例。无论如何,让我们逐步了解该示例并了解其工作原理。首先,我们必须导入必要的类并创建一个本地 SparkSession,这是与 Spark 相关的所有功能的起点。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
import java.util.Iterator;
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.getOrCreate();
sparkR.session(appName = "StructuredNetworkWordCount")
接下来,让我们创建一个流式 DataFrame,它表示从监听 localhost:9999 的服务器接收到的文本数据,并转换 DataFrame 以计算字数。
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
这个 lines
DataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,并且流式文本数据中的每一行都成为该表中的一行。请注意,由于我们只是设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用了两个内置 SQL 函数 - split 和 explode,将每一行拆分为多个包含一个单词的行。此外,我们使用函数 alias
将新列命名为“word”。最后,我们通过按 Dataset 中的唯一值分组并计算它们来定义 wordCounts
DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行字数统计。
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
这个 lines
DataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,并且流式文本数据中的每一行都成为该表中的一行。请注意,由于我们只是设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用 .as[String]
将 DataFrame 转换为 String 的 Dataset,以便我们可以应用 flatMap
操作将每一行拆分为多个单词。生成的 words
Dataset 包含所有单词。最后,我们通过按 Dataset 中的唯一值分组并计算它们来定义 wordCounts
DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行字数统计。
// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// Split the lines into words
Dataset<String> words = lines
.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
这个 lines
DataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,并且流式文本数据中的每一行都成为该表中的一行。请注意,由于我们只是设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用 .as(Encoders.STRING())
将 DataFrame 转换为 String 的 Dataset,以便我们可以应用 flatMap
操作将每一行拆分为多个单词。生成的 words
Dataset 包含所有单词。最后,我们通过按 Dataset 中的唯一值分组并计算它们来定义 wordCounts
DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行字数统计。
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)
# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# Generate running word count
wordCounts <- count(group_by(words, "word"))
这个 lines
SparkDataFrame 表示包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,并且流式文本数据中的每一行都成为该表中的一行。请注意,由于我们只是设置转换,尚未启动它,因此目前没有接收任何数据。接下来,我们使用一个带有两个 SQL 函数的 SQL 表达式 - split 和 explode,将每一行拆分为多个包含一个单词的行。此外,我们将新列命名为“word”。最后,我们通过按 SparkDataFrame 中的唯一值分组并计算它们来定义 wordCounts
SparkDataFrame。请注意,这是一个流式 SparkDataFrame,它表示流的运行字数统计。
我们现在已经设置了流式数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们将其设置为将完整的计数集(由 outputMode("complete")
指定)打印到控制台,每次更新时都会打印。然后使用 start()
启动流式计算。
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")
awaitTermination(query)
执行此代码后,流式计算将在后台启动。query
对象是指向该活动流式查询的句柄,我们已决定使用 awaitTermination()
等待查询终止,以防止进程在查询活动期间退出。
要实际执行此示例代码,您可以将代码编译到您自己的 Spark 应用程序中,或者只需在下载 Spark 后 运行该示例。我们展示后者。您首先需要使用以下命令运行 Netcat(一种在大多数类 Unix 系统中找到的小型实用程序)作为数据服务器:
$ nc -lk 9999
然后,在不同的终端中,您可以使用以下命令启动该示例:
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
然后,在运行 netcat 服务器的终端中输入的任何行都将被计数,并且每秒钟都会在屏幕上打印出来。它看起来像下面这样。
|
|
编程模型
结构化流处理的关键思想是将实时数据流视为一个不断附加数据的表。这导致了一种新的流处理模型,该模型与批量处理模型非常相似。您将把流式计算表达为对静态表的标准类批量查询,而 Spark 将其作为对无界输入表的增量查询来运行。让我们更详细地了解这个模型。
基本概念
将输入数据流视为“输入表”。流中到达的每个数据项都像附加到输入表的新行。
对输入的查询将生成“结果表”。每个触发间隔(例如,每 1 秒),新行会被附加到输入表,这最终会更新结果表。每当结果表更新时,我们都希望将更改后的结果行写入到外部接收器。
“输出”被定义为写入到外部存储的内容。输出可以用不同的模式定义
-
完整模式 - 整个更新后的结果表将写入外部存储。存储连接器决定如何处理整个表的写入。
-
追加模式 - 只有自上次触发后附加到结果表的新行才会被写入外部存储。这仅适用于结果表中的现有行预计不会更改的查询。
-
更新模式 - 只有自上次触发后在结果表中更新的行才会被写入外部存储(自 Spark 2.1.1 起可用)。请注意,这与完整模式的不同之处在于,此模式仅输出自上次触发后已更改的行。如果查询不包含聚合,则它将等效于追加模式。
请注意,每种模式都适用于某些类型的查询。这将在稍后详细讨论。
为了说明此模型的使用,让我们在 快速示例 的上下文中理解该模型。第一个 lines
DataFrame 是输入表,而最终的 wordCounts
DataFrame 是结果表。请注意,对流式 lines
DataFrame 生成 wordCounts
的查询与静态 DataFrame 的查询完全相同。但是,当此查询启动时,Spark 将持续检查来自套接字连接的新数据。如果有新数据,Spark 将运行一个“增量”查询,该查询将先前的运行计数与新数据相结合,以计算更新的计数,如下所示。
请注意,结构化流处理不会具体化整个表。它从流式数据源读取最新的可用数据,以增量方式处理它以更新结果,然后丢弃源数据。它仅保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。
此模型与许多其他流处理引擎显着不同。许多流式处理系统要求用户自己维护运行聚合,因此必须考虑容错和数据一致性(至少一次、至多一次或恰好一次)。在此模型中,当有新数据时,Spark 负责更新结果表,从而使用户免于考虑它。例如,让我们看看此模型如何处理基于事件时间的处理和延迟到达的数据。
处理事件时间和迟到数据
事件时间是嵌入在数据本身中的时间。对于许多应用程序,您可能希望基于此事件时间进行操作。例如,如果您想获取 IoT 设备每分钟生成的事件数,那么您可能想使用生成数据的时间(即数据中的事件时间),而不是 Spark 接收它们的时间。此事件时间在此模型中非常自然地表达 – 来自设备的每个事件都是表中的一行,而事件时间是行中的一个列值。这允许基于窗口的聚合(例如,每分钟的事件数)仅仅是事件时间列上的特殊类型的分组和聚合 – 每个时间窗口是一个组,并且每一行可以属于多个窗口/组。因此,这种基于事件时间窗口的聚合查询可以在静态数据集(例如,来自收集的设备事件日志)以及数据流上一致地定义,从而使用户的生活更加轻松。
此外,此模型自然地处理了基于其事件时间晚于预期到达的数据。由于 Spark 正在更新结果表,因此它可以完全控制在存在延迟数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。自 Spark 2.1 起,我们支持水印,允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。这些将在窗口操作部分中稍后更详细地解释。
容错语义
提供端到端恰好一次的语义是结构化流处理设计背后的关键目标之一。为了实现这一点,我们设计了结构化流处理源、接收器和执行引擎,以可靠地跟踪处理的准确进度,以便它可以处理通过重新启动和/或重新处理进行的任何类型的故障。假定每个流式源都具有偏移量(类似于 Kafka 偏移量或 Kinesis 序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录在每个触发器中处理的数据的偏移量范围。流式接收器被设计为幂等的,用于处理重新处理。总之,通过使用可重放的源和幂等的接收器,结构化流处理可以确保在任何故障下都具有端到端恰好一次的语义。
使用 Dataset 和 DataFrame 的 API
自 Spark 2.0 以来,DataFrames 和 Datasets 可以表示静态、有界的数据,以及流式、无界的数据。与静态 Datasets/DataFrames 类似,您可以使用公共入口点 SparkSession
(Scala/Java/Python/R 文档)从流式源创建流式 DataFrames/Datasets,并将与静态 DataFrames/Datasets 相同的操作应用于它们。如果您不熟悉 Datasets/DataFrames,强烈建议您使用 DataFrame/Dataset 编程指南熟悉它们。
创建流式 DataFrame 和流式 Dataset
可以通过 DataStreamReader
接口(Scala/Java/Python 文档)创建流式 DataFrames,该接口由 SparkSession.readStream()
返回。在 R 中,使用 read.stream()
方法。与用于创建静态 DataFrame 的读取接口类似,您可以指定源的详细信息 – 数据格式、模式、选项等。
输入源
有一些内置的源。
- 文件源 - 将目录中写入的文件读取为数据流。文件将按照文件修改时间顺序进行处理。如果设置了
latestFirst
,则顺序将颠倒。支持的文件格式为 text、CSV、JSON、ORC、Parquet。有关最新列表和每种文件格式支持的选项,请参见 DataStreamReader 接口的文档。请注意,文件必须以原子方式放置在给定的目录中,在大多数文件系统中,这可以通过文件移动操作来实现。 -
Kafka 源 - 从 Kafka 读取数据。它与 Kafka broker 版本 0.10.0 或更高版本兼容。有关更多详细信息,请参见 Kafka 集成指南。
-
套接字源(用于测试) - 从套接字连接读取 UTF8 文本数据。监听服务器套接字位于驱动程序上。请注意,这仅应用于测试,因为它不提供端到端容错保证。
-
速率源(用于测试) - 以指定的每秒行数生成数据,每个输出行包含一个
timestamp
和一个value
。其中timestamp
是一个Timestamp
类型,包含消息分发的时间,value
是一个Long
类型,包含消息计数,从 0 开始作为第一行。此源用于测试和基准测试。 - 每个微批次的速率源(用于测试) - 按照指定的每微批次的行数生成数据,每行输出包含一个
timestamp
和一个value
。其中,timestamp
是一个Timestamp
类型,包含消息的发送时间,而value
是一个Long
类型,包含消息计数,从 0 作为第一行开始。与rate
数据源不同,此数据源为每个微批次提供一组一致的输入行,而与查询执行(触发器的配置、查询延迟等)无关。例如,批次 0 将生成 0~999,批次 1 将生成 1000~1999,依此类推。生成的时间也适用相同的规则。此源用于测试和基准测试。
某些源不是容错的,因为它们不保证在失败后可以使用检查点偏移量重播数据。请参阅前面关于 容错语义 的章节。以下是 Spark 中所有源的详细信息。
源 | 选项 | 容错 | 备注 |
---|---|---|---|
文件源 |
path :输入目录的路径,所有文件格式通用。maxFilesPerTrigger :每次触发器中要考虑的最大新文件数(默认值:无最大值)latestFirst :是否首先处理最新的新文件,当存在大量文件积压时很有用(默认值:false)fileNameOnly :是否仅根据文件名而不是完整路径检查新文件(默认值:false)。如果将其设置为 `true`,则以下文件将被视为同一文件,因为它们的文件名“dataset.txt”相同"file:///dataset.txt" "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" maxFileAge :在忽略之前,可以在此目录中找到的文件的最大存在时间。对于第一批,所有文件将被视为有效。如果将 latestFirst 设置为 `true` 并且设置了 maxFilesPerTrigger ,则将忽略此参数,因为可能忽略有效且应处理的旧文件。最大期限是相对于最新文件的时间戳,而不是当前系统的时间戳指定的。(默认值:1 周)cleanSource :用于清理处理完成后的文件的选项。可用选项为“archive”、“delete”、“off”。如果未提供该选项,则默认值为“off”。 如果提供“archive”,则还必须提供附加选项 sourceArchiveDir 。“sourceArchiveDir”的值在深度上(从根目录开始的目录数)不得与源模式匹配,其中深度是两个路径上的最小深度。这将确保存档文件永远不会包含为新的源文件。例如,假设你提供 '/hello?/spark/*' 作为源模式,则 '/hello1/spark/archive/dir' 不能用作 "sourceArchiveDir" 的值,因为 '/hello?/spark/*' 和 '/hello1/spark/archive' 将匹配。'/hello1/spark' 也不能用作 "sourceArchiveDir" 的值,因为 '/hello?/spark' 和 '/hello1/spark' 将匹配。'/archived/here' 将是可以的,因为它不匹配。 Spark 将按照源文件自己的路径移动源文件。例如,如果源文件的路径是 /a/b/dataset.txt ,而存档目录的路径是 /archived/here ,则该文件将被移动到 /archived/here/a/b/dataset.txt 。注意:通过移动或删除已完成的文件进行存档,都会在每个微批次中引入开销(减慢速度,即使它发生在单独的线程中),因此在启用此选项之前,你需要了解文件系统中每个操作的成本。另一方面,启用此选项将降低列出源文件的成本,这可能是一项昂贵的操作。 已完成的文件清理器中使用的线程数可以使用 spark.sql.streaming.fileSource.cleaner.numThreads 进行配置(默认值:1)。注意 2:启用此选项时,不应从多个源或查询中使用源路径。同样,你必须确保源路径与文件流接收器的输出目录中的任何文件都不匹配。 注意 3:删除和移动操作都是尽力而为。未能删除或移动文件不会使流式查询失败。在某些情况下,Spark 可能无法清理某些源文件 - 例如,应用程序没有正常关闭,或者有太多的文件排队等待清理。 有关特定于文件格式的选项,请参阅 DataStreamReader 中的相关方法(Scala/Java/Python/R)。例如,对于 "parquet" 格式选项,请参阅 DataStreamReader.parquet() 。此外,还有一些会影响某些文件格式的会话配置。有关更多详细信息,请参阅 SQL 编程指南。例如,对于 "parquet",请参阅 Parquet 配置 部分。 |
是 | 支持 glob 路径,但不支持多个逗号分隔的路径/glob。 |
Socket 源 |
host :要连接到的主机,必须指定port :要连接到的端口,必须指定 |
否 | |
速率源 |
rowsPerSecond (例如 100,默认值:1):每秒应生成多少行。rampUpTime (例如 5s,默认值:0s):在生成速度达到 rowsPerSecond 之前要预热多长时间。使用比秒更精细的粒度将被截断为整数秒。numPartitions (例如 10,默认值:Spark 的默认并行度):生成行的分区数。该源将尽最大努力达到 rowsPerSecond ,但查询可能会受到资源限制,并且可以调整 numPartitions 以帮助达到所需的速度。 |
是 | |
每个微批次的速率源(格式:rate-micro-batch) |
rowsPerBatch (例如 100):每个微批次应生成多少行。numPartitions (例如 10,默认值:Spark 的默认并行度):生成行的分区数。startTimestamp (例如 1000,默认值:0):生成时间的起始值。advanceMillisPerBatch (例如 1000,默认值:1000):每个微批次中生成的时间提前量。 |
是 | |
Kafka 源 | 请参阅 Kafka 集成指南。 | 是 | |
以下是一些示例。
spark = SparkSession. ...
# Read text from socket
socketDF = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
socketDF.isStreaming() # Returns True for DataFrames that have streaming sources
socketDF.printSchema()
# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
.readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
val spark: SparkSession = ...
// Read text from socket
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
socketDF.isStreaming // Returns True for DataFrames that have streaming sources
socketDF.printSchema
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
SparkSession spark = ...
// Read text from socket
Dataset<Row> socketDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
socketDF.isStreaming(); // Returns True for DataFrames that have streaming sources
socketDF.printSchema();
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
.readStream()
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory")
sparkR.session(...)
# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)
isStreaming(socketDF) # Returns TRUE for SparkDataFrames that have streaming sources
printSchema(socketDF)
# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")
这些示例生成的是非类型化的流式 DataFrame,这意味着 DataFrame 的模式不会在编译时进行检查,仅在提交查询时在运行时进行检查。某些操作(如 map
、flatMap
等)需要在编译时知道类型。要执行这些操作,你可以使用与静态 DataFrame 相同的方法将这些非类型化的流式 DataFrame 转换为类型化的流式 Dataset。有关更多详细信息,请参阅 SQL 编程指南。此外,本文档稍后将讨论有关受支持的流式源的更多详细信息。
自 Spark 3.1 起,你还可以使用 DataStreamReader.table()
从表中创建流式 DataFrame。 有关更多详细信息,请参见 流式表 API。
流式 DataFrame/Dataset 的模式推断和分区
默认情况下,来自基于文件的源的结构化流需要你指定模式,而不是依赖 Spark 自动推断它。此限制确保将为流式查询使用一致的模式,即使在发生故障的情况下也是如此。 对于临时用例,你可以通过将 spark.sql.streaming.schemaInference
设置为 true
来重新启用模式推断。
当存在名为 /key=value/
的子目录时,会发生分区发现,并且列表将自动递归到这些目录中。 如果这些列出现在用户提供的模式中,则 Spark 将根据读取文件的路径填写它们。 构成分区方案的目录必须在查询开始时存在,并且必须保持静态。 例如,当 /data/year=2015/
存在时,添加 /data/year=2016/
是可以的,但更改分区列(即,通过创建目录 /data/date=2016-04-17/
)是无效的。
流式 DataFrame/Dataset 上的操作
你可以在流式 DataFrame/Dataset 上应用各种操作 - 从非类型化、类似 SQL 的操作(例如,select
、where
、groupBy
)到类型化的类似 RDD 的操作(例如,map
、filter
、flatMap
)。 有关更多详细信息,请参阅 SQL 编程指南。 让我们看一下你可以使用的一些示例操作。
基本操作 - 选择、投影、聚合
DataFrame/Dataset 上的大多数常见操作都支持流式处理。 此部分稍后将讨论一些不支持的操作。
df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("deviceType").count() // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
public class DeviceData {
private String device;
private String deviceType;
private Double signal;
private java.sql.Date time;
...
// Getter and setter methods for each field
}
Dataset<Row> df = ...; // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
.map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());
// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API
// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
.agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df <- ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")
# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))
你还可以将流式 DataFrame/Dataset 注册为临时视图,然后在其上应用 SQL 命令。
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates") # returns another streaming DF
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates") // returns another streaming DF
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates"); // returns another streaming DF
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")
请注意,你可以使用 df.isStreaming
识别 DataFrame/Dataset 是否具有流式数据。
df.isStreaming()
df.isStreaming
df.isStreaming()
isStreaming(df)
你可能需要检查查询的查询计划,因为 Spark 可能会在针对流式数据集解释 SQL 语句期间注入有状态操作。 一旦有状态操作被注入到查询计划中,你可能需要根据有状态操作的注意事项检查你的查询。 (例如,输出模式、水印、状态存储大小维护等)
事件时间上的窗口操作
使用结构化流式处理,在滑动事件时间窗口上的聚合非常简单,并且与分组聚合非常相似。 在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如,计数)。 对于基于窗口的聚合,为行事件时间所属的每个窗口维护聚合值。 让我们用一个图示来理解这一点。
假设我们的 快速示例 被修改,并且流现在包含行以及生成行的时间。 我们不想运行字数统计,而是想在 10 分钟的窗口内统计字数,每 5 分钟更新一次。 也就是说,在 10 分钟窗口 12:00 - 12:10、12:05 - 12:15、12:10 - 12:20 等之间收到的单词的字数统计。 请注意,12:00 - 12:10 表示在 12:00 之后但在 12:10 之前到达的数据。 现在,考虑一个在 12:07 收到的单词。 此单词应将对应于两个窗口 12:00 - 12:10 和 12:05 - 12:15 的计数递增。 因此,计数将按分组键(即,单词)和窗口(可以从事件时间计算)进行索引。
结果表如下所示。
由于此窗口与分组类似,因此在代码中,你可以使用 groupBy()
和 window()
操作来表示窗口聚合。 你可以在 Scala/Java/Python 中查看以下示例的完整代码。
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();
words <- ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts <- count(
groupBy(
words,
window(words$timestamp, "10 minutes", "5 minutes"),
words$word))
处理迟到数据和水印
现在考虑如果其中一个事件延迟到达应用程序会发生什么。 例如,假设在 12:04(即事件时间)生成的单词,应用程序在 12:11 收到。 应用程序应使用 12:04 而不是 12:11 来更新 12:00 - 12:10
窗口的旧计数。 这在我们基于窗口的分组中自然发生——Structured Streaming 可以长时间维护部分聚合的中间状态,以便延迟数据可以正确更新旧窗口的聚合,如下所示。
但是,要运行此查询几天,系统必须限制其累积的内存中间状态量。 这意味着系统需要知道何时可以从内存状态中删除旧的聚合,因为应用程序将不再收到该聚合的延迟数据。 为了实现这一点,在 Spark 2.1 中,我们引入了水印 (watermarking),它使引擎能够自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态。 您可以通过指定事件时间列和数据预期延迟的时间阈值来定义查询的水印。 对于在时间 T
结束的特定窗口,引擎将维护状态并允许延迟数据更新状态,直到 (引擎看到的最大事件时间 - 延迟阈值 > T)
。 换句话说,在阈值范围内的延迟数据将被聚合,但晚于阈值的数据将开始被丢弃(有关确切的保证,请参阅本节后面的语义保证)。 让我们用一个例子来理解这一点。 我们可以使用下面的 withWatermark()
轻松地在之前的示例中定义水印。
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word) \
.count()
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("word"))
.count();
words <- ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
words <- withWatermark(words, "timestamp", "10 minutes")
windowedCounts <- count(
groupBy(
words,
window(words$timestamp, "10 minutes", "5 minutes"),
words$word))
在此示例中,我们正在定义查询水印的列的值为 “timestamp”,并且还定义了 “10 分钟” 作为允许的数据延迟的阈值。 如果此查询在更新输出模式下运行(稍后在输出模式部分中讨论),引擎将继续更新结果表中窗口的计数,直到窗口比水印旧,水印滞后于列 “timestamp” 中的当前事件时间 10 分钟。 这是一个说明。
如图所示,引擎跟踪的最大事件时间是蓝色虚线,并且在每次触发开始时设置为 (最大事件时间 - '10 分钟')
的水印是红线。 例如,当引擎观察到数据 (12:14, dog)
时,它将下一个触发器的水印设置为 12:04
。 此水印允许引擎维护额外的 10 分钟的中间状态,以允许对延迟数据进行计数。 例如,数据 (12:09, cat)
是无序且延迟的,它落在窗口 12:00 - 12:10
和 12:05 - 12:15
中。 由于它仍然领先于触发器中的水印 12:04
,因此引擎仍然维护中间计数作为状态,并正确更新相关窗口的计数。 但是,当水印更新为 12:11
时,窗口 (12:00 - 12:10)
的中间状态被清除,所有后续数据(例如 (12:04, donkey)
)都被认为是“太晚”因此被忽略。 请注意,在每次触发后,更新的计数(即紫色行)将作为触发器输出写入接收器 (sink),如更新模式所指定的那样。
某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。 为了与它们一起工作,我们还支持追加模式,其中只有最终计数写入接收器。 如下图所示。
请注意,在非流式 Dataset 上使用 withWatermark
是无操作。 由于水印不应以任何方式影响任何批处理查询,因此我们将直接忽略它。
与之前的更新模式类似,引擎维护每个窗口的中间计数。 但是,部分计数不会更新到结果表,也不会写入接收器。 引擎等待“10 分钟”以计算延迟日期,然后删除水印 < 窗口的中间状态,并将最终计数附加到结果表/接收器。 例如,只有在水印更新为 12:11
后,窗口 12:00 - 12:10
的最终计数才会附加到结果表。
时间窗口的类型
Spark 支持三种类型的时间窗口:滚动(固定)、滑动和会话。
滚动窗口是一系列固定大小、非重叠和连续的时间间隔。 一个输入只能绑定到一个窗口。
滑动窗口类似于滚动窗口,它们都是“固定大小”的,但如果滑动的持续时间小于窗口的持续时间,则窗口可以重叠,在这种情况下,一个输入可以绑定到多个窗口。
滚动和滑动窗口使用 window
函数,该函数已在上述示例中描述。
与前两种类型相比,会话窗口具有不同的特征。 会话窗口具有动态的窗口长度,具体取决于输入。 会话窗口以输入开始,如果在间隔持续时间内收到后续输入,则会自行扩展。 对于静态间隔持续时间,如果在收到最新输入后间隔持续时间内没有收到任何输入,则会话窗口关闭。
会话窗口使用 session_window
函数。 该函数的使用与 window
函数类似。
events = ... # streaming DataFrame of schema { timestamp: Timestamp, userId: String }
# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window(events.timestamp, "5 minutes"),
events.userId) \
.count()
import spark.implicits._
val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }
// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window($"timestamp", "5 minutes"),
$"userId")
.count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }
// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window(col("timestamp"), "5 minutes"),
col("userId"))
.count();
除了静态值之外,我们还可以提供一个表达式来根据输入行动态指定间隔持续时间。 请注意,间隔持续时间为负数或零的行将从聚合中过滤掉。
使用动态间隔持续时间时,会话窗口的关闭不再取决于最新的输入。 会话窗口的范围是所有事件范围的并集,这些范围由事件开始时间和查询执行期间评估的间隔持续时间决定。
from pyspark.sql import functions as sf
events = ... # streaming DataFrame of schema { timestamp: Timestamp, userId: String }
session_window = session_window(events.timestamp, \
sf.when(events.userId == "user1", "5 seconds") \
.when(events.userId == "user2", "20 seconds").otherwise("5 minutes"))
# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window,
events.userId) \
.count()
import spark.implicits._
val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }
val sessionWindow = session_window($"timestamp", when($"userId" === "user1", "5 seconds")
.when($"userId" === "user2", "20 seconds")
.otherwise("5 minutes"))
// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
Column(sessionWindow),
$"userId")
.count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }
SessionWindow sessionWindow = session_window(col("timestamp"), when(col("userId").equalTo("user1"), "5 seconds")
.when(col("userId").equalTo("user2"), "20 seconds")
.otherwise("5 minutes"))
// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
new Column(sessionWindow),
col("userId"))
.count();
请注意,在流式查询中使用会话窗口时,存在一些限制,如下所示
- 不支持以“更新模式”作为输出模式。
- 除了分组键中的
session_window
之外,至少应有一列。
对于批处理查询,支持全局窗口(仅在分组键中具有 session_window
)。
默认情况下,Spark 不对会话窗口聚合执行部分聚合,因为它需要在分组之前在本地分区中进行额外的排序。 对于每个本地分区的同一组键中只有少量输入行的情况,它效果更好,但对于本地分区中存在大量具有相同组键的输入行的情况,尽管需要额外的排序,但执行部分聚合仍然可以显着提高性能。
您可以启用 spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition
以指示 Spark 执行部分聚合。
时间窗口的时间表示
在某些用例中,需要提取时间窗口的时间表示,以便将需要时间戳的操作应用于时间窗口数据。 一个例子是链式时间窗口聚合,用户想要针对时间窗口定义另一个时间窗口。 例如,有人想要将 5 分钟的时间窗口聚合为 1 小时的滚动时间窗口。
有两种方法可以实现这一点,如下所示
- 使用带有时间窗口列作为参数的
window_time
SQL 函数 - 使用带有时间窗口列作为参数的
window
SQL 函数
window_time
函数将生成一个时间戳,表示时间窗口的时间。 用户可以将结果传递给 window
函数(或任何需要时间戳的地方)的参数,以执行需要时间戳的时间窗口的操作。
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window(window_time($"window"), "1 hour"),
$"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();
// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
functions.window(functions.window_time("window"), "1 hour"),
windowedCounts.col("word")
).count();
window
函数不仅接受时间戳列,还接受时间窗口列。 这对于用户想要应用链式时间窗口聚合的情况特别有用。
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(windowedCounts.window, "1 hour"),
windowedCounts.word
).count()
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();
// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
functions.window("window", "1 hour"),
windowedCounts.col("word")
).count();
水印清理聚合状态的条件
重要的是要注意,必须满足以下条件,水印才能清理聚合查询中的状态(截至 Spark 2.1.1,将来可能会更改)。
-
输出模式必须是追加或更新。完整模式要求保留所有聚合数据,因此无法使用水印来删除中间状态。 有关每种输出模式的语义的详细说明,请参阅输出模式部分。
-
聚合必须具有事件时间列或事件时间列上的
window
。 -
withWatermark
必须在与聚合中使用的 timestamp 列相同的列上调用。 例如,df.withWatermark("time", "1 min").groupBy("time2").count()
在追加输出模式下无效,因为水印是在与聚合列不同的列上定义的。 -
必须在聚合之前调用
withWatermark
,才能使用水印详细信息。 例如,df.groupBy("time").count().withWatermark("time", "1 min")
在追加输出模式下无效。
带有水印的聚合的语义保证
-
“2 小时”的水印延迟(使用
withWatermark
设置)保证引擎永远不会删除任何延迟小于 2 小时的数据。 换句话说,保证聚合任何比当时处理的最新数据延迟少于 2 小时(以事件时间计)的数据。 -
但是,该保证仅在一个方向上严格。 不保证删除延迟超过 2 小时的数据;它可能会或可能不会被聚合。 数据越延迟,引擎处理它的可能性就越小。
Join 操作
Structured Streaming 支持将流式 Dataset/DataFrame 与静态 Dataset/DataFrame 以及另一个流式 Dataset/DataFrame 连接。 流式连接的结果会以增量方式生成,类似于上一节中流式聚合的结果。 在本节中,我们将探讨在上述情况下支持哪些类型的连接(即内部、外部、半连接等)。 请注意,在所有支持的连接类型中,与流式 Dataset/DataFrame 连接的结果与使用包含流中相同数据的静态 Dataset/DataFrame 连接的结果完全相同。
流-静态 Join
自 Spark 2.0 引入以来,Structured Streaming 一直支持流式和静态 DataFrame/Dataset 之间的连接(内部连接和某些类型的外部连接)。 这是一个简单的例子。
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer") # left outer join with a static DF
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer") // left outer join with a static DF
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type"); // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer"); // left outer join with a static DF
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE) # inner equi-join with a static DF
joined <- join(
streamingDf,
staticDf,
streamingDf$value == staticDf$value,
"left_outer") # left outer join with a static DF
请注意,流式-静态连接是无状态的,因此不需要状态管理。 但是,某些类型的流式-静态外部连接尚未受支持。 这些列在本连接部分的末尾。
流-流 Join
在 Spark 2.3 中,我们添加了对流式-流式连接的支持,也就是说,您可以连接两个流式 Datasets/DataFrames。 在两个数据流之间生成连接结果的挑战在于,在任何时间点,数据集的视图对于连接的双方都是不完整的,这使得在输入之间找到匹配项变得更加困难。 从一个输入流接收的任何行都可以与来自另一个输入流的任何未来的、尚未接收的行匹配。 因此,对于两个输入流,我们将过去的输入缓冲为流式状态,以便我们可以将每个未来的输入与过去的输入进行匹配,并相应地生成连接的结果。 此外,与流式聚合类似,我们会自动处理延迟的、乱序的数据,并且可以使用水印限制状态。 让我们讨论不同类型的支持的流式-流式连接以及如何使用它们。
带有可选水印的 Inner Join
支持任何类型的列以及任何类型的连接条件上的内部连接。但是,随着流的运行,流式状态的大小会无限增长,因为所有过去的输入都必须保存,以便任何新的输入都可以与过去的任何输入匹配。为避免无界状态,您必须定义额外的连接条件,以便无限旧的输入无法与未来的输入匹配,因此可以从状态中清除。换句话说,您需要在连接中执行以下额外步骤。
-
在两个输入上定义水印延迟,以便引擎知道输入的延迟程度(类似于流式聚合)
-
在两个输入之间定义事件时间约束,以便引擎可以确定何时不需要一个输入的旧行(即,不会满足时间约束)与另一个输入进行匹配。此约束可以通过以下两种方式之一定义。
-
时间范围连接条件(例如,
...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR
), -
基于事件时间窗口的连接(例如,
...JOIN ON leftTimeWindow = rightTimeWindow
)。
-
让我们通过一个例子来理解这一点。
假设我们想将广告展示流(广告何时展示)与用户点击广告的另一个流连接起来,以关联展示何时导致可盈利的点击。为了允许在此流-流连接中进行状态清理,您必须按如下方式指定水印延迟和时间约束。
-
水印延迟:假设展示和相应的点击在事件时间中最多可以延迟/乱序 2 小时和 3 小时。
-
事件时间范围条件:假设点击可以在相应展示后的 0 秒到 1 小时的时间范围内发生。
代码如下所示。
from pyspark.sql.functions import expr
impressions = spark.readStream. ...
clicks = spark.readStream. ...
# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
# Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
import org.apache.spark.sql.functions.expr
val impressions = spark.readStream. ...
val clicks = spark.readStream. ...
// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
// Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
import static org.apache.spark.sql.functions.expr
Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...
// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
// Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour ")
);
impressions <- read.stream(...)
clicks <- read.stream(...)
# Apply watermarks on event-time columns
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")
# Join with event-time constraints
joined <- join(
impressionsWithWatermark,
clicksWithWatermark,
expr(
paste(
"clickAdId = impressionAdId AND",
"clickTime >= impressionTime AND",
"clickTime <= impressionTime + interval 1 hour"
)))
带有水印的流-流内部连接的语义保证
这类似于水印在聚合上提供的保证。“2 小时”的水印延迟保证引擎永远不会丢弃任何延迟小于 2 小时的数据。但是,延迟超过 2 小时的数据可能会也可能不会被处理。
带有水印的 Outer Join
虽然水印 + 事件时间约束对于内部连接是可选的,但对于外部连接,它们必须指定。这是因为为了在外部连接中生成 NULL 结果,引擎必须知道何时输入行将不会与未来中的任何内容匹配。因此,必须指定水印 + 事件时间约束才能生成正确的结果。因此,带有外部连接的查询看起来很像之前的广告盈利示例,除了会有一个额外的参数指定它是外部连接。
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour "),
"leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
joined <- join(
impressionsWithWatermark,
clicksWithWatermark,
expr(
paste(
"clickAdId = impressionAdId AND",
"clickTime >= impressionTime AND",
"clickTime <= impressionTime + interval 1 hour"),
"left_outer" # can be "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))
带有水印的流-流外部连接的语义保证
外部连接与内部连接在水印延迟以及数据是否会被丢弃方面具有相同的保证。
注意事项
关于如何生成外部结果,需要注意一些重要的特征。
-
外部 NULL 结果将以取决于指定的水印延迟和时间范围条件的延迟生成。 这是因为引擎必须等待这么长时间才能确保没有匹配项,并且将来不会有更多匹配项。
-
在微批处理引擎的当前实现中,水印在微批处理结束时前进,下一个微批处理使用更新后的水印来清理状态并输出外部结果。由于我们仅在新数据要处理时才触发微批处理,因此如果流中没有接收到新数据,则外部结果的生成可能会延迟。简而言之,如果被连接的两个输入流中的任何一个在一段时间内没有收到数据,则外部(左或右)输出可能会延迟。
带有水印的 Semi Join
半连接返回关系的左侧与右侧匹配的值。它也称为左半连接。与外部连接类似,必须为半连接指定水印 + 事件时间约束。这是为了驱逐左侧不匹配的输入行,引擎必须知道左侧的输入行何时不会在将来与右侧的任何内容匹配。
带有水印的流-流半连接的语义保证
半连接与内部连接在水印延迟以及数据是否会被丢弃方面具有相同的保证。
流式查询中 Join 的支持矩阵
左侧输入 | 右侧输入 | 连接类型 | |
---|---|---|---|
静态 | 静态 | 所有类型 | 支持,因为它不是在流式数据上,即使它可能存在于流式查询中 |
流 | 静态 | 内部 | 支持,无状态 |
左外部 | 支持,无状态 | ||
右外部 | 不支持 | ||
完全外部 | 不支持 | ||
左半 | 支持,无状态 | ||
静态 | 流 | 内部 | 支持,无状态 |
左外部 | 不支持 | ||
右外部 | 支持,无状态 | ||
完全外部 | 不支持 | ||
左半 | 不支持 | ||
流 | 流 | 内部 | 支持,可选择指定两侧的水印 + 用于状态清理的时间约束 |
左外部 | 有条件支持,必须指定右侧的水印 + 用于正确结果的时间约束,可选择指定左侧的水印以进行所有状态清理 | ||
右外部 | 有条件支持,必须指定左侧的水印 + 用于正确结果的时间约束,可选择指定右侧的水印以进行所有状态清理 | ||
完全外部 | 有条件支持,必须指定一侧的水印 + 用于正确结果的时间约束,可选择指定另一侧的水印以进行所有状态清理 | ||
左半 | 有条件支持,必须指定右侧的水印 + 用于正确结果的时间约束,可选择指定左侧的水印以进行所有状态清理 | ||
有关支持的连接的更多详细信息
-
连接可以级联,也就是说,你可以做
df1.join(df2, ...).join(df3, ...).join(df4, ....)
。 -
从 Spark 2.4 开始,你只能在查询处于 Append 输出模式时使用连接。其他输出模式尚不支持。
-
你不能在连接之前和之后使用 mapGroupsWithState 和 flatMapGroupsWithState。
在 append 输出模式下,您可以构建一个具有非 map 类操作的查询,例如在连接之前/之后进行聚合、去重、流-流连接。
例如,这是一个在两个流中进行时间窗口聚合,然后进行带有事件时间窗口的流-流连接的例子
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Dataset<Row> clicksWindow = clicksWithWatermark
.groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
.count();
Dataset<Row> impressionsWindow = impressionsWithWatermark
.groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour"))
.count();
clicksWindow.join(impressionsWindow, "window", "inner");
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
这是另一个带有时间范围连接条件的流-流连接,然后是时间窗口聚合的例子
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Dataset<Row> joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour "),
"leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
joined
.groupBy(joined.col("clickAdId"), functions.window(joined.col("clickTime"), "1 hour"))
.count();
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
流式去重
您可以使用事件中的唯一标识符来对数据流中的记录进行去重。这与使用唯一标识符列对静态数据进行去重完全相同。查询将存储来自先前记录的必要数量的数据,以便它可以过滤重复的记录。与聚合类似,您可以使用带有或不带有水印的去重。
-
带有水印 - 如果重复记录到达的时间有一个上限,那么您可以定义一个事件时间列上的水印,并使用 guid 和事件时间列进行去重。查询将使用水印从过去记录中删除旧的状态数据,这些记录预计不再有任何重复项。这限制了查询必须维护的状态量。
-
没有水印 - 由于何时重复记录可能到达没有限制,因此查询将来自所有过去记录的数据存储为状态。
streamingDf = spark.readStream. ...
# Without watermark using guid column
streamingDf.dropDuplicates("guid")
# With watermark using guid and eventTime columns
streamingDf \
.withWatermark("eventTime", "10 seconds") \
.dropDuplicates("guid", "eventTime")
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// Without watermark using guid column
streamingDf.dropDuplicates("guid")
// With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...; // columns: guid, eventTime, ...
// Without watermark using guid column
streamingDf.dropDuplicates("guid");
// With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime");
streamingDf <- read.stream(...)
# Without watermark using guid column
streamingDf <- dropDuplicates(streamingDf, "guid")
# With watermark using guid and eventTime columns
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
特别是对于流式处理,您可以使用事件中的唯一标识符,在水印的时间范围内对数据流中的记录进行去重。例如,如果您将水印的延迟阈值设置为“1 小时”,则在 1 小时内发生的重复事件可以被正确地去重。(有关更多详细信息,请参阅dropDuplicatesWithinWatermark的 API 文档。)
这可以用于处理事件时间列不能作为唯一标识符的一部分的用例,主要是由于相同记录的事件时间以某种方式不同的情况。(例如,非幂等写入器,其中发出事件时间发生在写入时)
建议用户将水印的延迟阈值设置得长于重复事件之间的最大时间戳差异。
此功能要求在流式 DataFrame/Dataset 中设置带有延迟阈值的水印。
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
streamingDf \
.withWatermark("eventTime", "10 hours") \
.dropDuplicatesWithinWatermark("guid")
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark("guid")
Dataset<Row> streamingDf = spark.readStream(). ...; // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark("guid");
处理多个水印的策略
一个流式查询可以有多个联合或连接在一起的输入流。每个输入流可以具有不同的延迟数据阈值,这些阈值需要为有状态操作容忍。您可以使用每个输入流上的withWatermarks("eventTime", delay)
来指定这些阈值。例如,考虑一个在inputStream1
和inputStream2
之间进行流-流连接的查询。
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
在执行查询时,Structured Streaming 会单独跟踪每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择一个全局水印与它们一起使用于有状态操作。默认情况下,选择最小值作为全局水印,因为它确保如果其中一个流落后于其他流(例如,由于上游故障导致其中一个流停止接收数据),则不会意外地将任何数据作为过晚而丢弃。换句话说,全局水印将安全地以最慢的流的速度移动,并且查询输出将相应地延迟。
但是,在某些情况下,即使这意味着丢弃来自最慢流的数据,您可能也希望获得更快的结果。从 Spark 2.4 开始,您可以通过将 SQL 配置spark.sql.streaming.multipleWatermarkPolicy
设置为max
(默认值为min
)来设置多水印策略以选择最大值作为全局水印。这使得全局水印以最快的流的速度移动。但是,作为副作用,来自较慢流的数据将被积极地丢弃。因此,请谨慎使用此配置。
任意有状态操作
许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,您必须跟踪来自事件数据流的会话。要进行此类会话化,您必须将任意类型的数据保存为状态,并在每次触发时使用数据流事件对状态执行任意操作。从 Spark 2.2 开始,可以使用操作mapGroupsWithState
和更强大的操作flatMapGroupsWithState
来完成。这两个操作都允许您将用户定义的代码应用于分组的 Datasets,以更新用户定义的状态。有关更具体的详细信息,请查看 API 文档(Scala/Java)和示例(Scala/Java)。
虽然 Spark 无法检查和强制执行,但状态函数应该根据输出模式的语义来实现。例如,在 Update 模式下,Spark 不希望状态函数发出比当前水印加上允许的延迟记录延迟更旧的行,而在 Append 模式下,状态函数可以发出这些行。
不支持的操作
有一些 DataFrame/Dataset 操作不支持流式 DataFrames/Datasets。其中一些如下。
-
Limit 和 take the first N rows 在流式 Datasets 上不受支持。
-
Distinct 操作在流式 Datasets 上不受支持。
-
排序操作仅在聚合之后且在 Complete 输出模式下才在流式 Datasets 上受支持。
-
流式 Datasets 上的少数类型的外部连接不受支持。有关更多详细信息,请参阅连接操作部分中的支持矩阵。
-
不支持在 Update 和 Complete 模式下对流式 Datasets 链接多个有状态操作。
- 此外,在 Append 模式下,不支持 mapGroupsWithState/flatMapGroupsWithState 操作后接其他有状态操作。
- 一个已知的解决方法是将你的流式查询拆分为多个查询,每个查询只有一个有状态操作,并确保每个查询的端到端精确一次性。对于最后一个查询,确保端到端精确一次性是可选的。
此外,一些 Dataset 方法不能在流式 Dataset 上使用。 它们是会立即运行查询并返回结果的 action,这在流式 Dataset 上没有意义。 相反,这些功能可以通过显式启动流式查询来完成(请参阅下一节)。
-
count()
- 无法从流式 Dataset 返回单个计数。 相反,请使用ds.groupBy().count()
,它返回一个包含运行计数的流式 Dataset。 -
foreach()
- 请改用ds.writeStream.foreach(...)
(请参阅下一节)。 -
show()
- 请改用控制台 sink(请参阅下一节)。
如果你尝试任何这些操作,你将看到一个 AnalysisException
,如 “operation XYZ is not supported with streaming DataFrames/Datasets”。 虽然其中一些操作可能会在 Spark 的未来版本中得到支持,但其他一些操作从根本上来说很难在流式数据上高效地实现。 例如,不支持对输入流进行排序,因为它需要跟踪流中接收到的所有数据。 因此,从根本上来说,很难有效地执行此操作。
状态存储
状态存储是一个版本化的键值存储,提供读取和写入操作。 在 Structured Streaming 中,我们使用状态存储提供程序来处理跨批次的有状态操作。 有两种内置的状态存储提供程序实现。 最终用户也可以通过扩展 StateStoreProvider 接口来实现他们自己的状态存储提供程序。
HDFS 状态存储提供程序
HDFS 后端状态存储提供程序是 [[StateStoreProvider]] 和 [[StateStore]] 的默认实现,其中所有数据首先存储在内存映射中,然后由 HDFS 兼容文件系统中的文件备份。 对存储的所有更新必须以事务方式成组完成,并且每组更新都会增加存储的版本。 这些版本可用于在正确的存储版本上重新执行更新(通过 RDD 操作中的重试),并重新生成存储版本。
RocksDB 状态存储实现
从 Spark 3.2 开始,我们添加了一个新的内置状态存储实现,RocksDB 状态存储提供程序。
如果你的流式查询中有状态操作(例如,流式聚合、流式 dropDuplicates、流-流 joins、mapGroupsWithState 或 flatMapGroupsWithState),并且你希望在状态中维护数百万个键,那么你可能会面临与大型 JVM 垃圾回收 (GC) 暂停相关的问题,从而导致微批处理时间出现很大的变化。 这是因为,通过 HDFSBackedStateStore 的实现,状态数据保存在 executors 的 JVM 内存中,并且大量状态对象给 JVM 造成内存压力,从而导致高 GC 暂停。
在这种情况下,你可以选择使用基于 RocksDB 的更优化的状态管理解决方案。 此解决方案不是将状态保存在 JVM 内存中,而是使用 RocksDB 有效地管理本机内存和本地磁盘中的状态。 此外,Structured Streaming 会自动将对此状态的任何更改保存到你提供的检查点位置,从而提供完全的容错保证(与默认状态管理相同)。
要启用新的内置状态存储实现,请将 spark.sql.streaming.stateStore.providerClass
设置为 org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
。
以下是关于状态存储提供程序的 RocksDB 实例的配置
配置名称 | 描述 | 默认值 |
---|---|---|
spark.sql.streaming.stateStore.rocksdb.compactOnCommit | 我们是否对提交操作的 RocksDB 实例执行范围压缩 | False |
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled | 在 RocksDB StateStore 提交期间是否上传变更日志而不是快照 | False |
spark.sql.streaming.stateStore.rocksdb.blockSizeKB | 每个块中打包的用户数据的大致大小(以 KB 为单位),用于 RocksDB BlockBasedTable,它是 RocksDB 的默认 SST 文件格式。 | 4 |
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB | 块缓存的容量大小(以 MB 为单位)。 | 8 |
spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs | 在 RocksDB 实例的加载操作中,获取锁的等待时间(以毫秒为单位)。 | 60000 |
spark.sql.streaming.stateStore.rocksdb.maxOpenFiles | RocksDB 实例可以使用的打开文件数。 值 -1 表示始终保持打开状态的文件。 如果达到打开文件限制,RocksDB 将从打开文件缓存中逐出条目,关闭这些文件描述符,并从缓存中删除这些条目。 | -1 |
spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad | 我们是否在加载时重置 RocksDB 的所有 ticker 和 histogram 统计信息。 | True |
spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows | 我们是否跟踪状态存储中的总行数。 请参阅 性能方面注意事项 中的详细信息。 | True |
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB | RocksDB 中 MemTable 的最大大小。 值 -1 表示将使用 RocksDB 内部默认值 | -1 |
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber | RocksDB 中 MemTable 的最大数量,包括活动和不可变的。 值 -1 表示将使用 RocksDB 内部默认值 | -1 |
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage | 单个节点上 RocksDB 状态存储实例的总内存使用量是否有限制。 | false |
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB | 单个节点上 RocksDB 状态存储实例的总内存限制(以 MB 为单位)。 | 500 |
spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio | 写缓冲区占用的总内存,作为使用 maxMemoryUsageMB 在单个节点上分配给所有 RocksDB 实例的内存的一部分。 | 0.5 |
spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio | 高优先级池中块占用的总内存,作为使用 maxMemoryUsageMB 在单个节点上分配给所有 RocksDB 实例的内存的一部分。 | 0.1 |
RocksDB 状态存储内存管理
RocksDB 为不同的对象分配内存,例如 memtable、块缓存和过滤器/索引块。 如果不加以限制,多个实例的 RocksDB 内存使用量可能会无限增长,并可能导致 OOM(内存不足)问题。 RocksDB 提供了一种通过使用写缓冲区管理器功能来限制运行在单个节点上的所有 DB 实例的内存使用量的方法。 如果你想限制 Spark Structured Streaming 部署中的 RocksDB 内存使用量,可以通过将 spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage
配置设置为 true
来启用此功能。 你还可以通过将 spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB
值设置为静态数字或节点上可用物理内存的一部分来确定 RocksDB 实例的最大允许内存。 还可以通过将 spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB
和 spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber
设置为所需值来配置单个 RocksDB 实例的限制。 默认情况下,这些设置使用 RocksDB 内部默认值。
RocksDB 状态存储变更日志检查点
在较新版本的 Spark 中,为 RocksDB 状态存储引入了变更日志检查点。 RocksDB State Store 的传统检查点机制是增量快照检查点,其中 RocksDB 实例的 manifest 文件和新生成的 RocksDB SST 文件被上传到持久存储。 变更日志检查点不是上传 RocksDB 实例的数据文件,而是上传自上次检查点以来对状态所做的更改以实现持久性。 为了实现可预测的故障恢复和变更日志修剪,定期在后台持久保存快照。 变更日志检查点避免了捕获和上传 RocksDB 实例快照的成本,并显著降低了流式查询延迟。
默认情况下,变更日志检查点已禁用。 你可以通过将 spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled
配置设置为 true
来启用 RocksDB State Store 变更日志检查点。 变更日志检查点旨在与传统检查点机制向后兼容。 RocksDB 状态存储提供程序为两种检查点机制之间的双向转换提供无缝支持。 这使你可以利用变更日志检查点的性能优势,而无需丢弃旧的状态检查点。 在支持变更日志检查点的 Spark 版本中,你可以通过在 spark 会话中启用变更日志检查点,将流式查询从旧版本的 Spark 迁移到变更日志检查点。 反之亦然,你可以在较新版本的 Spark 中安全地禁用变更日志检查点,然后任何已经运行变更日志检查点的查询将切换回传统检查点。 你需要重新启动流式查询才能应用检查点机制中的更改,但在此过程中不会观察到任何性能下降。
性能方面注意事项
- 你可能想要禁用对总行数的跟踪,以实现 RocksDB 状态存储的更好性能。
跟踪行数会在写入操作上带来额外的查找 - 我们建议你尝试在调整 RocksDB 状态存储时关闭该配置,特别是状态运算符的指标值很大时 - numRowsUpdated
、numRowsRemoved
。
你可以在重启查询期间更改配置,这使你可以更改“可观察性与性能”之间的权衡决策。 如果禁用该配置,则状态中的行数 (numTotalStateRows
) 将报告为 0。
状态存储和任务本地性
有状态操作会为执行器状态存储中的事件存储状态。状态存储占用内存和磁盘空间等资源来存储状态。因此,让状态存储提供者在不同的流式处理批次中在同一个执行器中运行会更有效率。更改状态存储提供者的位置需要加载检查点状态的额外开销。从检查点加载状态的开销取决于外部存储和状态的大小,这往往会损害微批处理的延迟。对于某些用例,例如处理非常大的状态数据,从检查点状态加载新的状态存储提供者可能会非常耗时且效率低下。
Structured Streaming 查询中的有状态操作依赖于 Spark 的 RDD 的首选位置特性,以便在同一个执行器上运行状态存储提供者。如果在下一个批次中,对应的状态存储提供者再次被调度到该执行器上,它可以重用之前的状态,并节省加载检查点状态的时间。
然而,通常首选位置不是硬性要求,Spark 仍然有可能将任务调度到首选执行器之外的执行器。在这种情况下,Spark 将从新的执行器上的检查点状态加载状态存储提供者。在前一个批次中运行的状态存储提供者不会立即卸载。Spark 运行一个维护任务,该任务检查并卸载执行器上不活跃的状态存储提供者。
通过更改与任务调度相关的 Spark 配置,例如 spark.locality.wait
,用户可以配置 Spark 等待启动数据本地任务的时间。对于 Structured Streaming 中的有状态操作,它可以用于让状态存储提供者在不同的批次中在同一个执行器上运行。
特别是对于内置的 HDFS 状态存储提供者,用户可以检查状态存储指标,例如 loadedMapCacheHitCount
和 loadedMapCacheMissCount
。理想情况下,最好是尽量减少缓存未命中计数,这意味着 Spark 不会在加载检查点状态上浪费太多时间。用户可以增加 Spark 本地性等待配置,以避免在不同的批次中在不同的执行器上加载状态存储提供者。
启动流式查询
一旦您定义了最终结果 DataFrame/Dataset,剩下的就是启动流式计算。为此,您必须使用通过 Dataset.writeStream()
返回的 DataStreamWriter
(Scala/Java/Python docs)。您将需要在此接口中指定以下一个或多个内容。
-
输出接收器的详细信息:数据格式、位置等。
-
输出模式:指定写入到输出接收器的内容。
-
查询名称:可选,指定查询的唯一名称以进行标识。
-
触发间隔:可选,指定触发间隔。如果未指定,系统将在上一次处理完成后立即检查是否有新数据可用。如果由于上一次处理尚未完成而错过了触发时间,则系统将立即触发处理。
-
检查点位置:对于可以保证端到端容错的某些输出接收器,指定系统将写入所有检查点信息的位置。这应该是在 HDFS 兼容的容错文件系统中的目录。检查点语义将在下一节中更详细地讨论。
输出模式
有几种类型的输出模式。
-
追加模式 (默认) - 这是默认模式,其中只有自上次触发器以来添加到结果表的新行才会被输出到接收器。这仅支持添加到结果表的行永远不会更改的那些查询。因此,此模式保证每行只会被输出一次(假设容错接收器)。例如,只有
select
、where
、map
、flatMap
、filter
、join
等的查询将支持追加模式。 -
完整模式 - 每次触发后,整个结果表将被输出到接收器。这支持聚合查询。
-
更新模式 - (自 Spark 2.1.1 可用) 只有自上次触发器以来结果表中已更新的行才会被输出到接收器。未来版本中会添加更多信息。
不同类型的流式查询支持不同的输出模式。这是一个兼容性矩阵。
查询类型 | 支持的输出模式 | 备注 | |
---|---|---|---|
带有聚合的查询 | 具有水印的事件时间聚合 | 追加、更新、完整 | 追加模式使用水印来删除旧的聚合状态。但是窗口聚合的输出会延迟在 withWatermark() 中指定的延迟阈值,因为按照模式语义,只有在行被最终确定后(即,在跨越水印之后),才能将行添加到结果表中。有关更多详细信息,请参阅 延迟数据部分。更新模式使用水印来删除旧的聚合状态。 完整模式不删除旧的聚合状态,因为根据定义,此模式会保留结果表中的所有数据。 |
其他聚合 | 完整、更新 | 由于未定义水印(仅在其他类别中定义),因此不会删除旧的聚合状态。 不支持追加模式,因为聚合可以更新,从而违反此模式的语义。 |
|
带有 mapGroupsWithState 的查询 |
更新 | 不允许在带有 mapGroupsWithState 的查询中进行聚合。 |
|
带有 flatMapGroupsWithState 的查询 |
追加操作模式 | 追加 | 允许在 flatMapGroupsWithState 之后进行聚合。 |
更新操作模式 | 更新 | 不允许在带有 flatMapGroupsWithState 的查询中进行聚合。 |
|
带有 joins 的查询 |
追加 | 尚不支持更新和完整模式。 有关支持的连接类型的更多详细信息,请参阅 连接操作部分中的支持矩阵。 | |
其他查询 | 追加、更新 | 不支持完整模式,因为将所有未聚合的数据保留在结果表中是不可行的。 | |
输出接收器
有几种类型的内置输出接收器。
- 文件接收器 - 将输出存储到目录。
writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
- Kafka 接收器 - 将输出存储到一个或多个 Kafka 主题。
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()
- Foreach 接收器 - 对输出中的记录运行任意计算。有关更多详细信息,请参见本节后面的内容。
writeStream
.foreach(...)
.start()
- 控制台接收器(用于调试) - 每次触发时,将输出打印到控制台/标准输出。支持追加和完整输出模式。这应该用于调试目的,适用于低数据量,因为每次触发后,整个输出都会被收集并存储在驱动程序的内存中。
writeStream
.format("console")
.start()
- 内存接收器(用于调试) - 输出以内存表的形式存储在内存中。支持追加和完整输出模式。这应该用于调试目的,适用于低数据量,因为整个输出都会被收集并存储在驱动程序的内存中。因此,请谨慎使用。
writeStream
.format("memory")
.queryName("tableName")
.start()
某些接收器不是容错的,因为它们不保证输出的持久性,仅用于调试目的。请参阅前面关于 容错语义的部分。以下是 Spark 中所有接收器的详细信息。
接收器 | 支持的输出模式 | 选项 | 容错 | 备注 |
---|---|---|---|---|
文件接收器 | 追加 |
path : 输出目录的路径,必须指定。retention : 输出文件的生存时间 (TTL)。 批处理提交早于 TTL 的输出文件最终将在元数据日志中排除。 这意味着读取接收器输出目录的读取器查询可能不会处理它们。 您可以提供时间字符串格式的值。 (例如 "12h"、"7d" 等) 默认情况下,此选项已禁用。有关文件格式特定的选项,请参阅 DataFrameWriter 中的相关方法 (Scala/Java/Python/R)。 例如,对于“parquet”格式选项,请参阅 DataFrameWriter.parquet() |
是(精确一次) | 支持写入分区表。按时间分区可能很有用。 |
Kafka 接收器 | 追加、更新、完整 | 请参阅 Kafka 集成指南 | 是(至少一次) | 更多详细信息请参阅 Kafka 集成指南 |
Foreach 接收器 | 追加、更新、完整 | 无 | 是(至少一次) | 更多详细信息请参阅 下一节 |
ForeachBatch 接收器 | 追加、更新、完整 | 无 | 取决于实现 | 更多详细信息请参阅 下一节 |
控制台接收器 | 追加、更新、完整 |
numRows : 每次触发时要打印的行数 (默认值: 20)truncate : 如果输出太长是否截断输出 (默认值: true) |
否 | |
内存接收器 | 追加、完整 | 无 | 否。但在完整模式下,重新启动的查询将重新创建完整的表。 | 表名是查询名称。 |
请注意,您必须调用 start()
才能实际开始执行查询。这将返回一个 StreamingQuery 对象,该对象是连续运行的执行的句柄。您可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个示例来理解所有这些。
# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console
noAggDF \
.writeStream \
.format("console") \
.start()
# Write new data to Parquet files
noAggDF \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
.start()
# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()
# Print updated aggregations to console
aggDF \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF \
.writeStream \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
.start()
spark.sql("select * from aggregates").show() # interactively query in-memory table
// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")
// Print new data to console
noAggDF
.writeStream
.format("console")
.start()
// Write new data to Parquet files
noAggDF
.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()
// Print updated aggregations to console
aggDF
.writeStream
.outputMode("complete")
.format("console")
.start()
// Have all the aggregates in an in-memory table
aggDF
.writeStream
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show() // interactively query in-memory table
// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");
// Print new data to console
noAggDF
.writeStream()
.format("console")
.start();
// Write new data to Parquet files
noAggDF
.writeStream()
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start();
// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();
// Print updated aggregations to console
aggDF
.writeStream()
.outputMode("complete")
.format("console")
.start();
// Have all the aggregates in an in-memory table
aggDF
.writeStream()
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start();
spark.sql("select * from aggregates").show(); // interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")
# Print new data to console
write.stream(noAggDF, "console")
# Write new data to Parquet files
write.stream(noAggDF,
"parquet",
path = "path/to/destination/dir",
checkpointLocation = "path/to/checkpoint/dir")
# ========== DF with aggregation ==========
aggDF <- count(groupBy(df, "device"))
# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")
# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
# Interactively query in-memory table
head(sql("select * from aggregates"))
使用 Foreach 和 ForeachBatch
foreach
和 foreachBatch
操作允许您对流式查询的输出应用任意操作和写入逻辑。 它们具有略微不同的用例 - 虽然 foreach
允许对每一行进行自定义写入逻辑,但 foreachBatch
允许对每个微批处理的输出进行任意操作和自定义逻辑。 让我们更详细地了解它们的用法。
ForeachBatch
foreachBatch(...)
允许您指定一个函数,该函数在流式查询的每个微批处理的输出数据上执行。 从 Spark 2.4 开始,Scala、Java 和 Python 中都支持此功能。 它接受两个参数:一个 DataFrame 或 Dataset,其中包含微批处理的输出数据,以及微批处理的唯一 ID。
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
pass
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// Transform and write batchDF
}.start()
streamingDatasetOfString.writeStream().foreachBatch(
new VoidFunction2<Dataset<String>, Long>() {
public void call(Dataset<String> dataset, Long batchId) {
// Transform and write batchDF
}
}
).start();
尚不支持 R。
使用 foreachBatch
,您可以执行以下操作。
- 重用现有的批处理数据源 - 对于许多存储系统,可能尚未提供流式接收器,但可能已经存在用于批处理查询的数据写入器。 使用
foreachBatch
,您可以对每个微批处理的输出使用批处理数据写入器。 - 写入到多个位置 - 如果您想将流式查询的输出写入到多个位置,那么您可以简单地多次写入输出 DataFrame/Dataset。 但是,每次写入尝试都可能导致重新计算输出数据(包括可能重新读取输入数据)。 为了避免重新计算,您应该缓存输出 DataFrame/Dataset,将其写入到多个位置,然后取消缓存。 这是一个大纲。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
- 应用其他 DataFrame 操作 - 许多 DataFrame 和 Dataset 操作在流式 DataFrames 中不受支持,因为 Spark 不支持在这些情况下生成增量计划。 使用
foreachBatch
,您可以对每个微批处理输出应用其中一些操作。 但是,您必须自己推理执行该操作的端到端语义。
注意
- 默认情况下,
foreachBatch
仅提供至少一次写入保证。 但是,您可以使用提供给函数的 batchId 作为对输出进行重复数据删除并获得精确一次保证的方法。 foreachBatch
不能与连续处理模式一起使用,因为它从根本上依赖于流式查询的微批处理执行。 如果您以连续模式写入数据,请改用foreach
。
Foreach
如果 foreachBatch
不是一个选项(例如,不存在相应的批处理数据写入器,或者使用连续处理模式),那么您可以使用 foreach
来表达您的自定义写入器逻辑。 具体来说,您可以通过将数据写入逻辑划分为三个方法来表达:open
、process
和 close
。 从 Spark 2.4 开始,foreach
在 Scala、Java 和 Python 中可用。
在 Python 中,您可以通过两种方式调用 foreach:在函数中或在对象中。 函数提供了一种表达处理逻辑的简单方法,但是当故障导致某些输入数据的重新处理时,它不允许您对生成的数据进行重复数据删除。 对于这种情况,您必须在对象中指定处理逻辑。
- 首先,该函数将一行作为输入。
def process_row(row):
# Write row to storage
pass
query = streamingDF.writeStream.foreach(process_row).start()
- 其次,该对象具有一个 process 方法和可选的 open 和 close 方法
class ForeachWriter:
def open(self, partition_id, epoch_id):
# Open connection. This method is optional in Python.
pass
def process(self, row):
# Write row to connection. This method is NOT optional in Python.
pass
def close(self, error):
# Close the connection. This method in optional in Python.
pass
query = streamingDF.writeStream.foreach(ForeachWriter()).start()
在 Scala 中,您必须扩展类 ForeachWriter
(文档)。
streamingDatasetOfString.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// Open connection
}
def process(record: String): Unit = {
// Write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// Close the connection
}
}
).start()
在 Java 中,您必须扩展类 ForeachWriter
(文档)。
streamingDatasetOfString.writeStream().foreach(
new ForeachWriter<String>() {
@Override public boolean open(long partitionId, long version) {
// Open connection
}
@Override public void process(String record) {
// Write string to connection
}
@Override public void close(Throwable errorOrNull) {
// Close the connection
}
}
).start();
尚不支持 R。
执行语义 启动流式查询时,Spark 以下列方式调用该函数或对象的方法
-
此对象的单个副本负责查询中单个任务生成的所有数据。 换句话说,一个实例负责处理以分布式方式生成的数据的一个分区。
-
此对象必须是可序列化的,因为每个任务都将获得所提供对象的一个新的序列化-反序列化副本。 因此,强烈建议在调用 open() 方法之后完成任何用于写入数据的初始化(例如,打开连接或启动事务),这表示任务已准备好生成数据。
-
方法的生命周期如下
-
对于每个具有 partition_id 的分区
-
对于每个 epoch_id 的流式数据批次/轮次
-
调用方法 open(partitionId, epochId)。
-
如果 open(…) 返回 true,则对于分区和批次/轮次中的每一行,都会调用方法 process(row)。
-
调用方法 close(error),其中包含处理行时遇到的错误(如果有)。
-
-
-
-
如果存在 open() 方法并且返回成功(无论返回值如何),则调用 close() 方法(如果存在),除非 JVM 或 Python 进程在中间崩溃。
-
注意: Spark 不保证 (partitionId, epochId) 的相同输出,因此无法使用 (partitionId, epochId) 实现重复数据删除。 例如,由于某些原因,源提供了不同数量的分区,Spark 优化更改了分区数量等等。 有关更多详细信息,请参见 SPARK-28650。 如果您需要在输出上进行重复数据删除,请尝试
foreachBatch
。
流式表 API
从 Spark 3.1 开始,您还可以使用 DataStreamReader.table()
将表读取为流式 DataFrames,并使用 DataStreamWriter.toTable()
将流式 DataFrames 写入为表
spark = ... # spark session
# Create a streaming DataFrame
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 10) \
.load()
# Write the streaming DataFrame to a table
df.writeStream \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.toTable("myTable")
# Check the table result
spark.read.table("myTable").show()
# Transform the source dataset and write to a new table
spark.readStream \
.table("myTable") \
.select("value") \
.writeStream \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.format("parquet") \
.toTable("newTable")
# Check the new table result
spark.read.table("newTable").show()
val spark: SparkSession = ...
// Create a streaming DataFrame
val df = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
// Write the streaming DataFrame to a table
df.writeStream
.option("checkpointLocation", "path/to/checkpoint/dir")
.toTable("myTable")
// Check the table result
spark.read.table("myTable").show()
// Transform the source dataset and write to a new table
spark.readStream
.table("myTable")
.select("value")
.writeStream
.option("checkpointLocation", "path/to/checkpoint/dir")
.format("parquet")
.toTable("newTable")
// Check the new table result
spark.read.table("newTable").show()
SparkSession spark = ...
// Create a streaming DataFrame
Dataset<Row> df = spark.readStream()
.format("rate")
.option("rowsPerSecond", 10)
.load();
// Write the streaming DataFrame to a table
df.writeStream()
.option("checkpointLocation", "path/to/checkpoint/dir")
.toTable("myTable");
// Check the table result
spark.read().table("myTable").show();
// Transform the source dataset and write to a new table
spark.readStream()
.table("myTable")
.select("value")
.writeStream()
.option("checkpointLocation", "path/to/checkpoint/dir")
.format("parquet")
.toTable("newTable");
// Check the new table result
spark.read().table("newTable").show();
在 R 中不可用。
有关更多详细信息,请查看 DataStreamReader(Scala/Java/Python 文档)和 DataStreamWriter(Scala/Java/Python 文档)的文档。
触发器
流式查询的触发器设置定义了流式数据处理的时序,无论该查询将作为具有固定批处理间隔的微批处理查询还是作为连续处理查询执行。 以下是支持的不同类型的触发器。
触发器类型 | 描述 |
---|---|
未指定(默认) | 如果未显式指定触发器设置,则默认情况下,查询将以微批处理模式执行,其中微批处理将在上一个微批处理完成处理后立即生成。 |
固定间隔微批处理 | 该查询将以微批处理模式执行,其中微批处理将以用户指定的时间间隔启动。
|
一次性微批处理(已弃用) | 查询将仅执行一个微批处理以处理所有可用数据,然后自行停止。 这在您想要定期启动集群、处理自上一周期以来可用的所有内容,然后关闭集群的场景中非常有用。 在某些情况下,这可能会带来显着的成本节省。 请注意,此触发器已弃用,建议用户迁移到立即可用微批处理,因为它提供了更好的处理保证、更精细的批处理规模,以及更好地逐步处理水印推进(包括无数据批处理)。 |
立即可用微批处理 | 与查询的一次性微批处理触发器类似,查询将处理所有可用数据,然后自行停止。 不同之处在于,它将根据源选项(例如,文件源的 maxFilesPerTrigger )在(可能)多个微批处理中处理数据,这将带来更好的查询可伸缩性。
|
具有固定检查点间隔的连续模式 (实验性) |
查询将在新的低延迟连续处理模式下执行。 在下面的连续处理部分中阅读有关此内容的更多信息。 |
以下是一些代码示例。
# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
.format("console") \
.start()
# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()
# One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()
# Available-now trigger
df.writeStream \
.format("console") \
.trigger(availableNow=True) \
.start()
# Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()
import org.apache.spark.sql.streaming.Trigger
// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
.start()
// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
// Available-now trigger
df.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start()
// Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()
import org.apache.spark.sql.streaming.Trigger
// Default trigger (runs micro-batch as soon as it can)
df.writeStream
.format("console")
.start();
// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start();
// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start();
// Available-now trigger
df.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start();
// Continuous trigger with one-second checkpointing interval
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start();
# Default trigger (runs micro-batch as soon as it can)
write.stream(df, "console")
# ProcessingTime trigger with two-seconds micro-batch interval
write.stream(df, "console", trigger.processingTime = "2 seconds")
# One-time trigger
write.stream(df, "console", trigger.once = TRUE)
# Continuous trigger is not yet supported
管理流式查询
在启动查询时创建的 StreamingQuery
对象可用于监视和管理查询。
query = df.writeStream.format("console").start() # get the query object
query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId() # get the unique id of this run of the query, which will be generated at every start/restart
query.name() # get the name of the auto-generated or user-specified name
query.explain() # print detailed explanations of the query
query.stop() # stop the query
query.awaitTermination() # block until query is terminated, with stop() or with error
query.exception() # the exception if the query has been terminated with error
query.recentProgress # a list of the most recent progress updates for this query
query.lastProgress # the most recent progress update of this streaming query
val query = df.writeStream.format("console").start() // get the query object
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name // get the name of the auto-generated or user-specified name
query.explain() // print detailed explanations of the query
query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
query.exception // the exception if the query has been terminated with error
query.recentProgress // an array of the most recent progress updates for this query
query.lastProgress // the most recent progress update of this streaming query
StreamingQuery query = df.writeStream().format("console").start(); // get the query object
query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart
query.name(); // get the name of the auto-generated or user-specified name
query.explain(); // print detailed explanations of the query
query.stop(); // stop the query
query.awaitTermination(); // block until query is terminated, with stop() or with error
query.exception(); // the exception if the query has been terminated with error
query.recentProgress(); // an array of the most recent progress updates for this query
query.lastProgress(); // the most recent progress update of this streaming query
query <- write.stream(df, "console") # get the query object
queryName(query) # get the name of the auto-generated or user-specified name
explain(query) # print detailed explanations of the query
stopQuery(query) # stop the query
awaitTermination(query) # block until query is terminated, with stop() or with error
lastProgress(query) # the most recent progress update of this streaming query
您可以在单个 SparkSession 中启动任意数量的查询。 它们将并发运行,共享集群资源。 您可以使用 sparkSession.streams()
来获取 StreamingQueryManager
(Scala/Java/Python 文档),该管理器可用于管理当前活动的查询。
spark = ... # spark session
spark.streams.active # get the list of currently active streaming queries
spark.streams.get(id) # get a query object by its unique id
spark.streams.awaitAnyTermination() # block until any one of them terminates
val spark: SparkSession = ...
spark.streams.active // get the list of currently active streaming queries
spark.streams.get(id) // get a query object by its unique id
spark.streams.awaitAnyTermination() // block until any one of them terminates
SparkSession spark = ...
spark.streams().active(); // get the list of currently active streaming queries
spark.streams().get(id); // get a query object by its unique id
spark.streams().awaitAnyTermination(); // block until any one of them terminates
Not available in R.
监控流式查询
有多种方法可以监视活动的流式查询。 您可以使用 Spark 的 Dropwizard Metrics 支持将指标推送到外部系统,或者以编程方式访问它们。
交互式读取指标
您可以使用 streamingQuery.lastProgress()
和 streamingQuery.status()
直接获取活动查询的当前状态和指标。 lastProgress()
在 Scala 和 Java 中返回一个 StreamingQueryProgress
对象,并在 Python 中返回一个具有相同字段的字典。 它具有有关流的最后一个触发器中取得的进展的所有信息 - 处理了哪些数据,处理速率,延迟等等。 还有 streamingQuery.recentProgress
,它返回最近几个进展的数组。
此外,streamingQuery.status()
在 Scala 和 Java 中返回一个 StreamingQueryStatus
对象,并在 Python 中返回一个具有相同字段的字典。 它提供有关查询正在立即执行的操作的信息 - 触发器是否处于活动状态,数据是否正在处理等等。
以下是一些示例。
query = ... # a StreamingQuery
print(query.lastProgress)
'''
Will print something like the following.
{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''
print(query.status)
'''
Will print something like the following.
{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
val query: StreamingQuery = ...
println(query.lastProgress)
/* Will print something like the following.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
println(query.status)
/* Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
StreamingQuery query = ...
System.out.println(query.lastProgress());
/* Will print something like the following.
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
System.out.println(query.status());
/* Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
query <- ... # a StreamingQuery
lastProgress(query)
'''
Will print something like the following.
{
"id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
"runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
"name" : null,
"timestamp" : "2017-04-26T08:27:28.835Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 0,
"triggerExecution" : 1
},
"stateOperators" : [ {
"numRowsTotal" : 4,
"numRowsUpdated" : 0
} ],
"sources" : [ {
"description" : "TextSocketSource[host: localhost, port: 9999]",
"startOffset" : 1,
"endOffset" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
}
}
'''
status(query)
'''
Will print something like the following.
{
"message" : "Waiting for data to arrive",
"isDataAvailable" : false,
"isTriggerActive" : false
}
'''
使用异步 API 以编程方式报告指标
您还可以通过附加 StreamingQueryListener
(Scala/Java/Python 文档) 异步监视与 SparkSession
关联的所有查询。 一旦您将自定义 StreamingQueryListener
对象与 sparkSession.streams.addListener()
关联,您将在查询启动和停止时以及活动查询取得进展时获得回调。 这是一个例子,
spark = ...
class Listener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: " + queryStarted.id)
def onQueryProgress(self, event):
print("Query made progress: " + queryProgress.progress)
def onQueryTerminated(self, event):
print("Query terminated: " + queryTerminated.id)
spark.streams.addListener(Listener())
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
}
})
SparkSession spark = ...
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());
}
});
Not available in R.
使用 Dropwizard 报告指标
Spark 支持使用 Dropwizard Library 报告指标。 要使结构化流式查询的指标也一起报告,您必须在 SparkSession 中显式启用配置 spark.sql.streaming.metricsEnabled
。
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
sql("SET spark.sql.streaming.metricsEnabled=true")
启用此配置后,在 SparkSession 中启动的所有查询都将通过 Dropwizard 将指标报告给已配置的任何接收器(例如,Ganglia,Graphite,JMX 等)。
通过检查点从故障中恢复
如果发生故障或有意关闭,您可以恢复先前查询的先前进度和状态,并从中断的地方继续。 这是通过检查点和预写日志完成的。 您可以使用检查点位置配置查询,并且查询会将所有进度信息(即,每个触发器中处理的偏移量范围)和正在运行的聚合(例如,快速示例中的字数统计)保存到检查点位置。 此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在 启动查询时将其设置为 DataStreamWriter 中的一个选项。
aggDF \
.writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \
.start()
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
aggDF
.writeStream()
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start();
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")
流式查询更改后的恢复语义
从同一检查点位置重新启动时,允许对流式查询进行的更改类型存在限制。 以下是一些不允许的更改类型,或者更改的效果未明确定义。 对于所有这些
-
术语允许表示您可以进行指定的更改,但是其效果的语义是否明确取决于查询和更改。
-
术语不允许表示您不应进行指定的更改,因为重新启动的查询很可能会因无法预测的错误而失败。
sdf
表示使用 sparkSession.readStream 生成的流式 DataFrame/Dataset。
更改类型
-
输入源的数量或类型(即不同的源)的更改:这是不允许的。
-
输入源参数的更改:是否允许以及更改的语义是否明确取决于源和查询。以下是一些示例。
-
允许添加/删除/修改速率限制:从
spark.readStream.format("kafka").option("subscribe", "topic")
到spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
-
通常不允许更改订阅的主题/文件,因为结果是不可预测的:从
spark.readStream.format("kafka").option("subscribe", "topic")
到spark.readStream.format("kafka").option("subscribe", "newTopic")
-
-
输出接收器类型的更改:允许在一些特定的接收器组合之间进行更改。这需要逐个案例进行验证。以下是一些示例。
-
允许从文件接收器更改为 Kafka 接收器。Kafka 将只看到新的数据。
-
不允许从 Kafka 接收器更改为文件接收器。
-
允许将 Kafka 接收器更改为 foreach,或反之亦然。
-
-
输出接收器参数的更改:是否允许以及更改的语义是否明确取决于接收器和查询。以下是一些示例。
-
不允许更改文件接收器的输出目录:从
sdf.writeStream.format("parquet").option("path", "/somePath")
到sdf.writeStream.format("parquet").option("path", "/anotherPath")
-
允许更改输出主题:从
sdf.writeStream.format("kafka").option("topic", "someTopic")
到sdf.writeStream.format("kafka").option("topic", "anotherTopic")
-
允许更改用户自定义的 foreach 接收器(即
ForeachWriter
代码),但更改的语义取决于代码。
-
-
投影/过滤/类似 Map 操作的更改:允许某些情况。例如
-
允许添加/删除过滤器:从
sdf.selectExpr("a")
到sdf.where(...).selectExpr("a").filter(...)
。 -
允许更改具有相同输出 schema 的投影:从
sdf.selectExpr("stringColumn AS json").writeStream
到sdf.selectExpr("anotherStringColumn AS json").writeStream
-
有条件地允许更改具有不同输出 schema 的投影:只有当输出接收器允许从
"a"
到"b"
的 schema 更改时,才允许从sdf.selectExpr("a").writeStream
到sdf.selectExpr("b").writeStream
的更改。
-
-
有状态操作的更改:流式查询中的某些操作需要维护状态数据,以便持续更新结果。Structured Streaming 自动将状态数据检查点到容错存储(例如,HDFS、AWS S3、Azure Blob storage),并在重启后恢复它。但是,这假设状态数据的 schema 在重启后保持不变。这意味着不允许在重启之间对流式查询的有状态操作进行任何更改(即添加、删除或 schema 修改)。以下是有状态操作的列表,为了确保状态恢复,这些操作的 schema 不应在重启之间更改
-
流式聚合:例如,
sdf.groupBy("a").agg(...)
。不允许更改分组键或聚合的数量或类型。 -
流式去重:例如,
sdf.dropDuplicates("a")
。不允许更改去重列的数量或类型。 -
流-流 Join:例如,
sdf1.join(sdf2, ...)
(即,两个输入都是通过sparkSession.readStream
生成的)。不允许更改 schema 或等值连接列。不允许更改 join 类型(outer 或 inner)。join 条件的其他更改是定义不明确的。 -
任意有状态操作:例如,
sdf.groupByKey(...).mapGroupsWithState(...)
或sdf.groupByKey(...).flatMapGroupsWithState(...)
。不允许更改用户自定义状态的 schema 和超时类型。允许更改用户自定义的状态映射函数内的任何内容,但更改的语义效果取决于用户自定义的逻辑。如果您确实想要支持状态 schema 更改,那么您可以使用支持 schema 迁移的编码/解码方案将复杂的状态数据结构显式地编码/解码为字节。例如,如果您将状态保存为 Avro 编码的字节,那么您可以自由地在查询重启之间更改 Avro 状态 schema,因为二进制状态将始终成功恢复。
-
异步进度跟踪
它是什么?
异步进度跟踪允许流式查询异步并行地检查进度,而不需要像微批处理中那样在实际数据处理中进行,从而减少与维护偏移日志和提交日志相关的延迟。
它是如何工作的?
Structured Streaming 依赖于持久化和管理偏移量作为查询处理的进度指标。偏移量管理操作直接影响处理延迟,因为在这些操作完成之前,无法进行任何数据处理。异步进度跟踪使流式查询能够在不受这些偏移量管理操作影响的情况下检查进度。
如何使用它?
下面的代码片段提供了一个如何使用此功能的示例
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
下表描述了此功能的配置以及与其相关的默认值。
选项 | 值 | 默认值 | 描述 |
---|---|---|---|
asyncProgressTrackingEnabled | true/false | false | 启用或禁用异步进度跟踪 |
asyncProgressTrackingCheckpointIntervalMs | 毫秒 | 1000 | 我们提交偏移量和完成提交的间隔 |
限制
此功能的初始版本具有以下限制
- 异步进度跟踪仅在使用 Kafka Sink 的无状态查询中受支持
- 使用此异步进度跟踪将不支持完全一次的端到端处理,因为在发生故障时,批处理的偏移范围可能会更改。尽管许多接收器(例如 Kafka 接收器)无论如何都不支持完全一次的写入。
关闭设置
关闭异步进度跟踪可能会导致抛出以下异常
java.lang.IllegalStateException: batch x doesn't exist
此外,以下错误消息可能会打印在驱动程序日志中
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
这是因为启用异步进度跟踪后,框架不会像不使用异步进度跟踪那样为每个批处理检查进度。要解决此问题,只需重新启用“asyncProgressTrackingEnabled”并将“asyncProgressTrackingCheckpointIntervalMs”设置为 0,然后运行流式查询,直到至少处理了两个微批处理。现在可以安全地禁用异步进度跟踪,并且重新启动查询应该可以正常进行。
连续处理
[实验性]
连续处理是 Spark 2.3 中引入的一种新的实验性流式执行模式,它支持低延迟(约 1 毫秒)的端到端处理,并具有至少一次的容错保证。与默认的微批处理引擎相比,后者可以实现完全一次的保证,但延迟最多只能达到约 100 毫秒。对于某些类型的查询(如下所述),您可以选择哪种模式来执行它们,而无需修改应用程序逻辑(即,无需更改 DataFrame/Dataset 操作)。
要在连续处理模式下运行受支持的查询,您只需要指定一个连续触发器,并将所需的检查点间隔作为参数。例如,
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.trigger(continuous="1 second") \ # only change in query
.start()
import org.apache.spark.sql.streaming.Trigger
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // only change in query
.start()
import org.apache.spark.sql.streaming.Trigger;
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // only change in query
.start();
1 秒的检查点间隔意味着连续处理引擎将每秒记录查询的进度。生成的检查点与微批处理引擎兼容,因此任何查询都可以使用任何触发器重新启动。例如,以微批处理模式启动的受支持查询可以在连续模式下重新启动,反之亦然。请注意,每次切换到连续模式时,您都将获得至少一次的容错保证。
支持的查询
截至 Spark 2.4,只有以下类型的查询在连续处理模式下受支持。
- 操作:连续模式下仅支持类似 Map 的 Dataset/DataFrame 操作,即仅支持投影(
select
、map
、flatMap
、mapPartitions
等)和选择(where
、filter
等)。- 支持所有 SQL 函数,但聚合函数(因为尚未支持聚合)、
current_timestamp()
和current_date()
除外(使用时间进行确定性计算具有挑战性)。
- 支持所有 SQL 函数,但聚合函数(因为尚未支持聚合)、
- 源:
- Kafka 源:支持所有选项。
- Rate 源:适合测试。连续模式下仅支持的选项是
numPartitions
和rowsPerSecond
。
- 接收器:
- Kafka 接收器:支持所有选项。
- Memory 接收器:适合调试。
- Console 接收器:适合调试。支持所有选项。请注意,控制台将打印您在连续触发器中指定的每个检查点间隔。
有关更多详细信息,请参见输入源和输出接收器部分。虽然控制台接收器适合测试,但使用 Kafka 作为源和接收器可以最好地观察到端到端的低延迟处理,因为这允许引擎处理数据并在输入数据在输入主题中可用后的几毫秒内使结果在输出主题中可用。
注意事项
- 连续处理引擎启动多个长时间运行的任务,这些任务不断从源读取数据,对其进行处理并不断写入接收器。查询所需的任务数取决于查询可以并行从源读取多少个分区。因此,在启动连续处理查询之前,必须确保集群中有足够的内核来并行执行所有任务。例如,如果您从具有 10 个分区的 Kafka 主题中读取数据,则集群必须至少有 10 个内核才能使查询取得进展。
- 停止连续处理流可能会产生虚假的任務終止警告。可以安全地忽略这些警告。
- 目前,没有自动重试失败的任务。任何失败都会导致查询停止,并且需要从检查点手动重新启动。
附加信息
备注
- 在查询运行后,某些配置无法修改。要更改它们,请丢弃检查点并启动新查询。这些配置包括
spark.sql.shuffle.partitions
- 这是由于状态的物理分区:状态是通过将哈希函数应用于键来分区的,因此状态的分区数应保持不变。
- 如果你想为有状态操作运行更少的任务,
coalesce
有助于避免不必要的分区重组。- 在
coalesce
之后,除非发生另一次 shuffle,否则(减少后的)任务数量将保持不变。
- 在
spark.sql.streaming.stateStore.providerClass
: 为了正确读取查询的先前状态,状态存储提供者的类应该保持不变。spark.sql.streaming.multipleWatermarkPolicy
: 修改此项会导致查询包含多个水印时产生不一致的水印值,因此该策略应该保持不变。
进一步阅读
- 查看并运行 Scala/Java/Python/R 示例。
- 关于如何运行 Spark 示例的说明
- 在 Structured Streaming Kafka 集成指南中阅读有关与 Kafka 集成的更多信息
- 在 Spark SQL 编程指南中阅读有关使用 DataFrames/Datasets 的更多详细信息
- 第三方博客文章
演讲
- Spark Summit Europe 2017
- 使用 Apache Spark 中的 Structured Streaming 实现简单、可扩展、容错的流处理 - 第 1 部分幻灯片/视频, 第 2 部分幻灯片/视频
- 深入了解 Structured Streaming 中的有状态流处理 - 幻灯片/视频
- Spark Summit 2016
- Structured Streaming 深入探讨 - 幻灯片/视频
迁移指南
迁移指南现在已存档在此页面上。