结构化流式处理 + Kafka 集成指南(Kafka broker 版本 0.10.0 或更高版本)

用于 Kafka 0.10 的结构化流式处理集成,用于从 Kafka 读取数据和将数据写入 Kafka。

链接

对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请使用以下工件链接您的应用程序

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.5.1

请注意,要使用标头功能,您的 Kafka 客户端版本应为 0.11.0.0 或更高版本。

对于 Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的部署小节。

为了在 spark-shell 上进行实验,您还需要在调用 spark-shell 时添加上述库及其依赖项。另请参阅下面的部署小节。

从 Kafka 读取数据

为流式查询创建 Kafka 源

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to 1 topic, with headers
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

# Subscribe to multiple topics
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Array[(String, Array[Byte])])]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to 1 topic, with headers
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");

// Subscribe to multiple topics
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

为批处理查询创建 Kafka 源

如果您的用例更适合批处理,则可以为定义的偏移量范围创建数据集/数据帧。

# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

源中的每一行都具有以下架构

类型
二进制
二进制
主题 字符串
分区 整数
偏移量 长整型
时间戳 时间戳
时间戳类型 整数
标头(可选) 数组

必须为批处理和流式查询的 Kafka 源设置以下选项。

选项含义
分配 json 字符串 {"topicA":[0,1],"topicB":[2,4]} 要使用的特定主题分区。对于 Kafka 源,只能指定“assign”、“subscribe”或“subscribePattern”选项之一。
订阅 以逗号分隔的主题列表 要订阅的主题列表。对于 Kafka 源,只能指定“assign”、“subscribe”或“subscribePattern”选项之一。
订阅模式 Java 正则表达式字符串 用于订阅主题的模式。对于 Kafka 源,只能指定“assign”、“subscribe”或“subscribePattern”选项之一。
kafka.bootstrap.servers 以逗号分隔的 host:port 列表 Kafka“bootstrap.servers”配置。

以下配置是可选的

