Apache Avro 数据源指南
- 部署
- 加载和保存函数
- to_avro() 和 from_avro()
- 数据源选项
- 配置
- 与 Databricks spark-avro 的兼容性
- Avro 到 Spark SQL 转换支持的类型
- Spark SQL 到 Avro 转换支持的类型
- 处理 Avro 字段的循环引用
自 Spark 2.4 版本发布以来,Spark SQL 提供了对读写 Apache Avro 数据的内置支持。
部署
spark-avro
模块是外部模块,默认不包含在 spark-submit
或 spark-shell
中。
与任何 Spark 应用程序一样,使用 spark-submit
启动应用程序。spark-avro_2.13
及其依赖项可以使用 --packages
直接添加到 spark-submit
中,例如:
./bin/spark-submit --packages org.apache.spark:spark-avro_2.13:4.0.0 ...
要在 spark-shell
上进行实验,您也可以使用 --packages
直接添加 org.apache.spark:spark-avro_2.13
及其依赖项:
./bin/spark-shell --packages org.apache.spark:spark-avro_2.13:4.0.0 ...
有关提交带有外部依赖项的应用程序的更多详细信息,请参阅应用程序提交指南。
加载和保存函数
由于 spark-avro
模块是外部模块,DataFrameReader
或 DataFrameWriter
中没有 .avro
API。
要以 Avro 格式加载/保存数据,您需要将数据源选项 format
指定为 avro
(或 org.apache.spark.sql.avro
)。
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")
to_avro() 和 from_avro()
Avro 包提供了函数 to_avro
用于将列编码为 Avro 格式的二进制数据,以及 from_avro()
用于将 Avro 二进制数据解码为列。这两个函数都将一列转换为另一列,并且输入/输出 SQL 数据类型可以是复杂类型或原始类型。
在从 Kafka 等流式源读取或写入数据时,将 Avro 记录用作列非常有用。每个 Kafka 键值记录都将通过一些元数据进行增强,例如摄取到 Kafka 的时间戳、Kafka 中的偏移量等。
- 如果包含数据的“value”字段采用 Avro 格式,您可以使用
from_avro()
提取数据,对其进行丰富和清理,然后再次将其推送到 Kafka 或写入文件。 to_avro()
可用于将结构体转换为 Avro 记录。当您希望在将数据写入 Kafka 时将多列重新编码为单列时,此方法特别有用。
from pyspark.sql.avro.functions import from_avro, to_avro
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("subscribe", "topic1")\
.load()
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
query = output\
.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")\
.start()
import org.apache.spark.sql.avro.functions._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"user")
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as $"value")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;
// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
.select(from_avro(col("value"), jsonFormatSchema).as("user"))
.where("user.favorite_color == \"red\"")
.select(to_avro(col("user.name")).as("value"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")
df <- read.stream(
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1"
)
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output <- select(
filter(
select(df, alias(from_avro("value", jsonFormatSchema), "user")),
column("user.favorite_color") == "red"
),
alias(to_avro("user.name"), "value")
)
write.stream(
output,
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
topic = "topic2"
)
CREATE TABLE t AS
SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s
FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1);
DECLARE avro_schema STRING;
SET VARIABLE avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }';
SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t;
SELECT FROM_AVRO(result, avro_schema, MAP()).u FROM (
SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t);
DROP TEMPORARY VARIABLE avro_schema;
DROP TABLE t;
数据源选项
Avro 数据源选项可以通过以下方式设置:
DataFrameReader
或DataFrameWriter
上的.option
方法。- 函数
from_avro
中的options
参数。
属性名称 | 默认值 | 含义 | 范围 | 起始版本 |
---|---|---|---|---|
avroSchema |
无 | 用户以 JSON 格式提供的可选模式。
|
读取、写入和函数 from_avro |
2.4.0 |
recordName |
topLevelRecord | 写入结果中的顶层记录名称,这是 Avro 规范中要求的。 | 写入 | 2.4.0 |
recordNamespace |
"" | 写入结果中的记录命名空间。 | 写入 | 2.4.0 |
ignoreExtension |
true | 该选项控制在读取时忽略没有 .avro 扩展名的文件。如果启用此选项,则加载所有文件(无论是否带有 .avro 扩展名)。该选项已弃用,并将在未来的版本中移除。请使用通用数据源选项 pathGlobFilter 来过滤文件名。 |
读取 | 2.4.0 |
compression |
snappy | 选项 compression 允许指定写入时使用的压缩编解码器。目前支持的编解码器有 uncompressed 、snappy 、deflate 、bzip2 、xz 和 zstandard 。如果未设置此选项,则会考虑配置 spark.sql.avro.compression.codec 。 |
写入 | 2.4.0 |
mode |
FAILFAST | 选项 mode 允许为函数 from_avro 指定解析模式。目前支持的模式有:
|
函数 from_avro |
2.4.0 |
datetimeRebaseMode |
(spark.sql.avro.datetimeRebaseModeInRead 配置的值) |
选项 datetimeRebaseMode 允许指定 date 、timestamp-micros 、timestamp-millis 逻辑类型的值从儒略历到修正格里高利历的重基模式。目前支持的模式有:
|
读取和函数 from_avro |
3.2.0 |
positionalFieldMatching |
false | 此选项可与 `avroSchema` 选项结合使用,以调整将提供的 Avro 模式中的字段与 SQL 模式中的字段匹配的行为。默认情况下,匹配将使用字段名进行,忽略其位置。如果此选项设置为“true”,则匹配将基于字段的位置。 | 读取和写入 | 3.2.0 |
enableStableIdentifiersForUnionType |
false | 如果设置为 true,Avro 模式将被反序列化为 Spark SQL 模式,并且 Avro Union 类型将转换为一个结构,其中字段名与其各自的类型保持一致。结果字段名将转换为小写,例如 member_int 或 member_string。如果两个用户定义类型名称或一个用户定义类型名称和一个内置类型名称(不区分大小写)相同,将抛出异常。然而,在其他情况下,字段名可以唯一标识。 | 读取 | 3.5.0 |
stableIdentifierPrefixForUnionType |
member_ | 当启用 `enableStableIdentifiersForUnionType` 时,此选项允许配置 Avro Union 类型字段的前缀。 | 读取 | 4.0.0 |
recursiveFieldMaxDepth |
-1 | 如果此选项指定为负数或设置为 0,则不允许递归字段。将其设置为 1 会丢弃所有递归字段,设置为 2 允许递归字段递归一次,设置为 3 允许递归两次,依此类推,最多允许 15 次。不允许超过 15 的值,以避免无意中创建非常大的模式。如果 Avro 消息的深度超出此限制,则返回的 Spark 结构将在递归限制后被截断。使用示例可在处理 Avro 字段的循环引用部分找到。 | 读取 | 4.0.0 |
配置
Avro 的配置可以通过 spark.conf.set
或使用 SQL 运行 SET key=value
命令来完成。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true | 如果设置为 true,数据源提供程序 com.databricks.spark.avro 将映射到内置但外部的 Avro 数据源模块以实现向后兼容性。注意:此 SQL 配置在 Spark 3.2 中已弃用,并可能在未来版本中移除。 |
2.4.0 |
spark.sql.avro.compression.codec | snappy | 写入 AVRO 文件时使用的压缩编解码器。支持的编解码器:uncompressed, deflate, snappy, bzip2, xz 和 zstandard。默认编解码器是 snappy。 | 2.4.0 |
spark.sql.avro.deflate.level | -1 | 写入 AVRO 文件时 deflate 编解码器的压缩级别。有效值必须在 1 到 9(包含)或 -1 的范围内。默认值为 -1,在当前实现中对应于级别 6。 | 2.4.0 |
spark.sql.avro.xz.level | 6 | 写入 AVRO 文件时 xz 编解码器的压缩级别。有效值必须在 1 到 9(包含)的范围内。当前实现中的默认值为 6。 | 4.0.0 |
spark.sql.avro.zstandard.level | 3 | 写入 AVRO 文件时 zstandard 编解码器的压缩级别。当前实现中的默认值为 3。 | 4.0.0 |
spark.sql.avro.zstandard.bufferPool.enabled | false | 如果为 true,则在写入 AVRO 文件时启用 ZSTD JNI 库的缓冲区池。 | 4.0.0 |
spark.sql.avro.datetimeRebaseModeInRead | EXCEPTION |
date 、timestamp-micros 、timestamp-millis 逻辑类型的值从儒略历到修正格里高利历的重基模式
|
3.0.0 |
spark.sql.avro.datetimeRebaseModeInWrite | EXCEPTION |
date 、timestamp-micros 、timestamp-millis 逻辑类型的值从修正格里高利历到儒略历的重基模式
|
3.0.0 |
spark.sql.avro.filterPushdown.enabled | true | 如果为 true,则启用 Avro 数据源的谓词下推。 | 3.1.0 |
与 Databricks spark-avro 的兼容性
此 Avro 数据源模块最初来源于 Databricks 的开源仓库 spark-avro,并与其兼容。
默认情况下,当 SQL 配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
启用时,数据源提供程序 com.databricks.spark.avro
会映射到此内置 Avro 模块。对于在目录元存储中以 Provider
属性为 com.databricks.spark.avro
创建的 Spark 表,如果您正在使用此内置 Avro 模块,则此映射对于加载这些表至关重要。
请注意,在 Databricks 的 spark-avro 中,为快捷函数 .avro()
创建了隐式类 AvroDataFrameWriter
和 AvroDataFrameReader
。在此内置但外部模块中,这两个隐式类均已移除。请改用 DataFrameWriter
或 DataFrameReader
中的 .format("avro")
,这应该更简洁且足够好用。
如果您更喜欢使用自己构建的 spark-avro
jar 文件,您可以简单地禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
,并在部署应用程序时使用 --jars
选项。有关更多详细信息,请阅读应用程序提交指南中的高级依赖项管理部分。
Avro 到 Spark SQL 转换支持的类型
目前 Spark 支持读取 Avro 记录下的所有原始类型和复杂类型。
Avro 类型 | Spark SQL 类型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
fixed | BinaryType |
bytes | BinaryType |
record | StructType |
array | ArrayType |
map | MapType |
union | 见下文 |
除了上面列出的类型之外,它还支持读取 union
类型。以下三种类型被认为是基本的 union
类型:
union(int, long)
将映射到 LongType。union(float, double)
将映射到 DoubleType。union(something, null)
,其中 something 是任何受支持的 Avro 类型。这将映射到与 something 相同的 Spark SQL 类型,且可为空性设置为 true。所有其他 union 类型都被视为复杂类型。它们将映射到 StructType,其中字段名为 member0、member1 等,与 union 的成员一致。这与 Avro 和 Parquet 之间转换时的行为一致。
它还支持读取以下 Avro 逻辑类型:
Avro 逻辑类型 | Avro 类型 | Spark SQL 类型 |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
目前,它忽略 Avro 文件中存在的文档、别名和其他属性。
Spark SQL 到 Avro 转换支持的类型
Spark 支持将所有 Spark SQL 类型写入 Avro。对于大多数类型,Spark 类型到 Avro 类型的映射是直接的(例如 IntegerType 转换为 int);但是,下面列出了一些特殊情况:
Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DateType | int | date |
TimestampType | long | timestamp-micros |
DecimalType | fixed | decimal |
您还可以使用选项 avroSchema
指定整个输出 Avro 模式,以便 Spark SQL 类型可以转换为其他 Avro 类型。以下转换默认不应用,需要用户指定的 Avro 模式:
Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
BinaryType | fixed | |
StringType | enum | |
TimestampType | long | timestamp-millis |
DecimalType | bytes | decimal |
处理 Avro 字段的循环引用
在 Avro 中,当字段的类型在其中一个父记录中定义时,就会发生循环引用。这可能在解析数据时导致问题,因为它可能导致无限循环或其他意外行为。要读取具有循环引用模式的 Avro 数据,用户可以使用 recursiveFieldMaxDepth
选项来指定解析模式时允许的最大递归级别数。默认情况下,Spark Avro 数据源通过将 recursiveFieldMaxDepth
设置为 -1 来禁止递归字段。但是,如果需要,您可以将此选项设置为 1 到 15。
将 recursiveFieldMaxDepth
设置为 1 会丢弃所有递归字段,设置为 2 允许递归一次,设置为 3 允许递归两次。不允许 recursiveFieldMaxDepth
值大于 15,因为这可能导致性能问题甚至栈溢出。
下面 Avro 消息的 SQL 模式将根据 recursiveFieldMaxDepth
的值而变化。
{
"type": "record",
"name": "Node",
"fields": [
{"name": "Id", "type": "int"},
{"name": "Next", "type": ["null", "Node"]}
]
}
// The Avro schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursiveFieldMaxDepth` value.
1: struct<Id: int>
2: struct<Id: int, Next: struct<Id: int>>
3: struct<Id: int, Next: struct<Id: int, Next: struct<Id: int>>>