Parquet 文件
Parquet 是一种列式格式,受许多其他数据处理系统支持。Spark SQL 提供对读取和写入 Parquet 文件的支持,这些文件会自动保留原始数据的模式。读取 Parquet 文件时,出于兼容性原因,所有列都会自动转换为可为空的。
以编程方式加载数据
使用上述示例中的数据
peopleDF = spark.read.json("examples/src/main/resources/people.json")
# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");
// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
df <- read.df("examples/src/main/resources/people.json", "json")
# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
分区发现
表分区是 Hive 等系统中常用的优化方法。在分区表中,数据通常存储在不同的目录中,分区列值编码在每个分区目录的路径中。所有内置文件源(包括 Text/CSV/JSON/ORC/Parquet)都能够自动发现和推断分区信息。例如,我们可以使用以下目录结构将我们之前使用过的人口数据存储到分区表中,其中有两个额外的列,gender
和 country
作为分区列
通过将 path/to/table
传递给 SparkSession.read.parquet
或 SparkSession.read.load
,Spark SQL 将自动从路径中提取分区信息。现在返回的 DataFrame 的模式变为
请注意,分区列的数据类型是自动推断的。目前,支持数字数据类型、日期、时间戳和字符串类型。有时用户可能不希望自动推断分区列的数据类型。对于这些用例,可以通过 spark.sql.sources.partitionColumnTypeInference.enabled
配置自动类型推断,默认值为 true
。当类型推断被禁用时,将使用字符串类型作为分区列。
从 Spark 1.6.0 开始,分区发现默认情况下只查找给定路径下的分区。对于上面的示例,如果用户将 path/to/table/gender=male
传递给 SparkSession.read.parquet
或 SparkSession.read.load
,gender
将不会被视为分区列。如果用户需要指定分区发现应从其开始的基路径,则可以在数据源选项中设置 basePath
。例如,当 path/to/table/gender=male
是数据的路径,用户将 basePath
设置为 path/to/table/
时,gender
将是一个分区列。
模式合并
与 Protocol Buffer、Avro 和 Thrift 一样,Parquet 也支持模式演变。用户可以从简单的模式开始,并根据需要逐渐向模式添加更多列。这样,用户最终可能会得到多个具有不同但相互兼容模式的 Parquet 文件。Parquet 数据源现在能够自动检测这种情况并合并所有这些文件的模式。
由于模式合并是一个相对昂贵的操作,并且在大多数情况下不是必需的,因此我们从 1.5.0 开始默认将其关闭。您可以通过以下方式启用它:
- 在读取 Parquet 文件时(如以下示例所示)将数据源选项
mergeSchema
设置为true
,或 - 将全局 SQL 选项
spark.sql.parquet.mergeSchema
设置为true
。
from pyspark.sql import Row
# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext
squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
.map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")
# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- double: long (nullable = true)
# |-- single: long (nullable = true)
# |-- triple: long (nullable = true)
# |-- key: integer (nullable = true)
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static class Square implements Serializable {
private int value;
private int square;
// Getters and setters...
}
public static class Cube implements Serializable {
private int value;
private int cube;
// Getters and setters...
}
List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
Square square = new Square();
square.setValue(value);
square.setSquare(value * value);
squares.add(square);
}
// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");
List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
Cube cube = new Cube();
cube.setValue(value);
cube.setCube(value * value * value);
cubes.add(cube);
}
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");
// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))
# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths
## root
## |-- single: double (nullable = true)
## |-- double: double (nullable = true)
## |-- triple: double (nullable = true)
## |-- key: integer (nullable = true)
Hive 元存储 Parquet 表转换
从 Hive 元存储 Parquet 表读取并写入非分区 Hive 元存储 Parquet 表时,Spark SQL 将尝试使用自己的 Parquet 支持而不是 Hive SerDe,以获得更好的性能。此行为由 spark.sql.hive.convertMetastoreParquet
配置控制,默认情况下处于启用状态。
Hive/Parquet 模式协调
从表模式处理的角度来看,Hive 和 Parquet 之间有两个主要区别。
- Hive 不区分大小写,而 Parquet 区分大小写
- Hive 将所有列视为可为空的,而 Parquet 中的可空性很重要
由于这个原因,我们在将 Hive 元存储 Parquet 表转换为 Spark SQL Parquet 表时,必须协调 Hive 元存储模式与 Parquet 模式。协调规则是
-
在两种模式中具有相同名称的字段必须具有相同的数据类型,无论可空性如何。协调后的字段应具有 Parquet 侧的数据类型,以便尊重可空性。
-
协调后的模式包含 Hive 元存储模式中定义的那些字段。
- 仅出现在 Parquet 模式中的任何字段在协调后的模式中都会被删除。
- 仅出现在 Hive 元存储模式中的任何字段都将作为可为空的字段添加到协调后的模式中。
元数据刷新
Spark SQL 缓存 Parquet 元数据以提高性能。当启用 Hive 元存储 Parquet 表转换时,也会缓存那些已转换表的元数据。如果这些表被 Hive 或其他外部工具更新,您需要手动刷新它们以确保元数据一致。
列式加密
从 Spark 3.2 开始,支持使用 Apache Parquet 1.12+ 对 Parquet 表进行列式加密。
Parquet 使用信封加密实践,其中文件部分使用“数据加密密钥”(DEK)加密,而 DEK 使用“主加密密钥”(MEK)加密。DEK 由 Parquet 为每个加密文件/列随机生成。MEK 由用户选择的密钥管理服务 (KMS) 生成、存储和管理。Parquet Maven 存储库 包含一个带有模拟 KMS 实现的 jar,允许仅使用 spark-shell 运行列加密和解密,而无需部署 KMS 服务器(下载 parquet-hadoop-tests.jar
文件并将其放置在 Spark jars
文件夹中)
KMS 客户端
InMemoryKMS 类仅用于说明和简单演示 Parquet 加密功能。它不应在实际部署中使用。主加密密钥必须保存在生产级 KMS 系统中并由该系统管理,该系统部署在用户的组织中。使用 Parquet 加密推出 Spark 需要为 KMS 服务器实现一个客户端类。Parquet 提供了一个插件 接口 用于开发此类类,
在 parquet-mr 存储库中可以找到一个针对开源 KMS 的此类类的 示例。生产 KMS 客户端应与组织的安全管理员合作设计,并由具有访问控制管理经验的开发人员构建。创建此类类后,可以通过 parquet.encryption.kms.client.class
参数将其传递给应用程序,并由普通 Spark 用户利用,如上面加密的 DataFrame 写入/读取示例所示。
注意:默认情况下,Parquet 实现“双信封加密”模式,最大限度地减少了 Spark 执行器与 KMS 服务器的交互。在此模式下,DEK 使用“密钥加密密钥”(KEK,由 Parquet 随机生成)加密。KEK 在 KMS 中使用 MEK 加密;结果和 KEK 本身缓存在 Spark 执行器内存中。有兴趣使用常规信封加密的用户可以通过将 parquet.encryption.double.wrapping
参数设置为 false
切换到它。有关 Parquet 加密参数的更多详细信息,请访问 parquet-hadoop 配置 页面。
数据源选项
Parquet 的数据源选项可以通过以下方式设置:
- 的
.option
/.options
方法DataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
- CREATE TABLE USING DATA_SOURCE 中的
OPTIONS
子句
属性名称 | 默认值 | 含义 | 范围 |
---|---|---|---|
datetimeRebaseMode |
(spark.sql.parquet.datetimeRebaseModeInRead 配置的值) |
datetimeRebaseMode 选项允许指定从儒略历到公元纪年的 DATE 、TIMESTAMP_MILLIS 、TIMESTAMP_MICROS 逻辑类型的值的重新基准模式。当前支持的模式是
|
read |
int96RebaseMode |
(spark.sql.parquet.int96RebaseModeInRead 配置的值) |
int96RebaseMode 选项允许指定将 INT96 时间戳从儒略历重新调整到公历的模式。当前支持的模式是
|
read |
mergeSchema |
(spark.sql.parquet.mergeSchema 配置的值) |
设置是否应该合并从所有 Parquet 部分文件中收集的模式。这将覆盖 spark.sql.parquet.mergeSchema 。 |
read |
compression |
snappy |
保存到文件时使用的压缩编解码器。这可以是已知的区分大小写的简短名称之一(none、uncompressed、snappy、gzip、lzo、brotli、lz4 和 zstd)。这将覆盖 spark.sql.parquet.compression.codec 。 |
write |
其他通用选项可以在 通用文件源选项 中找到。
配置
可以使用 SparkSession
上的 setConf
方法或通过使用 SQL 运行 SET key=value
命令来配置 Parquet。
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.parquet.binaryAsString |
false | 某些其他生成 Parquet 的系统,特别是 Impala、Hive 和旧版本的 Spark SQL,在写入 Parquet 模式时不区分二进制数据和字符串。此标志告诉 Spark SQL 将二进制数据解释为字符串,以提供与这些系统的兼容性。 | 1.1.1 |
spark.sql.parquet.int96AsTimestamp |
true | 某些生成 Parquet 的系统,特别是 Impala 和 Hive,将时间戳存储到 INT96 中。此标志告诉 Spark SQL 将 INT96 数据解释为时间戳,以提供与这些系统的兼容性。 | 1.3.0 |
spark.sql.parquet.int96TimestampConversion |
false | 这控制在将 INT96 数据转换为时间戳时是否应应用时间戳调整,用于 Impala 写入的数据。这是必要的,因为 Impala 以与 Hive 和 Spark 不同的时区偏移量存储 INT96 数据。 | 2.3.0 |
spark.sql.parquet.outputTimestampType |
INT96 | 设置 Spark 将数据写入 Parquet 文件时使用的 Parquet 时间戳类型。INT96 是 Parquet 中非标准但常用的时间戳类型。TIMESTAMP_MICROS 是 Parquet 中的标准时间戳类型,它存储自 Unix 纪元以来的微秒数。TIMESTAMP_MILLIS 也是标准的,但精度为毫秒,这意味着 Spark 必须截断其时间戳值的微秒部分。 | 2.3.0 |
spark.sql.parquet.compression.codec |
snappy | 设置写入 Parquet 文件时使用的压缩编解码器。如果在表特定选项/属性中指定了 compression 或 parquet.compression ,则优先级将是 compression 、parquet.compression 、spark.sql.parquet.compression.codec 。可接受的值包括:none、uncompressed、snappy、gzip、lzo、brotli、lz4、zstd。请注意,brotli 需要安装 BrotliCodec 。 |
1.1.1 |
spark.sql.parquet.filterPushdown |
true | 设置为 true 时启用 Parquet 过滤器下推优化。 | 1.2.0 |
spark.sql.parquet.aggregatePushdown |
false | 如果为 true,则聚合将被下推到 Parquet 以进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。对于 MIN/MAX,支持布尔值、整数、浮点数和日期类型。对于 COUNT,支持所有数据类型。如果任何 Parquet 文件页脚缺少统计信息,则会抛出异常。 | 3.3.0 |
spark.sql.hive.convertMetastoreParquet |
true | 设置为 false 时,Spark SQL 将使用 Hive SerDe 用于 parquet 表,而不是内置支持。 | 1.1.1 |
spark.sql.parquet.mergeSchema |
false |
设置为 true 时,Parquet 数据源会合并从所有数据文件中收集的模式,否则模式将从摘要文件或随机数据文件(如果不存在摘要文件)中选择。 |
1.5.0 |
spark.sql.parquet.respectSummaryFiles |
false | 设置为 true 时,我们假设 Parquet 的所有部分文件与摘要文件一致,并且在合并模式时会忽略它们。否则,如果为 false(默认值),我们将合并所有部分文件。这应该被视为专家级选项,在了解其确切含义之前不应启用。 | 1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | 如果为 true,则数据将以 Spark 1.4 及更早版本的方式写入。例如,十进制值将以 Apache Parquet 的固定长度字节数组格式写入,其他系统(如 Apache Hive 和 Apache Impala)使用这种格式。如果为 false,则将使用 Parquet 中的较新格式。例如,小数将以基于整数的格式写入。如果 Parquet 输出旨在用于不支持这种较新格式的系统,则设置为 true。 | 1.6.0 |
spark.sql.parquet.enableVectorizedReader |
true | 启用矢量化 parquet 解码。 | 2.0.0 |
spark.sql.parquet.enableNestedColumnVectorizedReader |
true | 启用嵌套列(例如,结构、列表、映射)的矢量化 Parquet 解码。要求启用 spark.sql.parquet.enableVectorizedReader 。 |
3.3.0 |
spark.sql.parquet.recordLevelFilter.enabled |
false | 如果为 true,则使用下推过滤器启用 Parquet 的本机记录级过滤。此配置仅在启用 spark.sql.parquet.filterPushdown 且未使用矢量化读取器时有效。可以通过将 spark.sql.parquet.enableVectorizedReader 设置为 false 来确保未使用矢量化读取器。 |
2.3.0 |
spark.sql.parquet.columnarReaderBatchSize |
4096 | parquet 矢量化读取器批次中包含的行数。应仔细选择此数字,以最大程度地减少开销并避免读取数据时的 OOM。 | 2.4.0 |
spark.sql.parquet.fieldId.write.enabled |
true | 字段 ID 是 Parquet 模式规范的本机字段。启用后,Parquet 写入器将填充 Spark 模式中的字段 ID 元数据(如果存在)到 Parquet 模式。 | 3.3.0 |
spark.sql.parquet.fieldId.read.enabled |
false | 字段 ID 是 Parquet 模式规范的本机字段。启用后,Parquet 读取器将使用请求的 Spark 模式中的字段 ID(如果存在)来查找 Parquet 字段,而不是使用列名。 | 3.3.0 |
spark.sql.parquet.fieldId.read.ignoreMissing |
false | 当 Parquet 文件没有字段 ID 但 Spark 读取模式使用字段 ID 读取时,如果启用此标志,我们将静默返回 null,否则会出错。 | 3.3.0 |
spark.sql.parquet.timestampNTZ.enabled |
true | 启用 Parquet 读取和写入的 TIMESTAMP_NTZ 支持。启用后,TIMESTAMP_NTZ 值将作为 Parquet 时间戳列写入,其中注释 isAdjustedToUTC = false,并且以类似的方式推断。禁用后,此类值将作为 TIMESTAMP_LTZ 读取,并且必须转换为 TIMESTAMP_LTZ 以进行写入。 |
3.4.0 |
spark.sql.parquet.datetimeRebaseModeInRead | EXCEPTION |
将 DATE 、TIMESTAMP_MILLIS 、TIMESTAMP_MICROS 逻辑类型的值从儒略历重新调整到公历的模式
|
3.0.0 |
spark.sql.parquet.datetimeRebaseModeInWrite | EXCEPTION |
将 DATE 、TIMESTAMP_MILLIS 、TIMESTAMP_MICROS 逻辑类型的值从公历重新调整到儒略历的模式
|
3.0.0 |
spark.sql.parquet.int96RebaseModeInRead | EXCEPTION |
将 INT96 时间戳类型的值从儒略历重新调整到公历的模式
|
3.1.0 |
spark.sql.parquet.int96RebaseModeInWrite | EXCEPTION |
将 INT96 时间戳类型的值从公历重新调整到儒略历的模式
|
3.1.0 |