Protobuf 数据源指南
- 部署
- to_protobuf() 和 from_protobuf()
- Protobuf -> Spark SQL 转换支持的类型
- Spark SQL -> Protobuf 转换支持的类型
- 处理循环引用 protobuf 字段
自 Spark 3.4.0 版本起,Spark SQL 提供对读取和写入 protobuf 数据的内置支持。
部署
spark-protobuf
模块是外部模块,默认情况下不包含在 spark-submit
或 spark-shell
中。
与任何 Spark 应用程序一样,spark-submit
用于启动您的应用程序。spark-protobuf_2.12
及其依赖项可以使用 --packages
直接添加到 spark-submit
,例如:
./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.5 ...
为了在 spark-shell
上进行实验,您也可以使用 --packages
直接添加 org.apache.spark:spark-protobuf_2.12
及其依赖项:
./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.5 ...
有关使用外部依赖项提交应用程序的更多详细信息,请参见应用程序提交指南。
to_protobuf() 和 from_protobuf()
spark-protobuf 包提供了函数 to_protobuf
,用于将列编码为 protobuf 格式的二进制文件,以及 from_protobuf()
,用于将 protobuf 二进制数据解码为列。这两个函数都将一列转换为另一列,并且输入/输出 SQL 数据类型可以是复杂类型或原始类型。
当从 Kafka 等流源读取或写入数据时,使用 protobuf 消息作为列非常有用。每个 Kafka 键值记录都将使用一些元数据进行扩充,例如摄取到 Kafka 中的时间戳、Kafka 中的偏移量等。
- 如果包含数据的“value”字段是 protobuf 格式,则可以使用
from_protobuf()
提取您的数据、丰富数据、清理数据,然后将其向下游推送到 Kafka 再次或将其写入到不同的接收器。 to_protobuf()
可用于将结构转换为 protobuf 消息。 当您想将多个列重新编码为单个列,并将数据写入 Kafka 时,此方法特别有用。
Spark SQL 模式是基于传递给 from_protobuf
和 to_protobuf
的 protobuf 描述符文件或 protobuf 类生成的。 指定的 protobuf 类或 protobuf 描述符文件必须与数据匹配,否则,该行为是未定义的:它可能会失败或返回任意结果。
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
# string name = 1;
# int64 id = 2;
# string context = 3;
# }
df = spark
.readStream
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
.select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
.where('event.name == "alice"')
.select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))
# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
.select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
.where('event.name == "alice"')
output.printSchema()
# root
# |--event: struct (nullable = true)
# | |-- name : string (nullable = true)
# | |-- id: long (nullable = true)
# | |-- context: string (nullable = true)
output = output
.select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")
.start()
import org.apache.spark.sql.protobuf.functions._
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
.select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
.where("event.name == \"alice\"")
.select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
.select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(
to_protobuf(col("event"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
Protobuf -> Spark SQL 转换支持的类型
目前,Spark 支持读取 protobuf 标量类型、枚举类型、嵌套类型 和 映射类型(在 Protobuf 消息下)。 除了这些类型之外,spark-protobuf
还引入了对 Protobuf OneOf
字段的支持,这允许您处理可能具有多组字段的消息,但一次只能存在一组。 这对于您处理的数据格式并非始终相同的情况很有用,并且您需要能够处理具有不同字段集的消息,而不会遇到错误。
Protobuf 类型 | Spark SQL 类型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
bytes | BinaryType |
Message | StructType |
repeated | ArrayType |
map | MapType |
OneOf | Struct |
它还支持读取以下 Protobuf 类型 Timestamp 和 Duration
Protobuf 逻辑类型 | Protobuf 模式 | Spark SQL 类型 |
---|---|---|
duration | MessageType{seconds: Long, nanos: Int} | DayTimeIntervalType |
timestamp | MessageType{seconds: Long, nanos: Int} | TimestampType |
Spark SQL -> Protobuf 转换支持的类型
Spark 支持将所有 Spark SQL 类型写入 Protobuf。对于大多数类型,从 Spark 类型到 Protobuf 类型的映射非常简单(例如,IntegerType 会转换为 int);
Spark SQL 类型 | Protobuf 类型 |
---|---|
BooleanType | boolean |
IntegerType | int |
LongType | long |
FloatType | float |
DoubleType | double |
StringType | string |
StringType | enum |
BinaryType | bytes |
StructType | message |
ArrayType | repeated |
MapType | map |
处理循环引用 protobuf 字段
使用 Protobuf 数据时可能出现的一个常见问题是循环引用的存在。 在 Protobuf 中,当一个字段引用自身或引用另一个引用原始字段的字段时,就会发生循环引用。 这可能会在解析数据时导致问题,因为它可能导致无限循环或其他意外行为。 为了解决这个问题,最新版本的 spark-protobuf 引入了一个新功能:通过字段类型检查循环引用的能力。 这允许用户使用 recursive.fields.max.depth
选项来指定解析模式时允许的最大递归级别数。 默认情况下,通过将 recursive.fields.max.depth
设置为 -1,spark-protobuf
将不允许递归字段。 但是,您可以根据需要将此选项设置为 0 到 10。
将 recursive.fields.max.depth
设置为 0 会删除所有递归字段,将其设置为 1 允许它递归一次,将其设置为 2 允许它递归两次。 不允许大于 10 的 recursive.fields.max.depth
值,因为它可能导致性能问题甚至堆栈溢出。
以下 protobuf 消息的 SQL 模式将根据 recursive.fields.max.depth
的值而有所不同。
syntax = "proto3"
message Person {
string name = 1;
Person bff = 2
}
// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.
0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...