Protobuf 数据源指南

自 Spark 3.4.0 版本起,Spark SQL 提供对读写 Protobuf 数据的内置支持。

部署

spark-protobuf 模块是外部模块,默认不包含在 spark-submitspark-shell 中。

与所有 Spark 应用程序一样,使用 spark-submit 启动您的应用程序。spark-protobuf_2.13 及其依赖项可以使用 --packages 直接添加到 spark-submit,例如,

./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...

要在 spark-shell 上进行实验,您也可以使用 --packages 直接添加 org.apache.spark:spark-protobuf_2.13 及其依赖项,

./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...

有关提交带有外部依赖项的应用程序的更多详细信息,请参阅应用程序提交指南

to_protobuf() 和 from_protobuf()

spark-protobuf 包提供了函数 to_protobuf 用于将列编码为 Protobuf 格式的二进制数据,以及 from_protobuf() 用于将 Protobuf 二进制数据解码为列。这两个函数将一列转换为另一列,并且输入/输出 SQL 数据类型可以是复杂类型或原始类型。

在从 Kafka 等流式源读取或写入数据时,使用 Protobuf 消息作为列非常有用。每个 Kafka 键值记录都会附带一些元数据,例如摄取到 Kafka 的时间戳、在 Kafka 中的偏移量等。

Spark SQL 模式是根据传递给 from_protobufto_protobuf 的 Protobuf 描述符文件或 Protobuf 类生成的。指定的 Protobuf 类或 Protobuf 描述符文件必须与数据匹配,否则行为是未定义的:它可能会失败或返回任意结果。

此 div 仅用于使 markdown 编辑器/查看器正常显示,不会在网页上显示 ```python
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf

# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
#   string name = 1;
#   int64 id = 2;
#   string context = 3;
# }
df = spark
  .readStream
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
  .select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
  .where('event.name == "alice"')
  .select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))

# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
  .select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
  .where('event.name == "alice"')

output.printSchema()
# root
#  |--event: struct (nullable = true)
#  |   |-- name : string (nullable = true)
#  |   |-- id: long (nullable = true)
#  |   |-- context: string (nullable = true)

output = output
  .select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))

query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")
  .start()
```
此 div 仅用于使 markdown 编辑器/查看器正常显示,不会在网页上显示 ```scala
import org.apache.spark.sql.protobuf.functions._

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
  .select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
  .where("event.name == \"alice\"")
  .select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
  .select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
```
此 div 仅用于使 markdown 编辑器/查看器正常显示,不会在网页上显示 ```java
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
  .select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
  .where("event.name == \"alice\"")
  .select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
  .select(
    from_protobuf(col("value"),
    "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(
  to_protobuf(col("event"),
  "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
```

Protobuf -> Spark SQL 转换支持的类型

目前 Spark 支持读取 Protobuf 消息下的Protobuf 标量类型枚举类型嵌套类型映射类型。除了这些类型之外,spark-protobuf 还引入了对 Protobuf OneOf 字段的支持。这允许您处理可能具有多组字段但每次只能存在一组字段的消息。这对于您正在处理的数据不总是相同格式的情况很有用,并且您需要能够处理具有不同字段集的消息而不会遇到错误。

Protobuf 类型Spark SQL 类型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
bytes BinaryType
Message StructType
repeated ArrayType
map MapType
OneOf Struct

它还支持读取以下 Protobuf 类型:TimestampDuration

Protobuf 逻辑类型Protobuf 模式Spark SQL 类型
duration MessageType{seconds: Long, nanos: Int} DayTimeIntervalType
timestamp MessageType{seconds: Long, nanos: Int} TimestampType

Spark SQL -> Protobuf 转换支持的类型

Spark 支持将所有 Spark SQL 类型写入 Protobuf。对于大多数类型,从 Spark 类型到 Protobuf 类型的映射是直接的(例如,IntegerType 转换为 int);

Spark SQL 类型Protobuf 类型
BooleanType boolean
IntegerType int
LongType long
FloatType float
DoubleType double
StringType string
StringType enum
BinaryType bytes
StructType message
ArrayType repeated
MapType map

处理 Protobuf 字段中的循环引用

处理 Protobuf 数据时可能出现的一个常见问题是循环引用。在 Protobuf 中,当一个字段引用自身或引用另一个引用原始字段的字段时,就会发生循环引用。这可能导致解析数据时出现问题,因为它可能导致无限循环或其他意外行为。为了解决此问题,最新版本的 spark-protobuf 引入了一个新功能:通过字段类型检查循环引用的能力。这允许用户使用 recursive.fields.max.depth 选项来指定解析模式时允许的最大递归级别数。默认情况下,spark-protobuf 通过将 recursive.fields.max.depth 设置为 -1 来禁止递归字段。但是,如果需要,您可以将此选项设置为 1 到 10。

recursive.fields.max.depth 设置为 1 将丢弃所有递归字段;设置为 2 允许递归一次;设置为 3 允许递归两次。不允许 recursive.fields.max.depth 值大于 10,因为它可能导致性能问题甚至栈溢出。

以下 Protobuf 消息的 SQL 模式将根据 recursive.fields.max.depth 的值而异。

此 div 仅用于使 markdown 编辑器/查看器正常显示,不会在网页上显示 ```protobuf
syntax = "proto3"
message Person {
  string name = 1;
  Person bff = 2
}

// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.

1: struct<name: string>
2: struct<name: string, bff: struct<name: string>>
3: struct<name: string, bff: struct<name: string, bff: struct<name: string>>>
...
```

数据源选项

Protobuf 数据源选项可以通过以下方式设置

属性名默认值含义范围
mode FAILFAST 允许一种在解析过程中处理损坏记录的模式。
  • PERMISSIVE:当遇到损坏记录时,将所有字段设置为 null
  • DROPMALFORMED:忽略整个损坏记录。Protobuf 内置函数不支持此模式。
  • FAILFAST:当遇到损坏记录时抛出异常。
读取
recursive.fields.max.depth -1 指定解析模式时允许的最大递归级别数。有关更多详细信息,请参阅处理 Protobuf 字段中的循环引用部分。 读取
convert.any.fields.to.json false 启用将 Protobuf Any 字段转换为 JSON。应谨慎启用此选项。JSON 转换和处理效率低下。此外,模式安全性也会降低,导致下游处理容易出错。 读取
emit.default.values false 反序列化 Protobuf 到 Spark 结构体时是否呈现零值字段。当序列化的 Protobuf 中某个字段为空时,此库默认会将其反序列化为 null,此选项可以控制是否呈现特定类型的零值。 读取
enums.as.ints false 是否将枚举字段呈现为其整数值。当此选项设置为 false 时,枚举字段将被映射到 StringType,值为枚举名称;当设置为 true 时,枚举字段将被映射到 IntegerType,值为其整数值。 读取
upcast.unsigned.ints false 是否将无符号整数向上转型为更大的类型。将此选项设置为 true 时,uint32 使用 LongTypeuint64 使用 Decimal(20, 0),这样它们的表示可以包含大的无符号值而不会溢出。 读取
unwrap.primitive.wrapper.types false 反序列化时是否解包知名原始包装类型的结构体表示。默认情况下,原始类型的包装类型(即 google.protobuf.Int32Value、google.protobuf.Int64Value 等)将被反序列化为结构体。 读取
retain.empty.message.types false 是否在模式中保留空 Protobuf 消息类型的字段。由于 Spark 不允许写入空的 StructType,因此空 Protobuf 消息类型默认将被丢弃。将此选项设置为 true 将向空 Protobuf 消息插入一个虚拟列(__dummy_field_in_empty_struct),以便保留空消息字段。 读取