Apache Avro 数据源指南
- 部署
- 加载和保存函数
- to_avro() 和 from_avro()
- 数据源选项
- 配置
- 与 Databricks spark-avro 的兼容性
- 支持的 Avro 到 Spark SQL 转换类型
- 支持的 Spark SQL 到 Avro 转换类型
从 Spark 2.4 版本开始,Spark SQL 提供了对读取和写入 Apache Avro 数据的内置支持。
部署
spark-avro
模块是外部的,默认情况下不包含在 spark-submit
或 spark-shell
中。
与任何 Spark 应用程序一样,spark-submit
用于启动您的应用程序。 spark-avro_2.12
及其依赖项可以直接使用 --packages
添加到 spark-submit
中,例如:
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.1 ...
为了在 spark-shell
上进行实验,您也可以使用 --packages
直接添加 org.apache.spark:spark-avro_2.12
及其依赖项:
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.1 ...
有关使用外部依赖项提交应用程序的更多详细信息,请参阅 应用程序提交指南。
加载和保存函数
由于 spark-avro
模块是外部的,因此 DataFrameReader
或 DataFrameWriter
中没有 .avro
API。
要以 Avro 格式加载/保存数据,您需要将数据源选项 format
指定为 avro
(或 org.apache.spark.sql.avro
)。
to_avro() 和 from_avro()
Avro 包提供了函数 to_avro
用于将列以 Avro 格式编码为二进制,以及 from_avro()
用于将 Avro 二进制数据解码为列。这两个函数都将一列转换为另一列,输入/输出 SQL 数据类型可以是复杂类型或基本类型。
当从像 Kafka 这样的流式源读取或写入时,使用 Avro 记录作为列很有用。每个 Kafka 键值记录将被一些元数据增强,例如摄取到 Kafka 的时间戳、Kafka 中的偏移量等。
- 如果包含您的数据的“value”字段是 Avro,您可以使用
from_avro()
提取您的数据,对其进行丰富,清理,然后将其再次推送到下游到 Kafka 或将其写入文件。 to_avro()
可用于将结构转换为 Avro 记录。当您希望在将数据写入 Kafka 时将多个列重新编码为单个列时,此方法特别有用。
数据源选项
Avro 的数据源选项可以通过以下方式设置:
DataFrameReader
或DataFrameWriter
上的.option
方法。- 函数
from_avro
中的options
参数。
属性名称 | 默认值 | 含义 | 范围 | 自版本 |
---|---|---|---|---|
avroSchema |
None | 用户提供的 JSON 格式的可选模式。
|
读取、写入和函数 from_avro |
2.4.0 |
recordName |
topLevelRecord | 写入结果中的顶级记录名称,这是 Avro 规范中必需的。 | 写入 | 2.4.0 |
recordNamespace |
"" | 写入结果中的记录命名空间。 | 写入 | 2.4.0 |
ignoreExtension |
true | 该选项控制在读取时忽略没有 .avro 扩展名的文件。如果启用该选项,将加载所有文件(有和没有 .avro 扩展名的文件)。该选项已弃用,将在将来的版本中删除。请使用通用数据源选项 pathGlobFilter 来过滤文件名。 |
读取 | 2.4.0 |
compression |
snappy | compression 选项允许指定写入中使用的压缩编解码器。当前支持的编解码器是 uncompressed 、snappy 、deflate 、bzip2 、xz 和 zstandard 。如果未设置该选项,将考虑配置 spark.sql.avro.compression.codec 配置。 |
写入 | 2.4.0 |
mode |
FAILFAST | mode 选项允许指定函数 from_avro 的解析模式。当前支持的模式是:
|
函数 from_avro |
2.4.0 |
datetimeRebaseMode |
(spark.sql.avro.datetimeRebaseModeInRead 配置的值) |
datetimeRebaseMode 选项允许指定从儒略历到公历的 date 、timestamp-micros 、timestamp-millis 逻辑类型的值的重新基准模式。当前支持的模式是:
|
读取和函数 from_avro |
3.2.0 |
positionalFieldMatching |
false | 这可以与 `avroSchema` 选项一起使用,以调整匹配提供的 Avro 模式中的字段与 SQL 模式中的字段的行为。默认情况下,匹配将使用字段名称执行,忽略其位置。如果将此选项设置为“true”,则匹配将基于字段的位置。 | 读取和写入 | 3.2.0 |
enableStableIdentifiersForUnionType |
false | 如果设置为 true,Avro 模式将被反序列化为 Spark SQL 模式,并且 Avro Union 类型将被转换为结构,其中字段名称与其各自的类型保持一致。生成的字段名称将转换为小写,例如 member_int 或 member_string。如果两个用户定义的类型名称或用户定义的类型名称和内置类型名称在不区分大小写的情况下相同,则会引发异常。但是,在其他情况下,字段名称可以被唯一标识。 | 读取 | 3.5.0 |
配置
Avro 的配置可以通过 SparkSession 上的 setConf
方法完成,也可以通过使用 SQL 运行 SET key=value
命令完成。
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true | 如果设置为 true,数据源提供程序 com.databricks.spark.avro 将被映射到内置但外部的 Avro 数据源模块,以实现向后兼容性。注意:该 SQL 配置已在 Spark 3.2 中弃用,可能会在将来删除。 |
2.4.0 |
spark.sql.avro.compression.codec | snappy | 用于写入 AVRO 文件的压缩编解码器。支持的编解码器:uncompressed、deflate、snappy、bzip2、xz 和 zstandard。默认编解码器是 snappy。 | 2.4.0 |
spark.sql.avro.deflate.level | -1 | 用于写入 AVRO 文件的 deflate 编解码器的压缩级别。有效值必须在 1 到 9(包括)或 -1 的范围内。默认值为 -1,对应于当前实现中的级别 6。 | 2.4.0 |
spark.sql.avro.datetimeRebaseModeInRead | EXCEPTION |
从儒略历到公历的 date 、timestamp-micros 、timestamp-millis 逻辑类型的值的重新基准模式
|
3.0.0 |
spark.sql.avro.datetimeRebaseModeInWrite | EXCEPTION |
从公历到儒略历的 date 、timestamp-micros 、timestamp-millis 逻辑类型的值的重新基准模式
|
3.0.0 |
spark.sql.avro.filterPushdown.enabled | true | 当为 true 时,启用对 Avro 数据源的过滤器下推。 | 3.1.0 |
与 Databricks spark-avro 的兼容性
此 Avro 数据源模块最初来自 Databricks 的开源存储库 spark-avro,并且与之兼容。
默认情况下,使用 SQL 配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
启用,数据源提供程序 com.databricks.spark.avro
将映射到此内置 Avro 模块。对于在目录元存储中使用 Provider
属性为 com.databricks.spark.avro
创建的 Spark 表,如果使用此内置 Avro 模块,则映射对于加载这些表至关重要。
请注意,在 Databricks 的 spark-avro 中,为快捷函数 .avro()
创建了隐式类 AvroDataFrameWriter
和 AvroDataFrameReader
。在此内置但外部模块中,这两个隐式类都被删除。请改用 DataFrameWriter
或 DataFrameReader
中的 .format("avro")
,它应该干净且足够好。
如果您更喜欢使用您自己的 spark-avro
jar 文件构建,您可以简单地禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
,并在部署应用程序时使用选项 --jars
。有关更多详细信息,请阅读应用程序提交指南中的 高级依赖项管理 部分。
支持的 Avro 到 Spark SQL 转换类型
目前,Spark 支持读取 Avro 记录下的所有 基本类型 和 复杂类型。
Avro 类型 | Spark SQL 类型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
fixed | BinaryType |
bytes | BinaryType |
record | StructType |
array | ArrayType |
map | MapType |
union | 见下文 |
除了上面列出的类型之外,它还支持读取 union
类型。以下三种类型被认为是基本 union
类型
union(int, long)
将映射到 LongType。union(float, double)
将映射到 DoubleType。union(something, null)
,其中 something 是任何支持的 Avro 类型。这将映射到与 something 相同的 Spark SQL 类型,并且 nullable 设置为 true。所有其他 union 类型都被认为是复杂的。它们将映射到 StructType,其中字段名称为 member0、member1 等,与 union 的成员一致。这与在 Avro 和 Parquet 之间转换时的行为一致。
它还支持读取以下 Avro 逻辑类型
Avro 逻辑类型 | Avro 类型 | Spark SQL 类型 |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
目前,它忽略 Avro 文件中存在的文档、别名和其他属性。
支持的 Spark SQL 到 Avro 转换类型
Spark 支持将所有 Spark SQL 类型写入 Avro。对于大多数类型,从 Spark 类型到 Avro 类型的映射很简单(例如,IntegerType 被转换为 int);但是,有一些特殊情况,如下所示
Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DateType | int | date |
TimestampType | long | timestamp-micros |
DecimalType | fixed | decimal |
您还可以使用选项 avroSchema
指定整个输出 Avro 模式,以便 Spark SQL 类型可以转换为其他 Avro 类型。以下转换默认情况下不应用,需要用户指定的 Avro 模式
Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
BinaryType | fixed | |
StringType | enum | |
TimestampType | long | timestamp-millis |
DecimalType | bytes | decimal |