Apache Avro 数据源指南
- 部署
- 加载和保存函数
- to_avro() 和 from_avro()
- 数据源选项
- 配置
- 与 Databricks spark-avro 的兼容性
- Avro -> Spark SQL 转换支持的类型
- Spark SQL -> Avro 转换支持的类型
自 Spark 2.4 版本以来,Spark SQL 提供了对读写 Apache Avro 数据的内置支持。
部署
spark-avro
模块是外部模块,默认情况下不包含在 spark-submit
或 spark-shell
中。
与任何 Spark 应用程序一样,spark-submit
用于启动您的应用程序。 可以使用 --packages
将 spark-avro_2.12
及其依赖项直接添加到 spark-submit
,例如:
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.5 ...
为了在 spark-shell
上进行实验,您还可以使用 --packages
直接添加 org.apache.spark:spark-avro_2.12
及其依赖项:
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.5 ...
有关使用外部依赖项提交应用程序的更多详细信息,请参阅应用程序提交指南。
加载和保存函数
由于 spark-avro
模块是外部模块,因此 DataFrameReader
或 DataFrameWriter
中没有 .avro
API。
要以 Avro 格式加载/保存数据,您需要将数据源选项 format
指定为 avro
(或 org.apache.spark.sql.avro
)。
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")
to_avro() 和 from_avro()
Avro 包提供了函数 to_avro
以将列编码为 Avro 格式的二进制文件,以及 from_avro()
以将 Avro 二进制数据解码为列。 这两个函数都将一个列转换为另一个列,并且输入/输出 SQL 数据类型可以是复杂类型或原始类型。
当从 Kafka 等流式源读取数据或写入数据时,使用 Avro 记录作为列非常有用。每个 Kafka 键值记录都将使用一些元数据进行扩充,例如摄取到 Kafka 中的时间戳、Kafka 中的偏移量等。
- 如果包含数据的“value”字段采用 Avro 格式,则可以使用
from_avro()
提取您的数据、丰富数据、清理数据,然后将其向下游推送到 Kafka 或将其写入文件。 to_avro()
可用于将结构体转换为 Avro 记录。当您希望在将数据写出到 Kafka 时将多个列重新编码为一个列时,此方法特别有用。
from pyspark.sql.avro.functions import from_avro, to_avro
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("subscribe", "topic1")\
.load()
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
query = output\
.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")\
.start()
import org.apache.spark.sql.avro.functions._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"user")
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as $"value")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;
// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
.select(from_avro(col("value"), jsonFormatSchema).as("user"))
.where("user.favorite_color == \"red\"")
.select(to_avro(col("user.name")).as("value"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")
df <- read.stream(
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1"
)
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output <- select(
filter(
select(df, alias(from_avro("value", jsonFormatSchema), "user")),
column("user.favorite_color") == "red"
),
alias(to_avro("user.name"), "value")
)
write.stream(
output,
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
topic = "topic2"
)
数据源选项
Avro 的数据源选项可以通过以下方式设置:
DataFrameReader
或DataFrameWriter
上的.option
方法。- 函数
from_avro
中的options
参数。
属性名称 | 默认值 | 含义 | 范围 | 起始版本 |
---|---|---|---|---|
avroSchema |
无 | 用户提供的 JSON 格式的可选模式。
|
读取、写入和函数 from_avro |
2.4.0 |
recordName |
topLevelRecord | 写入结果中的顶级记录名称,Avro 规范中是必需的。 | 写入 | 2.4.0 |
recordNamespace |
"" | 写入结果中的记录命名空间。 | 写入 | 2.4.0 |
ignoreExtension |
true | 该选项控制在读取时忽略没有 .avro 扩展名的文件。如果启用该选项,则加载所有文件(带有和不带有 .avro 扩展名)。该选项已被弃用,并且将在以后的版本中删除。 请使用常规数据源选项 pathGlobFilter 过滤文件名。 |
读取 | 2.4.0 |
compression |
snappy | compression 选项允许指定在写入时使用的压缩编解码器。当前支持的编解码器为 uncompressed 、snappy 、deflate 、bzip2 、xz 和 zstandard 。如果未设置该选项,则会考虑配置 spark.sql.avro.compression.codec 。 |
写入 | 2.4.0 |
mode |
FAILFAST | mode 选项允许为函数 from_avro 指定解析模式。当前支持的模式为
|
函数 from_avro |
2.4.0 |
datetimeRebaseMode |
(spark.sql.avro.datetimeRebaseModeInRead 配置的值) |
datetimeRebaseMode 选项允许指定用于从 Julian 到 Proleptic Gregorian 日历的 date 、timestamp-micros 、timestamp-millis 逻辑类型的值的重新计算模式。当前支持的模式为
|
读取和函数 from_avro |
3.2.0 |
positionalFieldMatching |
false | 它可以与 `avroSchema` 选项结合使用,以调整将提供的 Avro 模式中的字段与 SQL 模式中的字段进行匹配的行为。 默认情况下,匹配将使用字段名称执行,忽略其位置。 如果此选项设置为“true”,则匹配将基于字段的位置。 | 读取和写入 | 3.2.0 |
enableStableIdentifiersForUnionType |
false | 如果设置为 true,Avro 模式将被反序列化为 Spark SQL 模式,并且 Avro Union 类型将被转换为一个结构,其中字段名称与其各自的类型保持一致。生成的字段名称将转换为小写,例如 member_int 或 member_string。如果两个用户定义的类型名称或一个用户定义的类型名称和一个内置类型名称相同(不区分大小写),则会引发异常。但是,在其他情况下,可以唯一地标识字段名称。 | 读取 | 3.5.0 |
配置
可以使用 SparkSession 上的 setConf
方法或使用 SQL 运行 SET key=value
命令来完成 Avro 的配置。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true | 如果设置为 true,则数据源提供程序 com.databricks.spark.avro 将映射到内置但外部的 Avro 数据源模块以实现向后兼容。注意: SQL 配置已在 Spark 3.2 中弃用,并且将来可能会被删除。 |
2.4.0 |
spark.sql.avro.compression.codec | snappy | 用于写入 AVRO 文件的压缩编解码器。 支持的编解码器:uncompressed、deflate、snappy、bzip2、xz 和 zstandard。 默认编解码器为 snappy。 | 2.4.0 |
spark.sql.avro.deflate.level | -1 | 用于写入 AVRO 文件的 deflate 编解码器的压缩级别。 有效值必须在 1 到 9(包括 1 和 9)或 -1 的范围内。 默认值为 -1,在当前实现中对应于 6 级。 | 2.4.0 |
spark.sql.avro.datetimeRebaseModeInRead | EXCEPTION |
用于从 Julian 到 Proleptic Gregorian 日历的 date 、timestamp-micros 、timestamp-millis 逻辑类型的值的重新计算模式
|
3.0.0 |
spark.sql.avro.datetimeRebaseModeInWrite | EXCEPTION |
用于将 date 、timestamp-micros 、timestamp-millis 逻辑类型的值从普罗列期格里高利历重新调整为儒略历的 rebase 模式
|
3.0.0 |
spark.sql.avro.filterPushdown.enabled | true | 当为 true 时,启用将 filter 下推到 Avro 数据源。 | 3.1.0 |
与 Databricks spark-avro 的兼容性
此 Avro 数据源模块最初来自 Databricks 的开源仓库 spark-avro 并与之兼容。
默认情况下,当启用 SQL 配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
时,数据源提供程序 com.databricks.spark.avro
会映射到此内置 Avro 模块。 对于使用 Provider
属性作为 com.databricks.spark.avro
在 catalog meta store 中创建的 Spark 表,如果您使用此内置 Avro 模块,则此映射对于加载这些表至关重要。
请注意,在 Databricks 的 spark-avro 中,为快捷功能 .avro()
创建了隐式类 AvroDataFrameWriter
和 AvroDataFrameReader
。 在这个内置但外部的模块中,这两个隐式类都被移除了。 请改用 DataFrameWriter
或 DataFrameReader
中的 .format("avro")
,这应该足够简洁和良好。
如果您更喜欢使用您自己构建的 spark-avro
jar 文件,您可以简单地禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
,并在部署应用程序时使用选项 --jars
。 有关更多详细信息,请阅读应用程序提交指南中的 高级依赖项管理 部分。
Avro -> Spark SQL 转换支持的类型
目前,Spark 支持读取 Avro 记录下的所有 原始类型 和 复杂类型。
Avro 类型 | Spark SQL 类型 |
---|---|
boolean | BooleanType(布尔类型) |
int | IntegerType(整数类型) |
long | LongType(长整型) |
float | FloatType(浮点类型) |
double | DoubleType(双精度浮点类型) |
string | StringType(字符串类型) |
enum | StringType(字符串类型) |
fixed | BinaryType(二进制类型) |
bytes | BinaryType(二进制类型) |
record | StructType(结构体类型) |
array | ArrayType(数组类型) |
map | MapType(映射类型) |
union | 见下文 |
除了上面列出的类型之外,它还支持读取 union
类型。 以下三种类型被认为是基本 union
类型
union(int, long)
将被映射到 LongType。union(float, double)
将被映射到 DoubleType。union(something, null)
,其中 something 是任何支持的 Avro 类型。 这将被映射到与 something 相同的 Spark SQL 类型,并将 nullable 设置为 true。 所有其他 union 类型都被认为是复杂的。 它们将被映射到 StructType,其中字段名称为 member0、member1 等,与 union 的成员一致。 这与 Avro 和 Parquet 之间转换时的行为一致。
它还支持读取以下 Avro 逻辑类型
Avro 逻辑类型 | Avro 类型 | Spark SQL 类型 |
---|---|---|
date | int | DateType(日期类型) |
timestamp-millis | long | TimestampType(时间戳类型) |
timestamp-micros | long | TimestampType(时间戳类型) |
decimal | fixed | DecimalType(十进制类型) |
decimal | bytes | DecimalType(十进制类型) |
目前,它忽略 Avro 文件中存在的文档、别名和其他属性。
Spark SQL -> Avro 转换支持的类型
Spark 支持将所有 Spark SQL 类型写入 Avro。 对于大多数类型,从 Spark 类型到 Avro 类型的映射非常简单(例如,IntegerType 转换为 int); 但是,有一些特殊情况,如下所示
Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
ByteType(字节类型) | int | |
ShortType(短整型) | int | |
BinaryType(二进制类型) | bytes | |
DateType(日期类型) | int | date |
TimestampType(时间戳类型) | long | timestamp-micros |
DecimalType(十进制类型) | fixed | decimal |
您还可以使用选项 avroSchema
指定整个输出 Avro schema,以便可以将 Spark SQL 类型转换为其他 Avro 类型。 以下转换默认情况下不应用,需要用户指定的 Avro schema
Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
BinaryType(二进制类型) | fixed | |
StringType(字符串类型) | enum | |
TimestampType(时间戳类型) | long | timestamp-millis |
DecimalType(十进制类型) | bytes | decimal |