Apache Avro 数据源指南

自 Spark 2.4 版本发布以来,Spark SQL 提供了对读写 Apache Avro 数据的内置支持。

部署

spark-avro 模块是外部模块,默认不包含在 spark-submitspark-shell 中。

与任何 Spark 应用程序一样,使用 spark-submit 启动应用程序。spark-avro_2.13 及其依赖项可以使用 --packages 直接添加到 spark-submit 中,例如:

./bin/spark-submit --packages org.apache.spark:spark-avro_2.13:4.0.0 ...

要在 spark-shell 上进行实验,您也可以使用 --packages 直接添加 org.apache.spark:spark-avro_2.13 及其依赖项:

./bin/spark-shell --packages org.apache.spark:spark-avro_2.13:4.0.0 ...

有关提交带有外部依赖项的应用程序的更多详细信息,请参阅应用程序提交指南

加载和保存函数

由于 spark-avro 模块是外部模块,DataFrameReaderDataFrameWriter 中没有 .avro API。

要以 Avro 格式加载/保存数据,您需要将数据源选项 format 指定为 avro(或 org.apache.spark.sql.avro)。

df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")

to_avro() 和 from_avro()

Avro 包提供了函数 to_avro 用于将列编码为 Avro 格式的二进制数据,以及 from_avro() 用于将 Avro 二进制数据解码为列。这两个函数都将一列转换为另一列,并且输入/输出 SQL 数据类型可以是复杂类型或原始类型。

在从 Kafka 等流式源读取或写入数据时,将 Avro 记录用作列非常有用。每个 Kafka 键值记录都将通过一些元数据进行增强,例如摄取到 Kafka 的时间戳、Kafka 中的偏移量等。

from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()

df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")\
  .start()
import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro($"value", jsonFormatSchema) as $"user")
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as $"value")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;

// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
  .select(from_avro(col("value"), jsonFormatSchema).as("user"))
  .where("user.favorite_color == \"red\"")
  .select(to_avro(col("user.name")).as("value"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")

df <- read.stream(
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  subscribe = "topic1"
)

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.

output <- select(
  filter(
    select(df, alias(from_avro("value", jsonFormatSchema), "user")),
    column("user.favorite_color") == "red"
  ),
  alias(to_avro("user.name"), "value")
)

write.stream(
  output,
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  topic = "topic2"
)
CREATE TABLE t AS
  SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s
  FROM VALUES (1, NULL), (NULL,  'a') tab(member0, member1);
DECLARE avro_schema STRING;
SET VARIABLE avro_schema =
  '{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }';

SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t;

SELECT FROM_AVRO(result, avro_schema, MAP()).u FROM (
  SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t);

DROP TEMPORARY VARIABLE avro_schema;
DROP TABLE t;

数据源选项

Avro 数据源选项可以通过以下方式设置:

属性名称默认值含义范围起始版本
avroSchema 用户以 JSON 格式提供的可选模式。
  • 读取 Avro 文件或调用 `from_avro` 函数时,此选项可设置为演进模式,该模式与实际 Avro 模式兼容但不同。反序列化模式将与演进模式保持一致。例如,如果设置了一个包含一个带有默认值的额外列的演进模式,则 Spark 中的读取结果也将包含新列。请注意,当与 `from_avro` 一起使用此选项时,您仍需将实际 Avro 模式作为参数传递给该函数。
  • 在写入 Avro 时,如果期望的输出 Avro 模式与 Spark 转换的模式不匹配,则可以设置此选项。例如,某个列的期望模式是“enum”类型,而不是默认转换模式中的“string”类型。
读取、写入和函数 from_avro 2.4.0
recordName topLevelRecord 写入结果中的顶层记录名称,这是 Avro 规范中要求的。 写入 2.4.0
recordNamespace "" 写入结果中的记录命名空间。 写入 2.4.0
ignoreExtension true 该选项控制在读取时忽略没有 .avro 扩展名的文件。
如果启用此选项,则加载所有文件(无论是否带有 .avro 扩展名)。
该选项已弃用,并将在未来的版本中移除。请使用通用数据源选项 pathGlobFilter 来过滤文件名。
读取 2.4.0
compression snappy 选项 compression 允许指定写入时使用的压缩编解码器。
目前支持的编解码器有 uncompressedsnappydeflatebzip2xzzstandard
如果未设置此选项,则会考虑配置 spark.sql.avro.compression.codec
写入 2.4.0
mode FAILFAST 选项 mode 允许为函数 from_avro 指定解析模式。
目前支持的模式有:
  • FAILFAST:处理损坏的记录时抛出异常。
  • PERMISSIVE:损坏的记录被处理为空结果。因此,数据模式被强制为完全可为空,这可能与用户提供的模式不同。
函数 from_avro 2.4.0
datetimeRebaseMode spark.sql.avro.datetimeRebaseModeInRead 配置的值) 选项 datetimeRebaseMode 允许指定 datetimestamp-microstimestamp-millis 逻辑类型的值从儒略历到修正格里高利历的重基模式。
目前支持的模式有:
  • EXCEPTION:读取在两种日历之间含义模糊的旧日期/时间戳时失败。
  • CORRECTED:加载日期/时间戳而不进行重基。
  • LEGACY:将旧日期/时间戳从儒略历重基到修正格里高利历。
读取和函数 from_avro 3.2.0
positionalFieldMatching false 此选项可与 `avroSchema` 选项结合使用,以调整将提供的 Avro 模式中的字段与 SQL 模式中的字段匹配的行为。默认情况下,匹配将使用字段名进行,忽略其位置。如果此选项设置为“true”,则匹配将基于字段的位置。 读取和写入 3.2.0
enableStableIdentifiersForUnionType false 如果设置为 true,Avro 模式将被反序列化为 Spark SQL 模式,并且 Avro Union 类型将转换为一个结构,其中字段名与其各自的类型保持一致。结果字段名将转换为小写,例如 member_int 或 member_string。如果两个用户定义类型名称或一个用户定义类型名称和一个内置类型名称(不区分大小写)相同,将抛出异常。然而,在其他情况下,字段名可以唯一标识。 读取 3.5.0
stableIdentifierPrefixForUnionType member_ 当启用 `enableStableIdentifiersForUnionType` 时,此选项允许配置 Avro Union 类型字段的前缀。 读取 4.0.0
recursiveFieldMaxDepth -1 如果此选项指定为负数或设置为 0,则不允许递归字段。将其设置为 1 会丢弃所有递归字段,设置为 2 允许递归字段递归一次,设置为 3 允许递归两次,依此类推,最多允许 15 次。不允许超过 15 的值,以避免无意中创建非常大的模式。如果 Avro 消息的深度超出此限制,则返回的 Spark 结构将在递归限制后被截断。使用示例可在处理 Avro 字段的循环引用部分找到。 读取 4.0.0

配置

Avro 的配置可以通过 spark.conf.set 或使用 SQL 运行 SET key=value 命令来完成。

属性名称默认值含义起始版本
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true 如果设置为 true,数据源提供程序 com.databricks.spark.avro 将映射到内置但外部的 Avro 数据源模块以实现向后兼容性。
注意:此 SQL 配置在 Spark 3.2 中已弃用,并可能在未来版本中移除。
2.4.0
spark.sql.avro.compression.codec snappy 写入 AVRO 文件时使用的压缩编解码器。支持的编解码器:uncompressed, deflate, snappy, bzip2, xz 和 zstandard。默认编解码器是 snappy。 2.4.0
spark.sql.avro.deflate.level -1 写入 AVRO 文件时 deflate 编解码器的压缩级别。有效值必须在 1 到 9(包含)或 -1 的范围内。默认值为 -1,在当前实现中对应于级别 6。 2.4.0
spark.sql.avro.xz.level 6 写入 AVRO 文件时 xz 编解码器的压缩级别。有效值必须在 1 到 9(包含)的范围内。当前实现中的默认值为 6。 4.0.0
spark.sql.avro.zstandard.level 3 写入 AVRO 文件时 zstandard 编解码器的压缩级别。当前实现中的默认值为 3。 4.0.0
spark.sql.avro.zstandard.bufferPool.enabled false 如果为 true,则在写入 AVRO 文件时启用 ZSTD JNI 库的缓冲区池。 4.0.0
spark.sql.avro.datetimeRebaseModeInRead EXCEPTION datetimestamp-microstimestamp-millis 逻辑类型的值从儒略历到修正格里高利历的重基模式
  • EXCEPTION:如果 Spark 遇到在两种日历之间含义模糊的旧日期/时间戳,则读取将失败。
  • CORRECTED:Spark 不会进行重基,并按原样读取日期/时间戳。
  • LEGACY:当读取 Avro 文件时,Spark 会将日期/时间戳从旧的混合(儒略历 + 格里高利历)日历重基到修正格里高利历。
此配置仅在 Avro 文件的写入者信息(如 Spark、Hive)未知时才有效。
3.0.0
spark.sql.avro.datetimeRebaseModeInWrite EXCEPTION datetimestamp-microstimestamp-millis 逻辑类型的值从修正格里高利历到儒略历的重基模式
  • EXCEPTION:如果 Spark 遇到在两种日历之间含义模糊的旧日期/时间戳,则写入将失败。
  • CORRECTED:Spark 不会进行重基,并按原样写入日期/时间戳。
  • LEGACY:当写入 Avro 文件时,Spark 会将日期/时间戳从修正格里高利历重基到旧的混合(儒略历 + 格里高利历)日历。
3.0.0
spark.sql.avro.filterPushdown.enabled true 如果为 true,则启用 Avro 数据源的谓词下推。 3.1.0

与 Databricks spark-avro 的兼容性

此 Avro 数据源模块最初来源于 Databricks 的开源仓库 spark-avro,并与其兼容。

默认情况下,当 SQL 配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 启用时,数据源提供程序 com.databricks.spark.avro 会映射到此内置 Avro 模块。对于在目录元存储中以 Provider 属性为 com.databricks.spark.avro 创建的 Spark 表,如果您正在使用此内置 Avro 模块,则此映射对于加载这些表至关重要。

请注意,在 Databricks 的 spark-avro 中,为快捷函数 .avro() 创建了隐式类 AvroDataFrameWriterAvroDataFrameReader。在此内置但外部模块中,这两个隐式类均已移除。请改用 DataFrameWriterDataFrameReader 中的 .format("avro"),这应该更简洁且足够好用。

如果您更喜欢使用自己构建的 spark-avro jar 文件,您可以简单地禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled,并在部署应用程序时使用 --jars 选项。有关更多详细信息,请阅读应用程序提交指南中的高级依赖项管理部分。

Avro 到 Spark SQL 转换支持的类型

目前 Spark 支持读取 Avro 记录下的所有原始类型复杂类型

Avro 类型Spark SQL 类型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union 见下文

除了上面列出的类型之外,它还支持读取 union 类型。以下三种类型被认为是基本的 union 类型:

  1. union(int, long) 将映射到 LongType。
  2. union(float, double) 将映射到 DoubleType。
  3. union(something, null),其中 something 是任何受支持的 Avro 类型。这将映射到与 something 相同的 Spark SQL 类型,且可为空性设置为 true。所有其他 union 类型都被视为复杂类型。它们将映射到 StructType,其中字段名为 member0、member1 等,与 union 的成员一致。这与 Avro 和 Parquet 之间转换时的行为一致。

它还支持读取以下 Avro 逻辑类型

Avro 逻辑类型Avro 类型Spark SQL 类型
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

目前,它忽略 Avro 文件中存在的文档、别名和其他属性。

Spark SQL 到 Avro 转换支持的类型

Spark 支持将所有 Spark SQL 类型写入 Avro。对于大多数类型,Spark 类型到 Avro 类型的映射是直接的(例如 IntegerType 转换为 int);但是,下面列出了一些特殊情况:

Spark SQL 类型Avro 类型Avro 逻辑类型
ByteType int
ShortType int
BinaryType bytes
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal

您还可以使用选项 avroSchema 指定整个输出 Avro 模式,以便 Spark SQL 类型可以转换为其他 Avro 类型。以下转换默认不应用,需要用户指定的 Avro 模式:

Spark SQL 类型Avro 类型Avro 逻辑类型
BinaryType fixed
StringType enum
TimestampType long timestamp-millis
DecimalType bytes decimal

处理 Avro 字段的循环引用

在 Avro 中,当字段的类型在其中一个父记录中定义时,就会发生循环引用。这可能在解析数据时导致问题,因为它可能导致无限循环或其他意外行为。要读取具有循环引用模式的 Avro 数据,用户可以使用 recursiveFieldMaxDepth 选项来指定解析模式时允许的最大递归级别数。默认情况下,Spark Avro 数据源通过将 recursiveFieldMaxDepth 设置为 -1 来禁止递归字段。但是,如果需要,您可以将此选项设置为 1 到 15。

recursiveFieldMaxDepth 设置为 1 会丢弃所有递归字段,设置为 2 允许递归一次,设置为 3 允许递归两次。不允许 recursiveFieldMaxDepth 值大于 15,因为这可能导致性能问题甚至栈溢出。

下面 Avro 消息的 SQL 模式将根据 recursiveFieldMaxDepth 的值而变化。

此 div 仅用于使 Markdown 编辑器/查看器正常工作,不会在网页上显示 ```avro
{
  "type": "record",
  "name": "Node",
  "fields": [
    {"name": "Id", "type": "int"},
    {"name": "Next", "type": ["null", "Node"]}
  ]
}

// The Avro schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursiveFieldMaxDepth` value.

1: struct<Id: int>
2: struct<Id: int, Next: struct<Id: int>>
3: struct<Id: int, Next: struct<Id: int, Next: struct<Id: int>>>
```