Spark Streaming 与 Kafka 集成指南(Kafka broker 版本 0.10.0 或更高版本)
适用于 Kafka 0.10 的 Spark Streaming 集成提供了简单的并行性、Kafka 分区和 Spark 分区之间的 1:1 对应关系,以及对偏移量和元数据的访问。但是,由于较新的集成使用 新的 Kafka 消费者 API 而不是简单的 API,因此在使用方面存在显著差异。
链接
对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的流应用程序与以下工件链接(有关更多信息,请参阅主编程指南中的链接部分)。
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 3.5.1
不要手动添加对 org.apache.kafka
工件(例如 kafka-clients
)的依赖项。spark-streaming-kafka-0-10
工件已经具有适当的传递依赖项,并且不同版本可能会以难以诊断的方式不兼容。
创建直接流
请注意,导入的命名空间包含版本号,即 org.apache.spark.streaming.kafka010
流中的每个项目都是一个 ConsumerRecord
有关可能的 kafkaParams,请参阅 Kafka 消费者配置文档。如果您的 Spark 批处理持续时间大于默认的 Kafka 心跳会话超时(30 秒),请相应增加 heartbeat.interval.ms 和 session.timeout.ms。对于大于 5 分钟的批处理,这将需要更改代理上的 group.max.session.timeout.ms。请注意,该示例将 enable.auto.commit 设置为 false,有关讨论,请参阅下面的 存储偏移量。
位置策略
新的 Kafka 消费者 API 会将消息预取到缓冲区中。因此,出于性能原因,Spark 集成将缓存的消费者保留在执行器上(而不是为每个批处理重新创建它们),并倾向于在具有相应消费者的主机位置上调度分区,这一点非常重要。
在大多数情况下,您应该使用如上所示的 LocationStrategies.PreferConsistent
。这将在可用执行器之间均匀分配分区。如果您的执行器与 Kafka 代理位于同一主机上,请使用 PreferBrokers
,它将优先在该分区的 Kafka leader 上调度分区。最后,如果您的分区负载存在显著偏差,请使用 PreferFixed
。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。
消费者缓存的默认最大大小为 64。如果您预计要处理超过(64 * 执行器数量)个 Kafka 分区,则可以通过 spark.streaming.kafka.consumer.cache.maxCapacity
更改此设置。
如果要禁用 Kafka 消费者的缓存,可以将 spark.streaming.kafka.consumer.cache.enabled
设置为 false
。
缓存是通过 topicpartition 和 group.id 进行键控的,因此每次调用 createDirectStream
时都要使用单独的 group.id
。
消费者策略
新的 Kafka 消费者 API 有多种不同的方式来指定主题,其中一些方式需要在对象实例化后进行大量设置。ConsumerStrategies
提供了一种抽象,允许 Spark 即使在从检查点重新启动后也能获得正确配置的消费者。
如上所示,ConsumerStrategies.Subscribe
允许您订阅固定的主题集合。SubscribePattern
允许您使用正则表达式来指定感兴趣的主题。请注意,与 0.8 集成不同,使用 Subscribe
或 SubscribePattern
应该响应在运行流期间添加分区。最后,Assign
允许您指定固定的分区集合。所有三种策略都有重载的构造函数,允许您指定特定分区的起始偏移量。
如果您有上述选项无法满足的特定消费者设置需求,ConsumerStrategy
是一个您可以扩展的公共类。
创建 RDD
如果您的用例更适合批处理,则可以为定义的偏移量范围创建一个 RDD。
请注意,您不能使用 PreferBrokers
,因为如果没有流,则驱动程序端没有消费者可以自动为您查找代理元数据。如果需要,请使用 PreferFixed
和您自己的元数据查找。
获取偏移量
请注意,只有在对 createDirectStream
的结果调用的第一个方法中执行类型转换为 HasOffsetRanges
才会成功,而不是在方法链的后面。请注意,在任何进行混洗或重新分区的方法(例如 reduceByKey() 或 window())之后,RDD 分区和 Kafka 分区之间的一对一映射将不再保留。
存储偏移量
在发生故障时,Kafka 的传递语义取决于偏移量的存储方式和时间。Spark 输出操作是至少一次。因此,如果您希望获得与恰好一次语义等效的结果,则必须在幂等输出后存储偏移量,或者在原子事务中与输出一起存储偏移量。通过此集成,您有 3 种选择,按可靠性(和代码复杂性)递增的顺序,用于存储偏移量。
检查点
如果您启用 Spark 检查点,则偏移量将存储在检查点中。这很容易启用,但也有一些缺点。您的输出操作必须是幂等的,因为您将获得重复的输出;事务不是一种选择。此外,如果您的应用程序代码已更改,则无法从检查点恢复。对于计划的升级,您可以通过同时运行新代码和旧代码来缓解这种情况(因为输出需要是幂等的,所以它们不应该发生冲突)。但是,对于需要更改代码的计划外故障,除非您有其他方法来识别已知的良好起始偏移量,否则您将丢失数据。
Kafka 本身
Kafka 有一个偏移量提交 API,用于将偏移量存储在特殊的 Kafka 主题中。默认情况下,新的消费者会定期自动提交偏移量。这几乎肯定不是您想要的,因为消费者成功轮询的消息可能尚未导致 Spark 输出操作,从而导致语义未定义。这就是为什么上面的流示例将“enable.auto.commit”设置为 false 的原因。但是,您可以在知道输出已存储后,使用 commitAsync
API 将偏移量提交到 Kafka。与检查点相比,这样做的好处是,无论您的应用程序代码如何更改,Kafka 都是一个持久存储。但是,Kafka 不是事务性的,因此您的输出必须仍然是幂等的。
与 HasOffsetRanges 一样,只有在对 createDirectStream 的结果调用时,对 CanCommitOffsets 的强制转换才会成功,而不是在转换之后。commitAsync 调用是线程安全的,但如果希望获得有意义的语义,则必须在输出之后发生。
您自己的数据存储
对于支持事务的数据存储,在与结果相同的 事务中保存偏移量可以使两者保持同步,即使在出现故障的情况下也是如此。如果您谨慎地检测重复或跳过的偏移量范围,回滚事务可以防止重复或丢失的消息影响结果。这提供了与恰好一次语义等效的结果。即使对于聚合产生的输出(通常难以使其幂等),也可以使用这种策略。
SSL / TLS
新的 Kafka 消费者 支持 SSL。要启用它,请在传递给 createDirectStream
/ createRDD
之前适当地设置 kafkaParams。请注意,这仅适用于 Spark 和 Kafka 代理之间的通信;您仍然负责单独保护 Spark 节点间通信的安全。
部署
与任何 Spark 应用程序一样,spark-submit
用于启动您的应用程序。
对于 Scala 和 Java 应用程序,如果您正在使用 SBT 或 Maven 进行项目管理,则将 spark-streaming-kafka-0-10_2.12
及其依赖项打包到应用程序 JAR 中。确保将 spark-core_2.12
和 spark-streaming_2.12
标记为 provided
依赖项,因为它们已经存在于 Spark 安装中。然后使用 spark-submit
启动您的应用程序(请参阅主编程指南中的部署部分)。
安全
请参阅 结构化流安全。
其他注意事项
- Kafka 原生接收器不可用,因此委托令牌仅在消费者端使用。