CSV 文件

Spark SQL 提供了 spark.read().csv("file_name") 用于将 CSV 格式的文件或文件目录读入 Spark DataFrame,以及 dataframe.write().csv("path") 用于写入 CSV 文件。函数 option() 可用于自定义读写行为,例如控制头部、分隔符、字符集等行为。

# spark is from the previous example
sc = spark.sparkContext

# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"

df = spark.read.csv(path)
df.show()
# +------------------+
# |               _c0|
# +------------------+
# |      name;age;job|
# |Jorge;30;Developer|
# |  Bob;32;Developer|
# +------------------+

# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# |  _c0|_c1|      _c2|
# +-----+---+---------+
# | name|age|      job|
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age|      job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)

# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# |        _c0|
# +-----------+
# |238val_238|
# |  86val_86|
# |311val_311|
# |  27val_27|
# |165val_165|
# +-----------+
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/sql/datasource.py"。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"

val df = spark.read.csv(path)
df.show()
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter" -> ";", "header" -> "true")).csv(path)

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";

Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");

// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。

数据源选项

CSV 的数据源选项可以通过以下方式设置:

属性名默认值含义范围
sep
delimiter
, 为每个字段和值设置一个分隔符。此分隔符可以是一个或多个字符。 读/写
extension csv 设置输出文件的文件扩展名。仅限于字母。长度必须等于 3。
encoding
charset
UTF-8 读取时,根据给定编码类型解码 CSV 文件。写入时,指定保存的 CSV 文件的编码 (charset)。CSV 内置函数会忽略此选项。 读/写
quote " 设置一个字符,用于转义引用值,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,需要设置空字符串而不是 null。对于写入,如果设置了空字符串,则使用 u0000(空字符)。 读/写
quoteAll false 一个标志,指示是否所有值都应始终用引号括起来。默认情况下,只转义包含引号字符的值。
escape \ 设置一个字符,用于转义已引用值内部的引号。 读/写
escapeQuotes true 一个标志,指示包含引号的值是否应始终用引号括起来。默认情况下,转义所有包含引号字符的值。
comment 设置一个字符,用于跳过以此字符开头的行。默认情况下,此功能被禁用。
header false 读取时,使用第一行作为列名。写入时,将列名作为第一行写入。请注意,如果给定路径是字符串 RDD,此头部选项将删除所有与头部相同的行(如果存在)。CSV 内置函数会忽略此选项。 读/写
inferSchema false 从数据自动推断输入模式。这需要对数据进行一次额外的遍历。CSV 内置函数会忽略此选项。
preferDate true 在模式推断 (inferSchema) 期间,如果字符串列的值满足 dateFormat 选项或默认日期格式,则尝试将其推断为 Date 类型。对于包含日期和时间戳混合的列,如果未指定时间戳格式,则尝试将其推断为 TimestampType,否则推断为 StringType
enforceSchema true 如果设置为 true,则指定的或推断的模式将强制应用于数据源文件,CSV 文件中的头部将被忽略。如果此选项设置为 false,则在 header 选项设置为 true 的情况下,模式将对照 CSV 文件中的所有头部进行验证。模式中的字段名和 CSV 头部中的列名根据其位置并考虑 spark.sql.caseSensitive 进行检查。尽管默认值为 true,但建议禁用 enforceSchema 选项以避免不正确的结果。CSV 内置函数会忽略此选项。
ignoreLeadingWhiteSpace false (用于读取), true (用于写入) 一个标志,指示是否应跳过读取/写入值中的前导空格。 读/写
ignoreTrailingWhiteSpace false (用于读取), true (用于写入) 一个标志,指示是否应跳过读取/写入值中的尾随空格。 读/写
nullValue 设置空值的字符串表示。从 2.0.1 版本开始,此 nullValue 参数适用于所有支持的类型,包括字符串类型。 读/写
nanValue NaN 设置非数值的字符串表示。
positiveInf Inf 设置正无穷大值的字符串表示。
negativeInf -Inf 设置负无穷大值的字符串表示。
dateFormat yyyy-MM-dd 设置指示日期格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于日期类型。 读/写
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 设置指示时间戳格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于时间戳类型。 读/写
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 设置指示不带时区时间戳格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于不带时区的时间戳类型,请注意,在写入或读取此数据类型时不支持区域偏移和时区组件。 读/写
enableDateTimeParsingFallback 如果时间解析器策略具有旧版设置或未提供自定义日期或时间戳模式,则启用此选项。 允许在值不匹配设置模式时回退到向后兼容(Spark 1.x 和 2.0)的日期和时间戳解析行为。
maxColumns 20480 定义记录可以拥有的列数的硬限制。
maxCharsPerColumn -1 定义读取的任何给定值允许的最大字符数。默认值为 -1,表示长度不限。
mode PERMISSIVE 允许在解析期间处理损坏记录的模式。它支持以下不区分大小写的模式。请注意,Spark 尝试在列剪枝下仅解析 CSV 中所需的列。因此,损坏的记录可能因所需的字段集而异。此行为可以通过 spark.sql.csv.parser.columnPruning.enabled(默认启用)进行控制。
  • PERMISSIVE: 当遇到损坏记录时,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式错误的字段设置为 null。为了保留损坏记录,用户可以在用户定义的模式中设置一个名为 columnNameOfCorruptRecord 的字符串类型字段。如果模式没有此字段,则在解析期间删除损坏记录。对于 CSV 而言,令牌数量少于/多于模式的记录不是损坏记录。当记录的令牌数量少于模式长度时,将额外字段设置为 null。当记录的令牌数量多于模式长度时,它会丢弃额外的令牌。
  • DROPMALFORMED: 忽略整个损坏记录。CSV 内置函数不支持此模式。
  • FAILFAST: 当遇到损坏记录时抛出异常。
