Spark Streaming + Kinesis 集成
Amazon Kinesis 是一个用于大规模实时处理流数据的全托管服务。Kinesis 接收器使用 Amazon 在 Amazon 软件许可证 (ASL) 下提供的 Kinesis 客户端库 (KCL) 创建输入 DStream。KCL 构建于基于 Apache 2.0 许可证的 AWS Java SDK 之上,并通过 Workers、检查点和分片租约等概念提供负载均衡、容错性、检查点功能。在此,我们解释如何配置 Spark Streaming 以从 Kinesis 接收数据。
配置 Kinesis
Kinesis 流可以按照以下指南,在其中一个有效的 Kinesis 端点处设置,并包含 1 个或更多分片。
配置 Spark Streaming 应用程序
-
链接:对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的流应用程序链接到以下工件(有关更多信息,请参阅主编程指南中的链接部分)。
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.13 version = 4.0.0对于 Python 应用程序,您在部署应用程序时必须添加上述库及其依赖项。请参阅下面的“部署”小节。请注意,通过链接到此库,您的应用程序将包含 ASL 许可的代码。
-
编程:在流应用程序代码中,导入
KinesisInputDStream并按如下方式创建字节数组的输入 DStreamfrom pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)请参阅 API 文档和示例。有关运行示例的说明,请参阅运行示例小节。
- CloudWatch 指标级别和维度。有关详细信息,请参阅有关监控 KCL 的 AWS 文档。默认值为
MetricsLevel.DETAILED。
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build();您还可以提供以下设置。目前,这仅在 Scala 和 Java 中受支持。
- 一个“消息处理函数”,它接受 Kinesis
Record并返回通用对象T,以防您想使用Record中包含的其他数据,例如分区键。
import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([message handler])import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import scala.collection.JavaConverters; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions( JavaConverters.asScalaSetConverter( KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS ) .asScala().toSet() ) .buildWithMessageHandler([message handler]);-
streamingContext: 包含 Kinesis 用来将此 Kinesis 应用程序与 Kinesis 流关联起来的应用程序名称的 StreamingContext [Kinesis app name]: 将用于在 DynamoDB 表中对 Kinesis 序列号进行检查点操作的应用程序名称。- 应用程序名称对于给定的账户和区域必须是唯一的。
- 如果表存在但检查点信息不正确(例如,指向不同的流,或包含旧的过期序列号),则可能会出现临时错误。
-
[Kinesis stream name]: 此流应用程序将从中拉取数据的 Kinesis 流。 -
[endpoint URL]: 有效的 Kinesis 端点 URL 可以在此处找到。 -
[region name]: 有效的 Kinesis 区域名称可以在此处找到。 -
[checkpoint interval]: Kinesis 客户端库在流中保存其位置的间隔(例如,Duration(2000) = 2 秒)。首先,将其设置为与流应用程序的批处理间隔相同。 -
[initial position]: 可以是KinesisInitialPositions.TrimHorizon或KinesisInitialPositions.Latest或KinesisInitialPositions.AtTimestamp(有关详细信息,请参阅Kinesis 检查点部分和Amazon Kinesis API 文档)。 [message handler]: 一个接受 KinesisRecord并输出通用T的函数。
在其他 API 版本中,您还可以直接指定 AWS 访问密钥和秘密密钥。
- CloudWatch 指标级别和维度。有关详细信息,请参阅有关监控 KCL 的 AWS 文档。默认值为
-
部署:与任何 Spark 应用程序一样,使用
spark-submit启动您的应用程序。但是,Scala/Java 应用程序和 Python 应用程序的细节略有不同。对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,请将
spark-streaming-kinesis-asl_2.13及其依赖项打包到应用程序 JAR 中。确保将spark-core_2.13和spark-streaming_2.13标记为provided依赖项,因为它们已存在于 Spark 安装中。然后使用spark-submit启动您的应用程序(请参阅主编程指南中的部署部分)。对于缺乏 SBT/Maven 项目管理的 Python 应用程序,可以使用
--packages将spark-streaming-kinesis-asl_2.13及其依赖项直接添加到spark-submit中(请参阅应用程序提交指南)。也就是说,./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 ...此外,您还可以从 Maven 仓库下载 Maven 工件
spark-streaming-kinesis-asl-assembly的 JAR,并使用--jars将其添加到spark-submit中。
运行时要记住的要点
-
Kinesis 数据处理按分区有序,并且每条消息至少处理一次。
-
多个应用程序可以从同一个 Kinesis 流中读取数据。Kinesis 将在 DynamoDB 中维护应用程序特有的分片和检查点信息。
-
一个 Kinesis 流分片一次由一个输入 DStream 处理。
-
单个 Kinesis 输入 DStream 可以通过创建多个 KinesisRecordProcessor 线程来从 Kinesis 流的多个分片读取数据。
-
在不同的进程/实例中运行的多个输入 DStream 可以从 Kinesis 流中读取数据。
-
您所需的 Kinesis 输入 DStream 数量永远不会超过 Kinesis 流分片的数量,因为每个输入 DStream 将创建至少一个处理单个分片的 KinesisRecordProcessor 线程。
-
通过添加/移除 Kinesis 输入 DStream(在单个进程内或跨多个进程/实例)来实现横向扩展 - 最多达到前述 Kinesis 流分片的总数。
-
Kinesis 输入 DStream 将在所有 DStream 之间平衡负载 - 即使跨进程/实例。
-
Kinesis 输入 DStream 将在重新分片事件(由于负载变化导致的合并和拆分)期间平衡负载。
-
作为最佳实践,建议您在可能的情况下通过超额预置来避免重新分片抖动。
-
每个 Kinesis 输入 DStream 维护其自己的检查点信息。有关详细信息,请参阅 Kinesis 检查点部分。
-
Kinesis 流分片的数量与在输入 DStream 处理期间在 Spark 集群中创建的 RDD 分区/分片的数量之间没有关联。这两种是独立的划分方案。
-
运行示例
要运行此示例,
-
从下载站点下载 Spark 二进制文件。
-
在 AWS 中设置 Kinesis 流(参见前面部分)。记下 Kinesis 流的名称以及与创建流的区域对应的端点 URL。
-
使用您的 AWS 凭据设置环境变量
AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY。 -
在 Spark 根目录中,运行示例如下
./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]这将等待从 Kinesis 流接收数据。
-
要在 Kinesis 流中生成随机字符串数据,请在另一个终端中运行相关的 Kinesis 数据生产者。
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10这将每秒推送 1000 行、每行 10 个随机数字的数据到 Kinesis 流中。这些数据随后应由运行中的示例接收和处理。
记录解聚合
当使用 Kinesis 生产者库 (KPL) 生成数据时,消息可能会被聚合以节省成本。Spark Streaming 在消费过程中将自动解聚合记录。
Kinesis 检查点
-
每个 Kinesis 输入 DStream 定期将流的当前位置存储在后端 DynamoDB 表中。这使得系统能够从故障中恢复,并在 DStream 停止的位置继续处理。
-
过于频繁地进行检查点操作将对 AWS 检查点存储层造成过载,并可能导致 AWS 节流。提供的示例使用随机退避重试策略处理此节流。
-
当输入 DStream 启动时,如果不存在 Kinesis 检查点信息,它将从可用的最旧记录 (
KinesisInitialPositions.TrimHorizon) 开始,或者从最新位置 (KinesisInitialPositions.Latest) 开始,或者(除了 Python)从提供的 UTC 时间戳 (KinesisInitialPositions.AtTimestamp(Date timestamp)) 指示的位置开始。这是可配置的。KinesisInitialPositions.Latest可能导致记录丢失,如果在没有输入 DStream 运行(且没有存储检查点信息)时数据被添加到流中。KinesisInitialPositions.TrimHorizon可能导致记录的重复处理,其中影响取决于检查点频率和处理幂等性。
Kinesis 重试配置
spark.streaming.kinesis.retry.waitTime: Kinesis 重试之间的等待时间,以持续时间字符串表示。从 Amazon Kinesis 读取数据时,当消费速度超过每秒 5 次事务或超出每秒 2 MiB 的最大读取速率时,用户可能会遇到ProvisionedThroughputExceededException异常。可以调整此配置以在获取失败时增加两次获取之间的休眠时间,从而减少这些异常。默认值为 “100ms”。spark.streaming.kinesis.retry.maxAttempts: Kinesis 获取的最大重试次数。此配置也可用于解决上述提到的场景中的 KinesisProvisionedThroughputExceededException异常。可以增加此值以进行更多次的 Kinesis 读取重试。默认值为 3。