Spark Streaming + Kinesis 集成

Amazon Kinesis 是一个用于大规模实时处理流数据的全托管服务。Kinesis 接收器使用 Amazon 在 Amazon 软件许可证 (ASL) 下提供的 Kinesis 客户端库 (KCL) 创建输入 DStream。KCL 构建于基于 Apache 2.0 许可证的 AWS Java SDK 之上,并通过 Workers、检查点和分片租约等概念提供负载均衡、容错性、检查点功能。在此,我们解释如何配置 Spark Streaming 以从 Kinesis 接收数据。

配置 Kinesis

Kinesis 流可以按照以下指南,在其中一个有效的 Kinesis 端点处设置,并包含 1 个或更多分片。

配置 Spark Streaming 应用程序

  1. 链接:对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的流应用程序链接到以下工件(有关更多信息,请参阅主编程指南中的链接部分)。

     groupId = org.apache.spark
     artifactId = spark-streaming-kinesis-asl_2.13
     version = 4.0.0
    

    对于 Python 应用程序,您在部署应用程序时必须添加上述库及其依赖项。请参阅下面的“部署”小节。请注意,通过链接到此库,您的应用程序将包含 ASL 许可的代码。

  2. 编程:在流应用程序代码中,导入 KinesisInputDStream 并按如下方式创建字节数组的输入 DStream

     from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    
     kinesisStream = KinesisUtils.createStream(
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED],
         StorageLevel.MEMORY_AND_DISK_2)
    

    请参阅 API 文档示例。有关运行示例的说明,请参阅运行示例小节。

     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.kinesis.KinesisInputDStream
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import org.apache.spark.streaming.kinesis.KinesisInitialPositions
    
     val kinesisStream = KinesisInputDStream.builder
         .streamingContext(streamingContext)
         .endpointUrl([endpoint URL])
         .regionName([region name])
         .streamName([streamName])
         .initialPosition([initial position])
         .checkpointAppName([Kinesis app name])
         .checkpointInterval([checkpoint interval])
         .metricsLevel([metricsLevel.DETAILED])
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
         .build()
    

    请参阅 API 文档示例。有关如何运行示例的说明,请参阅运行示例小节。

     import org.apache.spark.storage.StorageLevel;
     import org.apache.spark.streaming.kinesis.KinesisInputDStream;
     import org.apache.spark.streaming.Seconds;
     import org.apache.spark.streaming.StreamingContext;
     import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
    
     KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
         .streamingContext(streamingContext)
         .endpointUrl([endpoint URL])
         .regionName([region name])
         .streamName([streamName])
         .initialPosition([initial position])
         .checkpointAppName([Kinesis app name])
         .checkpointInterval([checkpoint interval])
         .metricsLevel([metricsLevel.DETAILED])
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
         .build();
    

    请参阅 API 文档示例。有关运行示例的说明,请参阅运行示例小节。

    您还可以提供以下设置。目前,这仅在 Scala 和 Java 中受支持。

    • 一个“消息处理函数”,它接受 Kinesis Record 并返回通用对象 T,以防您想使用 Record 中包含的其他数据,例如分区键。
     import collection.JavaConverters._
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.kinesis.KinesisInputDStream
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import org.apache.spark.streaming.kinesis.KinesisInitialPositions
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
    
     val kinesisStream = KinesisInputDStream.builder
         .streamingContext(streamingContext)
         .endpointUrl([endpoint URL])
         .regionName([region name])
         .streamName([streamName])
         .initialPosition([initial position])
         .checkpointAppName([Kinesis app name])
         .checkpointInterval([checkpoint interval])
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
         .metricsLevel(MetricsLevel.DETAILED)
         .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
         .buildWithMessageHandler([message handler])
    
     import org.apache.spark.storage.StorageLevel;
     import org.apache.spark.streaming.kinesis.KinesisInputDStream;
     import org.apache.spark.streaming.Seconds;
     import org.apache.spark.streaming.StreamingContext;
     import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
     import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
     import scala.collection.JavaConverters;
    
     KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
         .streamingContext(streamingContext)
         .endpointUrl([endpoint URL])
         .regionName([region name])
         .streamName([streamName])
         .initialPosition([initial position])
         .checkpointAppName([Kinesis app name])
         .checkpointInterval([checkpoint interval])
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
         .metricsLevel(MetricsLevel.DETAILED)
         .metricsEnabledDimensions(
             JavaConverters.asScalaSetConverter(
                 KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS
             )
             .asScala().toSet()
         )
         .buildWithMessageHandler([message handler]);
    
    • streamingContext: 包含 Kinesis 用来将此 Kinesis 应用程序与 Kinesis 流关联起来的应用程序名称的 StreamingContext

    • [Kinesis app name]: 将用于在 DynamoDB 表中对 Kinesis 序列号进行检查点操作的应用程序名称。
      • 应用程序名称对于给定的账户和区域必须是唯一的。
      • 如果表存在但检查点信息不正确(例如,指向不同的流,或包含旧的过期序列号),则可能会出现临时错误。
    • [Kinesis stream name]: 此流应用程序将从中拉取数据的 Kinesis 流。

    • [endpoint URL]: 有效的 Kinesis 端点 URL 可以在此处找到。

    • [region name]: 有效的 Kinesis 区域名称可以在此处找到。

    • [checkpoint interval]: Kinesis 客户端库在流中保存其位置的间隔(例如,Duration(2000) = 2 秒)。首先,将其设置为与流应用程序的批处理间隔相同。

    • [initial position]: 可以是 KinesisInitialPositions.TrimHorizonKinesisInitialPositions.LatestKinesisInitialPositions.AtTimestamp(有关详细信息,请参阅Kinesis 检查点部分和Amazon Kinesis API 文档)。

    • [message handler]: 一个接受 Kinesis Record 并输出通用 T 的函数。

    在其他 API 版本中,您还可以直接指定 AWS 访问密钥和秘密密钥。

  3. 部署:与任何 Spark 应用程序一样,使用 spark-submit 启动您的应用程序。但是,Scala/Java 应用程序和 Python 应用程序的细节略有不同。

    对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,请将 spark-streaming-kinesis-asl_2.13 及其依赖项打包到应用程序 JAR 中。确保将 spark-core_2.13spark-streaming_2.13 标记为 provided 依赖项,因为它们已存在于 Spark 安装中。然后使用 spark-submit 启动您的应用程序(请参阅主编程指南中的部署部分)。

    对于缺乏 SBT/Maven 项目管理的 Python 应用程序,可以使用 --packagesspark-streaming-kinesis-asl_2.13 及其依赖项直接添加到 spark-submit 中(请参阅应用程序提交指南)。也就是说,

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 ...
    

    此外,您还可以从 Maven 仓库下载 Maven 工件 spark-streaming-kinesis-asl-assembly 的 JAR,并使用 --jars 将其添加到 spark-submit 中。

    Spark Streaming Kinesis Architecture

    运行时要记住的要点

    • Kinesis 数据处理按分区有序,并且每条消息至少处理一次。

    • 多个应用程序可以从同一个 Kinesis 流中读取数据。Kinesis 将在 DynamoDB 中维护应用程序特有的分片和检查点信息。

    • 一个 Kinesis 流分片一次由一个输入 DStream 处理。

    • 单个 Kinesis 输入 DStream 可以通过创建多个 KinesisRecordProcessor 线程来从 Kinesis 流的多个分片读取数据。

    • 在不同的进程/实例中运行的多个输入 DStream 可以从 Kinesis 流中读取数据。

    • 您所需的 Kinesis 输入 DStream 数量永远不会超过 Kinesis 流分片的数量,因为每个输入 DStream 将创建至少一个处理单个分片的 KinesisRecordProcessor 线程。

    • 通过添加/移除 Kinesis 输入 DStream(在单个进程内或跨多个进程/实例)来实现横向扩展 - 最多达到前述 Kinesis 流分片的总数。

    • Kinesis 输入 DStream 将在所有 DStream 之间平衡负载 - 即使跨进程/实例。

    • Kinesis 输入 DStream 将在重新分片事件(由于负载变化导致的合并和拆分)期间平衡负载。

    • 作为最佳实践,建议您在可能的情况下通过超额预置来避免重新分片抖动。

    • 每个 Kinesis 输入 DStream 维护其自己的检查点信息。有关详细信息,请参阅 Kinesis 检查点部分。

    • Kinesis 流分片的数量与在输入 DStream 处理期间在 Spark 集群中创建的 RDD 分区/分片的数量之间没有关联。这两种是独立的划分方案。

运行示例

要运行此示例,

记录解聚合

当使用 Kinesis 生产者库 (KPL) 生成数据时,消息可能会被聚合以节省成本。Spark Streaming 在消费过程中将自动解聚合记录。

Kinesis 检查点

Kinesis 重试配置