Spark Streaming + Kafka 集成指南 (Kafka broker 版本 0.10.0 或更高版本)
适用于 Kafka 0.10 的 Spark Streaming 集成提供了简单的并行性、Kafka 分区和 Spark 分区之间的 1:1 对应关系,以及对偏移量和元数据的访问。但是,由于较新的集成使用新的 Kafka consumer API而不是简单的 API,因此在使用上存在显着差异。
链接
对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请使用以下工件链接您的流式应用程序(有关更多信息,请参阅主编程指南中的链接部分)。
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 3.5.5
不要手动添加对 org.apache.kafka
工件(例如 kafka-clients
)的依赖项。 spark-streaming-kafka-0-10
工件已经具有适当的传递依赖项,并且不同的版本可能以难以诊断的方式不兼容。
创建 Direct Stream
请注意,导入的命名空间包含版本,org.apache.spark.streaming.kafka010
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
流中的每个项目都是一个 ConsumerRecord
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
有关可能的 kafkaParams,请参阅Kafka consumer config docs。如果您的 Spark 批处理持续时间大于默认的 Kafka 心跳会话超时时间(30 秒),请适当增加 heartbeat.interval.ms 和 session.timeout.ms。对于大于 5 分钟的批处理,这将需要在 broker 上更改 group.max.session.timeout.ms。请注意,该示例将 enable.auto.commit 设置为 false,有关讨论,请参见下面的存储偏移量。
LocationStrategies
新的 Kafka consumer API 会将消息预取到缓冲区中。 因此,出于性能原因,Spark 集成将缓存的 consumer 保留在 executors 上(而不是为每个批次重新创建它们),并优先在具有相应 consumer 的主机位置上调度分区非常重要。
在大多数情况下,您应该使用如上所示的 LocationStrategies.PreferConsistent
。 这会将分区均匀地分布在可用的 executors 上。 如果您的 executors 与 Kafka broker 位于同一主机上,请使用 PreferBrokers
,它将优先在 Kafka leader 上调度该分区的分区。 最后,如果您的分区中的负载存在显着偏差,请使用 PreferFixed
。 这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。
consumer 的缓存的默认最大大小为 64。如果您希望处理超过 (64 * executors 的数量) 个 Kafka 分区,则可以通过 spark.streaming.kafka.consumer.cache.maxCapacity
更改此设置。
如果您想禁用 Kafka consumer 的缓存,可以将 spark.streaming.kafka.consumer.cache.enabled
设置为 false
。
缓存由 topicpartition 和 group.id 键控,因此对于每次调用 createDirectStream
,请使用单独的 group.id
。
ConsumerStrategies
新的 Kafka consumer API 有多种指定 topic 的方式,其中一些需要大量的对象实例化后设置。ConsumerStrategies
提供了一种抽象,允许 Spark 即使从检查点重新启动后也能获得正确配置的 consumer。
如上所示,ConsumerStrategies.Subscribe
允许您订阅固定的 topic 集合。SubscribePattern
允许您使用正则表达式来指定感兴趣的 topic。请注意,与 0.8 集成不同,使用 Subscribe
或 SubscribePattern
应该响应在运行流期间添加分区。最后,Assign
允许您指定固定的分区集合。所有三种策略都有重载的构造函数,允许您指定特定分区的起始偏移量。
如果上述选项无法满足您的特定 consumer 设置需求,则 ConsumerStrategy
是您可以扩展的公共类。
创建 RDD
如果您的用例更适合批处理,则可以为定义的偏移量范围创建一个 RDD。
// Import dependencies and create kafka params as in Create Direct Stream above
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
请注意,您不能使用 PreferBrokers
,因为如果没有流,则没有驱动程序端 consumer 自动为您查找 broker 元数据。如有必要,将 PreferFixed
与您自己的元数据查找一起使用。
获取偏移量
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
});
});
请注意,只有在对 createDirectStream
的结果调用的第一个方法中完成到 HasOffsetRanges
的类型转换,而不是在方法链的后面,才能成功。请注意,在任何 shuffle 或重新分区的操作(例如 reduceByKey() 或 window())之后,RDD 分区和 Kafka 分区之间的一对一映射不再保留。
存储偏移量
发生故障时 Kafka 传递语义取决于偏移量的存储方式和时间。Spark 输出操作是至少一次。因此,如果您想要等效的恰好一次语义,则必须在幂等输出之后存储偏移量,或者在与输出一起的原子事务中存储偏移量。通过此集成,您可以选择 3 个选项,按可靠性(和代码复杂性)的顺序排列,以存储偏移量。
检查点
如果启用 Spark 检查点,则偏移量将存储在检查点中。 这很容易启用,但存在缺点。 您的输出操作必须是幂等的,因为您将获得重复的输出;事务不是一个选项。 此外,如果您的应用程序代码已更改,则无法从检查点恢复。 对于计划内的升级,您可以通过同时运行新代码和旧代码来缓解这种情况(因为输出无论如何都需要是幂等的,所以它们不应该冲突)。 但是对于需要代码更改的计划外故障,除非您有另一种方法来识别已知的良好起始偏移量,否则您将丢失数据。
Kafka 本身
Kafka 具有一个偏移量提交 API,该 API 将偏移量存储在一个特殊的 Kafka topic 中。 默认情况下,新的 consumer 将定期自动提交偏移量。 这几乎肯定不是你想要的,因为 consumer 成功轮询的消息可能尚未导致 Spark 输出操作,从而导致未定义的语义。 这就是为什么上面的流示例将“enable.auto.commit”设置为 false。 但是,您可以在知道您的输出已存储后,使用 commitAsync
API 将偏移量提交到 Kafka。 与检查点相比,好处是 Kafka 是一个持久存储,无论应用程序代码如何更改。 但是,Kafka 不是事务性的,因此您的输出仍然必须是幂等的。
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
与 HasOffsetRanges 一样,只有在对 createDirectStream 的结果调用(而不是在转换之后)才能成功转换为 CanCommitOffsets。 如果您想要有意义的语义,commitAsync 调用是线程安全的,但必须发生在输出之后。
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
您自己的数据存储
对于支持事务的数据存储,将偏移量与结果保存在同一事务中可以使两者保持同步,即使在出现故障的情况下也是如此。 如果您小心检测重复或跳过的偏移量范围,回滚事务可防止重复或丢失的消息影响结果。 这给出了相当于恰好一次语义。 即使对于通常难以实现幂等的聚合产生的输出,也可以使用此策略。
// The details depend on your data store, but the general idea looks like this
// begin from the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
// The details depend on your data store, but the general idea looks like this
// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
});
SSL / TLS
新的 Kafka consumer 支持 SSL。 要启用它,请在传递给 createDirectStream
/ createRDD
之前,适当地设置 kafkaParams。 请注意,这仅适用于 Spark 和 Kafka broker 之间的通信; 您仍然需要单独地保护Spark 节点间通信。
val kafkaParams = Map[String, Object](
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
"security.protocol" -> "SSL",
"ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
"ssl.truststore.password" -> "test1234",
"ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
"ssl.keystore.password" -> "test1234",
"ssl.key.password" -> "test1234"
)
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");
部署
与任何 Spark 应用程序一样,使用 spark-submit
来启动您的应用程序。
对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,则将 spark-streaming-kafka-0-10_2.12
及其依赖项打包到应用程序 JAR 中。 确保 spark-core_2.12
和 spark-streaming_2.12
被标记为 provided
依赖项,因为这些依赖项已经存在于 Spark 安装中。 然后使用 spark-submit
来启动您的应用程序(请参阅主编程指南中的部署部分)。
安全
请参阅结构化流处理安全。
其他注意事项
- Kafka 本地 sink 不可用,因此委托 token 仅在 consumer 端使用。