Spark 流式处理 + Kinesis 集成

Amazon Kinesis 是一种完全托管的服务,用于大规模实时处理流式数据。Kinesis 接收器使用 Amazon 在 Amazon 软件许可 (ASL) 下提供的 Kinesis 客户端库 (KCL) 创建输入 DStream。KCL 基于 Apache 2.0 许可的 AWS Java SDK,并通过 Worker、Checkpoint 和 Shard Lease 的概念提供负载均衡、容错和检查点功能。这里我们将解释如何配置 Spark 流式处理以接收来自 Kinesis 的数据。

配置 Kinesis

Kinesis 流可以在一个有效的 Kinesis 端点上设置,每个端点可以根据以下 指南 设置 1 个或多个分片。

配置 Spark 流式处理应用程序

  1. 链接:对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,将您的流式处理应用程序链接到以下工件(有关更多信息,请参阅主编程指南中的 链接部分)。

     groupId = org.apache.spark
     artifactId = spark-streaming-kinesis-asl_2.12
     version = 3.5.1
    

    对于 Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的“部署”小节。请注意,通过链接到此库,您将在应用程序中包含 ASL 许可的代码。

  2. 编程:在流式处理应用程序代码中,导入 KinesisInputDStream 并创建如下所示的字节数组输入 DStream

         from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    
         kinesisStream = KinesisUtils.createStream(
             streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
             [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
    

    请参阅 API 文档示例。有关运行示例的说明,请参阅 运行示例 小节。

         import org.apache.spark.storage.StorageLevel
         import org.apache.spark.streaming.kinesis.KinesisInputDStream
         import org.apache.spark.streaming.{Seconds, StreamingContext}
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions
    
         val kinesisStream = KinesisInputDStream.builder
             .streamingContext(streamingContext)
             .endpointUrl([endpoint URL])
             .regionName([region name])
             .streamName([streamName])
             .initialPosition([initial position])
             .checkpointAppName([Kinesis app name])
             .checkpointInterval([checkpoint interval])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build()
    

    请参阅 API 文档示例。有关如何运行示例的说明,请参阅 运行示例 小节。

         import org.apache.spark.storage.StorageLevel;
         import org.apache.spark.streaming.kinesis.KinesisInputDStream;
         import org.apache.spark.streaming.Seconds;
         import org.apache.spark.streaming.StreamingContext;
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
    
         KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
             .streamingContext(streamingContext)
             .endpointUrl([endpoint URL])
             .regionName([region name])
             .streamName([streamName])
             .initialPosition([initial position])
             .checkpointAppName([Kinesis app name])
             .checkpointInterval([checkpoint interval])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build();
    

    请参阅 API 文档示例。有关运行示例的说明,请参阅 运行示例 小节。

    您也可以提供以下设置。目前仅在 Scala 和 Java 中支持此功能。

    • 一个“消息处理函数”,它接受 Kinesis Record 并返回一个泛型对象 T,如果您想使用 Record 中包含的其他数据(如分区键)。
             import collection.JavaConverters._
             import org.apache.spark.storage.StorageLevel
             import org.apache.spark.streaming.kinesis.KinesisInputDStream
             import org.apache.spark.streaming.{Seconds, StreamingContext}
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
    
             val kinesisStream = KinesisInputDStream.builder
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
                 .buildWithMessageHandler([message handler])
    
             import org.apache.spark.storage.StorageLevel;
             import org.apache.spark.streaming.kinesis.KinesisInputDStream;
             import org.apache.spark.streaming.Seconds;
             import org.apache.spark.streaming.StreamingContext;
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
             import scala.collection.JavaConverters;
    
             KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
                 .buildWithMessageHandler([message handler]);
    
    • streamingContext:包含 Kinesis 用于将此 Kinesis 应用程序绑定到 Kinesis 流的应用程序名称的 StreamingContext

    • [Kinesis 应用程序名称]:将在 DynamoDB 表中用于检查点 Kinesis 序列号的应用程序名称。
      • 应用程序名称对于给定的帐户和区域必须是唯一的。
      • 如果表存在但具有不正确的检查点信息(针对不同的流或旧的已过期序列号),则可能会出现临时错误。
    • [Kinesis 流名称]:此流式处理应用程序将从中提取数据的 Kinesis 流。

    • [端点 URL]:可以在 此处 找到有效的 Kinesis 端点 URL。

    • [区域名称]:可以在 此处 找到有效的 Kinesis 区域名称。

    • [检查点间隔]:Kinesis 客户端库在流中保存其位置的间隔(例如,Duration(2000) = 2 秒)。对于初学者,将其设置为与流式处理应用程序的批处理间隔相同。

    • [初始位置]:可以是 KinesisInitialPositions.TrimHorizonKinesisInitialPositions.LatestKinesisInitialPositions.AtTimestamp(有关更多详细信息,请参阅 Kinesis 检查点部分Amazon Kinesis API 文档)。

    • [消息处理程序]:一个函数,它接受 Kinesis Record 并输出泛型 T

    在 API 的其他版本中,您也可以直接指定 AWS 访问密钥和密钥。

  3. 部署:与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。但是,Scala/Java 应用程序和 Python 应用程序的细节略有不同。

    对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,则将 spark-streaming-kinesis-asl_2.12 及其依赖项打包到应用程序 JAR 中。确保 spark-core_2.12spark-streaming_2.12 被标记为 provided 依赖项,因为这些依赖项已存在于 Spark 安装中。然后使用 spark-submit 启动您的应用程序(请参阅主编程指南中的 部署部分)。

    对于缺少 SBT/Maven 项目管理的 Python 应用程序,可以使用 --packagesspark-streaming-kinesis-asl_2.12 及其依赖项直接添加到 spark-submit 中(请参阅 应用程序提交指南)。也就是说,

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 ...
    

    或者,您也可以从 Maven 存储库 下载 Maven 工件 spark-streaming-kinesis-asl-assembly 的 JAR,并使用 --jars 将其添加到 spark-submit 中。

    Spark Streaming Kinesis Architecture

    运行时需要注意的几点

    • Kinesis 数据处理按分区排序,并且每个消息至少处理一次。

    • 多个应用程序可以从同一个 Kinesis 流中读取数据。Kinesis 将在 DynamoDB 中维护特定于应用程序的分片和检查点信息。

    • 一次只有一个输入 DStream 处理单个 Kinesis 流分片。

    • 单个 Kinesis 输入 DStream 可以通过创建多个 KinesisRecordProcessor 线程从 Kinesis 流的多个分片中读取数据。

    • 在不同进程/实例中运行的多个输入 DStream 可以从 Kinesis 流中读取数据。

    • 您不需要比 Kinesis 流分片数量更多的 Kinesis 输入 DStream,因为每个输入 DStream 都会创建一个至少处理单个分片的 KinesisRecordProcessor 线程。

    • 水平扩展是通过添加/删除 Kinesis 输入 DStream(在单个进程中或跨多个进程/实例)来实现的,直到达到前一点中 Kinesis 流分片的总数。

    • Kinesis 输入 DStream 将在所有 DStream 之间平衡负载,即使跨进程/实例也是如此。

    • Kinesis 输入 DStream 将在由于负载变化而导致的重新分片事件(合并和拆分)期间平衡负载。

    • 作为最佳实践,建议您尽可能通过过度配置来避免重新分片抖动。

    • 每个 Kinesis 输入 DStream 都维护自己的检查点信息。有关更多详细信息,请参阅 Kinesis 检查点部分。

    • Kinesis 流分片数量与输入 DStream 处理期间在 Spark 集群中创建的 RDD 分区/分片数量之间没有关联。这是两种独立的分区方案。

运行示例

要运行示例,

记录反聚合

当使用 Kinesis 生产者库 (KPL) 生成数据时,消息可能会被聚合以节省成本。Spark Streaming 会在消费时自动解聚合记录。

Kinesis 检查点

Kinesis 重试配置