通用加载/保存函数
最简单的形式是,默认数据源(parquet
,除非通过 spark.sql.sources.default
进行配置)将用于所有操作。
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
手动指定选项
您还可以手动指定要使用的数据源以及要传递给数据源的任何额外选项。数据源由其完全限定名称指定(即 org.apache.spark.sql.parquet
),但对于内置源,您也可以使用其短名称(json
、parquet
、jdbc
、orc
、libsvm
、csv
、text
)。从任何数据源类型加载的 DataFrames 都可以使用此语法转换为其他类型。
请参考 API 文档,了解内置源的可用选项,例如 org.apache.spark.sql.DataFrameReader
和 org.apache.spark.sql.DataFrameWriter
。那里记录的选项也应该适用于非 Scala Spark API(例如 PySpark)。对于其他格式,请参阅特定格式的 API 文档。
要加载 JSON 文件,您可以使用
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Dataset<Row> peopleDF =
spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
要加载 CSV 文件,您可以使用
df = spark.read.load("examples/src/main/resources/people.csv",
format="csv", sep=";", inferSchema="true", header="true")
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
Dataset<Row> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
额外的选项也用于写入操作。 例如,您可以控制 ORC 数据源的 bloom 过滤器和字典编码。 以下 ORC 示例将创建 bloom 过滤器,并且仅对 favorite_color
使用字典编码。 对于 Parquet,也存在 parquet.bloom.filter.enabled
和 parquet.enable.dictionary
。 要查找有关 ORC/Parquet 额外选项的更多详细信息,请访问 Apache 官方 ORC / Parquet 网站。
ORC 数据源
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc"))
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
CREATE TABLE users_with_options (
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING ORC
OPTIONS (
orc.bloom.filter.columns 'favorite_color',
orc.dictionary.key.threshold '1.0',
orc.column.encoding.direct 'name'
)
Parquet 数据源
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))
usersDF.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet")
usersDF.write().format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet");
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
CREATE TABLE users_with_options (
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
OPTIONS (
`parquet.bloom.filter.enabled#favorite_color` true,
`parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
parquet.enable.dictionary true,
parquet.page.write-checksum.enabled true
)
直接在文件上运行 SQL
除了使用 read API 将文件加载到 DataFrame 并查询它之外,您还可以使用 SQL 直接查询该文件。
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
保存模式
保存操作可以选择采用 SaveMode
,它指定如果存在现有数据,如何处理。 重要的是要意识到这些保存模式不利用任何锁定并且不是原子的。 此外,在执行 Overwrite
时,数据将在写入新数据之前被删除。
Scala/Java | 任何语言 | 含义 |
---|---|---|
SaveMode.ErrorIfExists (默认) |
"error" 或 "errorifexists" (默认) |
将 DataFrame 保存到数据源时,如果数据已存在,则应抛出异常。 |
SaveMode.Append |
"append" |
将 DataFrame 保存到数据源时,如果数据/表已存在,则 DataFrame 的内容应附加到现有数据。 |
SaveMode.Overwrite |
"overwrite" |
覆盖模式意味着将 DataFrame 保存到数据源时,如果数据/表已存在,则现有数据应被 DataFrame 的内容覆盖。 |
SaveMode.Ignore |
"ignore" |
忽略模式意味着将 DataFrame 保存到数据源时,如果数据已存在,则保存操作不应保存 DataFrame 的内容,也不应更改现有数据。 这类似于 SQL 中的 CREATE TABLE IF NOT EXISTS 。 |
保存到持久表
DataFrames
也可以使用 saveAsTable
命令作为持久表保存到 Hive 元存储中。 请注意,不需要现有的 Hive 部署即可使用此功能。 Spark 将为您创建一个默认的本地 Hive 元存储(使用 Derby)。 与 createOrReplaceTempView
命令不同,saveAsTable
将实现 DataFrame 的内容,并在 Hive 元存储中创建一个指向数据的指针。 只要您保持与同一元存储的连接,即使您的 Spark 程序重新启动,持久表仍然存在。 可以通过在具有表名称的 SparkSession
上调用 table
方法来创建持久表的 DataFrame。
对于基于文件的数据源,例如 text、parquet、json 等,您可以通过 path
选项指定自定义表路径,例如 df.write.option("path", "/some/path").saveAsTable("t")
。 当表被删除时,自定义表路径不会被删除,表数据仍然存在。 如果未指定自定义表路径,Spark 会将数据写入仓库目录下的默认表路径。 当表被删除时,默认表路径也会被删除。
从 Spark 2.1 开始,持久性数据源表在 Hive 元存储中存储每个分区的元数据。 这带来了几个好处
- 由于元存储只能返回查询所需的必要分区,因此不再需要在对表进行第一次查询时发现所有分区。
- Hive DDL(例如
ALTER TABLE PARTITION ... SET LOCATION
)现在可用于使用 Datasource API 创建的表。
请注意,默认情况下,在创建外部数据源表(具有 path
选项的表)时,不会收集分区信息。 要同步元存储中的分区信息,您可以调用 MSCK REPAIR TABLE
。
分桶、排序和分区
对于基于文件的数据源,也可以对输出进行分桶和排序或分区。 分桶和排序仅适用于持久表
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
CREATE TABLE users_bucketed_by_name(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS;
当使用 Dataset API 时,分区可以与 save
和 saveAsTable
一起使用。
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");
CREATE TABLE users_by_favorite_color(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING csv PARTITIONED BY(favorite_color);
可以为一个表同时使用分区和分桶
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed"))
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
usersDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed");
CREATE TABLE users_bucketed_and_partitioned(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;
partitionBy
创建一个目录结构,如 分区发现 部分中所述。 因此,它对具有高基数的列的适用性有限。 相比之下,bucketBy
将数据分布到固定数量的桶中,并且可以在唯一值的数量不受限制时使用。