结构化流编程指南

概述

TransformWithState 是自 Apache Spark 4.0 发布以来结构化流中新的任意有状态操作符。该操作符是 Scala 中旧的 mapGroupsWithState/flatMapGroupsWithState API 和 Python 中 applyInPandasWithState API 的下一代替代品,用于 Apache Spark 中的任意有状态处理。

该操作符支持一系列功能,例如面向对象的有状态处理器定义、复合类型、基于 TTL 的自动逐出、计时器等,可用于构建业务关键型操作用例。

语言支持

TransformWithState 在 Scala、Java 和 Python 中可用。请注意,在 Python 中,该操作符名称为 transformWithStateInPandas,类似于 Apache Spark 中与其他 Pandas 接口交互的其他操作符。

TransformWithState 查询的组成部分

一个 transformWithState 查询通常包含以下组成部分:

在以下章节中,我们将更详细地介绍上述组成部分。

定义有状态处理器

有状态处理器是用户定义逻辑的核心,用于操作输入事件。有状态处理器通过扩展 StatefulProcessor 类并实现一些方法来定义。

一个典型的有状态处理器处理以下构造:

有状态处理器使用面向对象的范式来定义有状态逻辑。有状态逻辑通过实现以下方法来定义:

上述方法将在操作符作为流查询的一部分执行时由 Spark 查询引擎调用。

另请注意,并非所有类型的操作都在每个方法中都受支持。例如,用户不能在 init 方法中注册计时器。同样,他们不能在 handleExpiredTimer 方法中操作输入行。引擎将检测到不支持/不兼容的操作,并在需要时使查询失败。

使用 StatefulProcessorHandle

上述方法中的许多操作都可以使用 StatefulProcessorHandle 对象来执行。StatefulProcessorHandle 对象提供了与底层状态存储交互的方法。可以通过在 StatefulProcessor 中调用 getHandle 方法来检索此对象。

使用状态变量

状态变量是类特定的成员,用于存储用户状态。它们需要在有状态处理器的 init 方法中声明一次并初始化。

初始化状态变量通常包括以下步骤:

状态变量的类型

状态变量可以是以下类型:

类似于流行编程语言中的集合,状态类型可用于建模针对底层存储层的各种操作进行优化的数据结构。例如,追加操作对 ListState 进行了优化,而点查找操作对 MapState 进行了优化。

提供状态编码器

状态编码器用于序列化和反序列化状态变量。在 Scala 中,如果存在隐式编码器,则可以跳过状态编码器。在 Java 和 Python 中,状态编码器需要显式提供。Spark SQL 编码器默认提供针对基本类型、case 类和 Java Bean 类的内置编码器。

在 Scala 中提供隐式编码器

在 Scala 中,可以为 case 类和基本类型提供隐式编码器。implicits 对象作为 StatefulProcessor 类的一部分提供。在 StatefulProcessor 定义中,用户只需导入隐式值,例如 import implicits._,然后就不需要显式传递编码器类型。

为状态变量提供 TTL

状态变量可以配置一个可选的 TTL(Time-To-Live,生存时间)值。TTL 值用于在指定持续时间后自动逐出状态变量。TTL 值可以作为 Duration 提供。

处理输入行

handleInputRows 方法用于处理属于分组键的输入行,并在需要时发出输出。Spark 查询引擎会为操作符接收到的每个分组键值调用此方法。如果多行属于同一个分组键,则提供的迭代器将包含所有这些行。

处理过期计时器

handleInputRowshandleExpiredTimer 方法中,有状态处理器可以注册计时器以便在稍后时间触发。当有状态处理器设置的计时器过期时,Spark 查询引擎会调用 handleExpiredTimer 方法。此方法对每个过期计时器调用一次。以下是支持的一些计时器属性:

处理初始状态

handleInitialState 方法用于可选地处理初始状态批处理数据帧。初始状态批处理数据帧用于预填充有状态处理器的状态。当初始状态批处理数据帧可用时,Spark 查询引擎会调用此方法。此方法在查询的生命周期中只被调用一次。它在有状态处理器处理任何输入行之前被调用。

综合应用

以下是一个实现停机检测器的 StatefulProcessor 示例。每次为给定键看到新值时,它都会更新 lastSeen 状态值,清除任何现有计时器,并重置一个用于将来的计时器。

当计时器过期时,应用程序会发出自该键上次观察到的事件以来的已用时间。然后它会设置一个新计时器,以便在 10 秒后发出更新。

