Spark Streaming + Kinesis 集成
Amazon Kinesis 是一个用于大规模实时处理流数据的全托管服务。Kinesis 接收器使用 Amazon 在 Amazon 软件许可证 (ASL) 下提供的 Kinesis 客户端库 (KCL) 创建输入 DStream。KCL 构建于基于 Apache 2.0 许可证的 AWS Java SDK 之上,并通过 Workers、检查点和分片租约等概念提供负载均衡、容错性、检查点功能。在此,我们解释如何配置 Spark Streaming 以从 Kinesis 接收数据。
配置 Kinesis
Kinesis 流可以按照以下指南,在其中一个有效的 Kinesis 端点处设置,并包含 1 个或更多分片。
配置 Spark Streaming 应用程序
-
链接:对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的流应用程序链接到以下工件(有关更多信息,请参阅主编程指南中的链接部分)。
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.13 version = 4.0.0
对于 Python 应用程序,您在部署应用程序时必须添加上述库及其依赖项。请参阅下面的“部署”小节。请注意,通过链接到此库,您的应用程序将包含 ASL 许可的代码。
-
编程:在流应用程序代码中,导入
KinesisInputDStream
并按如下方式创建字节数组的输入 DStreamfrom pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
请参阅 API 文档和示例。有关运行示例的说明,请参阅运行示例小节。
- CloudWatch 指标级别和维度。有关详细信息,请参阅有关监控 KCL 的 AWS 文档。默认值为
MetricsLevel.DETAILED
。
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build();
您还可以提供以下设置。目前,这仅在 Scala 和 Java 中受支持。
- 一个“消息处理函数”,它接受 Kinesis
Record
并返回通用对象T
,以防您想使用Record
中包含的其他数据,例如分区键。
import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([message handler])
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import scala.collection.JavaConverters; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions( JavaConverters.asScalaSetConverter( KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS ) .asScala().toSet() ) .buildWithMessageHandler([message handler]);
-
streamingContext
: 包含 Kinesis 用来将此 Kinesis 应用程序与 Kinesis 流关联起来的应用程序名称的 StreamingContext [Kinesis app name]
: 将用于在 DynamoDB 表中对 Kinesis 序列号进行检查点操作的应用程序名称。- 应用程序名称对于给定的账户和区域必须是唯一的。
- 如果表存在但检查点信息不正确(例如,指向不同的流,或包含旧的过期序列号),则可能会出现临时错误。
-
[Kinesis stream name]
: 此流应用程序将从中拉取数据的 Kinesis 流。 -
[endpoint URL]
: 有效的 Kinesis 端点 URL 可以在此处找到。 -
[region name]
: 有效的 Kinesis 区域名称可以在此处找到。 -
[checkpoint interval]
: Kinesis 客户端库在流中保存其位置的间隔(例如,Duration(2000) = 2 秒)。首先,将其设置为与流应用程序的批处理间隔相同。 -
[initial position]
: 可以是KinesisInitialPositions.TrimHorizon
或KinesisInitialPositions.Latest
或KinesisInitialPositions.AtTimestamp
(有关详细信息,请参阅Kinesis 检查点
部分和Amazon Kinesis API 文档
)。 [message handler]
: 一个接受 KinesisRecord
并输出通用T
的函数。
在其他 API 版本中,您还可以直接指定 AWS 访问密钥和秘密密钥。
- CloudWatch 指标级别和维度。有关详细信息,请参阅有关监控 KCL 的 AWS 文档。默认值为
-
部署:与任何 Spark 应用程序一样,使用
spark-submit
启动您的应用程序。但是,Scala/Java 应用程序和 Python 应用程序的细节略有不同。对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,请将
spark-streaming-kinesis-asl_2.13
及其依赖项打包到应用程序 JAR 中。确保将spark-core_2.13
和spark-streaming_2.13
标记为provided
依赖项,因为它们已存在于 Spark 安装中。然后使用spark-submit
启动您的应用程序(请参阅主编程指南中的部署部分)。对于缺乏 SBT/Maven 项目管理的 Python 应用程序,可以使用
--packages
将spark-streaming-kinesis-asl_2.13
及其依赖项直接添加到spark-submit
中(请参阅应用程序提交指南)。也就是说,./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 ...
此外,您还可以从 Maven 仓库下载 Maven 工件
spark-streaming-kinesis-asl-assembly
的 JAR,并使用--jars
将其添加到spark-submit
中。运行时要记住的要点
-
Kinesis 数据处理按分区有序,并且每条消息至少处理一次。
-
多个应用程序可以从同一个 Kinesis 流中读取数据。Kinesis 将在 DynamoDB 中维护应用程序特有的分片和检查点信息。
-
一个 Kinesis 流分片一次由一个输入 DStream 处理。
-
单个 Kinesis 输入 DStream 可以通过创建多个 KinesisRecordProcessor 线程来从 Kinesis 流的多个分片读取数据。
-
在不同的进程/实例中运行的多个输入 DStream 可以从 Kinesis 流中读取数据。
-
您所需的 Kinesis 输入 DStream 数量永远不会超过 Kinesis 流分片的数量,因为每个输入 DStream 将创建至少一个处理单个分片的 KinesisRecordProcessor 线程。
-
通过添加/移除 Kinesis 输入 DStream(在单个进程内或跨多个进程/实例)来实现横向扩展 - 最多达到前述 Kinesis 流分片的总数。
-
Kinesis 输入 DStream 将在所有 DStream 之间平衡负载 - 即使跨进程/实例。
-
Kinesis 输入 DStream 将在重新分片事件(由于负载变化导致的合并和拆分)期间平衡负载。
-
作为最佳实践,建议您在可能的情况下通过超额预置来避免重新分片抖动。
-
每个 Kinesis 输入 DStream 维护其自己的检查点信息。有关详细信息,请参阅 Kinesis 检查点部分。
-
Kinesis 流分片的数量与在输入 DStream 处理期间在 Spark 集群中创建的 RDD 分区/分片的数量之间没有关联。这两种是独立的划分方案。
-
运行示例
要运行此示例,
-
从下载站点下载 Spark 二进制文件。
-
在 AWS 中设置 Kinesis 流(参见前面部分)。记下 Kinesis 流的名称以及与创建流的区域对应的端点 URL。
-
使用您的 AWS 凭据设置环境变量
AWS_ACCESS_KEY_ID
和AWS_SECRET_ACCESS_KEY
。 -
在 Spark 根目录中,运行示例如下
./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
这将等待从 Kinesis 流接收数据。
-
要在 Kinesis 流中生成随机字符串数据,请在另一个终端中运行相关的 Kinesis 数据生产者。
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
这将每秒推送 1000 行、每行 10 个随机数字的数据到 Kinesis 流中。这些数据随后应由运行中的示例接收和处理。
记录解聚合
当使用 Kinesis 生产者库 (KPL) 生成数据时,消息可能会被聚合以节省成本。Spark Streaming 在消费过程中将自动解聚合记录。
Kinesis 检查点
-
每个 Kinesis 输入 DStream 定期将流的当前位置存储在后端 DynamoDB 表中。这使得系统能够从故障中恢复,并在 DStream 停止的位置继续处理。
-
过于频繁地进行检查点操作将对 AWS 检查点存储层造成过载,并可能导致 AWS 节流。提供的示例使用随机退避重试策略处理此节流。
-
当输入 DStream 启动时,如果不存在 Kinesis 检查点信息,它将从可用的最旧记录 (
KinesisInitialPositions.TrimHorizon
) 开始,或者从最新位置 (KinesisInitialPositions.Latest
) 开始,或者(除了 Python)从提供的 UTC 时间戳 (KinesisInitialPositions.AtTimestamp(Date timestamp)
) 指示的位置开始。这是可配置的。KinesisInitialPositions.Latest
可能导致记录丢失,如果在没有输入 DStream 运行(且没有存储检查点信息)时数据被添加到流中。KinesisInitialPositions.TrimHorizon
可能导致记录的重复处理,其中影响取决于检查点频率和处理幂等性。
Kinesis 重试配置
spark.streaming.kinesis.retry.waitTime
: Kinesis 重试之间的等待时间,以持续时间字符串表示。从 Amazon Kinesis 读取数据时,当消费速度超过每秒 5 次事务或超出每秒 2 MiB 的最大读取速率时,用户可能会遇到ProvisionedThroughputExceededException
异常。可以调整此配置以在获取失败时增加两次获取之间的休眠时间,从而减少这些异常。默认值为 “100ms”。spark.streaming.kinesis.retry.maxAttempts
: Kinesis 获取的最大重试次数。此配置也可用于解决上述提到的场景中的 KinesisProvisionedThroughputExceededException
异常。可以增加此值以进行更多次的 Kinesis 读取重试。默认值为 3。