columnNameOfCorruptRecord (spark.sql.columnNameOfCorruptRecord 配置的值) 允许重命名由 PERMISSIVE 模式创建的包含格式错误字符串的新字段。这会覆盖 spark.sql.columnNameOfCorruptRecord
multiLine false 允许一行跨越多行,通过将引用值内的换行符解析为值本身的一部分。CSV 内置函数会忽略此选项。
charToEscapeQuoteEscaping escape\0 设置一个字符,用于转义引号字符的转义。当转义字符和引号字符不同时,默认值为转义字符,否则为 \0 读/写
samplingRatio 1.0 定义用于模式推断的行比例。CSV 内置函数会忽略此选项。
emptyValue (用于读取), "" (用于写入) 设置空值的字符串表示。 读/写
locale en-US 将语言标签设置为 IETF BCP 47 格式的区域设置。例如,这在解析日期和时间戳时使用。
lineSep \r, \r\n\n (用于读取), \n (用于写入) 定义用于解析/写入的行分隔符。最大长度为 1 个字符。CSV 内置函数会忽略此选项。 读/写
unescapedQuoteHandling STOP_AT_DELIMITER 定义 CsvParser 如何处理带有未转义引号的值。
  • STOP_AT_CLOSING_QUOTE: 如果在输入中发现未转义的引号,则累积引号字符并继续将值解析为引用值,直到找到闭合引号。
  • BACK_TO_DELIMITER: 如果在输入中发现未转义的引号,则将该值视为非引用值。这将使解析器累积当前解析值的所有字符,直到找到分隔符。如果值中未找到分隔符,解析器将继续从输入中累积字符,直到找到分隔符或行尾。
  • STOP_AT_DELIMITER: 如果在输入中发现未转义的引号,则将该值视为非引用值。这将使解析器累积所有字符,直到在输入中找到分隔符或行尾。
  • SKIP_VALUE: 如果在输入中发现未转义的引号,则跳过为给定值解析的内容,并生成 nullValue 中设置的值。
  • RAISE_ERROR: 如果在输入中发现未转义的引号,则抛出 TextParsingException。
compression
codec
(无) 保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的短名称之一(none, bzip2, gzip, lz4, snappydeflate)。CSV 内置函数会忽略此选项。
timeZone (spark.sql.session.timeZone 配置的值) 设置指示时区 ID 的字符串,用于格式化 JSON 数据源或分区值中的时间戳。支持以下 timeZone 格式:
  • 基于区域的时区 ID:应采用 'area/city' 形式,例如 'America/Los_Angeles'。
  • 时区偏移:应采用 '(+|- )HH:mm' 格式,例如 '-08:00' 或 '+01:00'。此外,支持 'UTC' 和 'Z' 作为 '+00:00' 的别名。
不建议使用 'CST' 等其他短名称,因为它们可能存在歧义。
读/写

其他通用选项可以在 通用文件源选项 中找到。