Apache Avro 数据源指南

从 Spark 2.4 版本开始,Spark SQL 提供了对读取和写入 Apache Avro 数据的内置支持。

部署

spark-avro 模块是外部的,默认情况下不包含在 spark-submitspark-shell 中。

与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。 spark-avro_2.12 及其依赖项可以直接使用 --packages 添加到 spark-submit 中,例如:

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.1 ...

为了在 spark-shell 上进行实验,您也可以使用 --packages 直接添加 org.apache.spark:spark-avro_2.12 及其依赖项:

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.1 ...

有关使用外部依赖项提交应用程序的更多详细信息,请参阅 应用程序提交指南

加载和保存函数

由于 spark-avro 模块是外部的,因此 DataFrameReaderDataFrameWriter 中没有 .avro API。

要以 Avro 格式加载/保存数据,您需要将数据源选项 format 指定为 avro(或 org.apache.spark.sql.avro)。

df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")

to_avro() 和 from_avro()

Avro 包提供了函数 to_avro 用于将列以 Avro 格式编码为二进制,以及 from_avro() 用于将 Avro 二进制数据解码为列。这两个函数都将一列转换为另一列,输入/输出 SQL 数据类型可以是复杂类型或基本类型。

当从像 Kafka 这样的流式源读取或写入时,使用 Avro 记录作为列很有用。每个 Kafka 键值记录将被一些元数据增强,例如摄取到 Kafka 的时间戳、Kafka 中的偏移量等。

from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()

df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")\
  .start()
import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro($"value", jsonFormatSchema) as $"user")
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as $"value")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;

// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
  .select(from_avro(col("value"), jsonFormatSchema).as("user"))
  .where("user.favorite_color == \"red\"")
  .select(to_avro(col("user.name")).as("value"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")

df <- read.stream(
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  subscribe = "topic1"
)

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.

output <- select(
  filter(
    select(df, alias(from_avro("value", jsonFormatSchema), "user")),
    column("user.favorite_color") == "red"
  ),
  alias(to_avro("user.name"), "value")
)

write.stream(
  output,
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  topic = "topic2"
)

数据源选项

Avro 的数据源选项可以通过以下方式设置:

属性名称默认值含义范围自版本
avroSchema None 用户提供的 JSON 格式的可选模式。
  • 在读取 Avro 文件或调用函数 from_avro 时,此选项可以设置为一个演变的模式,该模式与实际的 Avro 模式兼容但不同。反序列化模式将与演变的模式一致。例如,如果我们设置一个包含一个带有默认值的附加列的演变模式,则 Spark 中的读取结果也将包含新列。请注意,在使用此选项与 from_avro 时,您仍然需要将实际的 Avro 模式作为参数传递给函数。
  • 在写入 Avro 时,如果预期的输出 Avro 模式与 Spark 转换的模式不匹配,则可以设置此选项。例如,一列的预期模式是“enum”类型,而不是默认转换模式中的“string”类型。
读取、写入和函数 from_avro 2.4.0
recordName topLevelRecord 写入结果中的顶级记录名称,这是 Avro 规范中必需的。 写入 2.4.0
recordNamespace "" 写入结果中的记录命名空间。 写入 2.4.0
ignoreExtension true 该选项控制在读取时忽略没有 .avro 扩展名的文件。
如果启用该选项,将加载所有文件(有和没有 .avro 扩展名的文件)。
该选项已弃用,将在将来的版本中删除。请使用通用数据源选项 pathGlobFilter 来过滤文件名。
读取 2.4.0
compression snappy compression 选项允许指定写入中使用的压缩编解码器。
当前支持的编解码器是 uncompressedsnappydeflatebzip2xzzstandard
如果未设置该选项,将考虑配置 spark.sql.avro.compression.codec 配置。
写入 2.4.0
mode FAILFAST mode 选项允许指定函数 from_avro 的解析模式。
当前支持的模式是:
  • FAILFAST:在处理损坏的记录时抛出异常。
  • PERMISSIVE:损坏的记录将被处理为 null 结果。因此,数据模式将强制为完全可空,这可能与用户提供的模式不同。
函数 from_avro 2.4.0
datetimeRebaseMode (spark.sql.avro.datetimeRebaseModeInRead 配置的值) datetimeRebaseMode 选项允许指定从儒略历到公历的 datetimestamp-microstimestamp-millis 逻辑类型的值的重新基准模式。
当前支持的模式是:
  • EXCEPTION:在读取两个日历之间存在歧义的古代日期/时间戳时失败。
  • CORRECTED:加载日期/时间戳而不重新基准。
  • LEGACY:在读取 Avro 文件时,将古代日期/时间戳从儒略历重新基准到公历。
读取和函数 from_avro 3.2.0
positionalFieldMatching false 这可以与 `avroSchema` 选项一起使用,以调整匹配提供的 Avro 模式中的字段与 SQL 模式中的字段的行为。默认情况下,匹配将使用字段名称执行,忽略其位置。如果将此选项设置为“true”,则匹配将基于字段的位置。 读取和写入 3.2.0
enableStableIdentifiersForUnionType false 如果设置为 true,Avro 模式将被反序列化为 Spark SQL 模式,并且 Avro Union 类型将被转换为结构,其中字段名称与其各自的类型保持一致。生成的字段名称将转换为小写,例如 member_int 或 member_string。如果两个用户定义的类型名称或用户定义的类型名称和内置类型名称在不区分大小写的情况下相同,则会引发异常。但是,在其他情况下,字段名称可以被唯一标识。 读取 3.5.0

配置

Avro 的配置可以通过 SparkSession 上的 setConf 方法完成,也可以通过使用 SQL 运行 SET key=value 命令完成。

属性名称默认值含义自版本
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true 如果设置为 true,数据源提供程序 com.databricks.spark.avro 将被映射到内置但外部的 Avro 数据源模块,以实现向后兼容性。
注意:该 SQL 配置已在 Spark 3.2 中弃用,可能会在将来删除。
2.4.0
spark.sql.avro.compression.codec snappy 用于写入 AVRO 文件的压缩编解码器。支持的编解码器:uncompressed、deflate、snappy、bzip2、xz 和 zstandard。默认编解码器是 snappy。 2.4.0
spark.sql.avro.deflate.level -1 用于写入 AVRO 文件的 deflate 编解码器的压缩级别。有效值必须在 1 到 9(包括)或 -1 的范围内。默认值为 -1,对应于当前实现中的级别 6。 2.4.0
spark.sql.avro.datetimeRebaseModeInRead EXCEPTION 从儒略历到公历的 datetimestamp-microstimestamp-millis 逻辑类型的值的重新基准模式
  • EXCEPTION:如果 Spark 看到两个日历之间存在歧义的古代日期/时间戳,它将失败读取。
  • CORRECTED:Spark 不会重新基准,并按原样读取日期/时间戳。
  • LEGACY:在读取 Avro 文件时,Spark 将将日期/时间戳从旧的混合(儒略历 + 公历)日历重新基准到公历。
此配置仅在 Avro 文件的编写器信息(如 Spark、Hive)未知时有效。
3.0.0
spark.sql.avro.datetimeRebaseModeInWrite EXCEPTION 从公历到儒略历的 datetimestamp-microstimestamp-millis 逻辑类型的值的重新基准模式
  • EXCEPTION:如果 Spark 看到两个日历之间存在歧义的古代日期/时间戳,它将失败写入。
  • CORRECTED:Spark 不会重新基准,并按原样写入日期/时间戳。
  • LEGACY: 当写入 Avro 文件时,Spark 将从先验格里高利历法重新调整日期/时间戳到传统的混合(儒略历 + 格里高利历)历法。
3.0.0
spark.sql.avro.filterPushdown.enabled true 当为 true 时,启用对 Avro 数据源的过滤器下推。 3.1.0

与 Databricks spark-avro 的兼容性

此 Avro 数据源模块最初来自 Databricks 的开源存储库 spark-avro,并且与之兼容。

默认情况下,使用 SQL 配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 启用,数据源提供程序 com.databricks.spark.avro 将映射到此内置 Avro 模块。对于在目录元存储中使用 Provider 属性为 com.databricks.spark.avro 创建的 Spark 表,如果使用此内置 Avro 模块,则映射对于加载这些表至关重要。

请注意,在 Databricks 的 spark-avro 中,为快捷函数 .avro() 创建了隐式类 AvroDataFrameWriterAvroDataFrameReader。在此内置但外部模块中,这两个隐式类都被删除。请改用 DataFrameWriterDataFrameReader 中的 .format("avro"),它应该干净且足够好。

如果您更喜欢使用您自己的 spark-avro jar 文件构建,您可以简单地禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled,并在部署应用程序时使用选项 --jars。有关更多详细信息,请阅读应用程序提交指南中的 高级依赖项管理 部分。

支持的 Avro 到 Spark SQL 转换类型

目前,Spark 支持读取 Avro 记录下的所有 基本类型复杂类型

Avro 类型Spark SQL 类型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union 见下文

除了上面列出的类型之外,它还支持读取 union 类型。以下三种类型被认为是基本 union 类型

  1. union(int, long) 将映射到 LongType。
  2. union(float, double) 将映射到 DoubleType。
  3. union(something, null),其中 something 是任何支持的 Avro 类型。这将映射到与 something 相同的 Spark SQL 类型,并且 nullable 设置为 true。所有其他 union 类型都被认为是复杂的。它们将映射到 StructType,其中字段名称为 member0、member1 等,与 union 的成员一致。这与在 Avro 和 Parquet 之间转换时的行为一致。

它还支持读取以下 Avro 逻辑类型

Avro 逻辑类型Avro 类型Spark SQL 类型
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

目前,它忽略 Avro 文件中存在的文档、别名和其他属性。

支持的 Spark SQL 到 Avro 转换类型

Spark 支持将所有 Spark SQL 类型写入 Avro。对于大多数类型,从 Spark 类型到 Avro 类型的映射很简单(例如,IntegerType 被转换为 int);但是,有一些特殊情况,如下所示

Spark SQL 类型Avro 类型Avro 逻辑类型
ByteType int
ShortType int
BinaryType bytes
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal

您还可以使用选项 avroSchema 指定整个输出 Avro 模式,以便 Spark SQL 类型可以转换为其他 Avro 类型。以下转换默认情况下不应用,需要用户指定的 Avro 模式

Spark SQL 类型Avro 类型Avro 逻辑类型
BinaryType fixed
StringType enum
TimestampType long timestamp-millis
DecimalType bytes decimal