Spark Streaming 与 Kafka 集成指南(Kafka broker 版本 0.10.0 或更高版本)

适用于 Kafka 0.10 的 Spark Streaming 集成提供了简单的并行性、Kafka 分区和 Spark 分区之间的 1:1 对应关系,以及对偏移量和元数据的访问。但是,由于较新的集成使用 新的 Kafka 消费者 API 而不是简单的 API,因此在使用方面存在显著差异。

链接

对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的流应用程序与以下工件链接(有关更多信息,请参阅主编程指南中的链接部分)。

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 3.5.1

不要手动添加对 org.apache.kafka 工件(例如 kafka-clients)的依赖项。spark-streaming-kafka-0-10 工件已经具有适当的传递依赖项,并且不同版本可能会以难以诊断的方式不兼容。

创建直接流

请注意,导入的命名空间包含版本号,即 org.apache.spark.streaming.kafka010

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

流中的每个项目都是一个 ConsumerRecord

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

有关可能的 kafkaParams,请参阅 Kafka 消费者配置文档。如果您的 Spark 批处理持续时间大于默认的 Kafka 心跳会话超时(30 秒),请相应增加 heartbeat.interval.ms 和 session.timeout.ms。对于大于 5 分钟的批处理,这将需要更改代理上的 group.max.session.timeout.ms。请注意,该示例将 enable.auto.commit 设置为 false,有关讨论,请参阅下面的 存储偏移量

位置策略

新的 Kafka 消费者 API 会将消息预取到缓冲区中。因此,出于性能原因,Spark 集成将缓存的消费者保留在执行器上(而不是为每个批处理重新创建它们),并倾向于在具有相应消费者的主机位置上调度分区,这一点非常重要。

在大多数情况下,您应该使用如上所示的 LocationStrategies.PreferConsistent。这将在可用执行器之间均匀分配分区。如果您的执行器与 Kafka 代理位于同一主机上,请使用 PreferBrokers,它将优先在该分区的 Kafka leader 上调度分区。最后,如果您的分区负载存在显著偏差,请使用 PreferFixed。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。

消费者缓存的默认最大大小为 64。如果您预计要处理超过(64 * 执行器数量)个 Kafka 分区,则可以通过 spark.streaming.kafka.consumer.cache.maxCapacity 更改此设置。

如果要禁用 Kafka 消费者的缓存,可以将 spark.streaming.kafka.consumer.cache.enabled 设置为 false

缓存是通过 topicpartition 和 group.id 进行键控的,因此每次调用 createDirectStream 时都要使用单独的 group.id

消费者策略

新的 Kafka 消费者 API 有多种不同的方式来指定主题,其中一些方式需要在对象实例化后进行大量设置。ConsumerStrategies 提供了一种抽象,允许 Spark 即使在从检查点重新启动后也能获得正确配置的消费者。

如上所示,ConsumerStrategies.Subscribe 允许您订阅固定的主题集合。SubscribePattern 允许您使用正则表达式来指定感兴趣的主题。请注意,与 0.8 集成不同,使用 SubscribeSubscribePattern 应该响应在运行流期间添加分区。最后,Assign 允许您指定固定的分区集合。所有三种策略都有重载的构造函数,允许您指定特定分区的起始偏移量。

如果您有上述选项无法满足的特定消费者设置需求,ConsumerStrategy 是一个您可以扩展的公共类。

创建 RDD

如果您的用例更适合批处理,则可以为定义的偏移量范围创建一个 RDD。

// Import dependencies and create kafka params as in Create Direct Stream above

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
// Import dependencies and create kafka params as in Create Direct Stream above

OffsetRange[] offsetRanges = {
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange.create("test", 0, 0, 100),
  OffsetRange.create("test", 1, 0, 100)
};

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent()
);

请注意,您不能使用 PreferBrokers,因为如果没有流,则驱动程序端没有消费者可以自动为您查找代理元数据。如果需要,请使用 PreferFixed 和您自己的元数据查找。

