Protobuf 数据源指南
- 部署
- to_protobuf() 和 from_protobuf()
- Protobuf -> Spark SQL 转换支持的类型
- Spark SQL -> Protobuf 转换支持的类型
- 处理 Protobuf 字段中的循环引用
- 数据源选项
自 Spark 3.4.0 版本起,Spark SQL 提供对读写 Protobuf 数据的内置支持。
部署
spark-protobuf
模块是外部模块,默认不包含在 spark-submit
或 spark-shell
中。
与所有 Spark 应用程序一样,使用 spark-submit
启动您的应用程序。spark-protobuf_2.13
及其依赖项可以使用 --packages
直接添加到 spark-submit
,例如,
./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...
要在 spark-shell
上进行实验,您也可以使用 --packages
直接添加 org.apache.spark:spark-protobuf_2.13
及其依赖项,
./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...
有关提交带有外部依赖项的应用程序的更多详细信息,请参阅应用程序提交指南。
to_protobuf() 和 from_protobuf()
spark-protobuf 包提供了函数 to_protobuf
用于将列编码为 Protobuf 格式的二进制数据,以及 from_protobuf()
用于将 Protobuf 二进制数据解码为列。这两个函数将一列转换为另一列,并且输入/输出 SQL 数据类型可以是复杂类型或原始类型。
在从 Kafka 等流式源读取或写入数据时,使用 Protobuf 消息作为列非常有用。每个 Kafka 键值记录都会附带一些元数据,例如摄取到 Kafka 的时间戳、在 Kafka 中的偏移量等。
- 如果包含数据的“value”字段是 Protobuf 格式,您可以使用
from_protobuf()
提取数据、丰富数据、清理数据,然后将其再次推送到 Kafka 下游或写入不同的数据汇。 to_protobuf()
可用于将结构体转换为 Protobuf 消息。当您希望在将数据写入 Kafka 时将多列重新编码为单列时,此方法特别有用。
Spark SQL 模式是根据传递给 from_protobuf
和 to_protobuf
的 Protobuf 描述符文件或 Protobuf 类生成的。指定的 Protobuf 类或 Protobuf 描述符文件必须与数据匹配,否则行为是未定义的:它可能会失败或返回任意结果。
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
# string name = 1;
# int64 id = 2;
# string context = 3;
# }
df = spark
.readStream
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
.select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
.where('event.name == "alice"')
.select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))
# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
.select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
.where('event.name == "alice"')
output.printSchema()
# root
# |--event: struct (nullable = true)
# | |-- name : string (nullable = true)
# | |-- id: long (nullable = true)
# | |-- context: string (nullable = true)
output = output
.select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")
.start()
import org.apache.spark.sql.protobuf.functions._
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
.select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
.where("event.name == \"alice\"")
.select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
.select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(
to_protobuf(col("event"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
Protobuf -> Spark SQL 转换支持的类型
目前 Spark 支持读取 Protobuf 消息下的Protobuf 标量类型、枚举类型、嵌套类型和映射类型。除了这些类型之外,spark-protobuf
还引入了对 Protobuf OneOf
字段的支持。这允许您处理可能具有多组字段但每次只能存在一组字段的消息。这对于您正在处理的数据不总是相同格式的情况很有用,并且您需要能够处理具有不同字段集的消息而不会遇到错误。
Protobuf 类型 | Spark SQL 类型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
bytes | BinaryType |
Message | StructType |
repeated | ArrayType |
map | MapType |
OneOf | Struct |
它还支持读取以下 Protobuf 类型:Timestamp 和 Duration
Protobuf 逻辑类型 | Protobuf 模式 | Spark SQL 类型 |
---|---|---|
duration | MessageType{seconds: Long, nanos: Int} | DayTimeIntervalType |
timestamp | MessageType{seconds: Long, nanos: Int} | TimestampType |
Spark SQL -> Protobuf 转换支持的类型
Spark 支持将所有 Spark SQL 类型写入 Protobuf。对于大多数类型,从 Spark 类型到 Protobuf 类型的映射是直接的(例如,IntegerType 转换为 int);
Spark SQL 类型 | Protobuf 类型 |
---|---|
BooleanType | boolean |
IntegerType | int |
LongType | long |
FloatType | float |
DoubleType | double |
StringType | string |
StringType | enum |
BinaryType | bytes |
StructType | message |
ArrayType | repeated |
MapType | map |
处理 Protobuf 字段中的循环引用
处理 Protobuf 数据时可能出现的一个常见问题是循环引用。在 Protobuf 中,当一个字段引用自身或引用另一个引用原始字段的字段时,就会发生循环引用。这可能导致解析数据时出现问题,因为它可能导致无限循环或其他意外行为。为了解决此问题,最新版本的 spark-protobuf 引入了一个新功能:通过字段类型检查循环引用的能力。这允许用户使用 recursive.fields.max.depth
选项来指定解析模式时允许的最大递归级别数。默认情况下,spark-protobuf
通过将 recursive.fields.max.depth
设置为 -1 来禁止递归字段。但是,如果需要,您可以将此选项设置为 1 到 10。
将 recursive.fields.max.depth
设置为 1 将丢弃所有递归字段;设置为 2 允许递归一次;设置为 3 允许递归两次。不允许 recursive.fields.max.depth
值大于 10,因为它可能导致性能问题甚至栈溢出。
以下 Protobuf 消息的 SQL 模式将根据 recursive.fields.max.depth
的值而异。
syntax = "proto3"
message Person {
string name = 1;
Person bff = 2
}
// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.
1: struct<name: string>
2: struct<name: string, bff: struct<name: string>>
3: struct<name: string, bff: struct<name: string, bff: struct<name: string>>>
...
数据源选项
Protobuf 数据源选项可以通过以下方式设置
- 以下内置函数
from_protobuf
to_protobuf
属性名 | 默认值 | 含义 | 范围 |
---|---|---|---|
mode |
FAILFAST |
允许一种在解析过程中处理损坏记录的模式。
|
读取 |
recursive.fields.max.depth |
-1 |
指定解析模式时允许的最大递归级别数。有关更多详细信息,请参阅处理 Protobuf 字段中的循环引用部分。 | 读取 |
convert.any.fields.to.json |
false |
启用将 Protobuf Any 字段转换为 JSON。应谨慎启用此选项。JSON 转换和处理效率低下。此外,模式安全性也会降低,导致下游处理容易出错。 |
读取 |
emit.default.values |
false |
反序列化 Protobuf 到 Spark 结构体时是否呈现零值字段。当序列化的 Protobuf 中某个字段为空时,此库默认会将其反序列化为 null ,此选项可以控制是否呈现特定类型的零值。 |
读取 |
enums.as.ints |
false |
是否将枚举字段呈现为其整数值。当此选项设置为 false 时,枚举字段将被映射到 StringType ,值为枚举名称;当设置为 true 时,枚举字段将被映射到 IntegerType ,值为其整数值。 |
读取 |
upcast.unsigned.ints |
false |
是否将无符号整数向上转型为更大的类型。将此选项设置为 true 时,uint32 使用 LongType ,uint64 使用 Decimal(20, 0) ,这样它们的表示可以包含大的无符号值而不会溢出。 |
读取 |
unwrap.primitive.wrapper.types |
false |
反序列化时是否解包知名原始包装类型的结构体表示。默认情况下,原始类型的包装类型(即 google.protobuf.Int32Value、google.protobuf.Int64Value 等)将被反序列化为结构体。 | 读取 |
retain.empty.message.types |
false |
是否在模式中保留空 Protobuf 消息类型的字段。由于 Spark 不允许写入空的 StructType ,因此空 Protobuf 消息类型默认将被丢弃。将此选项设置为 true 将向空 Protobuf 消息插入一个虚拟列(__dummy_field_in_empty_struct ),以便保留空消息字段。 |
读取 |