通用加载/保存函数
最简单的形式是,默认数据源(parquet
,除非由 spark.sql.sources.default
配置)将用于所有操作。
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
手动指定选项
您还可以手动指定将要使用的数据源以及您想要传递给数据源的任何额外选项。数据源由其完全限定名指定(例如,org.apache.spark.sql.parquet
),但对于内置源,您也可以使用其简短名称(json
、parquet
、jdbc
、orc
、libsvm
、csv
、text
)。从任何数据源类型加载的 DataFrame 可以使用此语法转换为其他类型。
请参阅 API 文档以了解内置源的可用选项,例如,org.apache.spark.sql.DataFrameReader
和 org.apache.spark.sql.DataFrameWriter
。在那里记录的选项也应该适用于非 Scala Spark API(例如 PySpark)。对于其他格式,请参阅特定格式的 API 文档。
要加载 JSON 文件,您可以使用
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Dataset<Row> peopleDF =
spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
要加载 CSV 文件,您可以使用
df = spark.read.load("examples/src/main/resources/people.csv",
format="csv", sep=";", inferSchema="true", header="true")
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
Dataset<Row> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
额外的选项在写入操作期间也会使用。例如,您可以控制 ORC 数据源的布隆过滤器和字典编码。以下 ORC 示例将仅为 favorite_color
创建布隆过滤器并使用字典编码。对于 Parquet,也存在 parquet.bloom.filter.enabled
和 parquet.enable.dictionary
。要查找有关额外 ORC/Parquet 选项的更多详细信息,请访问官方 Apache ORC / Parquet 网站。
ORC 数据源
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc"))
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
Parquet 数据源
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))
usersDF.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet")
usersDF.write().format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet");
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
直接在文件上运行 SQL
除了使用读取 API 将文件加载到 DataFrame 并查询它之外,您还可以直接使用 SQL 查询该文件。
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
保存模式
保存操作可以选择接受一个 SaveMode
,它指定如果存在现有数据,如何处理这些数据。重要的是要意识到这些保存模式不使用任何锁定并且不是原子的。此外,在执行 Overwrite
时,将在写入新数据之前删除数据。
Scala/Java | 任何语言 | 意义 |
---|---|---|
SaveMode.ErrorIfExists (默认) |
"error" or "errorifexists" (默认) |
将 DataFrame 保存到数据源时,如果数据已存在,则预计会抛出异常。 |
SaveMode.Append |
"append" |
将 DataFrame 保存到数据源时,如果数据/表已存在,则预计 DataFrame 的内容将附加到现有数据。 |
SaveMode.Overwrite |
"overwrite" |
覆盖模式意味着,将 DataFrame 保存到数据源时,如果数据/表已存在,则预计现有数据将被 DataFrame 的内容覆盖。 |
SaveMode.Ignore |
"ignore" |
忽略模式意味着,将 DataFrame 保存到数据源时,如果数据已存在,则预计保存操作不会保存 DataFrame 的内容,也不会更改现有数据。这类似于 SQL 中的 CREATE TABLE IF NOT EXISTS 。 |
保存到持久表
DataFrame
也可以使用 saveAsTable
命令保存为 Hive 元存储中的持久表。请注意,使用此功能不需要现有的 Hive 部署。Spark 将为您创建一个默认的本地 Hive 元存储(使用 Derby)。与 createOrReplaceTempView
命令不同,saveAsTable
将物化 DataFrame 的内容并在 Hive 元存储中创建一个指向数据的指针。只要您保持与同一元存储的连接,持久表即使在 Spark 程序重新启动后仍将存在。可以通过在 SparkSession
上调用 table
方法并使用表的名称来创建持久表的 DataFrame。
对于基于文件的 data source,例如 text、parquet、json 等,您可以通过 path
选项指定自定义表路径,例如 df.write.option("path", "/some/path").saveAsTable("t")
。当表被删除时,自定义表路径不会被删除,表数据仍然存在。如果没有指定自定义表路径,Spark 将将数据写入仓库目录下的默认表路径。当表被删除时,默认表路径也会被删除。
从 Spark 2.1 开始,持久数据源表在 Hive 元存储中存储了每个分区元数据。这带来了几个好处
- 由于元存储只能返回查询所需的必要分区,因此不再需要在第一次查询表时发现所有分区。
- Hive DDL(如
ALTER TABLE PARTITION ... SET LOCATION
)现在可用于使用数据源 API 创建的表。
请注意,在创建外部数据源表(具有 path
选项的表)时,默认情况下不会收集分区信息。要同步元存储中的分区信息,您可以调用 MSCK REPAIR TABLE
。
分桶、排序和分区
对于基于文件的 data source,还可以对输出进行分桶和排序或分区。分桶和排序仅适用于持久表
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
而分区可以在使用 Dataset API 时与 save
和 saveAsTable
一起使用。
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");
可以对单个表使用分区和分桶
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed"))
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
usersDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed");
partitionBy
创建一个目录结构,如 分区发现 部分所述。因此,它对基数高的列的适用性有限。相反,bucketBy
将数据分布在固定数量的桶中,并且可以在唯一值数量不受限制时使用。