状态数据源集成指南
结构化流中的状态数据源指南 (实验性)
概览
状态数据源提供了从检查点操纵状态的功能。
截至 Spark 4.0,状态数据源提供了批量查询的读取功能。包括写入在内的其他功能将在未来的路线图中。
注意:此数据源目前标记为实验性——源选项和行为(输出)可能会有所更改。
从检查点读取状态键值对
状态数据源通过运行单独的批量查询,可以从检查点中的状态存储读取键值对。用户可以利用此功能来满足下面描述的两个主要用例:
- 构建同时检查输出和状态的测试。从输出中推断状态的键值并非易事,而拥有状态的可视性将极大地有助于测试。
- 调查有状态流查询的事件。如果用户观察到不正确的输出并希望追踪其产生原因,则需要具备状态的可视性。
用户可以读取状态存储的一个实例,该实例在大多数情况下与单个有状态操作符匹配。这意味着用户可以期望读取单个有状态操作符状态中的所有键值对。
请注意,可能存在例外情况,例如流-流连接(stream-stream join),它在内部利用了多个状态存储实例。该数据源将内部表示抽象化,并提供了一种用户友好的方法来读取状态。有关流-流连接的更多详细信息,请参阅相关部分。
以批量查询方式读取状态存储(所有默认值)
df = spark \
.read \
.format("statestore") \
.load("<checkpointLocation>")
val df = spark
.read
.format("statestore")
.load("<checkpointLocation>")
Dataset<Row> df = spark
.read()
.format("statestore")
.load("<checkpointLocation>");
源中的每一行都具有以下模式
列 | 类型 | 备注 |
---|---|---|
键 | 结构体 (取决于状态键的类型) | |
值 | 结构体 (取决于状态值的类型) | |
分区ID | 整型 |
键和值的嵌套列很大程度上取决于有状态操作符的输入模式以及操作符的类型。建议用户首先通过 df.schema() / df.printSchema() 查询模式,以了解输出类型。
源必须设置以下选项。
选项 | 值 | 含义 |
---|---|---|
路径 | 字符串 | 指定检查点位置的根目录。您可以通过 option("path", `path`) 或 load(`path`) 指定路径。 |
以下配置是可选的
选项 | 值 | 默认值 | 含义 |
---|---|---|---|
批次ID | 数值 | 最新提交的批次 | 表示要读取的目标批次。当用户想要执行时间旅行(time-travel)时使用此选项。该批次应已提交但尚未清除。 |
操作符ID | 数值 | 0 | 表示要读取的目标操作符。当查询使用多个有状态操作符时使用此选项。 |
存储名称 | 字符串 | DEFAULT | 表示要读取的目标状态存储名称。当有状态操作符使用多个状态存储实例时使用此选项。除流-流连接外,此选项不是必需的。 |
连接侧 | 字符串("left" 或 "right") | (无) | 表示要读取的目标侧。当用户想要从流-流连接读取状态时使用此选项。 |
快照起始批次ID | 数值 | 如果指定,则强制读取此批次ID处的快照,然后重放变更日志直到 'batchId' 或其默认值。请注意,快照批次ID从0开始,等于快照版本ID减1。此选项必须与 'snapshotPartitionId' 一起使用。 | |
快照分区ID | 数值 | 如果指定,将只读取此特定分区。请注意,分区ID从0开始。此选项必须与 'snapshotStartBatchId' 一起使用。 | |
读取变更流 | 布尔值 | false | 如果设置为 true,将读取微批次(microbatch)的状态变更。输出表模式也将有所不同。详细信息可在 "读取微批次的状态变更" 部分找到。此选项必须与 'changeStartBatchId' 选项一起指定。'batchId'、'joinSide'、'snapshotStartBatchId' 和 'snapshotPartitionId' 选项不能与此选项一起使用。 |
变更起始批次ID | 数值 | 表示在读取变更流模式下要读取的第一个批次。此选项要求 'readChangeFeed' 设置为 true。 | |
变更结束批次ID | 数值 | 最新提交的批次ID | 表示在读取变更流模式下要读取的最后一个批次。此选项要求 'readChangeFeed' 设置为 true。 |
状态变量名称 | 字符串 | 要作为此批量查询一部分读取的状态变量名称。如果使用了 transformWithState 操作符,此为必填选项。请注意,目前此选项仅适用于 transformWithState 操作符。 | |
读取已注册计时器 | 布尔值 | false | 如果为 true,用户可以读取 transformWithState 操作符中使用的已注册计时器。请注意,目前此选项仅适用于 transformWithState 操作符。此选项与上面描述的 stateVarName 选项互斥,一次只能使用其中一个。 |
展平集合类型 | 布尔值 | true | 如果为 true,状态变量的集合类型(如列表状态、映射状态等)将被展平。如果为 false,值将以 Spark SQL 中的 Array 或 Map 类型提供。请注意,目前此选项仅适用于 transformWithState 操作符。 |
读取流-流连接的状态
结构化流通过在内部利用多个状态存储实例来实现流-流连接功能。这些实例逻辑上构成缓冲区,用于存储左侧和右侧的输入行。
鉴于对用户来说更易于理解,该数据源提供了“joinSide”选项,用于读取连接特定侧的缓冲输入。为了支持直接读取内部状态存储实例的功能,我们还允许指定“storeName”选项,但限制是“storeName”和“joinSide”不能同时指定。
读取 transformWithState 的状态
TransformWithState 是一种有状态操作符,允许用户在批次之间维护任意状态。为了读取此状态,用户需要在状态数据源读取器查询中提供一些额外选项。此操作符允许在同一查询中使用多个状态变量。但是,由于它们可能具有不同的复合类型和编码格式,因此需要在批量查询中一次读取一个变量。为了实现这一点,用户需要为他们感兴趣读取的状态变量指定 stateVarName
。
通过将选项 readRegisteredTimers
设置为 true,可以读取计时器。这将返回所有跨分组键的已注册计时器。
我们还允许以 2 种格式读取复合类型变量
- 展平:这是默认格式,复合类型被展平为单独的列。
- 非展平:复合类型作为 Spark SQL 中的 Array 或 Map 类型的一个单列返回。
根据您的内存需求,您可以选择最适合您用例的格式。
读取微批次的状态变更
如果我们想了解微批次中状态存储的变更,而不是特定微批次中的整个状态存储,则应使用“readChangeFeed”选项。例如,这是从批次2到最新提交批次读取状态变更的代码。
df = spark \
.read \
.format("statestore") \
.option("readChangeFeed", true) \
.option("changeStartBatchId", 2) \
.load("<checkpointLocation>")
val df = spark
.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
Dataset<Row> df = spark
.read()
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>");
输出模式也将与常规输出不同。
列 | 类型 | 备注 |
---|---|---|
批次ID | 长整型 | |
变更类型 | 字符串 | 有两种可能的值:“update”和“delete”。“update”表示插入不存在的键值对或用新值更新现有键。“delete”记录的“value”字段将为 null。 |
键 | 结构体 (取决于状态键的类型) | |
值 | 结构体 (取决于状态值的类型) | |
分区ID | 整型 |
状态元数据源
在通过状态数据源查询现有检查点中的状态之前,用户可能希望了解检查点的信息,特别是关于状态操作符的信息。这包括检查点中可用的操作符和状态存储实例、可用的批次ID范围等。
结构化流提供了一个名为“状态元数据源”的数据源,用于从检查点提供与状态相关的元数据信息。
注意:元数据是在流式查询以 Spark 4.0+ 运行时构建的。使用较低 Spark 版本运行的现有检查点不包含元数据,并且无法通过此元数据源进行查询/使用。在查询之前,需要运行指向 Spark 4.0+ 中现有检查点的流式查询来构建元数据。用户可以选择性地提供 batchId 以获取特定时间点的操作符元数据。
为批量查询创建状态元数据存储
df = spark \
.read \
.format("state-metadata") \
.load("<checkpointLocation>")
val df = spark
.read
.format("state-metadata")
.load("<checkpointLocation>")
Dataset<Row> df = spark
.read()
.format("state-metadata")
.load("<checkpointLocation>");
源必须设置以下选项
选项 | 值 | 含义 |
---|---|---|
路径 | 字符串 | 指定检查点位置的根目录。您可以通过 option("path", `path`) 或 load(`path`) 指定路径。 |
以下配置是可选的
选项 | 值 | 默认值 | 含义 |
---|---|---|---|
批次ID | 数值 | 如果可用,则为最新提交的批次,否则为 0 | 可选的 batchId,用于在该批次获取操作符元数据。 |
源中的每一行都具有以下模式
列 | 类型 | 备注 |
---|---|---|
操作符ID | 整型 | |
操作符名称 | 字符串 | |
状态存储名称 | 整型 | |
分区数量 | 整型 | |
最小批次ID | 整型 | 可用于查询状态的最小批次ID。如果正在运行进行检查点的流式查询,该值可能无效,因为清理操作会运行。 |
最大批次ID | 整型 | 可用于查询状态的最大批次ID。如果正在运行进行检查点的流式查询,该值可能无效,因为查询将提交更多批次。 |
操作符属性 | 字符串 | 操作符使用的属性列表,以 JSON 编码。此处生成的输出取决于操作符。 |
_numColsPrefixKey | 整型 | 元数据列(除非通过 SELECT 指定,否则隐藏) |
此数据源的主要用例之一是,如果查询包含多个有状态操作符(例如,流-流连接后跟去重),则识别要查询的 operatorId。'operatorName' 列帮助用户为给定操作符识别 operatorId。
此外,如果用户想查询有状态操作符(例如流-流连接)的内部状态存储实例,'stateStoreName' 列将有助于确定目标。