Parquet 文件

Parquet 是一种列式格式,受到许多其他数据处理系统的支持。 Spark SQL 提供了对 Parquet 文件的读写支持,可以自动保留原始数据的模式。 读取 Parquet 文件时,出于兼容性考虑,所有列都会自动转换为可为空。

以编程方式加载数据

使用上面示例中的数据

peopleDF = spark.read.json("examples/src/main/resources/people.json")

# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")

# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# +------+
# |  name|
# +------+
# |Justin|
# +------+
在 Spark 仓库中的 "examples/src/main/python/sql/datasource.py" 中查找完整的示例代码。
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 中查找完整的示例代码。
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中查找完整的示例代码。
df <- read.df("examples/src/main/resources/people.json", "json")

# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")

# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
##     name
## 1 Justin

# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
  cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
在 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R" 中查找完整的示例代码。
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet"
)

SELECT * FROM parquetTable

分区发现

表分区是 Hive 等系统中常用的优化方法。在分区表中,数据通常存储在不同的目录中,分区列的值编码在每个分区目录的路径中。所有内置文件源(包括 Text/CSV/JSON/ORC/Parquet)都能够自动发现和推断分区信息。例如,我们可以使用以下目录结构将之前使用的所有人口数据存储到分区表中,并使用两个额外的列 gendercountry 作为分区列

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通过将 path/to/table 传递给 SparkSession.read.parquetSparkSession.read.load,Spark SQL 将自动从路径中提取分区信息。现在,返回的 DataFrame 的模式变为

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

请注意,分区列的数据类型是自动推断的。目前,支持数值数据类型、日期、时间戳和字符串类型。有时,用户可能不希望自动推断分区列的数据类型。对于这些用例,可以通过 spark.sql.sources.partitionColumnTypeInference.enabled 配置自动类型推断,默认为 true。禁用类型推断后,字符串类型将用于分区列。

从 Spark 1.6.0 开始,分区发现默认仅查找给定路径下的分区。对于上面的示例,如果用户将 path/to/table/gender=male 传递给 SparkSession.read.parquetSparkSession.read.load,则 gender 将不被视为分区列。如果用户需要指定分区发现应开始的基本路径,则可以在数据源选项中设置 basePath。例如,当 path/to/table/gender=male 是数据的路径,并且用户将 basePath 设置为 path/to/table/ 时,gender 将成为分区列。

模式合并

与 Protocol Buffer、Avro 和 Thrift 一样,Parquet 也支持模式演变。 用户可以从一个简单的模式开始,然后根据需要逐渐向模式中添加更多列。 这样,用户最终可能会得到多个具有不同但相互兼容的模式的 Parquet 文件。 Parquet 数据源现在能够自动检测这种情况并合并所有这些文件的模式。

由于模式合并是一项相对昂贵的操作,并且在大多数情况下不是必需的,因此我们从 1.5.0 开始默认将其关闭。 您可以通过以下方式启用它

  1. 在读取 Parquet 文件时,将数据源选项 mergeSchema 设置为 true(如下面的示例所示),或者
  2. 将全局 SQL 选项 spark.sql.parquet.mergeSchema 设置为 true
from pyspark.sql import Row

# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")

# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
#  |-- double: long (nullable = true)
#  |-- single: long (nullable = true)
#  |-- triple: long (nullable = true)
#  |-- key: integer (nullable = true)
在 Spark 仓库中的 "examples/src/main/python/sql/datasource.py" 中查找完整的示例代码。
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 中查找完整的示例代码。
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public static class Square implements Serializable {
  private int value;
  private int square;

  // Getters and setters...

}

public static class Cube implements Serializable {
  private int value;
  private int cube;

  // Getters and setters...

}

List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
  Square square = new Square();
  square.setValue(value);
  square.setSquare(value * value);
  squares.add(square);
}

// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");

List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
  Cube cube = new Cube();
  cube.setValue(value);
  cube.setCube(value * value * value);
  cubes.add(cube);
}

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");

// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中查找完整的示例代码。
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))

# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")

# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths
## root
##  |-- single: double (nullable = true)
##  |-- double: double (nullable = true)
##  |-- triple: double (nullable = true)
##  |-- key: integer (nullable = true)
在 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R" 中查找完整的示例代码。

Hive metastore Parquet 表转换

从 Hive metastore Parquet 表读取数据并写入到非分区 Hive metastore Parquet 表时,Spark SQL 将尝试使用其自己的 Parquet 支持而不是 Hive SerDe,以获得更好的性能。 此行为由 spark.sql.hive.convertMetastoreParquet 配置控制,默认情况下处于启用状态。

Hive/Parquet 模式协调

从表模式处理的角度来看,Hive 和 Parquet 之间存在两个主要差异。

  1. Hive 不区分大小写,而 Parquet 区分大小写
  2. Hive 认为所有列都可为空,而 Parquet 中的可空性很重要

由于这个原因,在将 Hive metastore Parquet 表转换为 Spark SQL Parquet 表时,我们必须协调 Hive metastore 模式与 Parquet 模式。 协调规则如下

  1. 在两个模式中具有相同名称的字段必须具有相同的数据类型,而不管可空性如何。协调后的字段应具有 Parquet 端的数据类型,以便尊重可空性。

  2. 协调后的模式仅包含 Hive metastore 模式中定义的那些字段。

    • 协调后的模式中将删除任何仅出现在 Parquet 模式中的字段。
    • 任何仅出现在 Hive metastore 模式中的字段都将作为可空字段添加到协调后的模式中。

元数据刷新

Spark SQL 缓存 Parquet 元数据以获得更好的性能。 启用 Hive metastore Parquet 表转换后,还会缓存这些转换后的表的元数据。 如果这些表由 Hive 或其他外部工具更新,则需要手动刷新它们以确保元数据一致。

# spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");
refreshTable("my_table")
REFRESH TABLE my_table;

列式加密

自 Spark 3.2 以来,Apache Parquet 1.12+ 的 Parquet 表支持列式加密。

Parquet 使用信封加密实践,其中文件部分使用“数据加密密钥”(DEK)加密,DEK 使用“主加密密钥”(MEK)加密。 DEK 由 Parquet 为每个加密文件/列随机生成。 MEK 在用户选择的密钥管理服务 (KMS) 中生成、存储和管理。 Parquet Maven 存储库有一个带有模拟 KMS 实现的 jar,允许仅使用 spark-shell 运行列加密和解密,而无需部署 KMS 服务器(下载 parquet-hadoop-tests.jar 文件并将其放置在 Spark jars 文件夹中)

# Set hadoop configuration properties, e.g. using configuration properties of
# the Spark job:
# --conf spark.hadoop.parquet.encryption.kms.client.class=\
#           "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS"\
# --conf spark.hadoop.parquet.encryption.key.list=\
#           "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA=="\
# --conf spark.hadoop.parquet.crypto.factory.class=\
#           "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"

# Write encrypted dataframe files.
# Column "square" will be protected with master key "keyA".
# Parquet file footers will be protected with master key "keyB"
squaresDF.write\
   .option("parquet.encryption.column.keys" , "keyA:square")\
   .option("parquet.encryption.footer.key" , "keyB")\
   .parquet("/path/to/table.parquet.encrypted")

# Read encrypted dataframe files
df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
                           "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")

// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
                   "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==")

// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
                   "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")

// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write.
   option("parquet.encryption.column.keys" , "keyA:square").
   option("parquet.encryption.footer.key" , "keyB").
parquet("/path/to/table.parquet.encrypted")

// Read encrypted dataframe files
val df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration().set("parquet.encryption.kms.client.class" ,
   "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS");

// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration().set("parquet.encryption.key.list" ,
   "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==");

// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration().set("parquet.crypto.factory.class" ,
   "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory");

// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write().
   option("parquet.encryption.column.keys" , "keyA:square").
   option("parquet.encryption.footer.key" , "keyB").
   parquet("/path/to/table.parquet.encrypted");

// Read encrypted dataframe files
Dataset<Row> df2 = spark.read().parquet("/path/to/table.parquet.encrypted");

