通用加载/保存函数

最简单的形式是,默认数据源(parquet,除非由 spark.sql.sources.default 配置)将用于所有操作。

df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
在 Spark 仓库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
在 Spark 仓库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
在 Spark 仓库的“examples/src/main/r/RSparkSQLExample.R”中找到完整的示例代码。

手动指定选项

您还可以手动指定将要使用的数据源以及您想要传递给数据源的任何额外选项。数据源由其完全限定名指定(例如,org.apache.spark.sql.parquet),但对于内置源,您也可以使用其简短名称(jsonparquetjdbcorclibsvmcsvtext)。从任何数据源类型加载的 DataFrame 可以使用此语法转换为其他类型。

请参阅 API 文档以了解内置源的可用选项,例如,org.apache.spark.sql.DataFrameReaderorg.apache.spark.sql.DataFrameWriter。在那里记录的选项也应该适用于非 Scala Spark API(例如 PySpark)。对于其他格式,请参阅特定格式的 API 文档。

要加载 JSON 文件,您可以使用

df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
在 Spark 仓库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
Dataset<Row> peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
在 Spark 仓库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
在 Spark 仓库的“examples/src/main/r/RSparkSQLExample.R”中找到完整的示例代码。

要加载 CSV 文件,您可以使用

df = spark.read.load("examples/src/main/resources/people.csv",
                     format="csv", sep=";", inferSchema="true", header="true")
在 Spark 仓库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
Dataset<Row> peopleDFCsv = spark.read().format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv");
在 Spark 仓库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
在 Spark 仓库的“examples/src/main/r/RSparkSQLExample.R”中找到完整的示例代码。

额外的选项在写入操作期间也会使用。例如,您可以控制 ORC 数据源的布隆过滤器和字典编码。以下 ORC 示例将仅为 favorite_color 创建布隆过滤器并使用字典编码。对于 Parquet,也存在 parquet.bloom.filter.enabledparquet.enable.dictionary。要查找有关额外 ORC/Parquet 选项的更多详细信息,请访问官方 Apache ORC / Parquet 网站。

ORC 数据源

df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
    .option("orc.bloom.filter.columns", "favorite_color")
    .option("orc.dictionary.key.threshold", "1.0")
    .option("orc.column.encoding.direct", "name")
    .save("users_with_options.orc"))
在 Spark 仓库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
usersDF.write.format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc")
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
usersDF.write().format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc");
在 Spark 仓库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
在 Spark 仓库的“examples/src/main/r/RSparkSQLExample.R”中找到完整的示例代码。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING ORC
OPTIONS (
  orc.bloom.filter.columns 'favorite_color',
  orc.dictionary.key.threshold '1.0',
  orc.column.encoding.direct 'name'
)

Parquet 数据源

df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet"))
在 Spark 仓库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
usersDF.write.format("parquet")
  .option("parquet.bloom.filter.enabled#favorite_color", "true")
  .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
  .option("parquet.enable.dictionary", "true")
  .option("parquet.page.write-checksum.enabled", "false")
  .save("users_with_options.parquet")
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
usersDF.write().format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet");
在 Spark 仓库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
在 Spark 仓库的“examples/src/main/r/RSparkSQLExample.R”中找到完整的示例代码。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
OPTIONS (
  `parquet.bloom.filter.enabled#favorite_color` true,
  `parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
  parquet.enable.dictionary true,
  parquet.page.write-checksum.enabled true
)

直接在文件上运行 SQL

除了使用读取 API 将文件加载到 DataFrame 并查询它之外,您还可以直接使用 SQL 查询该文件。

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
在 Spark 仓库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
Dataset<Row> sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
在 Spark 仓库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
在 Spark 仓库的“examples/src/main/r/RSparkSQLExample.R”中找到完整的示例代码。

保存模式

保存操作可以选择接受一个 SaveMode,它指定如果存在现有数据,如何处理这些数据。重要的是要意识到这些保存模式不使用任何锁定并且不是原子的。此外,在执行 Overwrite 时,将在写入新数据之前删除数据。

Scala/Java任何语言意义
SaveMode.ErrorIfExists(默认) "error" or "errorifexists"(默认) 将 DataFrame 保存到数据源时,如果数据已存在,则预计会抛出异常。
SaveMode.Append "append" 将 DataFrame 保存到数据源时,如果数据/表已存在,则预计 DataFrame 的内容将附加到现有数据。
SaveMode.Overwrite "overwrite" 覆盖模式意味着,将 DataFrame 保存到数据源时,如果数据/表已存在,则预计现有数据将被 DataFrame 的内容覆盖。
SaveMode.Ignore "ignore" 忽略模式意味着,将 DataFrame 保存到数据源时,如果数据已存在,则预计保存操作不会保存 DataFrame 的内容,也不会更改现有数据。这类似于 SQL 中的 CREATE TABLE IF NOT EXISTS

保存到持久表

DataFrame 也可以使用 saveAsTable 命令保存为 Hive 元存储中的持久表。请注意,使用此功能不需要现有的 Hive 部署。Spark 将为您创建一个默认的本地 Hive 元存储(使用 Derby)。与 createOrReplaceTempView 命令不同,saveAsTable 将物化 DataFrame 的内容并在 Hive 元存储中创建一个指向数据的指针。只要您保持与同一元存储的连接,持久表即使在 Spark 程序重新启动后仍将存在。可以通过在 SparkSession 上调用 table 方法并使用表的名称来创建持久表的 DataFrame。

对于基于文件的 data source,例如 text、parquet、json 等,您可以通过 path 选项指定自定义表路径,例如 df.write.option("path", "/some/path").saveAsTable("t")。当表被删除时,自定义表路径不会被删除,表数据仍然存在。如果没有指定自定义表路径,Spark 将将数据写入仓库目录下的默认表路径。当表被删除时,默认表路径也会被删除。

从 Spark 2.1 开始,持久数据源表在 Hive 元存储中存储了每个分区元数据。这带来了几个好处

请注意,在创建外部数据源表(具有 path 选项的表)时,默认情况下不会收集分区信息。要同步元存储中的分区信息,您可以调用 MSCK REPAIR TABLE

分桶、排序和分区

对于基于文件的 data source,还可以对输出进行分桶和排序或分区。分桶和排序仅适用于持久表

df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
在 Spark 仓库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
在 Spark 仓库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
CREATE TABLE users_bucketed_by_name(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS;

而分区可以在使用 Dataset API 时与 savesaveAsTable 一起使用。

df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
在 Spark 仓库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
usersDF
  .write()
  .partitionBy("favorite_color")
  .format("parquet")
  .save("namesPartByColor.parquet");
在 Spark 仓库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
CREATE TABLE users_by_favorite_color(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING csv PARTITIONED BY(favorite_color);

可以对单个表使用分区和分桶

df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("users_partitioned_bucketed"))
在 Spark 仓库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
usersDF
  .write()
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed");
在 Spark 仓库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
CREATE TABLE users_bucketed_and_partitioned(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;

partitionBy 创建一个目录结构,如 分区发现 部分所述。因此,它对基数高的列的适用性有限。相反,bucketBy 将数据分布在固定数量的桶中,并且可以在唯一值数量不受限制时使用。