Spark Streaming + Kafka 集成指南 (Kafka broker 版本 0.10.0 或更高)
Kafka 0.10 的 Spark Streaming 集成提供了简单的并行性、Kafka 分区与 Spark 分区之间的 1:1 对应关系,以及对偏移量和元数据的访问。然而,由于新的集成使用了新的 Kafka 消费者 API 而不是简单的 API,因此在使用上存在显著差异。
链接
对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的流处理应用程序与以下 artifact 链接(更多信息请参阅主编程指南中的链接部分)。
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.13
version = 4.0.0
请勿手动添加对 org.apache.kafka
artifact (例如 kafka-clients
) 的依赖。 spark-streaming-kafka-0-10
artifact 已包含适当的传递依赖,不同版本之间可能会以难以诊断的方式不兼容。
创建直接流
请注意,导入的命名空间包含版本号,即 org.apache.spark.streaming.kafka010
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
流中的每个项都是一个 ConsumerRecord
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
有关可能的 kafkaParams,请参阅Kafka 消费者配置文档。如果您的 Spark 批处理持续时间大于默认的 Kafka 心跳会话超时(30 秒),请适当增加 heartbeat.interval.ms 和 session.timeout.ms。对于大于 5 分钟的批处理,这需要在 broker 上更改 group.max.session.timeout.ms。请注意,示例将 enable.auto.commit 设置为 false,有关讨论请参阅下面的存储偏移量。
位置策略 (LocationStrategies)
新的 Kafka 消费者 API 会预取消息到缓冲区。因此,出于性能原因,Spark 集成在 executor 上保留缓存的消费者(而不是为每个批次重新创建它们),并且优先在具有适当消费者的主机位置调度分区,这一点很重要。
在大多数情况下,您应该如上所示使用 LocationStrategies.PreferConsistent
。这会将分区均匀分布到可用的 executor 上。如果您的 executor 与 Kafka broker 在同一主机上,请使用 PreferBrokers
,它会优先将分区调度到该分区的 Kafka leader 上。最后,如果您的分区负载存在明显倾斜,请使用 PreferFixed
。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。
消费者缓存的默认最大大小为 64。如果您预计要处理的 Kafka 分区数量超过 (64 * executor 数量),您可以通过 spark.streaming.kafka.consumer.cache.maxCapacity
更改此设置。
如果您想禁用 Kafka 消费者的缓存,可以将 spark.streaming.kafka.consumer.cache.enabled
设置为 false
。
缓存以 topicpartition 和 group.id 作为键,因此每次调用 createDirectStream
时请使用单独的 group.id
。
消费者策略 (ConsumerStrategies)
新的 Kafka 消费者 API 有多种指定 topic 的方式,其中一些需要大量的对象实例化后设置。 ConsumerStrategies
提供了一个抽象,允许 Spark 即使在从检查点重新启动后也能获得正确配置的消费者。
如上所示,ConsumerStrategies.Subscribe
允许您订阅固定的 topic 集合。 SubscribePattern
允许您使用正则表达式来指定感兴趣的 topic。请注意,与 0.8 集成不同,使用 Subscribe
或 SubscribePattern
应该能在流运行时响应分区的添加。最后,Assign
允许您指定固定的分区集合。所有这三种策略都有重载的构造函数,允许您为特定分区指定起始偏移量。
如果您有上述选项无法满足的特定消费者设置需求,ConsumerStrategy
是一个您可以扩展的公共类。
创建 RDD
如果您的用例更适合批处理,您可以为定义的偏移量范围创建一个 RDD。
// Import dependencies and create kafka params as in Create Direct Stream above
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
请注意,您不能使用 PreferBrokers
,因为没有流,就没有驱动端消费者为您自动查找 broker 元数据。如有必要,请使用 PreferFixed
并结合您自己的元数据查找。
获取偏移量
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
});
});
请注意,类型转换为 HasOffsetRanges
仅在对 createDirectStream
结果调用的第一个方法中成功,而不是在方法链的后面。请注意,RDD 分区和 Kafka 分区之间的一对一映射在任何涉及 shuffle 或重新分区的操作(例如 reduceByKey() 或 window())之后将不再保留。
存储偏移量
失败时 Kafka 的传递语义取决于偏移量的存储方式和时间。Spark 输出操作是至少一次的。因此,如果您希望获得等同于精确一次的语义,您必须在幂等输出之后存储偏移量,或者在与输出原子事务中存储偏移量。通过此集成,您有 3 种存储偏移量的方法,按可靠性(和代码复杂度)递增的顺序排列。
检查点
如果您启用 Spark 检查点,偏移量将存储在检查点中。这很容易启用,但也有缺点。您的输出操作必须是幂等的,因为您会得到重复的输出;事务不是一个选项。此外,如果您的应用程序代码已更改,您将无法从检查点恢复。对于计划升级,您可以通过新代码与旧代码同时运行来缓解此问题(因为输出无论如何都需要是幂等的,它们不应该冲突)。但对于需要代码更改的非计划故障,除非您有其他方法来识别已知的良好起始偏移量,否则您将丢失数据。
Kafka 自身
Kafka 有一个偏移量提交 API,它将偏移量存储在特殊的 Kafka topic 中。默认情况下,新的消费者会定期自动提交偏移量。这几乎肯定不是您想要的,因为消费者成功拉取的消息可能尚未导致 Spark 输出操作,从而导致语义不明确。这就是上述流示例将 “enable.auto.commit” 设置为 false 的原因。但是,在您知道输出已存储后,您可以使用 commitAsync
API 将偏移量提交到 Kafka。与检查点相比,其优点是无论您的应用程序代码如何更改,Kafka 都是一个持久存储。然而,Kafka 不支持事务,因此您的输出仍必须是幂等的。
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
与 HasOffsetRanges 类似,转换为 CanCommitOffsets 仅在对 createDirectStream 的结果调用时成功,而不是在转换之后。 commitAsync 调用是线程安全的,但如果您想要有意义的语义,它必须在输出之后发生。
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
您自己的数据存储
对于支持事务的数据存储,在与结果相同的事务中保存偏移量可以使两者保持同步,即使在故障情况下也是如此。如果您仔细检测重复或跳过的偏移量范围,回滚事务可以防止重复或丢失的消息影响结果。这提供了等同于精确一次的语义。即使对于聚合产生的输出(通常难以幂等化),也可以使用这种策略。
// The details depend on your data store, but the general idea looks like this
// begin from the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
// The details depend on your data store, but the general idea looks like this
// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
});
SSL / TLS
新的 Kafka 消费者支持 SSL。要启用它,请在传递给 createDirectStream
/ createRDD
之前适当地设置 kafkaParams。请注意,这仅适用于 Spark 和 Kafka broker 之间的通信;您仍需单独负责保护 Spark 节点间通信的安全。
val kafkaParams = Map[String, Object](
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
"security.protocol" -> "SSL",
"ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
"ssl.truststore.password" -> "test1234",
"ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
"ssl.keystore.password" -> "test1234",
"ssl.key.password" -> "test1234"
)
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");
部署
与任何 Spark 应用程序一样,使用 spark-submit
启动您的应用程序。
对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,请将 spark-streaming-kafka-0-10_2.13
及其依赖项打包到应用程序 JAR 中。确保 spark-core_2.13
和 spark-streaming_2.13
被标记为 provided
依赖项,因为它们已存在于 Spark 安装中。然后使用 spark-submit
启动您的应用程序(请参阅主编程指南中的部署部分)。
安全
请参阅结构化流安全。
其他注意事项
- Kafka 原生 sink 不可用,因此委托令牌仅在消费者端使用。