KMS 客户端

InMemoryKMS 类仅用于说明和简单演示 Parquet 加密功能。不应在实际部署中使用它。主加密密钥必须保存在用户组织中部署的生产级 KMS 系统中并进行管理。使用 Parquet 加密推出 Spark 需要实现 KMS 服务器的客户端类。 Parquet 提供了一个插件 接口,用于开发此类,

public interface KmsClient {
  // Wraps a key - encrypts it with the master key.
  public String wrapKey(byte[] keyBytes, String masterKeyIdentifier);

  // Decrypts (unwraps) a key with the master key.
  public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier);

  // Use of initialization parameters is optional.
  public void initialize(Configuration configuration, String kmsInstanceID,
                         String kmsInstanceURL, String accessToken);
}

在 parquet-mr 存储库中可以找到一个针对开源 KMS 的此类 示例。 生产 KMS 客户端应与组织的安全管理员合作设计,并由具有访问控制管理经验的开发人员构建。 创建此类后,可以通过 parquet.encryption.kms.client.class 参数将其传递给应用程序,并由普通 Spark 用户利用,如上面的加密 dataframe 写入/读取示例所示。

注意:默认情况下,Parquet 实现“双信封加密”模式,最大限度地减少 Spark executor 与 KMS 服务器的交互。 在此模式下,DEK 使用“密钥加密密钥”(KEK,由 Parquet 随机生成)加密。 KEK 使用 KMS 中的 MEK 加密; 结果和 KEK 本身缓存在 Spark executor 内存中。 对常规信封加密感兴趣的用户可以通过将 parquet.encryption.double.wrapping 参数设置为 false 来切换到它。 有关 Parquet 加密参数的更多详细信息,请访问 parquet-hadoop 配置 页面

数据源选项

可以通过以下方式设置 Parquet 的数据源选项

属性名称默认值含义范围
datetimeRebaseMode spark.sql.parquet.datetimeRebaseModeInRead 配置的值) datetimeRebaseMode 选项允许为从朱利安历到公历的 DATETIMESTAMP_MILLISTIMESTAMP_MICROS 逻辑类型的值指定基准模式。
目前支持的模式有
  • EXCEPTION:在读取两个日历之间含糊不清的古代日期/时间戳时失败。
  • CORRECTED:加载日期/时间戳,不进行基准调整。
  • LEGACY:执行从朱利安历到公历的古代日期/时间戳的基准调整。
读取
int96RebaseMode spark.sql.parquet.int96RebaseModeInRead 配置的值) int96RebaseMode 选项允许为从朱利安历到公历的 INT96 时间戳指定基准模式。
目前支持的模式有
  • EXCEPTION:在读取两个日历之间含糊不清的古代 INT96 时间戳时失败。
  • CORRECTED:加载 INT96 时间戳,不进行基准调整。
  • LEGACY:执行从儒略历到普罗列公历的古老时间戳的重新定位。
读取
mergeSchema spark.sql.parquet.mergeSchema 配置的值) 设置是否应合并从所有 Parquet 部分文件中收集的模式。 这将覆盖 spark.sql.parquet.mergeSchema 读取
compression snappy 保存到文件时使用的压缩编解码器。 这可以是已知的不区分大小写的缩短名称之一(none、uncompressed、snappy、gzip、lzo、brotli、lz4 和 zstd)。 这将覆盖 spark.sql.parquet.compression.codec write

其他通用选项可以在 通用文件源选项 中找到

配置

Parquet 的配置可以使用 SparkSession 上的 setConf 方法或通过使用 SQL 运行 SET key=value 命令来完成。

