通用加载/保存函数

在最简单的形式中,所有操作都将使用默认数据源(parquet,除非通过 spark.sql.sources.default 另行配置)。

users_df = spark.read.load("examples/src/main/resources/users.parquet")
users_df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R"。

手动指定选项

您还可以手动指定将使用的数据源以及您希望传递给数据源的任何额外选项。数据源通过其完全限定名(即 org.apache.spark.sql.parquet)指定,但对于内置数据源,您也可以使用其短名称(jsonparquetjdbcorclibsvmcsvtext)。从任何数据源类型加载的 DataFrame 都可以使用此语法转换为其他类型。

请参阅 API 文档以了解内置数据源的可用选项,例如 org.apache.spark.sql.DataFrameReaderorg.apache.spark.sql.DataFrameWriter。这些文档中说明的选项也应适用于非 Scala Spark API(例如 PySpark)。对于其他格式,请参阅特定格式的 API 文档。

要加载 JSON 文件,您可以使用

people_df = spark.read.load("examples/src/main/resources/people.json", format="json")
people_df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
Dataset<Row> peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R"。

要加载 CSV 文件,您可以使用

people_df = spark.read.load(
    "examples/src/main/resources/people.csv",
    format="csv",
    sep=";",
    inferSchema="true",
    header="true"
)
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
Dataset<Row> peopleDFCsv = spark.read().format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv");
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R"。

额外选项也用于写入操作。例如,您可以控制 ORC 数据源的布隆过滤器和字典编码。以下 ORC 示例将创建布隆过滤器并仅对 favorite_color 使用字典编码。对于 Parquet,也存在 parquet.bloom.filter.enabledparquet.enable.dictionary。要查找有关 ORC/Parquet 额外选项的更详细信息,请访问 Apache 官方 ORC / Parquet 网站。

ORC 数据源

users_df = spark.read.orc("examples/src/main/resources/users.orc")
(users_df.write.format("orc")
    .option("orc.bloom.filter.columns", "favorite_color")
    .option("orc.dictionary.key.threshold", "1.0")
    .option("orc.column.encoding.direct", "name")
    .save("users_with_options.orc"))
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
usersDF.write.format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
usersDF.write().format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc");
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R"。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING ORC
OPTIONS (
  orc.bloom.filter.columns 'favorite_color',
  orc.dictionary.key.threshold '1.0',
  orc.column.encoding.direct 'name'
)

Parquet 数据源

users_df = spark.read.parquet("examples/src/main/resources/users.parquet")
(users_df.write.format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet"))
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
usersDF.write.format("parquet")
  .option("parquet.bloom.filter.enabled#favorite_color", "true")
  .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
  .option("parquet.enable.dictionary", "true")
  .option("parquet.page.write-checksum.enabled", "false")
  .save("users_with_options.parquet")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
usersDF.write().format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet");
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
完整示例代码请参见 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R"。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
OPTIONS (
  `parquet.bloom.filter.enabled#favorite_color` true,
  `parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
  parquet.enable.dictionary true,
  parquet.page.write-checksum.enabled true
)

直接对文件运行 SQL

除了使用 read API 将文件加载到 DataFrame 并查询它之外,您还可以直接使用 SQL 查询该文件。

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
Dataset<Row> sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R"。
SELECT * FROM parquet.`examples/src/main/resources/users.parquet`

保存模式

保存操作可以选择接受一个 SaveMode,它指定如何在数据存在时处理现有数据。重要的是要认识到这些保存模式不利用任何锁定,也不是原子的。此外,执行 Overwrite 时,数据将在写入新数据之前被删除。

Scala/Java任何语言含义
SaveMode.ErrorIfExists(默认) "error" 或 "errorifexists"(默认) 将 DataFrame 保存到数据源时,如果数据已存在,则预期会抛出异常。
SaveMode.Append "append" 将 DataFrame 保存到数据源时,如果数据/表已存在,则 DataFrame 的内容预期会附加到现有数据中。
SaveMode.Overwrite "overwrite" 覆盖模式意味着将 DataFrame 保存到数据源时,如果数据/表已存在,则预期现有数据将被 DataFrame 的内容覆盖。
SaveMode.Ignore "ignore" 忽略模式意味着将 DataFrame 保存到数据源时,如果数据已存在,则保存操作预期不会保存 DataFrame 的内容,也不会更改现有数据。这类似于 SQL 中的 CREATE TABLE IF NOT EXISTS

保存到持久表

DataFrames 也可以使用 saveAsTable 命令作为持久表保存到 Hive 元数据存储中。请注意,使用此功能不需要现有 Hive 部署。Spark 将为您创建一个默认的本地 Hive 元数据存储(使用 Derby)。与 createOrReplaceTempView 命令不同,saveAsTable 将实现 DataFrame 的内容并在 Hive 元数据存储中创建指向数据的指针。即使您的 Spark 程序重新启动后,只要您保持与同一元数据存储的连接,持久表仍然会存在。可以通过在 SparkSession 上调用 table 方法并提供表名来创建持久表的 DataFrame。

对于基于文件的数据库源(例如 text、parquet、json 等),您可以通过 path 选项指定自定义表路径,例如 df.write.option("path", "/some/path").saveAsTable("t")。当表被删除时,自定义表路径不会被删除,表数据仍然存在。如果未指定自定义表路径,Spark 会将数据写入仓库目录下的默认表路径。当表被删除时,默认表路径也会被删除。

从 Spark 2.1 开始,持久数据源表将每个分区元数据存储在 Hive 元数据存储中。这带来了几个好处:

请注意,创建外部数据源表(带有 path 选项的表)时,默认不会收集分区信息。要同步元数据存储中的分区信息,您可以调用 MSCK REPAIR TABLE

分桶、排序和分区

对于基于文件的数据库源,还可以对输出进行分桶和排序或分区。分桶和排序仅适用于持久表,

people_df = spark.read.json("examples/src/main/resources/people.json")
people_df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
CREATE TABLE people_bucketed
USING json
CLUSTERED BY(name) INTO 42 BUCKETS
AS SELECT * FROM json.`examples/src/main/resources/people.json`;

而分区可以在使用 Dataset API 时与 savesaveAsTable 一起使用。

users_df = spark.read.load("examples/src/main/resources/users.parquet")
users_df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
usersDF
  .write()
  .partitionBy("favorite_color")
  .format("parquet")
  .save("namesPartByColor.parquet");
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
CREATE TABLE users_by_favorite_color
USING parquet
PARTITIONED BY(favorite_color)
AS SELECT * FROM parquet.`examples/src/main/resources/users.parquet`;

对于单个表,可以同时使用分区和分桶。

users_df = spark.read.parquet("examples/src/main/resources/users.parquet")
(users_df.write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("users_partitioned_bucketed"))
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
usersDF
  .write()
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed");
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
CREATE TABLE users_partitioned_bucketed
USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS
AS SELECT * FROM parquet.`examples/src/main/resources/users.parquet`;

partitionBy 会创建一个目录结构,如 分区发现 部分所述。因此,它对高基数字段的适用性有限。相比之下,bucketBy 将数据分布到固定数量的桶中,并且可以在唯一值数量无限时使用。