选项默认查询类型含义
startingTimestamp 时间戳字符串,例如“1000” 无(下一个首选项是 startingOffsetsByTimestamp 流式和批处理 查询启动时的时间戳起点,一个字符串,指定要订阅的主题中所有分区的起始时间戳。请参阅下面有关时间戳偏移量选项的详细信息。如果 Kafka 没有返回匹配的偏移量,则行为将遵循选项 startingOffsetsByTimestampStrategy 的值

注意 1:startingTimestamp 优先于 startingOffsetsByTimestampstartingOffsets

注意 2:对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询停止的地方开始。查询期间新发现的分区将从最早的偏移量开始。

startingOffsetsByTimestamp json 字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ 无(下一个首选项是 startingOffsets 流式和批处理 查询启动时的时间戳起点,一个 json 字符串,为每个主题分区指定一个起始时间戳。请参阅下面有关时间戳偏移量选项的详细信息。如果 Kafka 没有返回匹配的偏移量,则行为将遵循选项 startingOffsetsByTimestampStrategy 的值

注意 1:startingOffsetsByTimestamp 优先于 startingOffsets

注意 2:对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询停止的地方开始。查询期间新发现的分区将从最早的偏移量开始。

startingOffsets “最早”、“最新”(仅限流式处理)或 json 字符串 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ 流式处理为“最新”,批处理为“最早” 流式和批处理 查询启动时的起点,可以是“earliest”(从最早的偏移量开始)、“latest”(仅从最新的偏移量开始),也可以是一个 json 字符串,为每个主题分区指定一个起始偏移量。在 json 中,-2 作为偏移量可以用来表示最早的偏移量,-1 表示最新的偏移量。注意:对于批处理查询,不允许使用 latest(隐式或在 json 中使用 -1)。对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询停止的地方开始。查询期间新发现的分区将从最早的偏移量开始。
endingTimestamp 时间戳字符串,例如“1000” 无(下一个首选项是 endingOffsetsByTimestamp 批处理查询 批处理查询结束时的终点,一个 json 字符串,指定要订阅的主题中所有分区的结束时间戳。请参阅下面有关时间戳偏移量选项的详细信息。如果 Kafka 没有返回匹配的偏移量,则偏移量将设置为最新。

注意:endingTimestamp 优先于 endingOffsetsByTimestampendingOffsets

endingOffsetsByTimestamp json 字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ 无(下一个首选项是 endingOffsets 批处理查询 批处理查询结束时的终点,一个 json 字符串,为每个主题分区指定一个结束时间戳。请参阅下面有关时间戳偏移量选项的详细信息。如果 Kafka 没有返回匹配的偏移量,则偏移量将设置为最新。

注意:endingOffsetsByTimestamp 优先于 endingOffsets

endingOffsets 最新或 json 字符串 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} 最新 批处理查询 批处理查询结束时的终点,可以是“latest”(仅指最新的偏移量),也可以是一个 json 字符串,为每个主题分区指定一个结束偏移量。在 json 中,-1 作为偏移量可以用来表示最新的偏移量,不允许使用 -2(最早的偏移量)作为偏移量。
failOnDataLoss 真或假 流式和批处理 当可能丢失数据时(例如,主题被删除或偏移量超出范围),是否使查询失败。这可能是一个误报。当它不能按预期工作时,您可以禁用它。
kafkaConsumer.pollTimeoutMs 长整型 120000 流式和批处理 从执行程序中的 Kafka 轮询数据的超时时间(以毫秒为单位)。未定义时,它将回退到 spark.network.timeout
fetchOffset.numRetries 整数 3 流式和批处理 放弃获取 Kafka 偏移量之前重试的次数。
fetchOffset.retryIntervalMs 长整型 10 流式和批处理 重试获取 Kafka 偏移量之前等待的毫秒数
maxOffsetsPerTrigger 长整型 流式查询 每个触发间隔处理的最大偏移量数的速率限制。指定的偏移量总数将按比例分配给不同卷的主题分区。
minOffsetsPerTrigger 长整型 流式查询 每个触发间隔要处理的最小偏移量数。指定的偏移量总数将按比例分配给不同卷的主题分区。请注意,如果超过了 maxTriggerDelay,即使可用偏移量的数量没有达到 minOffsetsPerTrigger,也会触发触发器。
maxTriggerDelay 带单位的时间 15 分钟 流式查询 在两个触发器之间可以延迟触发器的最长时间,前提是从源中获得了一些数据。此选项仅在设置了 minOffsetsPerTrigger 时适用。
minPartitions 整数 流式和批处理 要从 Kafka 读取的所需最小分区数。默认情况下,Spark 将主题分区与从 Kafka 使用的 Spark 分区进行 1 对 1 映射。如果将此选项设置为大于主题分区的值,Spark 会将大型 Kafka 分区拆分为较小的部分。请注意,此配置类似于 hint:Spark 任务的数量将大约minPartitions。它可能会更少或更多,具体取决于舍入错误或没有收到任何新数据的 Kafka 分区。
groupIdPrefix 字符串 spark-kafka-source 流式和批处理 由结构化流式查询生成的消费者组标识符 (group.id) 的前缀。如果设置了“kafka.group.id”,则将忽略此选项。
kafka.group.id 字符串 流式和批处理 从 Kafka 读取数据时在 Kafka 消费者中使用的 Kafka 组 ID。请谨慎使用此选项。默认情况下,每个查询都会生成一个唯一的组 ID 来读取数据。这确保了每个 Kafka 源都有自己的消费者组,不会受到任何其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些情况下(例如,基于 Kafka 组的授权),您可能希望使用特定的授权组 ID 来读取数据。您可以选择设置组 ID。但是,请务必谨慎操作,因为它可能会导致意外行为。同时运行的查询(批处理和流式处理)或具有相同组 ID 的源可能会相互干扰,导致每个查询只读取部分数据。当查询快速连续地启动/重新启动时,也可能会发生这种情况。为了最大程度地减少此类问题,请将 Kafka 消费者会话超时(通过设置选项“kafka.session.timeout.ms”)设置为非常小。设置此选项后,将忽略选项“groupIdPrefix”。
includeHeaders 布尔值 流式和批处理 是否在行中包含 Kafka 标头。
startingOffsetsByTimestampStrategy “错误”或“最新” “错误” 流式和批处理 当按时间戳指定的起始偏移量(全局或每个分区)与 Kafka 返回的偏移量不匹配时,将使用该策略。以下是策略名称和相应的描述

“error”:查询失败,最终用户必须处理需要手动步骤的解决方法。

“latest”:为这些分区分配最新的偏移量,以便 Spark 可以在后续的微批处理中从这些分区读取更新的记录。

时间戳偏移量选项的详细信息

为每个分区返回的偏移量是其时间戳大于或等于相应分区中给定时间戳的最早偏移量。如果 Kafka 没有返回匹配的偏移量,则行为因选项而异 - 请查看每个选项的描述。

Spark 只是将时间戳信息传递给 KafkaConsumer.offsetsForTimes,而不会解释或推理该值。有关 KafkaConsumer.offsetsForTimes 的更多详细信息,请参阅javadoc。此外,此处 timestamp 的含义可能会根据 Kafka 配置 (log.message.timestamp.type) 而有所不同:有关更多详细信息,请参阅Kafka 文档

时间戳偏移量选项需要 Kafka 0.10.1.0 或更高版本。

偏移量获取

在 Spark 3.0 及更早版本中,Spark 使用 KafkaConsumer 进行偏移量获取,这可能会导致驱动程序无限期等待。在 Spark 3.1 中,添加了一个新的配置选项 spark.sql.streaming.kafka.useDeprecatedOffsetFetching(默认值:false),它允许 Spark 使用 AdminClient 使用新的偏移量获取机制。(将其设置为 true 以使用 KafkaConsumer 进行旧的偏移量获取。)

当使用新机制时,以下内容适用。

首先,新方法支持 Kafka 代理 0.11.0.0+

在 Spark 3.0 及更低版本中,安全的 Kafka 处理需要从驱动程序角度考虑以下 ACL:

从 Spark 3.1 开始,可以使用 AdminClient 而不是 KafkaConsumer 获取偏移量,因此从驱动程序角度需要以下 ACL:

由于驱动程序中的 AdminClient 未连接到消费者组,因此基于 group.id 的授权将不再起作用(执行程序从未进行过基于组的授权)。值得一提的是,执行程序端的行为方式与以前完全相同(组前缀和覆盖有效)。

消费者缓存

初始化 Kafka 消费者非常耗时,尤其是在处理时间是关键因素的流式场景中。因此,Spark 通过利用 Apache Commons Pool 在执行程序上池化 Kafka 消费者。

缓存键由以下信息构成:

以下属性可用于配置消费者池:

属性名称默认值含义自版本
spark.kafka.consumer.cache.capacity 64 缓存的最大消费者数量。请注意,这是一个软限制。 3.0.0
spark.kafka.consumer.cache.timeout 5m(5 分钟) 消费者在池中闲置的最短时间,然后才能被逐出器逐出。 3.0.0
spark.kafka.consumer.cache.evictorThreadRunInterval 1m(1 分钟) 消费者池的空闲逐出器线程运行之间的时间间隔。如果为非正数,则不会运行空闲逐出器线程。 3.0.0
spark.kafka.consumer.cache.jmx.enable 为使用此配置实例创建的池启用或禁用 JMX。可以通过 JMX 实例获取池的统计信息。JMX 名称的前缀设置为“kafka010-cached-simple-kafka-consumer-pool”。 3.0.0

池的大小受 spark.kafka.consumer.cache.capacity 限制,但它作为“软限制”工作,不会阻塞 Spark 任务。

空闲逐出线程会定期删除超过给定超时时间未使用的消费者。如果在借用时达到此阈值,它会尝试删除当前未使用的最少使用的条目。

如果无法删除,则池将继续增长。在最坏的情况下,池将增长到可以在执行程序中运行的最大并发任务数(即任务槽数)。

如果任务因任何原因失败,则新任务将使用新创建的 Kafka 消费者执行,以确保安全。同时,我们会使池中具有相同缓存键的所有消费者失效,以删除在失败执行中使用的消费者。其他任务正在使用的消费者不会被关闭,但当它们返回池中时也会失效。

除了消费者之外,Spark 还单独池化从 Kafka 获取的记录,以使 Kafka 消费者在 Spark 看来是无状态的,并最大限度地提高池化的效率。它使用与 Kafka 消费者池相同的缓存键。请注意,由于特性的不同,它没有利用 Apache Commons Pool。

以下属性可用于配置获取的数据池:

属性名称默认值含义自版本
spark.kafka.consumer.fetchedData.cache.timeout 5m(5 分钟) 获取的数据在池中闲置的最短时间,然后才能被逐出器逐出。 3.0.0
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval 1m(1 分钟) 获取的数据池的空闲逐出器线程运行之间的时间间隔。如果为非正数,则不会运行空闲逐出器线程。 3.0.0

将数据写入 Kafka

在这里,我们描述了对将流式查询和批处理查询写入 Apache Kafka 的支持。请注意,Apache Kafka 仅支持至少一次写入语义。因此,在将流式查询或批处理查询写入 Kafka 时,某些记录可能会重复;例如,如果 Kafka 需要重试代理未确认的消息,即使该代理已接收并写入消息记录,也可能会发生这种情况。由于这些 Kafka 写入语义,结构化流式传输无法防止此类重复发生。但是,如果写入查询成功,则可以假定查询输出至少写入了一次。读取写入数据时删除重复项的一种可能解决方案是引入一个主键(唯一),该主键可用于在读取时执行重复数据删除。

要写入 Kafka 的数据框应在架构中包含以下列:

类型
key(可选) 字符串或二进制
value(必填) 字符串或二进制
标头(可选) 数组
topic(*可选) 字符串
partition(可选) 整数

* 如果未指定“topic”配置选项,则需要 topic 列。

value 列是唯一必需的选项。如果未指定 key 列,则会自动添加一个值为 null 的 key 列(请参阅 Kafka 语义,了解如何处理值为 null 的键值)。如果存在 topic 列,则在将给定行写入 Kafka 时,其值将用作主题,除非设置了“topic”配置选项,即“topic”配置选项会覆盖 topic 列。如果未指定“partition”列(或其值为 null),则分区由 Kafka 生产者计算。可以通过设置 kafka.partitioner.class 选项在 Spark 中指定 Kafka 分区器。如果不存在,则将使用 Kafka 默认分区器。

必须为批处理和流式查询的 Kafka sink 设置以下选项。

选项含义
kafka.bootstrap.servers 以逗号分隔的 host:port 列表 Kafka“bootstrap.servers”配置。

以下配置是可选的

选项默认查询类型含义
主题 字符串 流式和批处理 设置将在 Kafka 中写入所有行的主题。此选项会覆盖数据中可能存在的任何主题列。
includeHeaders 布尔值 流式和批处理 是否在行中包含 Kafka 标头。

为流式查询创建 Kafka Sink

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start();

将批处理查询的输出写入 Kafka

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save();

生产者缓存

鉴于 Kafka 生产者实例被设计为线程安全的,因此 Spark 会初始化一个 Kafka 生产者实例,并在具有相同缓存键的任务之间共同使用。

缓存键由以下信息构成:

这包括授权配置,当使用委托令牌时,Spark 会自动包含该配置。即使我们考虑了授权,您也可以预期在相同的 Kafka 生产者配置中将使用相同的 Kafka 生产者实例。当委托令牌续订时,它将使用不同的 Kafka 生产者;旧委托令牌的 Kafka 生产者实例将根据缓存策略被逐出。

以下属性可用于配置生产者池:

属性名称默认值含义自版本
spark.kafka.producer.cache.timeout 10m(10 分钟) 生产者在池中闲置的最短时间,然后才能被逐出器逐出。 2.2.1
spark.kafka.producer.cache.evictorThreadRunInterval 1m(1 分钟) 生产者池的空闲逐出器线程运行之间的时间间隔。如果为非正数,则不会运行空闲逐出器线程。 3.0.0

空闲逐出线程会定期删除超过给定超时时间未使用的生产者。请注意,生产者是共享的并并发使用,因此最后使用时间戳由生产者实例返回且引用计数为 0 的时刻确定。

Kafka 特定配置

可以通过 DataStreamReader.option 使用 kafka. 前缀设置 Kafka 自身的配置,例如,stream.option("kafka.bootstrap.servers", "host:port")。有关可能的 kafka 参数,请参阅Kafka 消费者配置文档(用于读取数据的参数)和Kafka 生产者配置文档(用于写入数据的参数)。

请注意,以下 Kafka 参数无法设置,Kafka 源或 sink 将抛出异常:

部署

与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。spark-sql-kafka-0-10_2.12 及其依赖项可以使用 --packages 直接添加到 spark-submit 中,例如:

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 ...

为了在 spark-shell 上进行实验,您还可以使用 --packages 直接添加 spark-sql-kafka-0-10_2.12 及其依赖项,

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 ...

有关使用外部依赖项提交应用程序的更多详细信息,请参阅应用程序提交指南

安全

Kafka 0.9.0.0 引入了几个增强集群安全性的功能。有关这些可能性的详细说明,请参阅Kafka 安全文档

值得注意的是,安全性是可选的,默认情况下是关闭的。

Spark 支持以下身份验证 Kafka 集群的方法

委托令牌

通过这种方式,可以通过 Spark 参数配置应用程序,并且可能不需要 JAAS 登录配置(Spark 可以使用 Kafka 的动态 JAAS 配置功能)。有关委托令牌的更多信息,请参阅Kafka 委托令牌文档

该过程由 Spark 的 Kafka 委托令牌提供程序启动。当设置了 spark.kafka.clusters.${cluster}.auth.bootstrap.servers 时,Spark 会按优先顺序考虑以下登录选项

可以通过将 spark.security.credentials.kafka.enabled 设置为 false(默认值:true)来关闭 Kafka 委托令牌提供程序。

可以将 Spark 配置为使用以下身份验证协议来获取令牌(必须与 Kafka broker 配置匹配)

成功获取委托令牌后,Spark 会将其分发到各个节点并相应地对其进行续订。委托令牌使用 SCRAM 登录模块进行身份验证,因此必须配置适当的 spark.kafka.clusters.${cluster}.sasl.token.mechanism(默认值:SCRAM-SHA-512)。此外,此参数必须与 Kafka broker 配置匹配。

当执行器上可用委托令牌时,Spark 会按优先顺序考虑以下登录选项

当以上均不适用时,则假定为不安全的连接。

配置

可以从多个集群获取委托令牌,${cluster} 是一个任意的唯一标识符,有助于对不同的配置进行分组。

属性名称默认值含义自版本
spark.kafka.clusters.${cluster}.auth.bootstrap.servers 用于建立与 Kafka 集群的初始连接的逗号分隔的主机/端口对列表。有关更多详细信息,请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex .* 用于匹配应用程序中源和接收器的 bootstrap.servers 配置的正则表达式。如果服务器地址与此正则表达式匹配,则在连接时将使用从相应引导服务器获取的委托令牌。如果多个集群匹配该地址,则会抛出异常,并且查询将不会启动。Kafka 的安全和非安全侦听器绑定到不同的端口。当两者都使用时,安全侦听器端口必须是正则表达式的一部分。 3.0.0
spark.kafka.clusters.${cluster}.security.protocol SASL_SSL 用于与代理通信的协议。有关更多详细信息,请参阅 Kafka 文档。默认情况下,协议应用于所有源和接收器,其中 bootstrap.servers 配置匹配(有关更多详细信息,请参阅 spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex),并且可以通过在源或接收器上设置 kafka.security.protocol 来覆盖。 3.0.0
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name kafka Kafka 运行时使用的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。有关更多详细信息,请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.type 信任库文件的格式。有关更多详细信息,请参阅 Kafka 文档。仅用于获取委托令牌。 3.2.0
spark.kafka.clusters.${cluster}.ssl.truststore.location 信任库文件的位置。有关更多详细信息,请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.password 信任库文件的存储密码。这是可选的,并且仅在配置了 spark.kafka.clusters.${cluster}.ssl.truststore.location 时才需要。有关更多详细信息,请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.type 密钥库文件的格式。这对于客户端是可选的。有关更多详细信息,请参阅 Kafka 文档。仅用于获取委托令牌。 3.2.0
spark.kafka.clusters.${cluster}.ssl.keystore.location 密钥库文件的位置。这对于客户端是可选的,并且可以用于客户端的双向身份验证。有关更多详细信息,请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.password 密钥库文件的存储密码。这是可选的,并且仅在配置了 spark.kafka.clusters.${cluster}.ssl.keystore.location 时才需要。有关更多详细信息,请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.key.password 密钥库文件中私钥的密码。这对于客户端是可选的。有关更多详细信息,请参阅 Kafka 文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.sasl.token.mechanism SCRAM-SHA-512 用于使用委托令牌进行客户端连接的 SASL 机制。由于 SCRAM 登录模块用于身份验证,因此必须在此处设置兼容的机制。有关更多详细信息,请参阅 Kafka 文档(sasl.mechanism)。仅用于使用委托令牌对 Kafka broker 进行身份验证。 3.0.0

Kafka 特定配置

可以使用 kafka. 前缀设置 Kafka 自身的配置,例如,--conf spark.kafka.clusters.${cluster}.kafka.retries=1。有关可能的 Kafka 参数,请参阅Kafka adminclient 配置文档

注意事项

JAAS 登录配置

必须在 Spark 尝试访问 Kafka 集群的所有节点上放置 JAAS 登录配置。这提供了使用更高的维护成本应用任何自定义身份验证逻辑的可能性。这可以通过几种方式完成。一种可能性是提供额外的 JVM 参数,例如:

./bin/spark-submit \
    --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
    --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
    ...