结构化流处理 + Kafka 集成指南(Kafka broker 版本 0.10.0 或更高版本)
用于 Kafka 0.10 的结构化流处理集成,用于从 Kafka 读取数据和向 Kafka 写入数据。
链接
对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请使用以下工件链接您的应用程序
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.5.5
请注意,要使用 headers 功能,您的 Kafka 客户端版本应为 0.11.0.0 或更高版本。
对于 Python 应用程序,在部署应用程序时需要添加上述库及其依赖项。请参阅下面的部署小节。
对于在 spark-shell
上进行实验,在调用 spark-shell
时,您也需要添加上述库及其依赖项。 另外,请参阅下面的部署小节。
从 Kafka 读取数据
为流式查询创建 Kafka Source
# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to 1 topic, with headers
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("includeHeaders", "true") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
# Subscribe to multiple topics
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Array[(String, Array[Byte])])]
// Subscribe to multiple topics
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to 1 topic, with headers
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");
// Subscribe to multiple topics
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
为批处理查询创建 Kafka Source
如果您有一个更适合批处理的用例,您可以为定义的偏移量范围创建一个 Dataset/DataFrame。
# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
源中的每一行都有以下架构
列 | 类型 |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | timestamp |
timestampType | int |
headers (可选) | array |
对于批处理和流式查询,必须为 Kafka source 设置以下选项。
选项 | value | 含义 |
---|---|---|
assign | json 字符串 {"topicA":[0,1],"topicB":[2,4]} | 要消费的特定 TopicPartitions。 对于 Kafka source,只能指定 "assign"、"subscribe" 或 "subscribePattern" 选项之一。 |
subscribe | 以逗号分隔的主题列表 | 要订阅的主题列表。 对于 Kafka source,只能指定 "assign"、"subscribe" 或 "subscribePattern" 选项之一。 |
subscribePattern | Java 正则表达式字符串 | 用于订阅主题的模式。 对于 Kafka source,只能指定 "assign"、"subscribe" 或 "subscribePattern" 选项之一。 |
kafka.bootstrap.servers | 以逗号分隔的 host:port 列表 | Kafka "bootstrap.servers" 配置。 |
以下配置是可选的
选项 | value | 默认值 | 查询类型 | 含义 |
---|---|---|---|---|
startingTimestamp | 时间戳字符串,例如 "1000" | none (下一个优先级是 startingOffsetsByTimestamp ) |
流式和批处理 | 查询启动时的时间戳起始点,一个字符串,指定所有订阅主题中的分区的时间戳起始值。 请参考下面的时间戳偏移量选项的详细信息。 如果 Kafka 没有返回匹配的偏移量,行为将遵循 startingOffsetsByTimestampStrategy 选项的值。
注意 1: 注意 2:对于流式查询,这仅适用于启动新查询时,恢复将始终从查询停止的地方继续。 查询期间新发现的分区将从最早的时间开始。 |
startingOffsetsByTimestamp | json 字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | none (下一个优先级是 startingOffsets ) |
流式和批处理 | 查询启动时的时间戳起始点,一个 json 字符串,指定每个 TopicPartition 的时间戳起始值。 请参考下面的时间戳偏移量选项的详细信息。 如果 Kafka 没有返回匹配的偏移量,行为将遵循 startingOffsetsByTimestampStrategy 选项的值。
注意 1: 注意 2:对于流式查询,这仅适用于启动新查询时,恢复将始终从查询停止的地方继续。 查询期间新发现的分区将从最早的时间开始。 |
startingOffsets | "earliest"、"latest" (仅流式处理) 或 json 字符串 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ | 流式处理为 "latest",批处理为 "earliest" | 流式和批处理 | 查询启动时的起始点,可以是 "earliest" (从最早的偏移量开始)、"latest" (仅从最新的偏移量开始) 或指定每个 TopicPartition 起始偏移量的 json 字符串。 在 json 中,-2 作为偏移量可以指最早的偏移量,-1 指最新的偏移量。 注意:对于批处理查询,不允许使用最新 (无论是隐式还是在 json 中使用 -1)。 对于流式查询,这仅适用于启动新查询时,恢复将始终从查询停止的地方继续。 查询期间新发现的分区将从最早的时间开始。 |
endingTimestamp | 时间戳字符串,例如 "1000" | none (下一个优先级是 endingOffsetsByTimestamp ) |
批处理查询 | 批处理查询结束时的结束点,一个 json 字符串,指定所有订阅主题中的分区的时间戳结束值。 请参考下面的时间戳偏移量选项的详细信息。 如果 Kafka 没有返回匹配的偏移量,则偏移量将被设置为 latest。 注意:
|
endingOffsetsByTimestamp | json 字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | none (下一个优先级是 endingOffsets ) |
批处理查询 | 批处理查询结束时的结束点,一个 json 字符串,指定每个 TopicPartition 的时间戳结束值。 请参考下面的时间戳偏移量选项的详细信息。 如果 Kafka 没有返回匹配的偏移量,则偏移量将被设置为 latest。 注意: |
endingOffsets | latest 或 json 字符串 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | latest | 批处理查询 | 批处理查询结束时的结束点,可以是 "latest" (仅指最新的偏移量) 或指定每个 TopicPartition 结束偏移量的 json 字符串。 在 json 中,-1 作为偏移量可以指最新的偏移量,并且不允许使用 -2 (earliest) 作为偏移量。 |
failOnDataLoss | true 或 false | true | 流式和批处理 | 当可能丢失数据时 (例如,删除主题或偏移量超出范围),是否使查询失败。 这可能是一个虚惊。 您可以在它不按预期工作时禁用它。 |
kafkaConsumer.pollTimeoutMs | long | 120000 | 流式和批处理 | 从 executors 中的 Kafka 轮询数据的超时时间 (以毫秒为单位)。 如果未定义,则回退到 spark.network.timeout 。 |
fetchOffset.numRetries | int | 3 | 流式和批处理 | 放弃获取 Kafka 偏移量之前重试的次数。 |
fetchOffset.retryIntervalMs | long | 10 | 流式和批处理 | 重试获取 Kafka 偏移量之前等待的毫秒数 |
maxOffsetsPerTrigger | long | 无 | 流式查询 | 每个触发间隔处理的最大偏移量的速率限制。 指定的总偏移量数将按比例分配到不同卷的 topicPartitions 中。 |
minOffsetsPerTrigger | long | 无 | 流式查询 | 每个触发间隔要处理的最小偏移量数。 指定的总偏移量数将按比例分配到不同卷的 topicPartitions 中。 请注意,如果超过 maxTriggerDelay,即使可用偏移量的数量未达到 minOffsetsPerTrigger,也会触发一个触发器。 |
maxTriggerDelay | 带有单位的时间 | 15m | 流式查询 | 在源中有一些数据可用的情况下,两个触发器之间可以延迟的最大时间量。 仅当设置了 minOffsetsPerTrigger 时,此选项才适用。 |
minPartitions | int | 无 | 流式和批处理 | 希望从 Kafka 读取的最小分区数。 默认情况下,Spark 具有 topicPartitions 到从 Kafka 消费的 Spark 分区的 1-1 映射。 如果您将此选项设置为大于您的 topicPartitions 的值,Spark 会将大型 Kafka 分区分成较小的片段。 请注意,此配置类似于 hint :Spark 任务的数量将大约为 minPartitions 。 它可以少于或多于,具体取决于舍入误差或未收到任何新数据的 Kafka 分区。 |
groupIdPrefix | string | spark-kafka-source | 流式和批处理 | 由结构化流处理查询生成的消费者组标识符 (group.id ) 的前缀。 如果设置了 "kafka.group.id",则将忽略此选项。 |
kafka.group.id | string | 无 | 流式和批处理 | 从 Kafka 读取时在 Kafka 消费者中使用的 Kafka 组 ID。 使用此选项时请务必小心。 默认情况下,每个查询都会生成一个唯一的组 ID 来读取数据。 这可确保每个 Kafka source 都有自己的消费者组,不会受到任何其他消费者的干扰,因此可以读取其订阅主题的所有分区。 在某些情况下 (例如,基于 Kafka 组的授权),您可能希望使用特定的授权组 ID 来读取数据。 您可以选择设置组 ID。 但是,这样做时要格外小心,因为它可能会导致意外的行为。 并发运行的查询 (无论是批处理还是流式处理) 或具有相同组 ID 的源可能会相互干扰,导致每个查询仅读取部分数据。 当快速连续启动/重新启动查询时,也可能发生这种情况。 为了最大限度地减少此类问题,请将 Kafka 消费者会话超时 (通过设置选项 "kafka.session.timeout.ms") 设置为非常小。 设置此选项后,将忽略选项 "groupIdPrefix"。 |
includeHeaders | boolean | false | 流式和批处理 | 是否在行中包含 Kafka headers。 |
startingOffsetsByTimestampStrategy | "error" 或 "latest" | "error" | 流式和批处理 | 当指定的时间戳起始偏移量 (无论是全局的还是每个分区的) 与 Kafka 返回的偏移量不匹配时,将使用该策略。 这是策略名称和相应的描述
"error":查询失败,最终用户必须处理需要手动步骤的变通方案。 "latest":为这些分区分配最新的偏移量,以便 Spark 可以在后续的微批处理中读取来自这些分区的最新记录。 |
关于时间戳偏移量选项的详细信息
每个分区返回的偏移量是时间戳大于或等于相应分区中给定时间戳的最早偏移量。 如果 Kafka 没有返回匹配的偏移量,则不同选项的行为会有所不同 - 请查看每个选项的描述。
Spark 只是将时间戳信息传递给 KafkaConsumer.offsetsForTimes
,并且不解释或推理该值。 有关 KafkaConsumer.offsetsForTimes
的更多详细信息,请参阅 javadoc 获取详细信息。 此外,timestamp
在此处的含义可能因 Kafka 配置 (log.message.timestamp.type
) 而异:有关更多详细信息,请参阅 Kafka 文档。
时间戳偏移量选项需要 Kafka 0.10.1.0 或更高版本。
偏移量获取
在 Spark 3.0 及之前的版本中,Spark 使用 KafkaConsumer
来获取偏移量,这可能会导致驱动程序中的无限等待。 在 Spark 3.1 中,添加了一个新的配置选项 spark.sql.streaming.kafka.useDeprecatedOffsetFetching
(默认值:false
),允许 Spark 使用 AdminClient
使用新的偏移量获取机制。(将其设置为 true
以使用带有 KafkaConsumer
的旧偏移量获取。)
使用新机制时,以下适用。
首先,新方法支持 Kafka brokers 0.11.0.0+
。
在 Spark 3.0 及更低版本中,安全的 Kafka 处理需要来自驱动程序视角的以下 ACL:
- 主题资源描述操作
- 主题资源读取操作
- 组资源读取操作
自 Spark 3.1 以来,可以使用 AdminClient
而不是 KafkaConsumer
获取偏移量,因此需要来自驱动程序视角的以下 ACL:
- 主题资源描述操作
由于驱动程序中的 AdminClient
未连接到消费者组,因此基于 group.id
的授权将不再起作用(执行器从未进行基于组的授权)。 值得一提的是,执行器端的行为与以前完全相同(组前缀和覆盖有效)。
消费者缓存
初始化 Kafka 消费者非常耗时,尤其是在处理时间是关键因素的流式传输场景中。 因此,Spark 利用 Apache Commons Pool 在执行器上池化 Kafka 消费者。
缓存键由以下信息组成
- 主题名称
- 主题分区
- 组 ID
以下属性可用于配置消费者池
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.kafka.consumer.cache.capacity | 64 | 缓存的最大消费者数量。 请注意,这是一个软限制。 | 3.0.0 |
spark.kafka.consumer.cache.timeout | 5m(5 分钟) | 在消费者被驱逐器回收之前,消费者可以在池中空闲的最短时间。 | 3.0.0 |
spark.kafka.consumer.cache.evictorThreadRunInterval | 1m(1 分钟) | 消费者池的空闲驱逐器线程运行的时间间隔。 当为非正数时,将不会运行空闲驱逐器线程。 | 3.0.0 |
spark.kafka.consumer.cache.jmx.enable | false | 为此配置实例创建的池启用或禁用 JMX。 池的统计信息可通过 JMX 实例获得。 JMX 名称的前缀设置为“kafka010-cached-simple-kafka-consumer-pool”。 | 3.0.0 |
池的大小受 spark.kafka.consumer.cache.capacity
限制,但它作为“软限制”工作,以不阻塞 Spark 任务。
空闲驱逐线程定期删除未使用的消费者超过给定的超时时间。 如果在借用时达到此阈值,它会尝试删除当前未使用的最少使用的条目。
如果无法删除,则池将继续增长。 在最坏的情况下,池将增长到可以在执行器中运行的最大并发任务数(即任务槽数)。
如果任务由于任何原因失败,则出于安全原因,将使用新创建的 Kafka 消费者执行新任务。 同时,我们使池中具有相同缓存键的所有消费者失效,以删除在失败执行中使用的消费者。 任何其他任务正在使用的消费者将不会被关闭,但在返回到池时也将被失效。
除了消费者之外,Spark 还会单独池化从 Kafka 获取的记录,以使 Kafka 消费者在 Spark 的视角中无状态,并最大限度地提高池化的效率。 它利用与 Kafka 消费者池相同的缓存键。 请注意,由于特性的差异,它不利用 Apache Commons Pool。
以下属性可用于配置提取的数据池
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.kafka.consumer.fetchedData.cache.timeout | 5m(5 分钟) | 提取的数据在被驱逐器回收之前,可以在池中空闲的最短时间。 | 3.0.0 |
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval | 1m(1 分钟) | 提取的数据池的空闲驱逐器线程运行的时间间隔。 当为非正数时,将不会运行空闲驱逐器线程。 | 3.0.0 |
将数据写入 Kafka
在这里,我们描述了对将流式查询和批量查询写入 Apache Kafka 的支持。 请注意,Apache Kafka 仅支持至少一次的写入语义。 因此,当将流式查询或批量查询写入 Kafka 时,某些记录可能会重复; 例如,如果 Kafka 需要重试 Broker 未确认的消息,即使该 Broker 已经接收并写入了消息记录,也可能发生这种情况。 由于这些 Kafka 写入语义,Structured Streaming 无法阻止此类重复的发生。 但是,如果写入查询成功,则可以假定查询输出至少写入一次。 删除读取写入数据时的重复项的一种可能的解决方案是引入一个主键(唯一键),该键可用于在读取时执行去重。
写入 Kafka 的 Dataframe 应在 schema 中包含以下列
列 | 类型 |
---|---|
key (可选) | string 或 binary |
value (必需) | string 或 binary |
headers (可选) | array |
topic (*可选) | string |
partition (可选) | int |
* 如果未指定“topic”配置选项,则需要 topic 列。
value 列是唯一必需的选项。 如果未指定 key 列,则会自动添加一个 null
值的 key 列(请参阅 Kafka 语义,了解如何处理 null
值的 key 值)。 如果存在 topic 列,则当将给定行写入 Kafka 时,其值用作主题,除非设置了“topic”配置选项,即“topic”配置选项会覆盖 topic 列。 如果未指定“partition”列(或其值为 null
),则分区由 Kafka 生产者计算。 可以在 Spark 中通过设置 kafka.partitioner.class
选项来指定 Kafka 分区器。 如果不存在,将使用 Kafka 默认分区器。
对于批量查询和流式查询,必须为 Kafka sink 设置以下选项。
选项 | value | 含义 |
---|---|---|
kafka.bootstrap.servers | 以逗号分隔的 host:port 列表 | Kafka "bootstrap.servers" 配置。 |
以下配置是可选的
选项 | value | 默认值 | 查询类型 | 含义 |
---|---|---|---|---|
topic | string | 无 | 流式和批处理 | 设置所有行将写入 Kafka 的主题。 此选项将覆盖数据中可能存在的任何 topic 列。 |
includeHeaders | boolean | false | 流式和批处理 | 是否在行中包含 Kafka headers。 |
为流式查询创建 Kafka Sink
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start();
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start();
将批量查询的输出写入 Kafka
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save();
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save();
生产者缓存
鉴于 Kafka 生产者实例被设计为线程安全的,Spark 初始化一个 Kafka 生产者实例,并跨任务重复使用相同的缓存键。
缓存键由以下信息组成
- Kafka 生产者配置
这包括授权配置,当使用委托令牌时,Spark 将自动包含该配置。 即使我们考虑了授权,您也可以期望相同的 Kafka 生产者配置将使用相同的 Kafka 生产者实例。 当委托令牌续订时,它将使用不同的 Kafka 生产者;旧委托令牌的 Kafka 生产者实例将根据缓存策略被逐出。
以下属性可用于配置生产者池
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.kafka.producer.cache.timeout | 10m(10 分钟) | 在生产者被驱逐器回收之前,生产者可以在池中空闲的最短时间。 | 2.2.1 |
spark.kafka.producer.cache.evictorThreadRunInterval | 1m(1 分钟) | 生产者池的空闲驱逐器线程运行的时间间隔。 当为非正数时,将不会运行空闲驱逐器线程。 | 3.0.0 |
空闲驱逐线程定期删除未使用的生产者超过给定的超时时间。 请注意,生产者是共享的并且并发使用,因此上次使用的时间戳由生产者实例返回且引用计数为 0 的时刻决定。
Kafka 特定配置
可以通过 DataStreamReader.option
设置 Kafka 自己的配置,并带有 kafka.
前缀,例如,stream.option("kafka.bootstrap.servers", "host:port")
。 对于可能的 kafka 参数,请参阅 Kafka 消费者配置文档,了解与读取数据相关的参数,以及 Kafka 生产者配置文档,了解与写入数据相关的参数。
请注意,以下 Kafka 参数无法设置,Kafka 源或 sink 将抛出异常
- group.id:Kafka 源将自动为每个查询创建一个唯一的组 ID。 用户可以通过可选的源选项
groupIdPrefix
设置自动生成的 group.id 的前缀,默认值为“spark-kafka-source”。 您还可以设置“kafka.group.id”以强制 Spark 使用特殊的组 ID,但是,请阅读此选项的警告并谨慎使用。 - auto.offset.reset:设置源选项
startingOffsets
以指定从哪里开始。 Structured Streaming 管理内部消耗的偏移量,而不是依赖 kafka Consumer 来完成。 这将确保在动态订阅新的主题/分区时不会丢失任何数据。 请注意,startingOffsets
仅在新流式查询启动时适用,并且恢复将始终从查询停止的位置开始。 请注意,当流式应用程序消耗的偏移量不再存在于 Kafka 中时(例如,主题被删除、偏移量超出范围或偏移量在保留期后被删除),偏移量将不会被重置,并且流式应用程序将看到数据丢失。 在极端情况下,例如流式应用程序的吞吐量无法赶上 Kafka 的保留速度,当批处理的偏移量范围完全不在 Kafka 中时,批处理的输入行可能会逐渐减少到零。 启用failOnDataLoss
选项可以要求 Structured Streaming 在此类情况下使查询失败。 - key.deserializer: Key 总是使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化 key。
- value.deserializer: Value 总是使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化 value。
- key.serializer: Key 总是使用 ByteArraySerializer 或 StringSerializer 序列化。使用 DataFrame 操作显式将 key 序列化为字符串或字节数组。
- value.serializer: Value 总是使用 ByteArraySerializer 或 StringSerializer 序列化。使用 DataFrame 操作显式将 value 序列化为字符串或字节数组。
- enable.auto.commit: Kafka source 不提交任何 offset。
- interceptor.classes: Kafka source 始终将 key 和 value 读取为字节数组。使用 ConsumerInterceptor 不安全,因为它可能会破坏查询。
部署
与任何 Spark 应用程序一样,使用 spark-submit
启动应用程序。可以使用 --packages
将 spark-sql-kafka-0-10_2.12
及其依赖项直接添加到 spark-submit
中,例如:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5 ...
要在 spark-shell
上进行试验,也可以使用 --packages
直接添加 spark-sql-kafka-0-10_2.12
及其依赖项:
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5 ...
有关使用外部依赖项提交应用程序的更多详细信息,请参见应用程序提交指南。
安全性
Kafka 0.9.0.0 引入了多个功能,增强了集群的安全性。有关这些可能性的详细说明,请参见 Kafka 安全文档。
值得注意的是,安全性是可选的,默认情况下处于关闭状态。
Spark 支持以下方式对 Kafka 集群进行身份验证:
- Delegation token (在 Kafka broker 1.1.0 中引入)
- JAAS 登录配置
Delegation token
这样,可以通过 Spark 参数配置应用程序,并且可能不需要 JAAS 登录配置(Spark 可以使用 Kafka 的动态 JAAS 配置功能)。有关 delegation token 的更多信息,请参见 Kafka delegation token 文档。
该过程由 Spark 的 Kafka delegation token 提供程序启动。当设置 spark.kafka.clusters.${cluster}.auth.bootstrap.servers
时,Spark 按首选项顺序考虑以下登录选项:
- JAAS 登录配置,请参见下面的示例。
-
Keytab 文件,例如:
./bin/spark-submit \ --keytab <KEYTAB_FILE> \ --principal <PRINCIPAL> \ --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \ ...
-
Kerberos 凭据缓存,例如:
./bin/spark-submit \ --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \ ...
可以通过将 spark.security.credentials.kafka.enabled
设置为 false
(默认值:true
)来关闭 Kafka delegation token 提供程序。
可以配置 Spark 使用以下身份验证协议来获取 token(它必须与 Kafka broker 配置匹配):
- SASL SSL (默认)
- SSL
- SASL PLAINTEXT (用于测试)
成功获取 delegation token 后,Spark 会将其分发到各个节点并进行相应的续订。Delegation token 使用 SCRAM
登录模块进行身份验证,因此必须配置相应的 spark.kafka.clusters.${cluster}.sasl.token.mechanism
(默认值:SCRAM-SHA-512
)。 此外,此参数必须与 Kafka broker 配置匹配。
当 executor 上提供 delegation token 时,Spark 按首选项顺序考虑以下登录选项:
- JAAS 登录配置,请参见下面的示例。
- Delegation token,有关更多详细信息,请参见
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex
参数。
如果以上任何一项都不适用,则假定为不安全连接。
配置
可以从多个集群获取 delegation token,并且 ${cluster}
是任意唯一的标识符,有助于对不同的配置进行分组。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.kafka.clusters.${cluster}.auth.bootstrap.servers |
无 | 用于建立与 Kafka 集群的初始连接的逗号分隔的主机/端口对列表。有关更多详细信息,请参见 Kafka 文档。仅用于获取 delegation token。 | 3.0.0 |
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex |
.* | 用于匹配应用程序中 source 和 sink 的 bootstrap.servers 配置的正则表达式。如果服务器地址与此正则表达式匹配,则在连接时将使用从各自的 bootstrap servers 获取的 delegation token。如果多个集群匹配该地址,则将抛出异常,并且不会启动查询。Kafka 的安全和不安全监听器绑定到不同的端口。如果同时使用两者,则安全监听器端口必须是正则表达式的一部分。 |
3.0.0 |
spark.kafka.clusters.${cluster}.security.protocol |
SASL_SSL | 用于与 broker 通信的协议。有关更多详细信息,请参见 Kafka 文档。 协议默认应用于所有 source 和 sink,其中 bootstrap.servers 配置匹配(有关更多详细信息,请参见 spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex ),并且可以通过在 source 或 sink 上设置 kafka.security.protocol 来覆盖它。 |
3.0.0 |
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name |
kafka | Kafka 作为其运行的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。有关更多详细信息,请参见 Kafka 文档。仅用于获取 delegation token。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.type |
无 | 信任存储文件的文件格式。有关更多详细信息,请参见 Kafka 文档。仅用于获取 delegation token。 | 3.2.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.location |
无 | 信任存储文件的位置。有关更多详细信息,请参见 Kafka 文档。仅用于获取 delegation token。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.password |
无 | 信任存储文件的存储密码。这是可选的,只有在配置了 spark.kafka.clusters.${cluster}.ssl.truststore.location 时才需要。有关更多详细信息,请参见 Kafka 文档。仅用于获取 delegation token。 |
3.0.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.type |
无 | 密钥存储文件的文件格式。 对于客户端,这是可选的。 有关更多详细信息,请参见 Kafka 文档。仅用于获取 delegation token。 | 3.2.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.location |
无 | 密钥存储文件的位置。对于客户端,这是可选的,可用于客户端的双向身份验证。有关更多详细信息,请参见 Kafka 文档。仅用于获取 delegation token。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.password |
无 | 密钥存储文件的存储密码。这是可选的,只有在配置了 spark.kafka.clusters.${cluster}.ssl.keystore.location 时才需要。有关更多详细信息,请参见 Kafka 文档。仅用于获取 delegation token。 |
3.0.0 |
spark.kafka.clusters.${cluster}.ssl.key.password |
无 | 密钥存储文件中私钥的密码。对于客户端,这是可选的。有关更多详细信息,请参见 Kafka 文档。仅用于获取 delegation token。 | 3.0.0 |
spark.kafka.clusters.${cluster}.sasl.token.mechanism |
SCRAM-SHA-512 | SASL 机制用于与 delegation token 的客户端连接。由于 SCRAM 登录模块用于身份验证,因此必须在此处设置兼容的机制。有关更多详细信息,请参见 Kafka 文档 (sasl.mechanism )。仅用于通过 delegation token 对 Kafka broker 进行身份验证。 |
3.0.0 |
Kafka 特定配置
可以使用 kafka.
前缀设置 Kafka 自己的配置,例如,--conf spark.kafka.clusters.${cluster}.kafka.retries=1
。 有关可能的 Kafka 参数,请参见 Kafka adminclient 配置文档。
注意事项
- 尚不支持为代理用户获取 delegation token (KAFKA-6945)。
JAAS 登录配置
JAAS 登录配置必须放置在 Spark 尝试访问 Kafka 集群的所有节点上。 这提供了应用任何自定义身份验证逻辑的可能性,但维护成本更高。 这可以通过几种方式完成。 一种可能性是提供额外的 JVM 参数,例如:
./bin/spark-submit \
--driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
--conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
...