Spark Streaming + Kinesis 集成

Amazon Kinesis 是一项完全托管的服务,用于大规模实时处理流数据。 Kinesis 接收器使用 Amazon 根据 Amazon 软件许可(ASL)提供的 Kinesis 客户端库(KCL)创建输入 DStream。 KCL 构建在 Apache 2.0 许可的 AWS Java SDK 之上,并通过工作程序、检查点和分片租约的概念提供负载平衡、容错和检查点。 在这里,我们将解释如何配置 Spark Streaming 以从 Kinesis 接收数据。

配置 Kinesis

可以根据以下 指南 在一个有效的 Kinesis 端点设置一个 Kinesis 流,每个端点具有 1 个或多个分片。

配置 Spark Streaming 应用程序

  1. 链接: 对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的流应用程序与以下构件链接(请参阅主编程指南中的 链接部分 了解更多信息)。

     groupId = org.apache.spark
     artifactId = spark-streaming-kinesis-asl_2.12
     version = 3.5.5
    

    对于 Python 应用程序,在部署应用程序时,您需要添加上面的库及其依赖项。 请参阅下面的部署小节。 请注意,通过链接到此库,您将在应用程序中包含 ASL 许可的代码。

  2. 编程: 在流应用程序代码中,导入 KinesisInputDStream 并按如下方式创建字节数组的输入 DStream

         from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    
         kinesisStream = KinesisUtils.createStream(
             streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
             [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
    

    请参阅 API 文档示例。 有关运行示例的说明,请参阅运行示例小节。

         import org.apache.spark.storage.StorageLevel
         import org.apache.spark.streaming.kinesis.KinesisInputDStream
         import org.apache.spark.streaming.{Seconds, StreamingContext}
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions
    
         val kinesisStream = KinesisInputDStream.builder
             .streamingContext(streamingContext)
             .endpointUrl([endpoint URL])
             .regionName([region name])
             .streamName([streamName])
             .initialPosition([initial position])
             .checkpointAppName([Kinesis app name])
             .checkpointInterval([checkpoint interval])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build()
    

    请参阅 API 文档示例。 有关如何运行示例的说明,请参阅运行示例小节。

         import org.apache.spark.storage.StorageLevel;
         import org.apache.spark.streaming.kinesis.KinesisInputDStream;
         import org.apache.spark.streaming.Seconds;
         import org.apache.spark.streaming.StreamingContext;
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
    
         KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
             .streamingContext(streamingContext)
             .endpointUrl([endpoint URL])
             .regionName([region name])
             .streamName([streamName])
             .initialPosition([initial position])
             .checkpointAppName([Kinesis app name])
             .checkpointInterval([checkpoint interval])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build();
    

    请参阅 API 文档示例。 有关运行示例的说明,请参阅运行示例小节。

    您还可以提供以下设置。 目前仅在 Scala 和 Java 中受支持。

    • 一个“消息处理函数”,它接受 Kinesis Record 并返回一个通用对象 T,以防您想使用 Record 中包含的其他数据,例如分区键。
             import collection.JavaConverters._
             import org.apache.spark.storage.StorageLevel
             import org.apache.spark.streaming.kinesis.KinesisInputDStream
             import org.apache.spark.streaming.{Seconds, StreamingContext}
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
    
             val kinesisStream = KinesisInputDStream.builder
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
                 .buildWithMessageHandler([message handler])
    
             import org.apache.spark.storage.StorageLevel;
             import org.apache.spark.streaming.kinesis.KinesisInputDStream;
             import org.apache.spark.streaming.Seconds;
             import org.apache.spark.streaming.StreamingContext;
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
             import scala.collection.JavaConverters;
    
             KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
                 .buildWithMessageHandler([message handler]);
    
    • streamingContext:包含 Kinesis 使用的应用程序名称的 StreamingContext,用于将此 Kinesis 应用程序与 Kinesis 流相关联

    • [Kinesis 应用程序名称]:将用于在 DynamoDB 表中检查 Kinesis 序列号的应用程序名称。
      • 对于给定的帐户和区域,应用程序名称必须是唯一的。
      • 如果表存在但具有不正确的检查点信息(对于不同的流或过期的旧序列号),则可能会出现临时错误。
    • [Kinesis 流名称]:此流应用程序将从中提取数据的 Kinesis 流。

    • [端点 URL]:可以在此处找到有效的 Kinesis 端点 URL。

    • [区域名称]:可以在此处找到有效的 Kinesis 区域名称。

    • [检查点间隔]:Kinesis 客户端库在流中保存其位置的间隔(例如,Duration(2000) = 2 秒)。 对于初学者,请将其设置为与流应用程序的批处理间隔相同。

    • [初始位置]:可以是 KinesisInitialPositions.TrimHorizonKinesisInitialPositions.LatestKinesisInitialPositions.AtTimestamp(请参阅 Kinesis 检查点部分和 Amazon Kinesis API 文档 了解更多详细信息)。

    • [消息处理程序]:一个接受 Kinesis Record 并输出通用 T 的函数。

    在 API 的其他版本中,您也可以直接指定 AWS 访问密钥和密钥。

  3. 部署: 与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。 但是,Scala/Java 应用程序和 Python 应用程序的详细信息略有不同。

    对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,请将 spark-streaming-kinesis-asl_2.12 及其依赖项打包到应用程序 JAR 中。 确保将 spark-core_2.12spark-streaming_2.12 标记为 provided 依赖项,因为它们已经存在于 Spark 安装中。 然后使用 spark-submit 启动您的应用程序(请参阅主编程指南中的部署部分)。

    对于缺少 SBT/Maven 项目管理的 Python 应用程序,可以使用 --packagesspark-streaming-kinesis-asl_2.12 及其依赖项直接添加到 spark-submit(请参阅应用程序提交指南)。 也就是说,

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.5 ...
    

    或者,您也可以从 Maven 存储库下载 Maven 构件 spark-streaming-kinesis-asl-assembly 的 JAR,并使用 --jars 将其添加到 spark-submit

    Spark Streaming Kinesis Architecture

    运行时要记住的要点

    • Kinesis 数据处理按分区排序,并且每个消息至少发生一次。

    • 多个应用程序可以从同一个 Kinesis 流读取数据。 Kinesis 将在 DynamoDB 中维护特定于应用程序的分片和检查点信息。

    • 单个 Kinesis 流分片一次由一个输入 DStream 处理。

    • 单个 Kinesis 输入 DStream 可以通过创建多个 KinesisRecordProcessor 线程从 Kinesis 流的多个分片读取数据。

    • 在单独的进程/实例中运行的多个输入 DStream 可以从 Kinesis 流读取数据。

    • 您永远不需要比 Kinesis 流分片数量更多的 Kinesis 输入 DStream,因为每个输入 DStream 至少会创建一个 KinesisRecordProcessor 线程来处理单个分片。

    • 水平扩展是通过添加/删除 Kinesis 输入 DStream(在单个进程中或跨多个进程/实例)来实现的 - 直到每个先前点的 Kinesis 流分片的总数。

    • Kinesis 输入 DStream 将在所有 DStream 之间平衡负载 - 甚至跨进程/实例。

    • Kinesis 输入 DStream 将在由于负载变化而导致的分片事件(合并和拆分)期间平衡负载。

    • 作为最佳实践,建议您在可能的情况下通过过度配置来避免重新分片抖动。

    • 每个 Kinesis 输入 DStream 维护其自己的检查点信息。 有关更多详细信息,请参阅 Kinesis 检查点部分。

    • Kinesis 流分片的数量与输入 DStream 处理期间跨 Spark 集群创建的 RDD 分区/分片数量之间没有关联。 这些是 2 个独立的分区方案。

运行示例

要运行示例,

记录解聚合

当使用 Kinesis Producer Library (KPL) 生成数据时,可以聚合消息以节省成本。 Spark Streaming 将在消费期间自动解聚合记录。

Kinesis 检查点

Kinesis 重试配置