获取偏移量

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  rdd.foreachPartition(consumerRecords -> {
    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
    System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
  });
});

请注意,只有在对 createDirectStream 的结果调用的第一个方法中执行类型转换为 HasOffsetRanges 才会成功,而不是在方法链的后面。请注意,在任何进行混洗或重新分区的方法(例如 reduceByKey() 或 window())之后,RDD 分区和 Kafka 分区之间的一对一映射将不再保留。

存储偏移量

在发生故障时,Kafka 的传递语义取决于偏移量的存储方式和时间。Spark 输出操作是至少一次。因此,如果您希望获得与恰好一次语义等效的结果,则必须在幂等输出后存储偏移量,或者在原子事务中与输出一起存储偏移量。通过此集成,您有 3 种选择,按可靠性(和代码复杂性)递增的顺序,用于存储偏移量。

检查点

如果您启用 Spark 检查点,则偏移量将存储在检查点中。这很容易启用,但也有一些缺点。您的输出操作必须是幂等的,因为您将获得重复的输出;事务不是一种选择。此外,如果您的应用程序代码已更改,则无法从检查点恢复。对于计划的升级,您可以通过同时运行新代码和旧代码来缓解这种情况(因为输出需要是幂等的,所以它们不应该发生冲突)。但是,对于需要更改代码的计划外故障,除非您有其他方法来识别已知的良好起始偏移量,否则您将丢失数据。

Kafka 本身

Kafka 有一个偏移量提交 API,用于将偏移量存储在特殊的 Kafka 主题中。默认情况下,新的消费者会定期自动提交偏移量。这几乎肯定不是您想要的,因为消费者成功轮询的消息可能尚未导致 Spark 输出操作,从而导致语义未定义。这就是为什么上面的流示例将“enable.auto.commit”设置为 false 的原因。但是,您可以在知道输出已存储后,使用 commitAsync API 将偏移量提交到 Kafka。与检查点相比,这样做的好处是,无论您的应用程序代码如何更改,Kafka 都是一个持久存储。但是,Kafka 不是事务性的,因此您的输出必须仍然是幂等的。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

与 HasOffsetRanges 一样,只有在对 createDirectStream 的结果调用时,对 CanCommitOffsets 的强制转换才会成功,而不是在转换之后。commitAsync 调用是线程安全的,但如果希望获得有意义的语义,则必须在输出之后发生。

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

您自己的数据存储

对于支持事务的数据存储,在与结果相同的 事务中保存偏移量可以使两者保持同步,即使在出现故障的情况下也是如此。如果您谨慎地检测重复或跳过的偏移量范围,回滚事务可以防止重复或丢失的消息影响结果。这提供了与恰好一次语义等效的结果。即使对于聚合产生的输出(通常难以使其幂等),也可以使用这种策略。

// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}
// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
  fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  streamingContext,
  LocationStrategies.PreferConsistent(),
  ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  
  Object results = yourCalculation(rdd);

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
});

SSL / TLS

新的 Kafka 消费者 支持 SSL。要启用它,请在传递给 createDirectStream / createRDD 之前适当地设置 kafkaParams。请注意,这仅适用于 Spark 和 Kafka 代理之间的通信;您仍然负责单独保护 Spark 节点间通信的安全。

val kafkaParams = Map[String, Object](
  // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
  "ssl.truststore.password" -> "test1234",
  "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
  "ssl.keystore.password" -> "test1234",
  "ssl.key.password" -> "test1234"
)
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

部署

与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。

对于 Scala 和 Java 应用程序,如果您正在使用 SBT 或 Maven 进行项目管理,则将 spark-streaming-kafka-0-10_2.12 及其依赖项打包到应用程序 JAR 中。确保将 spark-core_2.12spark-streaming_2.12 标记为 provided 依赖项,因为它们已经存在于 Spark 安装中。然后使用 spark-submit 启动您的应用程序(请参阅主编程指南中的部署部分)。

安全

请参阅 结构化流安全

其他注意事项