结构化流编程指南
快速示例
假设您想维护一个从监听 TCP 套接字的数据服务器接收的文本数据的运行词计数。让我们看看如何使用结构化流来表达这一点。您可以在 Python/Scala/Java/R 中查看完整代码。如果您下载 Spark,可以直接运行示例。无论如何,让我们逐步讲解这个示例,理解它是如何工作的。首先,我们需要导入必要的类并创建一个本地 SparkSession,它是所有与 Spark 相关功能的起点。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
import java.util.Iterator;
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.getOrCreate();
sparkR.session(appName = "StructuredNetworkWordCount")
接下来,让我们创建一个流式 DataFrame,它表示从监听 localhost:9999 的服务器接收的文本数据,并转换该 DataFrame 以计算词计数。
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
这个 lines
DataFrame 表示一个包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,流式文本数据中的每一行都成为表中的一行。请注意,我们目前尚未接收任何数据,因为我们只是在设置转换,尚未启动它。接下来,我们使用了两个内置 SQL 函数——split 和 explode,将每一行拆分成多个行,每行一个单词。此外,我们使用函数 alias
将新列命名为“word”。最后,我们通过对 Dataset 中的唯一值进行分组并计数来定义 wordCounts
DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行词计数。
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
这个 lines
DataFrame 表示一个包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,流式文本数据中的每一行都成为表中的一行。请注意,我们目前尚未接收任何数据,因为我们只是在设置转换,尚未启动它。接下来,我们使用 .as[String]
将 DataFrame 转换为 String 类型的 Dataset,以便我们可以应用 flatMap
操作将每一行拆分成多个单词。结果的 words
Dataset 包含所有单词。最后,我们通过对 Dataset 中的唯一值进行分组并计数来定义 wordCounts
DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行词计数。
// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// Split the lines into words
Dataset<String> words = lines
.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
这个 lines
DataFrame 表示一个包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,流式文本数据中的每一行都成为表中的一行。请注意,我们目前尚未接收任何数据,因为我们只是在设置转换,尚未启动它。接下来,我们使用 .as(Encoders.STRING())
将 DataFrame 转换为 String 类型的 Dataset,以便我们可以应用 flatMap
操作将每一行拆分成多个单词。结果的 words
Dataset 包含所有单词。最后,我们通过对 Dataset 中的唯一值进行分组并计数来定义 wordCounts
DataFrame。请注意,这是一个流式 DataFrame,它表示流的运行词计数。
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)
# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# Generate running word count
wordCounts <- count(group_by(words, "word"))
这个 lines
SparkDataFrame 表示一个包含流式文本数据的无界表。该表包含一个名为“value”的字符串列,流式文本数据中的每一行都成为表中的一行。请注意,我们目前尚未接收任何数据,因为我们只是在设置转换,尚未启动它。接下来,我们有一个包含两个 SQL 函数——split 和 explode 的 SQL 表达式,用于将每一行拆分成多个行,每行一个单词。此外,我们将新列命名为“word”。最后,我们通过对 SparkDataFrame 中的唯一值进行分组并计数来定义 wordCounts
SparkDataFrame。请注意,这是一个流式 SparkDataFrame,它表示流的运行词计数。
我们现在已经设置了流数据的查询。剩下要做的就是实际开始接收数据并计算计数。为此,我们将其设置为每次更新时将完整的计数集(由 outputMode("complete")
指定)打印到控制台。然后使用 start()
启动流计算。
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")
awaitTermination(query)
这段代码执行后,流计算将在后台启动。query
对象是该活动流查询的句柄,我们决定使用 awaitTermination()
等待查询终止,以防止在查询活动时进程退出。
要实际执行此示例代码,您可以在自己的Spark 应用程序中编译代码,或者在下载 Spark 后直接运行示例。我们展示后者。您首先需要使用以下命令运行 Netcat(大多数类 Unix 系统中都有的一个小型实用程序)作为数据服务器:
$ nc -lk 9999
然后,在另一个终端中,您可以使用以下命令启动示例:
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
然后,在运行 netcat 服务器的终端中输入的任何行都将每秒被计数并打印在屏幕上。它看起来会像这样。
|
|
编程模型
结构化流的关键思想是将实时数据流视为一个不断追加的表。这导致了一种新的流处理模型,它与批处理模型非常相似。您将把流计算表达为像在静态表上那样的标准批处理式查询,而 Spark 将其作为对无界输入表的增量查询来运行。让我们更详细地了解这个模型。
基本概念
将输入数据流视为“输入表”。流上到达的每个数据项都像是被追加到输入表中的新行。
对输入的查询将生成“结果表”。每个触发间隔(例如,每 1 秒),新行都会追加到输入表,最终更新结果表。每当结果表更新时,我们都希望将更改的结果行写入外部接收器。
“输出”定义为写入外部存储的内容。输出可以以不同的模式定义
-
完全模式 (Complete Mode) - 整个更新后的结果表将被写入外部存储。由存储连接器决定如何处理整个表的写入。
-
追加模式 (Append Mode) - 只有自上次触发以来追加到结果表中的新行才会被写入外部存储。这仅适用于结果表中现有行预期不会更改的查询。
-
更新模式 (Update Mode) - 只有自上次触发以来在结果表中更新的行才会被写入外部存储(自 Spark 2.1.1 起可用)。请注意,这与完全模式不同,因为此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,它将等同于追加模式。
请注意,每种模式都适用于特定类型的查询。这将在稍后详细讨论。
为了说明此模型的用法,让我们结合上述快速示例来理解该模型。第一个 lines
DataFrame 是输入表,最终的 wordCounts
DataFrame 是结果表。请注意,在流式 lines
DataFrame 上生成 wordCounts
的查询与在静态 DataFrame 上的查询完全相同。然而,当此查询启动时,Spark 将持续检查套接字连接中的新数据。如果有新数据,Spark 将运行一个“增量”查询,将之前的运行计数与新数据结合起来计算更新后的计数,如下所示。
请注意,结构化流不会具体化整个表。它从流式数据源读取最新的可用数据,增量处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。
此模型与许多其他流处理引擎显著不同。许多流系统要求用户自行维护运行中的聚合,因此必须考虑容错和数据一致性(至少一次、至多一次或精确一次)。在此模型中,当有新数据时,Spark 负责更新结果表,从而减轻了用户考虑这些问题的负担。例如,让我们看看此模型如何处理基于事件时间的处理和延迟到达的数据。
处理事件时间和延迟数据
事件时间是嵌入在数据本身中的时间。对于许多应用程序,您可能希望在此事件时间上进行操作。例如,如果您想获取物联网设备每分钟生成的事件数量,那么您可能希望使用数据生成的时间(即数据中的事件时间),而不是 Spark 接收它们的时间。这种事件时间在此模型中表达起来非常自然——设备发出的每个事件都是表中的一行,事件时间是行中的一个列值。这使得基于窗口的聚合(例如,每分钟的事件数量)成为事件时间列上的一种特殊类型的分组和聚合——每个时间窗口都是一个组,并且每行可以属于多个窗口/组。因此,此类基于事件时间窗口的聚合查询可以一致地定义在静态数据集(例如,从收集的设备事件日志中)和数据流上,从而大大简化了用户的工作。
此外,此模型自然地处理根据事件时间晚于预期到达的数据。由于 Spark 正在更新结果表,它完全控制在有延迟数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。自 Spark 2.1 起,我们支持水印功能,允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。这些将在窗口操作部分中更详细地解释。
容错语义
实现端到端精确一次语义是结构化流设计背后的关键目标之一。为了实现这一目标,我们设计了结构化流的源、接收器和执行引擎,以可靠地跟踪处理的精确进度,从而可以通过重新启动和/或重新处理来处理任何类型的故障。每个流源都被假定具有偏移量(类似于 Kafka 偏移量或 Kinesis 序列号)以跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移量范围。流接收器被设计为幂等的,以处理重新处理。总而言之,通过使用可重放的源和幂等的接收器,结构化流可以在任何故障下确保端到端精确一次语义。