Spark Streaming + Kafka 集成指南 (Kafka broker 版本 0.10.0 或更高)

Kafka 0.10 的 Spark Streaming 集成提供了简单的并行性、Kafka 分区与 Spark 分区之间的 1:1 对应关系,以及对偏移量和元数据的访问。然而,由于新的集成使用了新的 Kafka 消费者 API 而不是简单的 API,因此在使用上存在显著差异。

链接

对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的流处理应用程序与以下 artifact 链接(更多信息请参阅主编程指南中的链接部分)。

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.13
version = 4.0.0

请勿手动添加对 org.apache.kafka artifact (例如 kafka-clients) 的依赖。 spark-streaming-kafka-0-10 artifact 已包含适当的传递依赖,不同版本之间可能会以难以诊断的方式不兼容。

创建直接流

请注意,导入的命名空间包含版本号,即 org.apache.spark.streaming.kafka010

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

流中的每个项都是一个 ConsumerRecord

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

有关可能的 kafkaParams,请参阅Kafka 消费者配置文档。如果您的 Spark 批处理持续时间大于默认的 Kafka 心跳会话超时(30 秒),请适当增加 heartbeat.interval.ms 和 session.timeout.ms。对于大于 5 分钟的批处理,这需要在 broker 上更改 group.max.session.timeout.ms。请注意,示例将 enable.auto.commit 设置为 false,有关讨论请参阅下面的存储偏移量

位置策略 (LocationStrategies)

新的 Kafka 消费者 API 会预取消息到缓冲区。因此,出于性能原因,Spark 集成在 executor 上保留缓存的消费者(而不是为每个批次重新创建它们),并且优先在具有适当消费者的主机位置调度分区,这一点很重要。

在大多数情况下,您应该如上所示使用 LocationStrategies.PreferConsistent。这会将分区均匀分布到可用的 executor 上。如果您的 executor 与 Kafka broker 在同一主机上,请使用 PreferBrokers,它会优先将分区调度到该分区的 Kafka leader 上。最后,如果您的分区负载存在明显倾斜,请使用 PreferFixed。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。

消费者缓存的默认最大大小为 64。如果您预计要处理的 Kafka 分区数量超过 (64 * executor 数量),您可以通过 spark.streaming.kafka.consumer.cache.maxCapacity 更改此设置。

如果您想禁用 Kafka 消费者的缓存,可以将 spark.streaming.kafka.consumer.cache.enabled 设置为 false

缓存以 topicpartition 和 group.id 作为键,因此每次调用 createDirectStream 时请使用单独的 group.id

消费者策略 (ConsumerStrategies)

新的 Kafka 消费者 API 有多种指定 topic 的方式,其中一些需要大量的对象实例化后设置。 ConsumerStrategies 提供了一个抽象,允许 Spark 即使在从检查点重新启动后也能获得正确配置的消费者。

如上所示,ConsumerStrategies.Subscribe 允许您订阅固定的 topic 集合。 SubscribePattern 允许您使用正则表达式来指定感兴趣的 topic。请注意,与 0.8 集成不同,使用 SubscribeSubscribePattern 应该能在流运行时响应分区的添加。最后,Assign 允许您指定固定的分区集合。所有这三种策略都有重载的构造函数,允许您为特定分区指定起始偏移量。

如果您有上述选项无法满足的特定消费者设置需求,ConsumerStrategy 是一个您可以扩展的公共类。

创建 RDD

如果您的用例更适合批处理,您可以为定义的偏移量范围创建一个 RDD。

// Import dependencies and create kafka params as in Create Direct Stream above

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
// Import dependencies and create kafka params as in Create Direct Stream above

OffsetRange[] offsetRanges = {
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange.create("test", 0, 0, 100),
  OffsetRange.create("test", 1, 0, 100)
};

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent()
);

请注意,您不能使用 PreferBrokers,因为没有流,就没有驱动端消费者为您自动查找 broker 元数据。如有必要,请使用 PreferFixed 并结合您自己的元数据查找。

