Protobuf 数据源指南
- 部署
- to_protobuf() 和 from_protobuf()
- Protobuf -> Spark SQL 转换支持的类型
- Spark SQL -> Protobuf 转换支持的类型
- 处理循环引用的 protobuf 字段
自 Spark 3.4.0 版本起,Spark SQL 提供了对读取和写入 protobuf 数据的内置支持。
部署
spark-protobuf
模块是外部模块,默认情况下不包含在 spark-submit
或 spark-shell
中。
与任何 Spark 应用程序一样,使用 spark-submit
启动应用程序。spark-protobuf_2.12
及其依赖项可以使用 --packages
直接添加到 spark-submit
中,例如:
./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...
为了在 spark-shell
上进行实验,您还可以使用 --packages
直接添加 org.apache.spark:spark-protobuf_2.12
及其依赖项,
./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...
有关使用外部依赖项提交应用程序的更多详细信息,请参阅应用程序提交指南。
to_protobuf() 和 from_protobuf()
spark-protobuf 包提供了函数 to_protobuf
将列编码为 protobuf 格式的二进制文件,以及 from_protobuf()
将 protobuf 二进制数据解码为列。这两个函数都将一列转换为另一列,并且输入/输出 SQL 数据类型可以是复杂类型或基本类型。
当从 Kafka 等流式源读取数据或写入数据时,使用 protobuf 消息作为列非常有用。每个 Kafka 键值记录都将添加一些元数据,例如 Kafka 中的摄取时间戳、Kafka 中的偏移量等。
- 如果包含数据的“值”字段采用 protobuf 格式,则可以使用
from_protobuf()
提取数据、丰富数据、清理数据,然后将其下推回 Kafka 或将其写入其他接收器。 to_protobuf()
可用于将结构转换为 protobuf 消息。当您希望在将数据写入 Kafka 时将多列重新编码为一列时,此方法特别有用。
Spark SQL 架构是根据传递给 from_protobuf
和 to_protobuf
的 protobuf 描述符文件或 protobuf 类生成的。指定的 protobuf 类或 protobuf 描述符文件必须与数据匹配,否则行为未定义:它可能会失败或返回任意结果。
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
# string name = 1;
# int64 id = 2;
# string context = 3;
# }
df = spark
.readStream
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
.select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
.where('event.name == "alice"')
.select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))
# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
.select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
.where('event.name == "alice"')
output.printSchema()
# root
# |--event: struct (nullable = true)
# | |-- name : string (nullable = true)
# | |-- id: long (nullable = true)
# | |-- context: string (nullable = true)
output = output
.select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")
.start()
import org.apache.spark.sql.protobuf.functions._
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
.select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
.where("event.name == \"alice\"")
.select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
.select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(
to_protobuf(col("event"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
Protobuf -> Spark SQL 转换支持的类型
目前,Spark 支持读取 Protobuf 消息下的protobuf 标量类型、枚举类型、嵌套类型和映射类型。除了这些类型之外,spark-protobuf
还引入了对 Protobuf OneOf
字段的支持,它允许您处理可能具有多组字段但一次只能存在一组字段的消息。这对于您正在处理的数据格式不总是相同并且您需要能够处理具有不同字段集的消息而不会遇到错误的情况非常有用。
Protobuf 类型 | Spark SQL 类型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
bytes | BinaryType |
Message | StructType |
repeated | ArrayType |
map | MapType |
OneOf | Struct |
它还支持读取以下 Protobuf 类型:Timestamp 和 Duration
Protobuf 逻辑类型 | Protobuf 架构 | Spark SQL 类型 |
---|---|---|
duration | MessageType{seconds: Long, nanos: Int} | DayTimeIntervalType |
timestamp | MessageType{seconds: Long, nanos: Int} | TimestampType |
Spark SQL -> Protobuf 转换支持的类型
Spark 支持将所有 Spark SQL 类型写入 Protobuf。对于大多数类型,从 Spark 类型到 Protobuf 类型的映射非常简单(例如,IntegerType 转换为 int);
Spark SQL 类型 | Protobuf 类型 |
---|---|
BooleanType | boolean |
IntegerType | int |
LongType | long |
FloatType | float |
DoubleType | double |
StringType | string |
StringType | enum |
BinaryType | bytes |
StructType | message |
ArrayType | repeated |
MapType | map |
处理循环引用的 protobuf 字段
使用 Protobuf 数据时,一个常见问题是存在循环引用。在 Protobuf 中,当一个字段引用自身或引用另一个引用原始字段的字段时,就会发生循环引用。这在解析数据时可能会导致问题,因为它会导致无限循环或其他意外行为。为了解决这个问题,最新版本的 spark-protobuf 引入了一个新功能:能够通过字段类型检查循环引用。这允许用户使用 recursive.fields.max.depth
选项来指定解析架构时允许的最大递归级别数。默认情况下,spark-protobuf
将通过将 recursive.fields.max.depth
设置为 -1 来禁止递归字段。但是,如果需要,您可以将此选项设置为 0 到 10。
将 recursive.fields.max.depth
设置为 0 将删除所有递归字段,将其设置为 1 允许递归一次,将其设置为 2 允许递归两次。recursive.fields.max.depth
值大于 10 是不允许的,因为它会导致性能问题甚至堆栈溢出。
以下 protobuf 消息的 SQL 架构将根据 recursive.fields.max.depth
的值而有所不同。
syntax = "proto3"
message Person {
string name = 1;
Person bff = 2
}
// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.
0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...