属性名称默认值含义Since Version
spark.sql.parquet.binaryAsString false 一些其他产生 Parquet 的系统,特别是 Impala、Hive 和旧版本的 Spark SQL,在写出 Parquet 模式时,不会区分二进制数据和字符串。 此标志告诉 Spark SQL 将二进制数据解释为字符串,以提供与这些系统的兼容性。 1.1.1
spark.sql.parquet.int96AsTimestamp true 一些产生 Parquet 的系统,特别是 Impala 和 Hive,将 Timestamp 存储到 INT96 中。 此标志告诉 Spark SQL 将 INT96 数据解释为时间戳,以提供与这些系统的兼容性。 1.3.0
spark.sql.parquet.int96TimestampConversion false 这控制在转换为时间戳时,是否应将时间戳调整应用于 INT96 数据,用于 Impala 写入的数据。 这是必要的,因为 Impala 存储 INT96 数据时使用与 Hive & Spark 不同的时区偏移。 2.3.0
spark.sql.parquet.outputTimestampType INT96 设置 Spark 将数据写入 Parquet 文件时使用的 Parquet 时间戳类型。 INT96 是一种非标准但在 Parquet 中常用的时间戳类型。 TIMESTAMP_MICROS 是 Parquet 中的标准时间戳类型,它存储自 Unix epoch 以来经过的微秒数。 TIMESTAMP_MILLIS 也是标准类型,但精度为毫秒,这意味着 Spark 必须截断其时间戳值的微秒部分。 2.3.0
spark.sql.parquet.compression.codec snappy 设置写入 Parquet 文件时使用的压缩编解码器。 如果在特定于表的选项/属性中指定了 compressionparquet.compression,则优先级将为 compressionparquet.compressionspark.sql.parquet.compression.codec。 可接受的值包括:none、uncompressed、snappy、gzip、lzo、brotli、lz4、zstd。 请注意,brotli 需要安装 BrotliCodec 1.1.1
spark.sql.parquet.filterPushdown true 设置为 true 时,启用 Parquet 过滤器下推优化。 1.2.0
spark.sql.parquet.aggregatePushdown false 如果为 true,则会将聚合下推到 Parquet 以进行优化。 支持 MIN、MAX 和 COUNT 作为聚合表达式。 对于 MIN/MAX,支持布尔值、整数、浮点数和日期类型。 对于 COUNT,支持所有数据类型。 如果任何 Parquet 文件页脚中缺少统计信息,则会抛出异常。 3.3.0
spark.sql.hive.convertMetastoreParquet true 设置为 false 时,Spark SQL 将对 Parquet 表使用 Hive SerDe,而不是内置支持。 1.1.1
spark.sql.parquet.mergeSchema false

如果为 true,Parquet 数据源会合并从所有数据文件收集的模式,否则,如果不存在摘要文件,则从摘要文件或随机数据文件中选取模式。

