Spark Streaming + Kinesis 集成
Amazon Kinesis 是一项完全托管的服务,用于大规模实时处理流数据。 Kinesis 接收器使用 Amazon 根据 Amazon 软件许可(ASL)提供的 Kinesis 客户端库(KCL)创建输入 DStream。 KCL 构建在 Apache 2.0 许可的 AWS Java SDK 之上,并通过工作程序、检查点和分片租约的概念提供负载平衡、容错和检查点。 在这里,我们将解释如何配置 Spark Streaming 以从 Kinesis 接收数据。
配置 Kinesis
可以根据以下 指南 在一个有效的 Kinesis 端点设置一个 Kinesis 流,每个端点具有 1 个或多个分片。
配置 Spark Streaming 应用程序
-
链接: 对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的流应用程序与以下构件链接(请参阅主编程指南中的 链接部分 了解更多信息)。
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.12 version = 3.5.5
对于 Python 应用程序,在部署应用程序时,您需要添加上面的库及其依赖项。 请参阅下面的部署小节。 请注意,通过链接到此库,您将在应用程序中包含 ASL 许可的代码。
-
编程: 在流应用程序代码中,导入
KinesisInputDStream
并按如下方式创建字节数组的输入 DStreamfrom pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
请参阅 API 文档 和 示例。 有关运行示例的说明,请参阅运行示例小节。
- CloudWatch 指标级别和维度。 有关详细信息,请参阅 有关监控 KCL 的 AWS 文档。 默认为 MetricsLevel.DETAILED
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build();
您还可以提供以下设置。 目前仅在 Scala 和 Java 中受支持。
- 一个“消息处理函数”,它接受 Kinesis
Record
并返回一个通用对象T
,以防您想使用Record
中包含的其他数据,例如分区键。
import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([message handler])
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import scala.collection.JavaConverters; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet()) .buildWithMessageHandler([message handler]);
-
streamingContext
:包含 Kinesis 使用的应用程序名称的 StreamingContext,用于将此 Kinesis 应用程序与 Kinesis 流相关联 [Kinesis 应用程序名称]
:将用于在 DynamoDB 表中检查 Kinesis 序列号的应用程序名称。- 对于给定的帐户和区域,应用程序名称必须是唯一的。
- 如果表存在但具有不正确的检查点信息(对于不同的流或过期的旧序列号),则可能会出现临时错误。
-
[Kinesis 流名称]
:此流应用程序将从中提取数据的 Kinesis 流。 -
[端点 URL]
:可以在此处找到有效的 Kinesis 端点 URL。 -
[区域名称]
:可以在此处找到有效的 Kinesis 区域名称。 -
[检查点间隔]
:Kinesis 客户端库在流中保存其位置的间隔(例如,Duration(2000) = 2 秒)。 对于初学者,请将其设置为与流应用程序的批处理间隔相同。 -
[初始位置]
:可以是KinesisInitialPositions.TrimHorizon
或KinesisInitialPositions.Latest
或KinesisInitialPositions.AtTimestamp
(请参阅Kinesis 检查点
部分和Amazon Kinesis API 文档
了解更多详细信息)。 [消息处理程序]
:一个接受 KinesisRecord
并输出通用T
的函数。
在 API 的其他版本中,您也可以直接指定 AWS 访问密钥和密钥。
-
部署: 与任何 Spark 应用程序一样,
spark-submit
用于启动您的应用程序。 但是,Scala/Java 应用程序和 Python 应用程序的详细信息略有不同。对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,请将
spark-streaming-kinesis-asl_2.12
及其依赖项打包到应用程序 JAR 中。 确保将spark-core_2.12
和spark-streaming_2.12
标记为provided
依赖项,因为它们已经存在于 Spark 安装中。 然后使用spark-submit
启动您的应用程序(请参阅主编程指南中的部署部分)。对于缺少 SBT/Maven 项目管理的 Python 应用程序,可以使用
--packages
将spark-streaming-kinesis-asl_2.12
及其依赖项直接添加到spark-submit
(请参阅应用程序提交指南)。 也就是说,./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.5 ...
或者,您也可以从 Maven 存储库下载 Maven 构件
spark-streaming-kinesis-asl-assembly
的 JAR,并使用--jars
将其添加到spark-submit
。运行时要记住的要点
-
Kinesis 数据处理按分区排序,并且每个消息至少发生一次。
-
多个应用程序可以从同一个 Kinesis 流读取数据。 Kinesis 将在 DynamoDB 中维护特定于应用程序的分片和检查点信息。
-
单个 Kinesis 流分片一次由一个输入 DStream 处理。
-
单个 Kinesis 输入 DStream 可以通过创建多个 KinesisRecordProcessor 线程从 Kinesis 流的多个分片读取数据。
-
在单独的进程/实例中运行的多个输入 DStream 可以从 Kinesis 流读取数据。
-
您永远不需要比 Kinesis 流分片数量更多的 Kinesis 输入 DStream,因为每个输入 DStream 至少会创建一个 KinesisRecordProcessor 线程来处理单个分片。
-
水平扩展是通过添加/删除 Kinesis 输入 DStream(在单个进程中或跨多个进程/实例)来实现的 - 直到每个先前点的 Kinesis 流分片的总数。
-
Kinesis 输入 DStream 将在所有 DStream 之间平衡负载 - 甚至跨进程/实例。
-
Kinesis 输入 DStream 将在由于负载变化而导致的分片事件(合并和拆分)期间平衡负载。
-
作为最佳实践,建议您在可能的情况下通过过度配置来避免重新分片抖动。
-
每个 Kinesis 输入 DStream 维护其自己的检查点信息。 有关更多详细信息,请参阅 Kinesis 检查点部分。
-
Kinesis 流分片的数量与输入 DStream 处理期间跨 Spark 集群创建的 RDD 分区/分片数量之间没有关联。 这些是 2 个独立的分区方案。
-
运行示例
要运行示例,
-
从下载站点下载 Spark 二进制文件。
-
在 AWS 中设置 Kinesis 流(请参阅前面的部分)。 记下 Kinesis 流的名称以及与创建流的区域相对应的端点 URL。
-
使用您的 AWS 凭证设置环境变量
AWS_ACCESS_KEY_ID
和AWS_SECRET_ACCESS_KEY
。 -
在 Spark 根目录中,按如下方式运行示例
./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.5 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.5 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
这将等待从 Kinesis 流接收数据。
-
要生成随机字符串数据以放入 Kinesis 流中,请在另一个终端中运行关联的 Kinesis 数据生成器。
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
这会将每秒 1000 行、每行 10 个随机数字推送到 Kinesis 流。 然后,这些数据应由正在运行的示例接收和处理。
记录解聚合
当使用 Kinesis Producer Library (KPL) 生成数据时,可以聚合消息以节省成本。 Spark Streaming 将在消费期间自动解聚合记录。
Kinesis 检查点
-
每个 Kinesis 输入 DStream 都会定期将流的当前位置存储在后备 DynamoDB 表中。 这允许系统从故障中恢复并继续处理 DStream 中止的位置。
-
过于频繁地进行检查点会导致 AWS 检查点存储层上的负载过大,并可能导致 AWS 限制。 提供的示例使用随机退避重试策略来处理此限制。
-
如果输入 DStream 启动时不存在 Kinesis 检查点信息,它将从最早的可用记录 (
KinesisInitialPositions.TrimHorizon
) 或从最新的提示 (KinesisInitialPositions.Latest
) 或(除了 Python)从提供的 UTC 时间戳表示的位置 (KinesisInitialPositions.AtTimestamp(Date timestamp)
) 开始。 这是可配置的。- 如果数据在没有输入 DStream 运行(并且没有存储检查点信息)时添加到流,则
KinesisInitialPositions.Latest
可能会导致错过记录。 KinesisInitialPositions.TrimHorizon
可能会导致重复处理记录,其中影响取决于检查点频率和处理幂等性。
- 如果数据在没有输入 DStream 运行(并且没有存储检查点信息)时添加到流,则
Kinesis 重试配置
spark.streaming.kinesis.retry.waitTime
: Kinesis 重试之间的等待时间,以持续时间字符串表示。从 Amazon Kinesis 读取数据时,用户可能会遇到ProvisionedThroughputExceededException
异常,原因是消费速度超过每秒 5 个事务或超过 2 MiB/秒的最大读取速率。可以调整此配置以增加获取失败时休眠的时间,从而减少这些异常。默认值为“100ms”。spark.streaming.kinesis.retry.maxAttempts
: Kinesis 获取的最大重试次数。此配置也可用于解决上述场景中的 KinesisProvisionedThroughputExceededException
异常。可以增加它以增加 Kinesis 读取的重试次数。默认值为 3。