结构化流编程指南

异步进度跟踪

什么是它?

异步进度跟踪允许流式查询在微批次内异步地、并行地对实际数据处理进行检查点进度,从而减少与维护偏移日志和提交日志相关的延迟。

Async Progress Tracking

它是如何工作的?

结构化流依赖于持久化和管理偏移量作为查询处理的进度指示器。偏移量管理操作直接影响处理延迟,因为在这些操作完成之前,无法进行数据处理。异步进度跟踪使流式查询能够进行检查点进度,而不受这些偏移量管理操作的影响。

如何使用?

下面的代码片段提供了一个如何使用此功能的示例

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()
val query = stream.writeStream
     .format("kafka")
     .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
     .option("asyncProgressTrackingEnabled", "true")
     .start()

下表描述了此功能的配置及其相关默认值。

选项 默认值 描述
asyncProgressTrackingEnabled true/false false 启用或禁用异步进度跟踪
asyncProgressTrackingCheckpointIntervalMs 毫秒 1000 我们提交偏移量和完成提交的间隔

限制

此功能的初始版本有以下限制

关闭此设置

关闭异步进度跟踪可能会导致抛出以下异常

java.lang.IllegalStateException: batch x doesn't exist

驱动程序日志中也可能打印以下错误消息

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

这是由于以下事实造成的:当启用异步进度跟踪时,框架不会像不使用异步进度跟踪时那样为每个批次设置检查点。要解决此问题,只需重新启用“asyncProgressTrackingEnabled”并将“asyncProgressTrackingCheckpointIntervalMs”设置为 0,然后运行流式查询,直到至少处理了两个微批次。现在可以安全地禁用异步进度跟踪,并且重新启动查询应该会正常进行。

持续处理

[实验性]

持续处理是 Spark 2.3 中引入的一种新的实验性流式执行模式,它能够实现低 (~1 毫秒) 端到端延迟,并提供至少一次的容错保证。将其与默认的微批次处理引擎进行比较,后者可以实现精确一次的保证,但最佳延迟约为 100 毫秒。对于某些类型的查询(如下所述),您可以选择以哪种模式执行它们,而无需修改应用程序逻辑(即无需更改 DataFrame/Dataset 操作)。

要在持续处理模式下运行支持的查询,您只需指定一个带有所需检查点间隔作为参数的持续触发器。例如,

spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load() \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .trigger(continuous="1 second") \     # only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger;

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();

1 秒的检查点间隔意味着持续处理引擎将每秒记录查询的进度。生成的检查点采用与微批次引擎兼容的格式,因此任何查询都可以通过任何触发器重新启动。例如,以微批次模式启动的支持查询可以在持续模式下重新启动,反之亦然。请注意,无论何时切换到持续模式,您都将获得至少一次的容错保证。

支持的查询

截至 Spark 2.4,持续处理模式仅支持以下类型的查询。

有关它们的更多详细信息,请参阅输入源输出接收器部分。虽然 Console 接收器适用于测试,但使用 Kafka 作为源和接收器可以最好地观察端到端低延迟处理,因为这使得引擎能够在输入数据在输入主题中可用后的几毫秒内处理数据并将结果提供给输出主题。

注意事项