Protobuf 数据源指南

自 Spark 3.4.0 版本起,Spark SQL 提供了对读取和写入 protobuf 数据的内置支持。

部署

spark-protobuf 模块是外部模块,默认情况下不包含在 spark-submitspark-shell 中。

与任何 Spark 应用程序一样,使用 spark-submit 启动应用程序。spark-protobuf_2.12 及其依赖项可以使用 --packages 直接添加到 spark-submit 中,例如:

./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...

为了在 spark-shell 上进行实验,您还可以使用 --packages 直接添加 org.apache.spark:spark-protobuf_2.12 及其依赖项,

./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...

有关使用外部依赖项提交应用程序的更多详细信息,请参阅应用程序提交指南

to_protobuf() 和 from_protobuf()

spark-protobuf 包提供了函数 to_protobuf 将列编码为 protobuf 格式的二进制文件,以及 from_protobuf() 将 protobuf 二进制数据解码为列。这两个函数都将一列转换为另一列,并且输入/输出 SQL 数据类型可以是复杂类型或基本类型。

当从 Kafka 等流式源读取数据或写入数据时,使用 protobuf 消息作为列非常有用。每个 Kafka 键值记录都将添加一些元数据,例如 Kafka 中的摄取时间戳、Kafka 中的偏移量等。

Spark SQL 架构是根据传递给 from_protobufto_protobuf 的 protobuf 描述符文件或 protobuf 类生成的。指定的 protobuf 类或 protobuf 描述符文件必须与数据匹配,否则行为未定义:它可能会失败或返回任意结果。

此 div 仅用于使 markdown 编辑器/查看器满意,不会显示在网页上 ```python
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf

# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
#   string name = 1;
#   int64 id = 2;
#   string context = 3;
# }
df = spark
  .readStream
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
  .select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
  .where('event.name == "alice"')
  .select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))

# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
  .select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
  .where('event.name == "alice"')

output.printSchema()
# root
#  |--event: struct (nullable = true)
#  |   |-- name : string (nullable = true)
#  |   |-- id: long (nullable = true)
#  |   |-- context: string (nullable = true)

output = output
  .select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))

query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")
  .start()
```
此 div 仅用于使 markdown 编辑器/查看器满意,不会显示在网页上 ```scala
import org.apache.spark.sql.protobuf.functions._

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
  .select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
  .where("event.name == \"alice\"")
  .select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
  .select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
```
此 div 仅用于使 markdown 编辑器/查看器满意,不会显示在网页上 ```java
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
  .select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
  .where("event.name == \"alice\"")
  .select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
  .select(
    from_protobuf(col("value"),
    "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(
  to_protobuf(col("event"),
  "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
```

Protobuf -> Spark SQL 转换支持的类型

目前,Spark 支持读取 Protobuf 消息下的protobuf 标量类型枚举类型嵌套类型映射类型。除了这些类型之外,spark-protobuf 还引入了对 Protobuf OneOf 字段的支持,它允许您处理可能具有多组字段但一次只能存在一组字段的消息。这对于您正在处理的数据格式不总是相同并且您需要能够处理具有不同字段集的消息而不会遇到错误的情况非常有用。

Protobuf 类型Spark SQL 类型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
bytes BinaryType
Message StructType
repeated ArrayType
map MapType
OneOf Struct

它还支持读取以下 Protobuf 类型:TimestampDuration

Protobuf 逻辑类型Protobuf 架构Spark SQL 类型
duration MessageType{seconds: Long, nanos: Int} DayTimeIntervalType
timestamp MessageType{seconds: Long, nanos: Int} TimestampType

Spark SQL -> Protobuf 转换支持的类型

Spark 支持将所有 Spark SQL 类型写入 Protobuf。对于大多数类型,从 Spark 类型到 Protobuf 类型的映射非常简单(例如,IntegerType 转换为 int);

Spark SQL 类型Protobuf 类型
BooleanType boolean
IntegerType int
LongType long
FloatType float
DoubleType double
StringType string
StringType enum
BinaryType bytes
StructType message
ArrayType repeated
MapType map

处理循环引用的 protobuf 字段

使用 Protobuf 数据时,一个常见问题是存在循环引用。在 Protobuf 中,当一个字段引用自身或引用另一个引用原始字段的字段时,就会发生循环引用。这在解析数据时可能会导致问题,因为它会导致无限循环或其他意外行为。为了解决这个问题,最新版本的 spark-protobuf 引入了一个新功能:能够通过字段类型检查循环引用。这允许用户使用 recursive.fields.max.depth 选项来指定解析架构时允许的最大递归级别数。默认情况下,spark-protobuf 将通过将 recursive.fields.max.depth 设置为 -1 来禁止递归字段。但是,如果需要,您可以将此选项设置为 0 到 10。

recursive.fields.max.depth 设置为 0 将删除所有递归字段,将其设置为 1 允许递归一次,将其设置为 2 允许递归两次。recursive.fields.max.depth 值大于 10 是不允许的,因为它会导致性能问题甚至堆栈溢出。

以下 protobuf 消息的 SQL 架构将根据 recursive.fields.max.depth 的值而有所不同。

此 div 仅用于使 markdown 编辑器/查看器满意,不会显示在网页上 ```protobuf
syntax = "proto3"
message Person {
  string name = 1;
  Person bff = 2
}

// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.

0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
```