class DownTimeDetector(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # Define schema for the state value (timestamp)
        state_schema = StructType([StructField("value", TimestampType(), True)])
        self.handle = handle
        # Initialize state to store the last seen timestamp for each key
        self.last_seen = handle.getValueState("last_seen", state_schema)

    def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
        latest_from_existing = self.last_seen.get()
        # Calculate downtime duration
        downtime_duration = timerValues.getCurrentProcessingTimeInMs() - int(latest_from_existing.timestamp() * 1000)
        # Register a new timer for 10 seconds in the future
        self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
        # Yield a DataFrame with the key and downtime duration
        yield pd.DataFrame(
            {
                "id": key,
                "timeValues": str(downtime_duration),
            }
        )

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        # Find the row with the maximum timestamp
        max_row = max((tuple(pdf.iloc[0]) for pdf in rows), key=lambda row: row[1])

        # Get the latest timestamp from existing state or use epoch start if not exists
        if self.last_seen.exists():
            latest_from_existing = self.last_seen.get()
        else:
            latest_from_existing = datetime.fromtimestamp(0)

        # If new data is more recent than existing state
        if latest_from_existing < max_row[1]:
            # Delete all existing timers
            for timer in self.handle.listTimers():
                self.handle.deleteTimer(timer)
            # Update the last seen timestamp
            self.last_seen.update((max_row[1],))

        # Register a new timer for 5 seconds in the future
        self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 5000)

        # Yield an empty DataFrame
        yield pd.DataFrame()

    def close(self) -> None:
        # No cleanup needed
        pass
// The (String, Timestamp) schema represents an (id, time). We want to do downtime
// detection on every single unique sensor, where each sensor has a sensor ID.
class DowntimeDetector(duration: Duration) extends
  StatefulProcessor[String, (String, Timestamp), (String, Duration)] {

  @transient private var _lastSeen: ValueState[Timestamp] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    _lastSeen = getHandle.getValueState[Timestamp]("lastSeen", Encoders.TIMESTAMP, TTLConfig.NONE)
  }

  // The logic here is as follows: find the largest timestamp seen so far. Set a timer for
  // the duration later.
  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Timestamp)],
      timerValues: TimerValues): Iterator[(String, Duration)] = {
    val latestRecordFromNewRows = inputRows.maxBy(_._2.getTime)

    val latestTimestampFromExistingRows = if (_lastSeen.exists()) {
      _lastSeen.get()
    } else {
      new Timestamp(0)
    }

    val latestTimestampFromNewRows = latestRecordFromNewRows._2

    if (latestTimestampFromNewRows.after(latestTimestampFromExistingRows)) {
      // Cancel the one existing timer, since we have a new latest timestamp.
      // We call "listTimers()" just because we don't know ahead of time what
      // the timestamp of the existing timer is.
      getHandle.listTimers().foreach(timer => getHandle.deleteTimer(timer))

      _lastSeen.update(latestTimestampFromNewRows)
      // Use timerValues to schedule a timer using processing time.
      getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + duration.toMillis)
    } else {
      // No new latest timestamp, so no need to update state or set a timer.
    }

    Iterator.empty
  }

  override def handleExpiredTimer(
    key: String,
    timerValues: TimerValues,
    expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, Duration)] = {
      val latestTimestamp = _lastSeen.get()
      val downtimeDuration = new Duration(
        timerValues.getCurrentProcessingTimeInMs() - latestTimestamp.getTime)

      // Register another timer that will fire in 10 seconds.
      // Timers can be registered anywhere but init()
      getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)

      Iterator((key, downtimeDuration))
  }
}

在流查询中使用 StatefulProcessor

现在我们已经定义了 StatefulProcessor,我们可以在流查询中使用它。以下代码片段展示了如何在 Python 和 Scala 的流查询中使用 StatefulProcessor

q = (df.groupBy("key")
  .transformWithStateInPandas(
    statefulProcessor=DownTimeDetector(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="None",
  )
  .writeStream...
  
val query = df.groupBy("key")
  .transformWithState(
    statefulProcessor = new DownTimeDetector(),
    outputMode = OutputMode.Update,
    timeMode = TimeMode.None)
  .writeStream...

状态模式演进

TransformWithState 还允许对管理的状态执行模式演进。这里有两部分:

请注意,模式演进仅在值侧受支持。键侧状态模式演进不受支持。

跨状态变量的演进

该操作符允许在同一流查询的不同运行中添加和删除状态变量。为了删除变量,我们还需要通知引擎,以便清除底层状态。用户可以通过在 StatefulProcessor 的 init 方法中为给定状态变量调用 deleteIfExists 方法来实现此目的。

状态变量内部的演进

该操作符还允许特定状态变量的状态模式进行演进。例如,如果您使用 case 类将状态存储在 ValueState 变量中,那么您可以通过添加/删除/加宽字段来演进此 case 类。我们仅在底层编码格式设置为 Avro 时才支持此类模式演进。为了启用此功能,请将以下 Spark 配置设置为 spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")

Avro 规则中支持以下演进操作:

不支持以下演进操作:

与状态数据源集成

TransformWithState 是一个有状态操作符,允许用户在批次之间维护任意状态。为了读取此状态,用户需要在状态数据源读取器查询中提供一些附加选项。此操作符允许在同一查询中使用多个状态变量。然而,由于它们可能是不同的复合类型和编码格式,因此需要在批处理查询中一次读取一个变量。为了实现这一点,用户需要为他们感兴趣读取的状态变量指定 stateVarName

通过将选项 readRegisteredTimers 设置为 true,可以读取计时器。这将返回所有分组键上注册的计时器。

我们还允许以 2 种格式读取复合类型变量:

根据您的内存需求,您可以选择最适合您用例的格式。有关源选项的更多信息可以在此处找到。