CSV 文件
Spark SQL 提供了 spark.read().csv("file_name")
用于将 CSV 格式的文件或文件目录读入 Spark DataFrame,以及 dataframe.write().csv("path")
用于写入 CSV 文件。函数 option()
可用于自定义读写行为,例如控制头部、分隔符、字符集等行为。
# spark is from the previous example
sc = spark.sparkContext
# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"
df = spark.read.csv(path)
df.show()
# +------------------+
# | _c0|
# +------------------+
# | name;age;job|
# |Jorge;30;Developer|
# | Bob;32;Developer|
# +------------------+
# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# | _c0|_c1| _c2|
# +-----+---+---------+
# | name|age| job|
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age| job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)
# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")
# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# | _c0|
# +-----------+
# |238val_238|
# | 86val_86|
# |311val_311|
# | 27val_27|
# |165val_165|
# +-----------+
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"
val df = spark.read.csv(path)
df.show()
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter" -> ";", "header" -> "true")).csv(path)
// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")
// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";
Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);
// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");
// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
数据源选项
CSV 的数据源选项可以通过以下方式设置:
- DataFrameReader 的
.option
/.options
方法DataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
- 以下内置函数
from_csv
to_csv
schema_of_csv
- CREATE TABLE USING DATA_SOURCE 中的
OPTIONS
子句
属性名 | 默认值 | 含义 | 范围 |
---|---|---|---|
sep delimiter |
, | 为每个字段和值设置一个分隔符。此分隔符可以是一个或多个字符。 | 读/写 |
extension |
csv | 设置输出文件的文件扩展名。仅限于字母。长度必须等于 3。 | 写 |
encoding charset |
UTF-8 | 读取时,根据给定编码类型解码 CSV 文件。写入时,指定保存的 CSV 文件的编码 (charset)。CSV 内置函数会忽略此选项。 | 读/写 |
quote |
" | 设置一个字符,用于转义引用值,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,需要设置空字符串而不是 null 。对于写入,如果设置了空字符串,则使用 u0000 (空字符)。 |
读/写 |
quoteAll |
false | 一个标志,指示是否所有值都应始终用引号括起来。默认情况下,只转义包含引号字符的值。 | 写 |
escape |
\ | 设置一个字符,用于转义已引用值内部的引号。 | 读/写 |
escapeQuotes |
true | 一个标志,指示包含引号的值是否应始终用引号括起来。默认情况下,转义所有包含引号字符的值。 | 写 |
comment |
设置一个字符,用于跳过以此字符开头的行。默认情况下,此功能被禁用。 | 读 | |
header |
false | 读取时,使用第一行作为列名。写入时,将列名作为第一行写入。请注意,如果给定路径是字符串 RDD,此头部选项将删除所有与头部相同的行(如果存在)。CSV 内置函数会忽略此选项。 | 读/写 |
inferSchema |
false | 从数据自动推断输入模式。这需要对数据进行一次额外的遍历。CSV 内置函数会忽略此选项。 | 读 |
preferDate |
true | 在模式推断 (inferSchema ) 期间,如果字符串列的值满足 dateFormat 选项或默认日期格式,则尝试将其推断为 Date 类型。对于包含日期和时间戳混合的列,如果未指定时间戳格式,则尝试将其推断为 TimestampType ,否则推断为 StringType 。 |
读 |
enforceSchema |
true | 如果设置为 true ,则指定的或推断的模式将强制应用于数据源文件,CSV 文件中的头部将被忽略。如果此选项设置为 false ,则在 header 选项设置为 true 的情况下,模式将对照 CSV 文件中的所有头部进行验证。模式中的字段名和 CSV 头部中的列名根据其位置并考虑 spark.sql.caseSensitive 进行检查。尽管默认值为 true,但建议禁用 enforceSchema 选项以避免不正确的结果。CSV 内置函数会忽略此选项。 |
读 |
ignoreLeadingWhiteSpace |
false (用于读取), true (用于写入) |
一个标志,指示是否应跳过读取/写入值中的前导空格。 | 读/写 |
ignoreTrailingWhiteSpace |
false (用于读取), true (用于写入) |
一个标志,指示是否应跳过读取/写入值中的尾随空格。 | 读/写 |
nullValue |
设置空值的字符串表示。从 2.0.1 版本开始,此 nullValue 参数适用于所有支持的类型,包括字符串类型。 |
读/写 | |
nanValue |
NaN | 设置非数值的字符串表示。 | 读 |
positiveInf |
Inf | 设置正无穷大值的字符串表示。 | 读 |
negativeInf |
-Inf | 设置负无穷大值的字符串表示。 | 读 |
dateFormat |
yyyy-MM-dd | 设置指示日期格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于日期类型。 | 读/写 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] | 设置指示时间戳格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于时间戳类型。 | 读/写 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] | 设置指示不带时区时间戳格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于不带时区的时间戳类型,请注意,在写入或读取此数据类型时不支持区域偏移和时区组件。 | 读/写 |
enableDateTimeParsingFallback |
如果时间解析器策略具有旧版设置或未提供自定义日期或时间戳模式,则启用此选项。 | 允许在值不匹配设置模式时回退到向后兼容(Spark 1.x 和 2.0)的日期和时间戳解析行为。 | 读 |
maxColumns |
20480 | 定义记录可以拥有的列数的硬限制。 | 读 |
maxCharsPerColumn |
-1 | 定义读取的任何给定值允许的最大字符数。默认值为 -1,表示长度不限。 | 读 |
mode |
PERMISSIVE | 允许在解析期间处理损坏记录的模式。它支持以下不区分大小写的模式。请注意,Spark 尝试在列剪枝下仅解析 CSV 中所需的列。因此,损坏的记录可能因所需的字段集而异。此行为可以通过 spark.sql.csv.parser.columnPruning.enabled (默认启用)进行控制。
|
读 |
columnNameOfCorruptRecord |
(spark.sql.columnNameOfCorruptRecord 配置的值) |
允许重命名由 PERMISSIVE 模式创建的包含格式错误字符串的新字段。这会覆盖 spark.sql.columnNameOfCorruptRecord 。 |
读 |
multiLine |
false | 允许一行跨越多行,通过将引用值内的换行符解析为值本身的一部分。CSV 内置函数会忽略此选项。 | 读 |
charToEscapeQuoteEscaping |
escape 或 \0 |
设置一个字符,用于转义引号字符的转义。当转义字符和引号字符不同时,默认值为转义字符,否则为 \0 。 |
读/写 |
samplingRatio |
1.0 | 定义用于模式推断的行比例。CSV 内置函数会忽略此选项。 | 读 |
emptyValue |
(用于读取), "" (用于写入) |
设置空值的字符串表示。 | 读/写 |
locale |
en-US | 将语言标签设置为 IETF BCP 47 格式的区域设置。例如,这在解析日期和时间戳时使用。 | 读 |
lineSep |
\r , \r\n 和 \n (用于读取), \n (用于写入) |
定义用于解析/写入的行分隔符。最大长度为 1 个字符。CSV 内置函数会忽略此选项。 | 读/写 |
unescapedQuoteHandling |
STOP_AT_DELIMITER | 定义 CsvParser 如何处理带有未转义引号的值。
|
读 |
compression codec |
(无) | 保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的短名称之一(none , bzip2 , gzip , lz4 , snappy 和 deflate )。CSV 内置函数会忽略此选项。 |
写 |
timeZone |
(spark.sql.session.timeZone 配置的值) |
设置指示时区 ID 的字符串,用于格式化 JSON 数据源或分区值中的时间戳。支持以下 timeZone 格式:
|
读/写 |
其他通用选项可以在 通用文件源选项 中找到。