1.5.0
spark.sql.parquet.respectSummaryFiles false 如果为 true,我们假设 Parquet 的所有部分文件都与摘要文件一致,并且在合并模式时会忽略它们。 否则,如果这是 false(默认值),我们将合并所有部分文件。 这应被视为仅供专家使用的选项,并且在了解其确切含义之前不应启用。 1.5.0
spark.sql.parquet.writeLegacyFormat false 如果为 true,则数据将以 Spark 1.4 及更早版本的方式写入。 例如,decimal 值将以 Apache Parquet 的固定长度字节数组格式写入,其他系统(如 Apache Hive 和 Apache Impala)使用该格式。 如果为 false,则将使用 Parquet 中较新的格式。 例如,小数将以基于 int 的格式写入。 如果 Parquet 输出旨在与不支持此较新格式的系统一起使用,请设置为 true。 1.6.0
spark.sql.parquet.enableVectorizedReader true 启用向量化 Parquet 解码。 2.0.0
spark.sql.parquet.enableNestedColumnVectorizedReader true 启用嵌套列(例如,struct、list、map)的向量化 Parquet 解码。 需要启用 spark.sql.parquet.enableVectorizedReader 3.3.0
spark.sql.parquet.recordLevelFilter.enabled false 如果为 true,则启用 Parquet 的本机记录级筛选,并使用下推的过滤器。 仅当启用 spark.sql.parquet.filterPushdown 且未使用向量化读取器时,此配置才有效。 您可以通过将 spark.sql.parquet.enableVectorizedReader 设置为 false 来确保未使用向量化读取器。 2.3.0
spark.sql.parquet.columnarReaderBatchSize 4096 Parquet 向量化读取器批处理中包含的行数。 应仔细选择该数字,以最大程度地减少开销并避免在读取数据时出现 OOM。 2.4.0
spark.sql.parquet.fieldId.write.enabled true Field ID 是 Parquet 模式规范的本机字段。 启用后,Parquet 编写器会将 Spark 模式中的 Field ID 元数据(如果存在)填充到 Parquet 模式中。 3.3.0
spark.sql.parquet.fieldId.read.enabled false Field ID 是 Parquet 模式规范的本机字段。 启用后,Parquet 读取器将使用请求的 Spark 模式中的 Field ID(如果存在)来查找 Parquet 字段,而不是使用列名。 3.3.0
spark.sql.parquet.fieldId.read.ignoreMissing false 当 Parquet 文件没有任何 Field ID,但 Spark 读取模式正在使用 Field ID 进行读取时,如果启用了此标志,我们将静默返回 null,否则会出错。 3.3.0
spark.sql.parquet.inferTimestampNTZ.enabled true 启用后,带有批注 isAdjustedToUTC = false 的 Parquet 时间戳列在模式推断期间被推断为 TIMESTAMP_NTZ 类型。 否则,所有 Parquet 时间戳列都将被推断为 TIMESTAMP_LTZ 类型。 请注意,Spark 在文件写入时将输出模式写入 Parquet 的页脚元数据,并在文件读取时利用它。 因此,此配置仅影响非 Spark 写入的 Parquet 文件的模式推断。 3.4.0
spark.sql.parquet.datetimeRebaseModeInRead EXCEPTION DATETIMESTAMP_MILLISTIMESTAMP_MICROS 逻辑类型的值从儒略历到普罗列公历的重新定位模式
  • EXCEPTION:如果 Spark 看到在两个日历之间模棱两可的古代日期/时间戳,则读取将失败。
  • CORRECTED:Spark 将不进行重新定位,并按原样读取日期/时间戳。
  • LEGACY:读取 Parquet 文件时,Spark 将日期/时间戳从旧的混合(儒略 + 格里高利)日历重新定位到普罗列公历。
如果 Parquet 文件的编写者信息(如 Spark、Hive)未知,则此配置才有效。
3.0.0
spark.sql.parquet.datetimeRebaseModeInWrite EXCEPTION DATETIMESTAMP_MILLISTIMESTAMP_MICROS 逻辑类型的值从普罗列公历到儒略历的重新定位模式
  • EXCEPTION:如果 Spark 看到在两个日历之间模棱两可的古代日期/时间戳,则写入将失败。
  • CORRECTED:Spark 将不进行重新定位,并按原样写入日期/时间戳。
  • LEGACY:写入 Parquet 文件时,Spark 将日期/时间戳从普罗列公历重新定位到旧的混合(儒略 + 格里高利)日历。
3.0.0
spark.sql.parquet.int96RebaseModeInRead EXCEPTION INT96 时间戳类型的值从儒略历到普罗列公历的重新定位模式
  • EXCEPTION:如果 Spark 看到在两个日历之间模棱两可的古代 INT96 时间戳,则读取将失败。
  • CORRECTED:Spark 将不进行重新定位,并按原样读取日期/时间戳。
  • LEGACY:读取 Parquet 文件时,Spark 将 INT96 时间戳从旧的混合(儒略 + 格里高利)日历重新定位到普罗列公历。
如果 Parquet 文件的编写者信息(如 Spark、Hive)未知,则此配置才有效。
3.1.0
spark.sql.parquet.int96RebaseModeInWrite EXCEPTION INT96 时间戳类型的值从普罗列公历到儒略历的重新定位模式
  • EXCEPTION:如果 Spark 看到在两个日历之间模棱两可的古代时间戳,则写入将失败。
  • CORRECTED:Spark 将不进行重新定位,并按原样写入日期/时间戳。
  • LEGACY:写入 Parquet 文件时,Spark 将 INT96 时间戳从普罗列公历重新定位到旧的混合(儒略 + 格里高利)日历。
3.1.0