结构化流 + Kafka 集成指南 (Kafka 代理版本 0.10.0 或更高)

Kafka 0.10 的结构化流集成,用于从 Kafka 读取数据和向 Kafka 写入数据。

链接

对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的应用程序链接到以下构件

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.13
version = 4.0.0

请注意,要使用标头功能,您的 Kafka 客户端版本应为 0.11.0.0 或更高版本。

对于 Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的部署小节。

要在spark-shell上进行实验,您在调用spark-shell时也需要添加上述库及其依赖项。此外,请参阅下面的部署小节。

从 Kafka 读取数据

为流式查询创建 Kafka 源

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to 1 topic, with headers
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

# Subscribe to multiple topics
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Array[(String, Array[Byte])])]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to 1 topic, with headers
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");

// Subscribe to multiple topics
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

为批处理查询创建 Kafka 源

如果您的用例更适合批处理,您可以为定义的偏移量范围创建 Dataset/DataFrame。

# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

源中的每一行具有以下 schema

类型
key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
headers (可选) array

对于 Kafka 源,批处理和流式查询都必须设置以下选项。

选项value含义
assign json string {"topicA":[0,1],"topicB":[2,4]} 要消费的特定 TopicPartitions。对于 Kafka 源,只能指定“assign”、“subscribe”或“subscribePattern”选项之一。
subscribe 逗号分隔的 topics 列表 要订阅的 topic 列表。对于 Kafka 源,只能指定“assign”、“subscribe”或“subscribePattern”选项之一。
subscribePattern Java 正则表达式字符串 用于订阅 topic(s) 的模式。对于 Kafka 源,只能指定“assign”、“subscribe”或“subscribePattern”选项之一。
kafka.bootstrap.servers 逗号分隔的 host:port 列表 Kafka “bootstrap.servers” 配置。

以下配置是可选的

