结构化流 + Kafka 集成指南 (Kafka 代理版本 0.10.0 或更高)
Kafka 0.10 的结构化流集成,用于从 Kafka 读取数据和向 Kafka 写入数据。
链接
对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,请将您的应用程序链接到以下构件
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.13
version = 4.0.0
请注意,要使用标头功能,您的 Kafka 客户端版本应为 0.11.0.0 或更高版本。
对于 Python 应用程序,您需要在部署应用程序时添加上述库及其依赖项。请参阅下面的部署小节。
要在spark-shell
上进行实验,您在调用spark-shell
时也需要添加上述库及其依赖项。此外,请参阅下面的部署小节。
从 Kafka 读取数据
为流式查询创建 Kafka 源
# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to 1 topic, with headers
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("includeHeaders", "true") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
# Subscribe to multiple topics
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Array[(String, Array[Byte])])]
// Subscribe to multiple topics
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to 1 topic, with headers
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");
// Subscribe to multiple topics
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
为批处理查询创建 Kafka 源
如果您的用例更适合批处理,您可以为定义的偏移量范围创建 Dataset/DataFrame。
# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
源中的每一行具有以下 schema
列 | 类型 |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | timestamp |
timestampType | int |
headers (可选) | array |
对于 Kafka 源,批处理和流式查询都必须设置以下选项。
选项 | value | 含义 |
---|---|---|
assign | json string {"topicA":[0,1],"topicB":[2,4]} | 要消费的特定 TopicPartitions。对于 Kafka 源,只能指定“assign”、“subscribe”或“subscribePattern”选项之一。 |
subscribe | 逗号分隔的 topics 列表 | 要订阅的 topic 列表。对于 Kafka 源,只能指定“assign”、“subscribe”或“subscribePattern”选项之一。 |
subscribePattern | Java 正则表达式字符串 | 用于订阅 topic(s) 的模式。对于 Kafka 源,只能指定“assign”、“subscribe”或“subscribePattern”选项之一。 |
kafka.bootstrap.servers | 逗号分隔的 host:port 列表 | Kafka “bootstrap.servers” 配置。 |
以下配置是可选的
选项 | value | 默认 | 查询类型 | 含义 |
---|---|---|---|---|
startingTimestamp | 时间戳字符串,例如 "1000" | 无(下一个优先级是 startingOffsetsByTimestamp ) |
流式和批处理 | 查询启动时的时间戳起始点,一个字符串,指定订阅主题中所有分区的起始时间戳。请参阅下面的时间戳 offset 选项的详细信息。如果 Kafka 没有返回匹配的 offset,则行为将遵循选项 startingOffsetsByTimestampStrategy 的值。
注1: 注2:对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询中断处继续。查询期间新发现的分区将从最早处开始。 |
startingOffsetsByTimestamp | json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | 无(下一个优先级是 startingOffsets ) |
流式和批处理 | 查询启动时的时间戳起始点,一个 json 字符串,为每个 TopicPartition 指定一个起始时间戳。请参阅下面的时间戳 offset 选项的详细信息。如果 Kafka 没有返回匹配的 offset,则行为将遵循选项 startingOffsetsByTimestampStrategy 的值。
注1: 注2:对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询中断处继续。查询期间新发现的分区将从最早处开始。 |
startingOffsets | "earliest"(最早),"latest"(最新,仅限流式),或 json 字符串 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ | 流式为“latest”,批处理为“earliest” | 流式和批处理 | 查询启动时的起始点,可以是“earliest”(从最早的 offset 开始),“latest”(从最新的 offset 开始),或者一个 json 字符串,为每个 TopicPartition 指定一个起始 offset。在 json 中,-2 可用于表示最早的 offset,-1 表示最新的 offset。注意:对于批处理查询,不允许使用 latest(无论是隐式还是通过在 json 中使用 -1)。对于流式查询,这仅适用于启动新查询时,并且恢复将始终从查询中断处继续。查询期间新发现的分区将从最早处开始。 |
endingTimestamp | 时间戳字符串,例如 "1000" | 无(下一个优先级是 endingOffsetsByTimestamp ) |
批处理查询 | 批处理查询结束时的终点,一个 json 字符串,指定订阅主题中所有分区的结束时间戳。请参阅下面的时间戳 offset 选项的详细信息。如果 Kafka 没有返回匹配的 offset,则 offset 将设置为最新。 注:
|
endingOffsetsByTimestamp | json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | 无(下一个优先级是 endingOffsets ) |
批处理查询 | 批处理查询结束时的终点,一个 json 字符串,为每个 TopicPartition 指定一个结束时间戳。请参阅下面的时间戳 offset 选项的详细信息。如果 Kafka 没有返回匹配的 offset,则 offset 将设置为最新。 注: |
endingOffsets | latest 或 json 字符串 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | latest | 批处理查询 | 批处理查询结束时的终点,可以是“latest”(指最新),或者一个 json 字符串,为每个 TopicPartition 指定一个结束 offset。在 json 中,-1 可用于表示最新的 offset,-2(最早的)作为 offset 是不允许的。 |
failOnDataLoss | true 或 false | true | 流式和批处理 | 当数据可能丢失时(例如,topic 被删除,或 offset 超出范围)是否使查询失败。这可能是一个误报。当它不按预期工作时,您可以禁用它。 |
kafkaConsumer.pollTimeoutMs | long | 120000 | 流式和批处理 | 在 executor 中从 Kafka 拉取数据时的超时时间(毫秒)。如果未定义,则回退到 spark.network.timeout 。 |
fetchOffset.numRetries | int | 3 | 流式和批处理 | 在放弃获取 Kafka offsets 之前重试的次数。 |
fetchOffset.retryIntervalMs | long | 10 | 流式和批处理 | 重试获取 Kafka offsets 前等待的毫秒数 |
maxOffsetsPerTrigger | long | 无 | 流式查询 | 每个触发间隔处理的最大 offset 数量的速率限制。指定的总 offset 数量将按比例分配给不同数据量的 topicPartitions。 |
minOffsetsPerTrigger | long | 无 | 流式查询 | 每个触发间隔要处理的最小 offset 数量。指定的总 offset 数量将按比例分配给不同数据量的 topicPartitions。注意,如果超过 maxTriggerDelay,即使可用 offset 数量未达到 minOffsetsPerTrigger,也会触发一次触发。 |
maxTriggerDelay | 带单位的时间 | 15m | 流式查询 | 在源中有数据可用的情况下,两次触发之间触发器可以延迟的最大时间量。此选项仅在设置了 minOffsetsPerTrigger 时适用。 |
minPartitions | int | 无 | 流式和批处理 | 从 Kafka 读取所需的最少分区数。默认情况下,Spark 将 topicPartitions 与从 Kafka 消费的 Spark 分区进行 1 对 1 映射。如果您将此选项设置为大于您的 topicPartitions 的值,Spark 将把大型 Kafka 分区拆分为更小的块。请注意,此配置类似于一个 hint (提示):Spark 任务的数量将大约为 minPartitions 。它可能会更少或更多,具体取决于舍入误差或未收到任何新数据的 Kafka 分区。 |
maxRecordsPerPartition | long | 无 | 流式和批处理 | 限制分区中存在的最大记录数。默认情况下,Spark 将 topicPartitions 与从 Kafka 消费的 Spark 分区进行 1 对 1 映射。如果您设置此选项,Spark 将把 Kafka 分区拆分为更小的块,以便每个分区最多包含 maxRecordsPerPartition 条记录。当 minPartitions 和 maxRecordsPerPartition 都设置时,分区数将大约为 (recordsPerPartition / maxRecordsPerPartition) 和 minPartitions 中的最大值。在这种情况下,Spark 将根据 maxRecordsPerPartition 拆分分区,如果最终分区计数小于 minPartitions ,它将再次根据 minPartitions 拆分分区。 |
groupIdPrefix | string | spark-kafka-source | 流式和批处理 | 结构化流查询生成的消费者组标识符 (group.id ) 的前缀。如果设置了 "kafka.group.id",此选项将被忽略。 |
kafka.group.id | string | 无 | 流式和批处理 | 从 Kafka 读取时在 Kafka 消费者中使用的 Kafka 组 ID。请谨慎使用此选项。默认情况下,每个查询都会生成一个唯一的组 ID 用于读取数据。这确保了每个 Kafka 源都有自己的消费者组,不会受到任何其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些情况下(例如,基于 Kafka 组的授权),您可能希望使用特定的授权组 ID 来读取数据。您可以选择性地设置组 ID。但是,请务必谨慎操作,因为它可能导致意外行为。同时运行的查询(包括批处理和流式)或具有相同组 ID 的源可能会相互干扰,导致每个查询只能读取部分数据。当查询快速启动/重启时也可能发生这种情况。为了尽量减少此类问题,请将 Kafka 消费者会话超时(通过设置选项 "kafka.session.timeout.ms")设置得非常小。设置此选项后,将忽略选项 "groupIdPrefix"。 |
includeHeaders | boolean | false | 流式和批处理 | 是否在行中包含 Kafka 标头。 |
startingOffsetsByTimestampStrategy | "error" 或 "latest" | "error" | 流式和批处理 | 当按时间戳指定的起始 offset(无论是全局还是按分区)与 Kafka 返回的 offset 不匹配时,将使用此策略。以下是策略名称和相应的描述
“error”:使查询失败,最终用户必须处理需要手动步骤的变通方法。 “latest”:为这些分区分配最新 offset,以便 Spark 可以在后续的微批次中从这些分区读取更新的记录。 |
时间戳 offset 选项的详细信息
每个分区返回的 offset 是时间戳大于或等于相应分区中给定时间戳的最早 offset。如果 Kafka 未返回匹配的 offset,则不同选项的行为会有所不同——请查看每个选项的描述。
Spark 简单地将时间戳信息传递给 KafkaConsumer.offsetsForTimes
,并且不解释或推断该值。有关 KafkaConsumer.offsetsForTimes
的更多详细信息,请参阅 javadoc。此外,此处 timestamp
的含义可能根据 Kafka 配置 (log.message.timestamp.type
) 而有所不同:请参阅 Kafka 文档了解更多详细信息。
时间戳 offset 选项需要 Kafka 0.10.1.0 或更高版本。
获取 Offset
在 Spark 3.0 及更早版本中,Spark 使用 KafkaConsumer
获取 offset,这可能导致驱动程序无限等待。在 Spark 3.1 中,新增了一个配置选项 spark.sql.streaming.kafka.useDeprecatedOffsetFetching
(默认:false
),它允许 Spark 使用基于 AdminClient
的新 offset 获取机制。(将其设置为 true
以使用旧的 KafkaConsumer
获取 offset 机制。)
当使用新机制时,适用以下情况。
首先,新方法支持 Kafka broker 0.11.0.0+
。
在 Spark 3.0 及更早版本中,安全的 Kafka 处理需要从驱动程序角度提供以下 ACL
- Topic 资源描述操作
- Topic 资源读取操作
- Group 资源读取操作
自 Spark 3.1 起,可以使用 AdminClient
而不是 KafkaConsumer
获取 offset,为此需要从驱动程序角度提供以下 ACL
- Topic 资源描述操作
由于驱动程序中的 AdminClient
不连接到消费者组,基于 group.id
的授权将不再起作用(executor 从未进行过基于组的授权)。值得一提的是,executor 端的行为与之前完全相同(组前缀和覆盖仍然有效)。
消费者缓存
初始化 Kafka 消费者非常耗时,特别是在处理时间是关键因素的流式场景中。因此,Spark 通过利用 Apache Commons Pool 在 executor 上池化 Kafka 消费者。
缓存键由以下信息构建
- Topic 名称
- Topic 分区
- 组 ID
以下属性可用于配置消费者池
属性名 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.kafka.consumer.cache.capacity | 64 | 缓存的最大消费者数量。请注意,这是一个软限制。 | 3.0.0 |
spark.kafka.consumer.cache.timeout | 5m (5 分钟) | 消费者在池中闲置的最短时间,超过此时间后才有资格被回收器驱逐。 | 3.0.0 |
spark.kafka.consumer.cache.evictorThreadRunInterval | 1m (1 分钟) | 消费者池的空闲回收线程运行之间的时间间隔。当为非正值时,将不会运行空闲回收线程。 | 3.0.0 |
spark.kafka.consumer.cache.jmx.enable | false | 为此配置实例创建的池启用或禁用 JMX。池的统计信息可通过 JMX 实例获得。JMX 名称的前缀设置为“kafka010-cached-simple-kafka-consumer-pool”。 | 3.0.0 |
池的大小受 spark.kafka.consumer.cache.capacity
限制,但它作为“软限制”工作,以避免阻塞 Spark 任务。
空闲回收线程会定期移除超过给定超时时间未使用的消费者。如果在借用时达到此阈值,它会尝试移除当前未使用的最少使用条目。
如果无法移除,则池将继续增长。在最坏的情况下,池将增长到 executor 中可以运行的最大并发任务数(即,任务槽数)。
如果任务因任何原因失败,出于安全原因,新任务将使用新创建的 Kafka 消费者执行。同时,我们使池中所有具有相同缓存键的消费者失效,以移除在失败执行中使用的消费者。任何其他任务正在使用的消费者不会被关闭,但当它们返回池中时也将失效。
除了消费者之外,Spark 还单独池化从 Kafka 获取的记录,以便从 Spark 的角度来看 Kafka 消费者是无状态的,并最大限度地提高池化效率。它利用与 Kafka 消费者池相同的缓存键。请注意,由于特性差异,它不利用 Apache Commons Pool。
以下属性可用于配置获取的数据池
属性名 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.kafka.consumer.fetchedData.cache.timeout | 5m (5 分钟) | 获取的数据在池中闲置的最短时间,超过此时间后才有资格被回收器驱逐。 | 3.0.0 |
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval | 1m (1 分钟) | 获取的数据池的空闲回收线程运行之间的时间间隔。当为非正值时,将不会运行空闲回收线程。 | 3.0.0 |
将数据写入 Kafka
在此,我们描述了将流式查询和批处理查询写入 Apache Kafka 的支持。请注意,Apache Kafka 仅支持至少一次写入语义。因此,当将流式查询或批处理查询写入 Kafka 时,某些记录可能会重复;例如,如果 Kafka 需要重试一个未被 Broker 确认的消息,即使该 Broker 已经接收并写入了该消息记录,也可能发生这种情况。由于 Kafka 的这些写入语义,结构化流无法阻止此类重复的发生。但是,如果查询写入成功,则可以假定查询输出至少被写入了一次。一种可能的解决方案是在读取写入的数据时消除重复项,即引入一个主(唯一)键,该键可用于在读取时执行去重操作。
要写入 Kafka 的 DataFrame 在 schema 中应具有以下列
列 | 类型 |
---|---|
key (可选) | string 或 binary |
value (必需) | string 或 binary |
headers (可选) | array |
topic (*可选) | string |
partition (可选) | int |
* 如果未指定“topic”配置选项,则 topic 列是必需的。
value 列是唯一必需的选项。如果未指定 key 列,则会自动添加一个值为 null
的 key 列(有关如何处理值为 null
的 key 值的 Kafka 语义,请参阅相关文档)。如果存在 topic 列,则在将给定行写入 Kafka 时,其值将用作 topic,除非设置了“topic”配置选项,即“topic”配置选项会覆盖 topic 列。如果未指定“partition”列(或其值为 null
),则分区将由 Kafka 生产者计算。可以通过设置 kafka.partitioner.class
选项在 Spark 中指定 Kafka 分区器。如果未指定,将使用 Kafka 默认分区器。
对于 Kafka sink,批处理和流式查询都必须设置以下选项。
选项 | value | 含义 |
---|---|---|
kafka.bootstrap.servers | 逗号分隔的 host:port 列表 | Kafka “bootstrap.servers” 配置。 |
以下配置是可选的
选项 | value | 默认 | 查询类型 | 含义 |
---|---|---|---|---|
topic | string | 无 | 流式和批处理 | 设置所有行将被写入 Kafka 的 topic。此选项会覆盖数据中可能存在的任何 topic 列。 |
includeHeaders | boolean | false | 流式和批处理 | 是否在行中包含 Kafka 标头。 |
为流式查询创建 Kafka Sink
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start();
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start();
将批处理查询的输出写入 Kafka
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save();
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save();
生产者缓存
鉴于 Kafka 生产者实例被设计为线程安全的,Spark 初始化一个 Kafka 生产者实例并在任务之间共享使用相同的缓存键。
缓存键由以下信息构建
- Kafka 生产者配置
这包括授权配置,当使用委托令牌时,Spark 会自动包含该配置。即使我们考虑授权,您也可以预期在相同的 Kafka 生产者配置中将使用相同的 Kafka 生产者实例。当委托令牌续订时,它将使用不同的 Kafka 生产者;旧委托令牌的 Kafka 生产者实例将根据缓存策略被逐出。
以下属性可用于配置生产者池
属性名 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.kafka.producer.cache.timeout | 10m (10 分钟) | 生产者在池中闲置的最短时间,超过此时间后才有资格被回收器驱逐。 | 2.2.1 |
spark.kafka.producer.cache.evictorThreadRunInterval | 1m (1 分钟) | 生产者池的空闲回收线程运行之间的时间间隔。当为非正值时,将不会运行空闲回收线程。 | 3.0.0 |
空闲回收线程会定期移除超过给定超时时间未使用的生产者。请注意,生产者是共享并并发使用的,因此最后使用的时间戳由生产者实例返回且引用计数为 0 的那一刻决定。
Kafka 特有配置
Kafka 自己的配置可以通过 DataStreamReader.option
和 kafka.
前缀来设置,例如 stream.option("kafka.bootstrap.servers", "host:port")
。有关可能的 Kafka 参数,请参阅 Kafka 消费者配置文档(与读取数据相关的参数)和 Kafka 生产者配置文档(与写入数据相关的参数)。
请注意,不能设置以下 Kafka 参数,否则 Kafka 源或 sink 将抛出异常
- group.id:Kafka 源将自动为每个查询创建一个唯一的组 ID。用户可以通过可选的源选项
groupIdPrefix
设置自动生成的 group.id 的前缀,默认值为“spark-kafka-source”。您也可以设置“kafka.group.id”以强制 Spark 使用特定的组 ID,但是,请阅读此选项的警告并谨慎使用。 - auto.offset.reset:设置源选项
startingOffsets
来指定从何处开始。结构化流在内部管理要消费的 offset,而不是依赖 Kafka Consumer 来完成。这将确保在动态订阅新主题/分区时不会丢失数据。请注意,startingOffsets
仅在启动新的流式查询时适用,并且恢复将始终从查询中断处继续。请注意,当流式应用程序消费的 offset 不再存在于 Kafka 中时(例如,主题被删除,offset 超出范围,或 offset 在保留期后被移除),offset 不会被重置,流式应用程序将出现数据丢失。在极端情况下,例如流式应用程序的吞吐量无法赶上 Kafka 的保留速度时,当批次的 offset 范围完全不在 Kafka 中时,批次的输入行可能会逐渐减少直到为零。启用failOnDataLoss
选项可以要求结构化流在此类情况下使查询失败。 - key.deserializer:Key 始终使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化 key。
- value.deserializer:Value 始终使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化 value。
- key.serializer:Key 始终使用 ByteArraySerializer 或 StringSerializer 序列化。使用 DataFrame 操作将 key 显式序列化为字符串或字节数组。
- value.serializer:Value 始终使用 ByteArraySerializer 或 StringSerializer 序列化。使用 DataFrame 操作将 value 显式序列化为字符串或字节数组。
- enable.auto.commit:Kafka 源不提交任何 offset。
- interceptor.classes:Kafka 源始终将 key 和 value 读取为字节数组。使用 ConsumerInterceptor 不安全,因为它可能会中断查询。
部署
与任何 Spark 应用程序一样,spark-submit
用于启动您的应用程序。spark-sql-kafka-0-10_2.13
及其依赖项可以直接使用 --packages
添加到 spark-submit
,例如:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0 ...
要在 spark-shell
上进行实验,您也可以直接使用 --packages
添加 spark-sql-kafka-0-10_2.13
及其依赖项,
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0 ...
有关提交具有外部依赖项的应用程序的更多详细信息,请参阅应用程序提交指南。
安全性
Kafka 0.9.0.0 引入了多项功能,提高了集群的安全性。有关这些可能性的详细描述,请参阅 Kafka 安全文档。
值得注意的是,安全性是可选的,并且默认情况下是关闭的。
Spark 支持以下几种针对 Kafka 集群进行身份验证的方式
- 委托令牌(在 Kafka broker 1.1.0 中引入)
- JAAS 登录配置
委托令牌
通过这种方式,应用程序可以通过 Spark 参数进行配置,并且可能不需要 JAAS 登录配置(Spark 可以使用 Kafka 的动态 JAAS 配置功能)。有关委托令牌的更多信息,请参阅 Kafka 委托令牌文档。
该过程由 Spark 的 Kafka 委托令牌提供者启动。当设置了 spark.kafka.clusters.${cluster}.auth.bootstrap.servers
时,Spark 会按以下优先顺序考虑登录选项:
- JAAS 登录配置,请参阅以下示例。
-
Keytab 文件,例如:
./bin/spark-submit \ --keytab <KEYTAB_FILE> \ --principal <PRINCIPAL> \ --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \ ...
-
Kerberos 凭证缓存,例如:
./bin/spark-submit \ --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \ ...
可以通过将 spark.security.credentials.kafka.enabled
设置为 false
来关闭 Kafka 委托令牌提供程序(默认:true
)。
Spark 可以配置为使用以下身份验证协议来获取令牌(它必须与 Kafka broker 配置匹配)
- SASL SSL (默认)
- SSL
- SASL PLAINTEXT (用于测试)
成功获取委托令牌后,Spark 会将其分发到各个节点并相应地续订。委托令牌使用 SCRAM
登录模块进行身份验证,因此必须配置相应的 spark.kafka.clusters.${cluster}.sasl.token.mechanism
(默认:SCRAM-SHA-512
)。此外,此参数必须与 Kafka broker 配置匹配。
当 executor 上有委托令牌可用时,Spark 会按以下优先顺序考虑登录选项
- JAAS 登录配置,请参阅以下示例。
- 委托令牌,更多详细信息请参阅
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex
参数。
如果以上都不适用,则假定为不安全连接。
配置
可以从多个集群获取委托令牌,${cluster}
是一个任意的唯一标识符,用于帮助分组不同的配置。
属性名 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.kafka.clusters.${cluster}.auth.bootstrap.servers |
无 | 用于建立与 Kafka 集群初始连接的逗号分隔的 host/port 对列表。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex |
.* | 用于匹配应用程序中源和 sink 的 bootstrap.servers 配置的正则表达式。如果服务器地址与此正则表达式匹配,则连接时将使用从相应 bootstrap 服务器获取的委托令牌。如果多个集群匹配该地址,将抛出异常,并且查询不会启动。Kafka 的安全和不安全监听器绑定到不同的端口。当两者都使用时,安全监听器端口必须是正则表达式的一部分。 |
3.0.0 |
spark.kafka.clusters.${cluster}.security.protocol |
SASL_SSL | 用于与 broker 通信的协议。更多详细信息请参阅 Kafka 文档。协议默认应用于所有 bootstrap.servers 配置匹配的源和 sink(更多详细信息请参阅 spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex ),并且可以通过在源或 sink 上设置 kafka.security.protocol 来覆盖。 |
3.0.0 |
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name |
kafka | Kafka 运行时的 Kerberos 主体名称。这可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.type |
无 | 信任存储文件的文件格式。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 | 3.2.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.location |
无 | 信任存储文件的位置。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.password |
无 | 信任存储文件的存储密码。此项可选,仅当配置了 spark.kafka.clusters.${cluster}.ssl.truststore.location 时才需要。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 |
3.0.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.type |
无 | 密钥库文件的文件格式。对于客户端,此项可选。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 | 3.2.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.location |
无 | 密钥库文件的位置。对于客户端,此项可选,可用于客户端的双向认证。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.password |
无 | 密钥库文件的存储密码。此项可选,仅当配置了 spark.kafka.clusters.${cluster}.ssl.keystore.location 时才需要。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 |
3.0.0 |
spark.kafka.clusters.${cluster}.ssl.key.password |
无 | 密钥库文件中私钥的密码。对于客户端,此项可选。更多详细信息请参阅 Kafka 文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.sasl.token.mechanism |
SCRAM-SHA-512 | 用于使用委托令牌的客户端连接的 SASL 机制。由于 SCRAM 登录模块用于身份验证,因此此处必须设置兼容的机制。更多详细信息请参阅 Kafka 文档 (sasl.mechanism )。仅用于使用委托令牌对 Kafka broker 进行身份验证。 |
3.0.0 |
Kafka 特有配置
Kafka 自己的配置可以使用 kafka.
前缀设置,例如 --conf spark.kafka.clusters.${cluster}.kafka.retries=1
。有关可能的 Kafka 参数,请参阅 Kafka adminclient 配置文档。
注意事项
- 尚未支持为代理用户获取委托令牌(KAFKA-6945)。
JAAS 登录配置
JAAS 登录配置必须放置在 Spark 尝试访问 Kafka 集群的所有节点上。这提供了应用任何自定义身份验证逻辑的可能性,但维护成本更高。这可以通过多种方式完成。一种可能性是提供额外的 JVM 参数,例如:
./bin/spark-submit \
--driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
--conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
...