获取偏移量

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  rdd.foreachPartition(consumerRecords -> {
    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
    System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
  });
});

请注意,类型转换为 HasOffsetRanges 仅在对 createDirectStream 结果调用的第一个方法中成功,而不是在方法链的后面。请注意,RDD 分区和 Kafka 分区之间的一对一映射在任何涉及 shuffle 或重新分区的操作(例如 reduceByKey() 或 window())之后将不再保留。

存储偏移量

失败时 Kafka 的传递语义取决于偏移量的存储方式和时间。Spark 输出操作是至少一次的。因此,如果您希望获得等同于精确一次的语义,您必须在幂等输出之后存储偏移量,或者在与输出原子事务中存储偏移量。通过此集成,您有 3 种存储偏移量的方法,按可靠性(和代码复杂度)递增的顺序排列。

检查点

如果您启用 Spark 检查点,偏移量将存储在检查点中。这很容易启用,但也有缺点。您的输出操作必须是幂等的,因为您会得到重复的输出;事务不是一个选项。此外,如果您的应用程序代码已更改,您将无法从检查点恢复。对于计划升级,您可以通过新代码与旧代码同时运行来缓解此问题(因为输出无论如何都需要是幂等的,它们不应该冲突)。但对于需要代码更改的非计划故障,除非您有其他方法来识别已知的良好起始偏移量,否则您将丢失数据。

Kafka 自身

Kafka 有一个偏移量提交 API,它将偏移量存储在特殊的 Kafka topic 中。默认情况下,新的消费者会定期自动提交偏移量。这几乎肯定不是您想要的,因为消费者成功拉取的消息可能尚未导致 Spark 输出操作,从而导致语义不明确。这就是上述流示例将 “enable.auto.commit” 设置为 false 的原因。但是,在您知道输出已存储后,您可以使用 commitAsync API 将偏移量提交到 Kafka。与检查点相比,其优点是无论您的应用程序代码如何更改,Kafka 都是一个持久存储。然而,Kafka 不支持事务,因此您的输出仍必须是幂等的。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

与 HasOffsetRanges 类似,转换为 CanCommitOffsets 仅在对 createDirectStream 的结果调用时成功,而不是在转换之后。 commitAsync 调用是线程安全的,但如果您想要有意义的语义,它必须在输出之后发生。

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

您自己的数据存储

对于支持事务的数据存储,在与结果相同的事务中保存偏移量可以使两者保持同步,即使在故障情况下也是如此。如果您仔细检测重复或跳过的偏移量范围,回滚事务可以防止重复或丢失的消息影响结果。这提供了等同于精确一次的语义。即使对于聚合产生的输出(通常难以幂等化),也可以使用这种策略。

// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}
// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
  fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  streamingContext,
  LocationStrategies.PreferConsistent(),
  ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  
  Object results = yourCalculation(rdd);

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
});

SSL / TLS

新的 Kafka 消费者支持 SSL。要启用它,请在传递给 createDirectStream / createRDD 之前适当地设置 kafkaParams。请注意,这仅适用于 Spark 和 Kafka broker 之间的通信;您仍需单独负责保护 Spark 节点间通信的安全。

val kafkaParams = Map[String, Object](
  // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
  "ssl.truststore.password" -> "test1234",
  "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
  "ssl.keystore.password" -> "test1234",
  "ssl.key.password" -> "test1234"
)
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

部署

与任何 Spark 应用程序一样,使用 spark-submit 启动您的应用程序。

对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,请将 spark-streaming-kafka-0-10_2.13 及其依赖项打包到应用程序 JAR 中。确保 spark-core_2.13spark-streaming_2.13 被标记为 provided 依赖项,因为它们已存在于 Spark 安装中。然后使用 spark-submit 启动您的应用程序(请参阅主编程指南中的部署部分)。

安全

请参阅结构化流安全

其他注意事项