CSV 文件
Spark SQL 提供了 spark.read().csv("file_name")
来将 CSV 格式的文件或文件目录读取到 Spark DataFrame 中,并提供了 dataframe.write().csv("path")
来写入 CSV 文件。函数 option()
可用于自定义读取或写入的行为,例如控制标题、分隔符、字符集等的行为。
# spark is from the previous example
sc = spark.sparkContext
# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"
df = spark.read.csv(path)
df.show()
# +------------------+
# | _c0|
# +------------------+
# | name;age;job|
# |Jorge;30;Developer|
# | Bob;32;Developer|
# +------------------+
# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# | _c0|_c1| _c2|
# +-----+---+---------+
# | name|age| job|
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age| job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)
# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")
# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# | _c0|
# +-----------+
# |238val_238|
# | 86val_86|
# |311val_311|
# | 27val_27|
# |165val_165|
# +-----------+
完整的示例代码可以在 Spark 仓库的 "examples/src/main/python/sql/datasource.py" 中找到。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"
val df = spark.read.csv(path)
df.show()
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)
// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")
// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
完整的示例代码可以在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 中找到。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";
Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);
// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");
// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
完整的示例代码可以在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中找到。
数据源选项
CSV 的数据源选项可以通过以下方式设置
.option
/.options
方法DataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
- 以下内置函数
from_csv
to_csv
schema_of_csv
- CREATE TABLE USING DATA_SOURCE 中的
OPTIONS
子句
属性名称 | 默认值 | 含义 | 范围 |
---|---|---|---|
sep |
, | , | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
读取/写入 |
encoding | UTF-8 | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
对于读取,使用给定的编码类型解码 CSV 文件。对于写入,指定保存的 CSV 文件的编码(字符集)。CSV 内置函数忽略此选项。 |
" | quote | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
" |
设置用于转义带引号的值的单个字符,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,则需要设置为空字符串而不是 null 。对于写入,如果设置为空字符串,则使用 u0000 (空字符)。 |
quoteAll | false |
一个标志,指示是否应始终将所有值括在引号中。默认情况下,仅转义包含引号字符的值。 |
\ | 写入 | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
escape |
\ | 设置用于转义已加引号的值内的引号的单个字符。 | false |
escapeQuotes |
true | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 | |
comment |
设置用于转义带引号的值的单个字符,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,则需要设置为空字符串而不是 null 。对于写入,如果设置为空字符串,则使用 u0000 (空字符)。 |
空 | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
设置用于跳过以此字符开头的行的单个字符。默认情况下,它是禁用的。 |
设置用于转义带引号的值的单个字符,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,则需要设置为空字符串而不是 null 。对于写入,如果设置为空字符串,则使用 u0000 (空字符)。 |
读取 | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
header |
\ | false | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
对于读取,使用第一行作为列名。对于写入,将列名写入第一行。请注意,如果给定的路径是字符串的 RDD,则此标题选项将删除与标题相同的所有行(如果存在)。CSV 内置函数忽略此选项。 |
\ | inferSchema | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
false |
从数据中自动推断输入架构。它需要对数据进行一次额外的遍历。CSV 内置函数忽略此选项。 | preferDate | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
false |
从数据中自动推断输入架构。它需要对数据进行一次额外的遍历。CSV 内置函数忽略此选项。 | 在架构推断 (inferSchema ) 期间,如果值满足 dateFormat 选项或默认日期格式,则尝试将包含日期的字符串列推断为 Date 。对于包含日期和时间戳混合的列,如果未指定时间戳格式,则尝试将它们推断为 TimestampType ,否则将它们推断为 StringType 。 |
设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
enforceSchema |
true | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 | |
如果将其设置为 |
ignoreLeadingWhiteSpace | false (用于读取),true (用于写入) |
一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
一个标志,指示是否应跳过正在读取/写入的值中的前导空格。 |
ignoreTrailingWhiteSpace | false (用于读取),true (用于写入) |
一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
一个标志,指示是否应跳过正在读取/写入的值中的尾随空格。 |
nullValue | 空 | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
设置空值的字符串表示形式。从 2.0.1 开始,此 |
nanValue | NaN | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
设置非数字值的字符串表示形式。 |
positiveInf | Inf | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
设置正无穷大值的字符串表示形式。 |
negativeInf | -Inf | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
设置负无穷大值的字符串表示形式。 |
dateFormat | yyyy-MM-dd | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
设置指示日期格式的字符串。自定义日期格式遵循日期时间模式中的格式。这适用于日期类型。 |
20480 | timestampFormat | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
-1 | 设置指示时间戳格式的字符串。自定义日期格式遵循日期时间模式中的格式。这适用于时间戳类型。 | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] | 设置指示没有时区格式的时间戳的字符串。自定义日期格式遵循日期时间模式中的格式。这适用于没有时区类型的时间戳,请注意,在写入或读取此数据类型时,不支持区域偏移量和时区组件。
|
一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
maxColumns |
20480 | 定义记录可以具有的最大列数。 | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
maxCharsPerColumn |
设置用于转义带引号的值的单个字符,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,则需要设置为空字符串而不是 null 。对于写入,如果设置为空字符串,则使用 u0000 (空字符)。 |
4096 | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
定义允许读取的任何给定值的最大字符数。默认情况下,它是 -1,表示长度不受限制 |
mode | PERMISSIVE | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
允许一种在解析过程中处理损坏记录的模式。它支持以下不区分大小写的模式。请注意,在列剪枝下,Spark 尝试仅解析 CSV 中的必需列。因此,根据所需的字段集,损坏的记录可能会有所不同。此行为可以通过 |
1.0 | PERMISSIVE :当遇到损坏的记录时,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式错误的字段设置为 null 。要保留损坏的记录,用户可以在用户定义的架构中设置一个名为 columnNameOfCorruptRecord 的字符串类型字段。如果架构没有该字段,则在解析过程中会删除损坏的记录。令牌少于/多于架构的记录对于 CSV 而言不是损坏的记录。当遇到令牌少于架构长度的记录时,将为多余的字段设置 null 。当记录的令牌多于架构的长度时,它将删除多余的令牌。 |
一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
|
FAILFAST :当遇到损坏的记录时抛出异常。 |
columnNameOfCorruptRecord | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
( |
允许重命名由 PERMISSIVE 模式创建的具有格式错误字符串的新字段。这将覆盖 spark.sql.columnNameOfCorruptRecord 。 |
multiLine | 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
false |
每个文件解析一条记录,该记录可能跨越多行。CSV 内置函数忽略此选项。 | charToEscapeQuoteEscaping | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 |
|
设置用于转义引号字符的转义符的单个字符。默认值是转义字符(当转义字符和引号字符不同时),否则为 \0 。 |
samplingRatio
|
一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。 |
locale |
en-US | 以 IETF BCP 47 格式设置语言标签形式的区域设置。例如,这在解析日期和时间戳时使用。 | false |
lineSep