Spark 流式处理 + Kinesis 集成
Amazon Kinesis 是一种完全托管的服务,用于大规模实时处理流式数据。Kinesis 接收器使用 Amazon 在 Amazon 软件许可 (ASL) 下提供的 Kinesis 客户端库 (KCL) 创建输入 DStream。KCL 基于 Apache 2.0 许可的 AWS Java SDK,并通过 Worker、Checkpoint 和 Shard Lease 的概念提供负载均衡、容错和检查点功能。这里我们将解释如何配置 Spark 流式处理以接收来自 Kinesis 的数据。
配置 Kinesis
Kinesis 流可以在一个有效的 Kinesis 端点上设置,每个端点可以根据以下 指南 设置 1 个或多个分片。
配置 Spark 流式处理应用程序
-
链接:对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,将您的流式处理应用程序链接到以下工件(有关更多信息,请参阅主编程指南中的 链接部分)。
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.12 version = 3.5.1
对于 Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的“部署”小节。请注意,通过链接到此库,您将在应用程序中包含 ASL 许可的代码。
-
编程:在流式处理应用程序代码中,导入
KinesisInputDStream
并创建如下所示的字节数组输入 DStreamfrom pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
请参阅 API 文档 和 示例。有关运行示例的说明,请参阅 运行示例 小节。
- CloudWatch 指标级别和维度。有关详细信息,请参阅 有关监控 KCL 的 AWS 文档。默认值为 MetricsLevel.DETAILED
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build();
您也可以提供以下设置。目前仅在 Scala 和 Java 中支持此功能。
- 一个“消息处理函数”,它接受 Kinesis
Record
并返回一个泛型对象T
,如果您想使用Record
中包含的其他数据(如分区键)。
import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([message handler])
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import scala.collection.JavaConverters; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet()) .buildWithMessageHandler([message handler]);
-
streamingContext
:包含 Kinesis 用于将此 Kinesis 应用程序绑定到 Kinesis 流的应用程序名称的 StreamingContext [Kinesis 应用程序名称]
:将在 DynamoDB 表中用于检查点 Kinesis 序列号的应用程序名称。- 应用程序名称对于给定的帐户和区域必须是唯一的。
- 如果表存在但具有不正确的检查点信息(针对不同的流或旧的已过期序列号),则可能会出现临时错误。
-
[Kinesis 流名称]
:此流式处理应用程序将从中提取数据的 Kinesis 流。 -
[端点 URL]
:可以在 此处 找到有效的 Kinesis 端点 URL。 -
[区域名称]
:可以在 此处 找到有效的 Kinesis 区域名称。 -
[检查点间隔]
:Kinesis 客户端库在流中保存其位置的间隔(例如,Duration(2000) = 2 秒)。对于初学者,将其设置为与流式处理应用程序的批处理间隔相同。 -
[初始位置]
:可以是KinesisInitialPositions.TrimHorizon
或KinesisInitialPositions.Latest
或KinesisInitialPositions.AtTimestamp
(有关更多详细信息,请参阅Kinesis 检查点
部分 和Amazon Kinesis API 文档
)。 [消息处理程序]
:一个函数,它接受 KinesisRecord
并输出泛型T
。
在 API 的其他版本中,您也可以直接指定 AWS 访问密钥和密钥。
-
部署:与任何 Spark 应用程序一样,
spark-submit
用于启动您的应用程序。但是,Scala/Java 应用程序和 Python 应用程序的细节略有不同。对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,则将
spark-streaming-kinesis-asl_2.12
及其依赖项打包到应用程序 JAR 中。确保spark-core_2.12
和spark-streaming_2.12
被标记为provided
依赖项,因为这些依赖项已存在于 Spark 安装中。然后使用spark-submit
启动您的应用程序(请参阅主编程指南中的 部署部分)。对于缺少 SBT/Maven 项目管理的 Python 应用程序,可以使用
--packages
将spark-streaming-kinesis-asl_2.12
及其依赖项直接添加到spark-submit
中(请参阅 应用程序提交指南)。也就是说,./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 ...
或者,您也可以从 Maven 存储库 下载 Maven 工件
spark-streaming-kinesis-asl-assembly
的 JAR,并使用--jars
将其添加到spark-submit
中。运行时需要注意的几点
-
Kinesis 数据处理按分区排序,并且每个消息至少处理一次。
-
多个应用程序可以从同一个 Kinesis 流中读取数据。Kinesis 将在 DynamoDB 中维护特定于应用程序的分片和检查点信息。
-
一次只有一个输入 DStream 处理单个 Kinesis 流分片。
-
单个 Kinesis 输入 DStream 可以通过创建多个 KinesisRecordProcessor 线程从 Kinesis 流的多个分片中读取数据。
-
在不同进程/实例中运行的多个输入 DStream 可以从 Kinesis 流中读取数据。
-
您不需要比 Kinesis 流分片数量更多的 Kinesis 输入 DStream,因为每个输入 DStream 都会创建一个至少处理单个分片的 KinesisRecordProcessor 线程。
-
水平扩展是通过添加/删除 Kinesis 输入 DStream(在单个进程中或跨多个进程/实例)来实现的,直到达到前一点中 Kinesis 流分片的总数。
-
Kinesis 输入 DStream 将在所有 DStream 之间平衡负载,即使跨进程/实例也是如此。
-
Kinesis 输入 DStream 将在由于负载变化而导致的重新分片事件(合并和拆分)期间平衡负载。
-
作为最佳实践,建议您尽可能通过过度配置来避免重新分片抖动。
-
每个 Kinesis 输入 DStream 都维护自己的检查点信息。有关更多详细信息,请参阅 Kinesis 检查点部分。
-
Kinesis 流分片数量与输入 DStream 处理期间在 Spark 集群中创建的 RDD 分区/分片数量之间没有关联。这是两种独立的分区方案。
-
运行示例
要运行示例,
-
从 下载站点 下载 Spark 二进制文件。
-
在 AWS 中设置 Kinesis 流(请参阅前面的部分)。请注意 Kinesis 流的名称和创建流的区域对应的端点 URL。
-
使用您的 AWS 凭据设置环境变量
AWS_ACCESS_KEY_ID
和AWS_SECRET_ACCESS_KEY
。 -
在 Spark 根目录中,以以下方式运行示例
./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
这将等待从 Kinesis 流接收数据。
-
要在另一个终端中生成要放入 Kinesis 流的随机字符串数据,请运行关联的 Kinesis 数据生产者。
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
这将每秒将 1000 行每行 10 个随机数推送到 Kinesis 流。然后,运行的示例应该接收并处理这些数据。
记录反聚合
当使用 Kinesis 生产者库 (KPL) 生成数据时,消息可能会被聚合以节省成本。Spark Streaming 会在消费时自动解聚合记录。
Kinesis 检查点
-
每个 Kinesis 输入 DStream 定期将流的当前位置存储在后端 DynamoDB 表中。这使系统能够从故障中恢复并继续从 DStream 停止的地方进行处理。
-
过于频繁地进行检查点会导致 AWS 检查点存储层负载过重,并可能导致 AWS 节流。提供的示例使用随机回退重试策略处理此节流。
-
如果在输入 DStream 启动时不存在 Kinesis 检查点信息,它将从可用的最旧记录 (
KinesisInitialPositions.TrimHorizon
) 开始,或从最新的提示 (KinesisInitialPositions.Latest
) 开始,或(除了 Python)从提供的 UTC 时间戳 (KinesisInitialPositions.AtTimestamp(Date timestamp)
) 所表示的位置开始。这是可配置的。KinesisInitialPositions.Latest
可能会导致错过记录,如果在没有输入 DStream 运行(并且没有存储检查点信息)的情况下将数据添加到流中。KinesisInitialPositions.TrimHorizon
可能会导致重复处理记录,其影响取决于检查点频率和处理幂等性。
Kinesis 重试配置
spark.streaming.kinesis.retry.waitTime
: Kinesis 重试之间的等待时间,以持续时间字符串表示。从 Amazon Kinesis 读取时,用户可能会遇到ProvisionedThroughputExceededException
,当消费速度快于每秒 5 次事务或超过每秒 2 MiB 的最大读取速率时。可以调整此配置以在获取失败时增加获取之间的休眠时间,以减少这些异常。默认值为“100ms”。spark.streaming.kinesis.retry.maxAttempts
: Kinesis 获取的最大重试次数。此配置也可以用于解决上述场景中 KinesisProvisionedThroughputExceededException
。可以增加它以对 Kinesis 读取进行更多重试。默认值为 3。