Protobuf 数据源指南

自 Spark 3.4.0 版本起,Spark SQL 提供对读取和写入 protobuf 数据的内置支持。

部署

spark-protobuf 模块是外部模块,默认情况下不包含在 spark-submitspark-shell 中。

与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。spark-protobuf_2.12 及其依赖项可以使用 --packages 直接添加到 spark-submit,例如:

./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.5 ...

为了在 spark-shell 上进行实验,您也可以使用 --packages 直接添加 org.apache.spark:spark-protobuf_2.12 及其依赖项:

./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.5 ...

有关使用外部依赖项提交应用程序的更多详细信息,请参见应用程序提交指南

to_protobuf() 和 from_protobuf()

spark-protobuf 包提供了函数 to_protobuf,用于将列编码为 protobuf 格式的二进制文件,以及 from_protobuf(),用于将 protobuf 二进制数据解码为列。这两个函数都将一列转换为另一列,并且输入/输出 SQL 数据类型可以是复杂类型或原始类型。

当从 Kafka 等流源读取或写入数据时,使用 protobuf 消息作为列非常有用。每个 Kafka 键值记录都将使用一些元数据进行扩充,例如摄取到 Kafka 中的时间戳、Kafka 中的偏移量等。

Spark SQL 模式是基于传递给 from_protobufto_protobuf 的 protobuf 描述符文件或 protobuf 类生成的。 指定的 protobuf 类或 protobuf 描述符文件必须与数据匹配,否则,该行为是未定义的:它可能会失败或返回任意结果。

此 div 仅用于使 markdown 编辑器/查看器感到满意,并且不会在 Web 上显示 ```python
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf

# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
#   string name = 1;
#   int64 id = 2;
#   string context = 3;
# }
df = spark
  .readStream
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
  .select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
  .where('event.name == "alice"')
  .select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))

# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
  .select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
  .where('event.name == "alice"')

output.printSchema()
# root
#  |--event: struct (nullable = true)
#  |   |-- name : string (nullable = true)
#  |   |-- id: long (nullable = true)
#  |   |-- context: string (nullable = true)

output = output
  .select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))

query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")
  .start()
```
此 div 仅用于使 markdown 编辑器/查看器感到满意,并且不会在 Web 上显示 ```scala
import org.apache.spark.sql.protobuf.functions._

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
  .select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
  .where("event.name == \"alice\"")
  .select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
  .select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
```
此 div 仅用于使 markdown 编辑器/查看器感到满意,并且不会在 Web 上显示 ```java
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
  .select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
  .where("event.name == \"alice\"")
  .select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
  .select(
    from_protobuf(col("value"),
    "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(
  to_protobuf(col("event"),
  "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
```

Protobuf -> Spark SQL 转换支持的类型

目前,Spark 支持读取 protobuf 标量类型枚举类型嵌套类型映射类型(在 Protobuf 消息下)。 除了这些类型之外,spark-protobuf 还引入了对 Protobuf OneOf 字段的支持,这允许您处理可能具有多组字段的消息,但一次只能存在一组。 这对于您处理的数据格式并非始终相同的情况很有用,并且您需要能够处理具有不同字段集的消息,而不会遇到错误。

Protobuf 类型Spark SQL 类型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
bytes BinaryType
Message StructType
repeated ArrayType
map MapType
OneOf Struct

它还支持读取以下 Protobuf 类型 TimestampDuration

Protobuf 逻辑类型Protobuf 模式Spark SQL 类型
duration MessageType{seconds: Long, nanos: Int} DayTimeIntervalType
timestamp MessageType{seconds: Long, nanos: Int} TimestampType

Spark SQL -> Protobuf 转换支持的类型

Spark 支持将所有 Spark SQL 类型写入 Protobuf。对于大多数类型,从 Spark 类型到 Protobuf 类型的映射非常简单(例如,IntegerType 会转换为 int);

Spark SQL 类型Protobuf 类型
BooleanType boolean
IntegerType int
LongType long
FloatType float
DoubleType double
StringType string
StringType enum
BinaryType bytes
StructType message
ArrayType repeated
MapType map

处理循环引用 protobuf 字段

使用 Protobuf 数据时可能出现的一个常见问题是循环引用的存在。 在 Protobuf 中,当一个字段引用自身或引用另一个引用原始字段的字段时,就会发生循环引用。 这可能会在解析数据时导致问题,因为它可能导致无限循环或其他意外行为。 为了解决这个问题,最新版本的 spark-protobuf 引入了一个新功能:通过字段类型检查循环引用的能力。 这允许用户使用 recursive.fields.max.depth 选项来指定解析模式时允许的最大递归级别数。 默认情况下,通过将 recursive.fields.max.depth 设置为 -1,spark-protobuf 将不允许递归字段。 但是,您可以根据需要将此选项设置为 0 到 10。

recursive.fields.max.depth 设置为 0 会删除所有递归字段,将其设置为 1 允许它递归一次,将其设置为 2 允许它递归两次。 不允许大于 10 的 recursive.fields.max.depth 值,因为它可能导致性能问题甚至堆栈溢出。

以下 protobuf 消息的 SQL 模式将根据 recursive.fields.max.depth 的值而有所不同。

此 div 仅用于使 markdown 编辑器/查看器感到满意,并且不会在 Web 上显示 ```protobuf
syntax = "proto3"
message Person {
  string name = 1;
  Person bff = 2
}

// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.

0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
```