选项value默认查询类型含义
startingTimestamp 时间戳字符串,例如 "1000" 无(下一个优先级是 startingOffsetsByTimestamp 流式和批处理 查询启动时的时间戳起始点,一个字符串,指定订阅主题中所有分区的起始时间戳。请参阅下面的时间戳 offset 选项的详细信息。如果 Kafka 没有返回匹配的 offset,则行为将遵循选项 startingOffsetsByTimestampStrategy 的值。

注1:startingTimestamp 优先于 startingOffsetsByTimestampstartingOffsets

注2:对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询中断处继续。查询期间新发现的分区将从最早处开始。

startingOffsetsByTimestamp json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ 无(下一个优先级是 startingOffsets 流式和批处理 查询启动时的时间戳起始点,一个 json 字符串,为每个 TopicPartition 指定一个起始时间戳。请参阅下面的时间戳 offset 选项的详细信息。如果 Kafka 没有返回匹配的 offset,则行为将遵循选项 startingOffsetsByTimestampStrategy 的值。

注1:startingOffsetsByTimestamp 优先于 startingOffsets

注2:对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询中断处继续。查询期间新发现的分区将从最早处开始。

startingOffsets "earliest"(最早),"latest"(最新,仅限流式),或 json 字符串 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ 流式为“latest”,批处理为“earliest” 流式和批处理 查询启动时的起始点,可以是“earliest”(从最早的 offset 开始),“latest”(从最新的 offset 开始),或者一个 json 字符串,为每个 TopicPartition 指定一个起始 offset。在 json 中,-2 可用于表示最早的 offset,-1 表示最新的 offset。注意:对于批处理查询,不允许使用 latest(无论是隐式还是通过在 json 中使用 -1)。对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询中断处继续。查询期间新发现的分区将从最早处开始。
endingTimestamp 时间戳字符串,例如 "1000" 无(下一个优先级是 endingOffsetsByTimestamp 批处理查询 批处理查询结束时的终点,一个 json 字符串,指定订阅主题中所有分区的结束时间戳。请参阅下面的时间戳 offset 选项的详细信息。如果 Kafka 没有返回匹配的 offset,则 offset 将设置为最新。

注:endingTimestamp 优先于 endingOffsetsByTimestampendingOffsets

endingOffsetsByTimestamp json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ 无(下一个优先级是 endingOffsets 批处理查询 批处理查询结束时的终点,一个 json 字符串,为每个 TopicPartition 指定一个结束时间戳。请参阅下面的时间戳 offset 选项的详细信息。如果 Kafka 没有返回匹配的 offset,则 offset 将设置为最新。

注:endingOffsetsByTimestamp 优先于 endingOffsets

endingOffsets latest 或 json 字符串 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} latest 批处理查询 批处理查询结束时的终点,可以是“latest”(指最新),或者一个 json 字符串,为每个 TopicPartition 指定一个结束 offset。在 json 中,-1 可用于表示最新的 offset,-2(最早的)作为 offset 是不允许的。
failOnDataLoss true 或 false true 流式和批处理 当数据可能丢失时(例如,topic 被删除,或 offset 超出范围)是否使查询失败。这可能是一个误报。当它不按预期工作时,您可以禁用它。
kafkaConsumer.pollTimeoutMs long 120000 流式和批处理 在 executor 中从 Kafka 拉取数据时的超时时间(毫秒)。如果未定义,则回退到 spark.network.timeout
fetchOffset.numRetries int 3 流式和批处理 在放弃获取 Kafka offsets 之前重试的次数。
fetchOffset.retryIntervalMs long 10 流式和批处理 重试获取 Kafka offsets 前等待的毫秒数
maxOffsetsPerTrigger long 流式查询 每个触发间隔处理的最大 offset 数量的速率限制。指定的总 offset 数量将按比例分配给不同数据量的 topicPartitions。
minOffsetsPerTrigger long 流式查询 每个触发间隔要处理的最小 offset 数量。指定的总 offset 数量将按比例分配给不同数据量的 topicPartitions。注意,如果超过 maxTriggerDelay,即使可用 offset 数量未达到 minOffsetsPerTrigger,也会触发一次触发。
maxTriggerDelay 带单位的时间 15m 流式查询 在源中有数据可用的情况下,两次触发之间触发器可以延迟的最大时间量。此选项仅在设置了 minOffsetsPerTrigger 时适用。
minPartitions int 流式和批处理 从 Kafka 读取所需的最少分区数。默认情况下,Spark 将 topicPartitions 与从 Kafka 消费的 Spark 分区进行 1 对 1 映射。如果您将此选项设置为大于您的 topicPartitions 的值,Spark 将把大型 Kafka 分区拆分为更小的块。请注意,此配置类似于一个 hint(提示):Spark 任务的数量将大约minPartitions。它可能会更少或更多,具体取决于舍入误差或未收到任何新数据的 Kafka 分区。
maxRecordsPerPartition long 流式和批处理 限制分区中存在的最大记录数。默认情况下,Spark 将 topicPartitions 与从 Kafka 消费的 Spark 分区进行 1 对 1 映射。如果您设置此选项,Spark 将把 Kafka 分区拆分为更小的块,以便每个分区最多包含 maxRecordsPerPartition 条记录。当 minPartitionsmaxRecordsPerPartition 都设置时,分区数将大约(recordsPerPartition / maxRecordsPerPartition)minPartitions 中的最大值。在这种情况下,Spark 将根据 maxRecordsPerPartition 拆分分区,如果最终分区计数小于 minPartitions,它将再次根据 minPartitions 拆分分区。
groupIdPrefix string spark-kafka-source 流式和批处理 结构化流查询生成的消费者组标识符 (group.id) 的前缀。如果设置了 "kafka.group.id",此选项将被忽略。
kafka.group.id string 流式和批处理 从 Kafka 读取时在 Kafka 消费者中使用的 Kafka 组 ID。请谨慎使用此选项。默认情况下,每个查询都会生成一个唯一的组 ID 用于读取数据。这确保了每个 Kafka 源都有自己的消费者组,不会受到任何其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些情况下(例如,基于 Kafka 组的授权),您可能希望使用特定的授权组 ID 来读取数据。您可以选择性地设置组 ID。但是,请务必谨慎操作,因为它可能导致意外行为。同时运行的查询(包括批处理和流式)或具有相同组 ID 的源可能会相互干扰,导致每个查询只能读取部分数据。当查询快速启动/重启时也可能发生这种情况。为了尽量减少此类问题,请将 Kafka 消费者会话超时(通过设置选项 "kafka.session.timeout.ms")设置得非常小。设置此选项后,将忽略选项 "groupIdPrefix"。
includeHeaders boolean false 流式和批处理 是否在行中包含 Kafka 标头。
startingOffsetsByTimestampStrategy "error" 或 "latest" "error" 流式和批处理 当按时间戳指定的起始 offset(无论是全局还是按分区)与 Kafka 返回的 offset 不匹配时,将使用此策略。以下是策略名称和相应的描述

“error”:使查询失败,最终用户必须处理需要手动步骤的变通方法。

“latest”:为这些分区分配最新 offset,以便 Spark 可以在后续的微批次中从这些分区读取更新的记录。

时间戳 offset 选项的详细信息

每个分区返回的 offset 是时间戳大于或等于相应分区中给定时间戳的最早 offset。如果 Kafka 未返回匹配的 offset,则不同选项的行为会有所不同——请查看每个选项的描述。

Spark 简单地将时间戳信息传递给 KafkaConsumer.offsetsForTimes,并且不解释或推断该值。有关 KafkaConsumer.offsetsForTimes 的更多详细信息,请参阅 javadoc。此外,此处 timestamp 的含义可能根据 Kafka 配置 (log.message.timestamp.type) 而有所不同:请参阅 Kafka 文档了解更多详细信息。

时间戳 offset 选项需要 Kafka 0.10.1.0 或更高版本。

获取 Offset

在 Spark 3.0 及更早版本中,Spark 使用 KafkaConsumer 获取 offset,这可能导致驱动程序无限等待。在 Spark 3.1 中,新增了一个配置选项 spark.sql.streaming.kafka.useDeprecatedOffsetFetching(默认:false),它允许 Spark 使用基于 AdminClient 的新 offset 获取机制。(将其设置为 true 以使用旧的 KafkaConsumer 获取 offset 机制。)

当使用新机制时,适用以下情况。

首先,新方法支持 Kafka broker 0.11.0.0+

在 Spark 3.0 及更早版本中,安全的 Kafka 处理需要从驱动程序角度提供以下 ACL

自 Spark 3.1 起,可以使用 AdminClient 而不是 KafkaConsumer 获取 offset,为此需要从驱动程序角度提供以下 ACL

由于驱动程序中的 AdminClient 不连接到消费者组,基于 group.id 的授权将不再起作用(executor 从未进行过基于组的授权)。值得一提的是,executor 端的行为与之前完全相同(组前缀和覆盖仍然有效)。

消费者缓存

初始化 Kafka 消费者非常耗时,特别是在处理时间是关键因素的流式场景中。因此,Spark 通过利用 Apache Commons Pool 在 executor 上池化 Kafka 消费者。

缓存键由以下信息构建

以下属性可用于配置消费者池

属性名默认值含义始于版本
spark.kafka.consumer.cache.capacity 64 缓存的最大消费者数量。请注意,这是一个软限制。 3.0.0
spark.kafka.consumer.cache.timeout 5m (5 分钟) 消费者在池中闲置的最短时间,超过此时间后才有资格被回收器驱逐。 3.0.0
spark.kafka.consumer.cache.evictorThreadRunInterval 1m (1 分钟) 消费者池的空闲回收线程运行之间的时间间隔。当为非正值时,将不会运行空闲回收线程。 3.0.0
spark.kafka.consumer.cache.jmx.enable false 为此配置实例创建的池启用或禁用 JMX。池的统计信息可通过 JMX 实例获得。JMX 名称的前缀设置为“kafka010-cached-simple-kafka-consumer-pool”。 3.0.0

池的大小受 spark.kafka.consumer.cache.capacity 限制,但它作为“软限制”工作,以避免阻塞 Spark 任务。

空闲回收线程会定期移除超过给定超时时间未使用的消费者。如果在借用时达到此阈值,它会尝试移除当前未使用的最少使用条目。

如果无法移除,则池将继续增长。在最坏的情况下,池将增长到 executor 中可以运行的最大并发任务数(即,任务槽数)。

如果任务因任何原因失败,出于安全原因,新任务将使用新创建的 Kafka 消费者执行。同时,我们使池中所有具有相同缓存键的消费者失效,以移除在失败执行中使用的消费者。任何其他任务正在使用的消费者不会被关闭,但当它们返回池中时也将失效。

除了消费者之外,Spark 还单独池化从 Kafka 获取的记录,以便从 Spark 的角度来看 Kafka 消费者是无状态的,并最大限度地提高池化效率。它利用与 Kafka 消费者池相同的缓存键。请注意,由于特性差异,它不利用 Apache Commons Pool。

以下属性可用于配置获取的数据池

属性名默认值含义始于版本
spark.kafka.consumer.fetchedData.cache.timeout 5m (5 分钟) 获取的数据在池中闲置的最短时间,超过此时间后才有资格被回收器驱逐。 3.0.0
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval 1m (1 分钟) 获取的数据池的空闲回收线程运行之间的时间间隔。当为非正值时,将不会运行空闲回收线程。 3.0.0

将数据写入 Kafka

在此,我们描述了将流式查询和批处理查询写入 Apache Kafka 的支持。请注意,Apache Kafka 仅支持至少一次写入语义。因此,当将流式查询或批处理查询写入 Kafka 时,某些记录可能会重复;例如,如果 Kafka 需要重试一个未被 Broker 确认的消息,即使该 Broker 已经接收并写入了该消息记录,也可能发生这种情况。由于 Kafka 的这些写入语义,结构化流无法阻止此类重复的发生。但是,如果查询写入成功,则可以假定查询输出至少被写入了一次。一种可能的解决方案是在读取写入的数据时消除重复项,即引入一个主(唯一)键,该键可用于在读取时执行去重操作。

要写入 Kafka 的 DataFrame 在 schema 中应具有以下列

类型
key (可选) string 或 binary
value (必需) string 或 binary
headers (可选) array
topic (*可选) string
partition (可选) int

* 如果未指定“topic”配置选项,则 topic 列是必需的。

value 列是唯一必需的选项。如果未指定 key 列,则会自动添加一个值为 null 的 key 列(有关如何处理值为 null 的 key 值的 Kafka 语义,请参阅相关文档)。如果存在 topic 列,则在将给定行写入 Kafka 时,其值将用作 topic,除非设置了“topic”配置选项,即“topic”配置选项会覆盖 topic 列。如果未指定“partition”列(或其值为 null),则分区将由 Kafka 生产者计算。可以通过设置 kafka.partitioner.class 选项在 Spark 中指定 Kafka 分区器。如果未指定,将使用 Kafka 默认分区器。

对于 Kafka sink,批处理和流式查询都必须设置以下选项。

选项value含义
kafka.bootstrap.servers 逗号分隔的 host:port 列表 Kafka “bootstrap.servers” 配置。

以下配置是可选的

选项value默认查询类型含义
topic string 流式和批处理 设置所有行将被写入 Kafka 的 topic。此选项会覆盖数据中可能存在的任何 topic 列。
includeHeaders boolean false 流式和批处理 是否在行中包含 Kafka 标头。

为流式查询创建 Kafka Sink

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start();

将批处理查询的输出写入 Kafka

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save();

生产者缓存

鉴于 Kafka 生产者实例被设计为线程安全的,Spark 初始化一个 Kafka 生产者实例并在任务之间共享使用相同的缓存键。

缓存键由以下信息构建

这包括授权配置,当使用委托令牌时,Spark 会自动包含该配置。即使我们考虑授权,您也可以预期在相同的 Kafka 生产者配置中将使用相同的 Kafka 生产者实例。当委托令牌续订时,它将使用不同的 Kafka 生产者;旧委托令牌的 Kafka 生产者实例将根据缓存策略被逐出。

以下属性可用于配置生产者池

属性名默认值含义始于版本
spark.kafka.producer.cache.timeout 10m (10 分钟) 生产者在池中闲置的最短时间,超过此时间后才有资格被回收器驱逐。 2.2.1
spark.kafka.producer.cache.evictorThreadRunInterval 1m (1 分钟) 生产者池的空闲回收线程运行之间的时间间隔。当为非正值时,将不会运行空闲回收线程。 3.0.0

空闲回收线程会定期移除超过给定超时时间未使用的生产者。请注意,生产者是共享并并发使用的,因此最后使用的时间戳由生产者实例返回且引用计数为 0 的那一刻决定。

Kafka 特有配置

Kafka 自己的配置可以通过 DataStreamReader.optionkafka. 前缀来设置,例如 stream.option("kafka.bootstrap.servers", "host:port")。有关可能的 Kafka 参数,请参阅 Kafka 消费者配置文档(与读取数据相关的参数)和 Kafka 生产者配置文档(与写入数据相关的参数)。

请注意,不能设置以下 Kafka 参数,否则 Kafka 源或 sink 将抛出异常

部署

与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。spark-sql-kafka-0-10_2.13 及其依赖项可以直接使用 --packages 添加到 spark-submit,例如:

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0 ...

要在 spark-shell 上进行实验,您也可以直接使用 --packages 添加 spark-sql-kafka-0-10_2.13 及其依赖项,

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0 ...

有关提交具有外部依赖项的应用程序的更多详细信息,请参阅应用程序提交指南

安全性

Kafka 0.9.0.0 引入了多项功能,提高了集群的安全性。有关这些可能性的详细描述,请参阅 Kafka 安全文档

值得注意的是,安全性是可选的,并且默认情况下是关闭的。

Spark 支持以下几种针对 Kafka 集群进行身份验证的方式

委托令牌

通过这种方式,应用程序可以通过 Spark 参数进行配置,并且可能不需要 JAAS 登录配置(Spark 可以使用 Kafka 的动态 JAAS 配置功能)。有关委托令牌的更多信息,请参阅 Kafka 委托令牌文档

该过程由 Spark 的 Kafka 委托令牌提供者启动。当设置了 spark.kafka.clusters.${cluster}.auth.bootstrap.servers 时,Spark 会按以下优先顺序考虑登录选项:

可以通过将 spark.security.credentials.kafka.enabled 设置为 false 来关闭 Kafka 委托令牌提供程序(默认:true)。

Spark 可以配置为使用以下身份验证协议来获取令牌(它必须与 Kafka broker 配置匹配)

成功获取委托令牌后,Spark 会将其分发到各个节点并相应地续订。委托令牌使用 SCRAM 登录模块进行身份验证,因此必须配置相应的 spark.kafka.clusters.${cluster}.sasl.token.mechanism(默认:SCRAM-SHA-512)。此外,此参数必须与 Kafka broker 配置匹配。

当 executor 上有委托令牌可用时,Spark 会按以下优先顺序考虑登录选项

如果以上都不适用,则假定为不安全连接。

配置

可以从多个集群获取委托令牌,${cluster} 是一个任意的唯一标识符,用于帮助分组不同的配置。

属性名默认值含义始于版本
spark.kafka.clusters.${cluster}.auth.bootstrap.servers 用于建立与 Kafka 集群初始连接的逗号分隔的 host/port 对列表。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex .* 用于匹配应用程序中源和 sink 的 bootstrap.servers 配置的正则表达式。如果服务器地址与此正则表达式匹配,则连接时将使用从相应 bootstrap 服务器获取的委托令牌。如果多个集群匹配该地址,将抛出异常,并且查询不会启动。Kafka 的安全和不安全监听器绑定到不同的端口。当两者都使用时,安全监听器端口必须是正则表达式的一部分。 3.0.0
spark.kafka.clusters.${cluster}.security.protocol SASL_SSL 用于与 broker 通信的协议。更多详细信息请参阅 Kafka 文档。协议默认应用于所有 bootstrap.servers 配置匹配的源和 sink(更多详细信息请参阅 spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex),并且可以通过在源或 sink 上设置 kafka.security.protocol 来覆盖。 3.0.0
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name kafka Kafka 运行时的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.type 信任存储文件的文件格式。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 3.2.0
spark.kafka.clusters.${cluster}.ssl.truststore.location 信任存储文件的位置。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.password 信任存储文件的存储密码。此项可选,仅当配置了 spark.kafka.clusters.${cluster}.ssl.truststore.location 时才需要。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.type 密钥库文件的文件格式。对于客户端,此项可选。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 3.2.0
spark.kafka.clusters.${cluster}.ssl.keystore.location 密钥库文件的位置。对于客户端,此项可选,可用于客户端的双向认证。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.password 密钥库文件的存储密码。此项可选,仅当配置了 spark.kafka.clusters.${cluster}.ssl.keystore.location 时才需要。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.key.password 密钥库文件中私钥的密码。对于客户端,此项可选。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.sasl.token.mechanism SCRAM-SHA-512 用于使用委托令牌的客户端连接的 SASL 机制。由于 SCRAM 登录模块用于身份验证,因此此处必须设置兼容的机制。更多详细信息请参阅 Kafka 文档 (sasl.mechanism)。仅用于使用委托令牌对 Kafka broker 进行身份验证。 3.0.0

Kafka 特有配置

Kafka 自己的配置可以使用 kafka. 前缀设置,例如 --conf spark.kafka.clusters.${cluster}.kafka.retries=1。有关可能的 Kafka 参数,请参阅 Kafka adminclient 配置文档

注意事项

JAAS 登录配置

JAAS 登录配置必须放置在 Spark 尝试访问 Kafka 集群的所有节点上。这提供了应用任何自定义身份验证逻辑的可能性,但维护成本更高。这可以通过多种方式完成。一种可能性是提供额外的 JVM 参数,例如:

./bin/spark-submit \
    --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
    --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
    ...