Apache Avro 数据源指南

自 Spark 2.4 版本以来,Spark SQL 提供了对读写 Apache Avro 数据的内置支持。

部署

spark-avro 模块是外部模块,默认情况下不包含在 spark-submitspark-shell 中。

与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。 可以使用 --packagesspark-avro_2.12 及其依赖项直接添加到 spark-submit,例如:

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.5 ...

为了在 spark-shell 上进行实验,您还可以使用 --packages 直接添加 org.apache.spark:spark-avro_2.12 及其依赖项:

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.5 ...

有关使用外部依赖项提交应用程序的更多详细信息,请参阅应用程序提交指南

加载和保存函数

由于 spark-avro 模块是外部模块,因此 DataFrameReaderDataFrameWriter 中没有 .avro API。

要以 Avro 格式加载/保存数据,您需要将数据源选项 format 指定为 avro (或 org.apache.spark.sql.avro)。

df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")

to_avro() 和 from_avro()

Avro 包提供了函数 to_avro 以将列编码为 Avro 格式的二进制文件,以及 from_avro() 以将 Avro 二进制数据解码为列。 这两个函数都将一个列转换为另一个列,并且输入/输出 SQL 数据类型可以是复杂类型或原始类型。

当从 Kafka 等流式源读取数据或写入数据时,使用 Avro 记录作为列非常有用。每个 Kafka 键值记录都将使用一些元数据进行扩充,例如摄取到 Kafka 中的时间戳、Kafka 中的偏移量等。

from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()

df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")\
  .start()
import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro($"value", jsonFormatSchema) as $"user")
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as $"value")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;

// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
  .select(from_avro(col("value"), jsonFormatSchema).as("user"))
  .where("user.favorite_color == \"red\"")
  .select(to_avro(col("user.name")).as("value"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")

df <- read.stream(
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  subscribe = "topic1"
)

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.

output <- select(
  filter(
    select(df, alias(from_avro("value", jsonFormatSchema), "user")),
    column("user.favorite_color") == "red"
  ),
  alias(to_avro("user.name"), "value")
)

write.stream(
  output,
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  topic = "topic2"
)

数据源选项

Avro 的数据源选项可以通过以下方式设置:

属性名称默认值含义范围起始版本
avroSchema 用户提供的 JSON 格式的可选模式。
  • 读取 Avro 文件或调用函数 from_avro 时,此选项可以设置为演化的模式,该模式与实际 Avro 模式兼容但不同。 反序列化模式将与演化的模式一致。 例如,如果我们设置一个包含具有默认值的附加列的演化模式,则 Spark 中的读取结果也将包含新列。 请注意,将此选项与 from_avro 一起使用时,您仍然需要将实际的 Avro 模式作为参数传递给该函数。
  • 写入 Avro 时,如果期望的输出 Avro 模式与 Spark 转换的模式不匹配,则可以设置此选项。 例如,一列的预期模式是“枚举”类型,而不是默认转换模式中的“字符串”类型。
读取、写入和函数 from_avro 2.4.0
recordName topLevelRecord 写入结果中的顶级记录名称,Avro 规范中是必需的。 写入 2.4.0
recordNamespace "" 写入结果中的记录命名空间。 写入 2.4.0
ignoreExtension true 该选项控制在读取时忽略没有 .avro 扩展名的文件。
如果启用该选项,则加载所有文件(带有和不带有 .avro 扩展名)。
该选项已被弃用,并且将在以后的版本中删除。 请使用常规数据源选项 pathGlobFilter 过滤文件名。
读取 2.4.0
compression snappy compression 选项允许指定在写入时使用的压缩编解码器。
当前支持的编解码器为 uncompressedsnappydeflatebzip2xzzstandard
如果未设置该选项,则会考虑配置 spark.sql.avro.compression.codec
写入 2.4.0
mode FAILFAST mode 选项允许为函数 from_avro 指定解析模式。
当前支持的模式为
  • FAILFAST:在处理损坏的记录时引发异常。
  • PERMISSIVE:损坏的记录被处理为 null 结果。 因此,数据模式被迫完全可为空,这可能与用户提供的模式不同。
函数 from_avro 2.4.0
datetimeRebaseMode spark.sql.avro.datetimeRebaseModeInRead 配置的值) datetimeRebaseMode 选项允许指定用于从 Julian 到 Proleptic Gregorian 日历的 datetimestamp-microstimestamp-millis 逻辑类型的值的重新计算模式。
当前支持的模式为
  • EXCEPTION:在读取两个日历之间不明确的古代日期/时间戳时失败。
  • CORRECTED:加载日期/时间戳,不进行重新计算。
  • LEGACY:执行从 Julian 到 Proleptic Gregorian 日历的古代日期/时间戳的重新计算。
读取和函数 from_avro 3.2.0
positionalFieldMatching false 它可以与 `avroSchema` 选项结合使用,以调整将提供的 Avro 模式中的字段与 SQL 模式中的字段进行匹配的行为。 默认情况下,匹配将使用字段名称执行,忽略其位置。 如果此选项设置为“true”,则匹配将基于字段的位置。 读取和写入 3.2.0
enableStableIdentifiersForUnionType false 如果设置为 true,Avro 模式将被反序列化为 Spark SQL 模式,并且 Avro Union 类型将被转换为一个结构,其中字段名称与其各自的类型保持一致。生成的字段名称将转换为小写,例如 member_int 或 member_string。如果两个用户定义的类型名称或一个用户定义的类型名称和一个内置类型名称相同(不区分大小写),则会引发异常。但是,在其他情况下,可以唯一地标识字段名称。 读取 3.5.0

配置

可以使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key=value 命令来完成 Avro 的配置。

属性名称默认值含义起始版本
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true 如果设置为 true,则数据源提供程序 com.databricks.spark.avro 将映射到内置但外部的 Avro 数据源模块以实现向后兼容。
注意: SQL 配置已在 Spark 3.2 中弃用,并且将来可能会被删除。
2.4.0
spark.sql.avro.compression.codec snappy 用于写入 AVRO 文件的压缩编解码器。 支持的编解码器:uncompressed、deflate、snappy、bzip2、xz 和 zstandard。 默认编解码器为 snappy。 2.4.0
spark.sql.avro.deflate.level -1 用于写入 AVRO 文件的 deflate 编解码器的压缩级别。 有效值必须在 1 到 9(包括 1 和 9)或 -1 的范围内。 默认值为 -1,在当前实现中对应于 6 级。 2.4.0
spark.sql.avro.datetimeRebaseModeInRead EXCEPTION 用于从 Julian 到 Proleptic Gregorian 日历的 datetimestamp-microstimestamp-millis 逻辑类型的值的重新计算模式
  • EXCEPTION: 如果 Spark 遇到两种日历之间存在歧义的古老日期/时间戳,则读取将会失败。
  • CORRECTED: Spark 不会进行 rebase 操作,并按原样读取日期/时间戳。
  • LEGACY: 当读取 Avro 文件时,Spark 会将日期/时间戳从旧版混合日历(儒略历 + 格里高利历)重新调整为普罗列期格里高利历。
只有当 Avro 文件的写入者信息(如 Spark, Hive)未知时,此配置才有效。
3.0.0
spark.sql.avro.datetimeRebaseModeInWrite EXCEPTION 用于将 datetimestamp-microstimestamp-millis 逻辑类型的值从普罗列期格里高利历重新调整为儒略历的 rebase 模式
  • EXCEPTION: 如果 Spark 遇到两种日历之间存在歧义的古老日期/时间戳,则写入将会失败。
  • CORRECTED: Spark 不会进行 rebase 操作,并按原样写入日期/时间戳。
  • LEGACY: 当写入 Avro 文件时,Spark 会将日期/时间戳从普罗列期格里高利历重新调整为旧版混合日历(儒略历 + 格里高利历)。
3.0.0
spark.sql.avro.filterPushdown.enabled true 当为 true 时,启用将 filter 下推到 Avro 数据源。 3.1.0

与 Databricks spark-avro 的兼容性

此 Avro 数据源模块最初来自 Databricks 的开源仓库 spark-avro 并与之兼容。

默认情况下,当启用 SQL 配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 时,数据源提供程序 com.databricks.spark.avro 会映射到此内置 Avro 模块。 对于使用 Provider 属性作为 com.databricks.spark.avro 在 catalog meta store 中创建的 Spark 表,如果您使用此内置 Avro 模块,则此映射对于加载这些表至关重要。

请注意,在 Databricks 的 spark-avro 中,为快捷功能 .avro() 创建了隐式类 AvroDataFrameWriterAvroDataFrameReader。 在这个内置但外部的模块中,这两个隐式类都被移除了。 请改用 DataFrameWriterDataFrameReader 中的 .format("avro"),这应该足够简洁和良好。

如果您更喜欢使用您自己构建的 spark-avro jar 文件,您可以简单地禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled,并在部署应用程序时使用选项 --jars。 有关更多详细信息,请阅读应用程序提交指南中的 高级依赖项管理 部分。

Avro -> Spark SQL 转换支持的类型

目前,Spark 支持读取 Avro 记录下的所有 原始类型复杂类型

Avro 类型Spark SQL 类型
boolean BooleanType(布尔类型)
int IntegerType(整数类型)
long LongType(长整型)
float FloatType(浮点类型)
double DoubleType(双精度浮点类型)
string StringType(字符串类型)
enum StringType(字符串类型)
fixed BinaryType(二进制类型)
bytes BinaryType(二进制类型)
record StructType(结构体类型)
array ArrayType(数组类型)
map MapType(映射类型)
union 见下文

除了上面列出的类型之外,它还支持读取 union 类型。 以下三种类型被认为是基本 union 类型

  1. union(int, long) 将被映射到 LongType。
  2. union(float, double) 将被映射到 DoubleType。
  3. union(something, null),其中 something 是任何支持的 Avro 类型。 这将被映射到与 something 相同的 Spark SQL 类型,并将 nullable 设置为 true。 所有其他 union 类型都被认为是复杂的。 它们将被映射到 StructType,其中字段名称为 member0、member1 等,与 union 的成员一致。 这与 Avro 和 Parquet 之间转换时的行为一致。

它还支持读取以下 Avro 逻辑类型

Avro 逻辑类型Avro 类型Spark SQL 类型
date int DateType(日期类型)
timestamp-millis long TimestampType(时间戳类型)
timestamp-micros long TimestampType(时间戳类型)
decimal fixed DecimalType(十进制类型)
decimal bytes DecimalType(十进制类型)

目前,它忽略 Avro 文件中存在的文档、别名和其他属性。

Spark SQL -> Avro 转换支持的类型

Spark 支持将所有 Spark SQL 类型写入 Avro。 对于大多数类型,从 Spark 类型到 Avro 类型的映射非常简单(例如,IntegerType 转换为 int); 但是,有一些特殊情况,如下所示

Spark SQL 类型Avro 类型Avro 逻辑类型
ByteType(字节类型) int
ShortType(短整型) int
BinaryType(二进制类型) bytes
DateType(日期类型) int date
TimestampType(时间戳类型) long timestamp-micros
DecimalType(十进制类型) fixed decimal

您还可以使用选项 avroSchema 指定整个输出 Avro schema,以便可以将 Spark SQL 类型转换为其他 Avro 类型。 以下转换默认情况下不应用,需要用户指定的 Avro schema

Spark SQL 类型Avro 类型Avro 逻辑类型
BinaryType(二进制类型) fixed
StringType(字符串类型) enum
TimestampType(时间戳类型) long timestamp-millis
DecimalType(十进制类型) bytes decimal