CSV 文件

Spark SQL 提供了 spark.read().csv("file_name") 来读取 CSV 格式的文件或文件目录到 Spark DataFrame 中,以及 dataframe.write().csv("path") 来写入到 CSV 文件。 函数 option() 可以用来定制读取或写入的行为,例如控制 header,分隔符,字符集等等。

# spark is from the previous example
sc = spark.sparkContext

# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"

df = spark.read.csv(path)
df.show()
# +------------------+
# |               _c0|
# +------------------+
# |      name;age;job|
# |Jorge;30;Developer|
# |  Bob;32;Developer|
# +------------------+

# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# |  _c0|_c1|      _c2|
# +-----+---+---------+
# | name|age|      job|
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age|      job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)

# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# |        _c0|
# +-----------+
# |238val_238|
# |  86val_86|
# |311val_311|
# |  27val_27|
# |165val_165|
# +-----------+
在 Spark 代码仓库的 "examples/src/main/python/sql/datasource.py" 中查找完整的示例代码。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"

val df = spark.read.csv(path)
df.show()
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
在 Spark 代码仓库的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 中查找完整的示例代码。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";

Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");

// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
在 Spark 代码仓库的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中查找完整的示例代码。

数据源选项

CSV 的数据源选项可以通过以下方式设置

属性名默认值含义作用域
sep , 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 读取/写入
encoding UTF-8 对于读取,使用给定的编码类型解码 CSV 文件。 对于写入,指定保存的 CSV 文件的编码 (字符集)。 CSV 内置函数忽略此选项。 读取/写入
quote " 设置一个用于转义引用值的单字符,其中分隔符可以是值的一部分。 对于读取,如果要关闭引号,则需要设置的不是 null 而是空字符串。 对于写入,如果设置了空字符串,则使用 u0000(空字符)。 读取/写入
quoteAll false 一个标志,指示是否应始终将所有值括在引号中。 默认值是仅转义包含引号字符的值。 写入
escape \ 设置一个用于转义已经引用的值中的引号的单字符。 读取/写入
escapeQuotes true 一个标志,指示是否应始终将包含引号的值括在引号中。 默认值是转义所有包含引号字符的值。 写入
comment 设置一个用于跳过以此字符开头的行的单字符。 默认情况下,它是禁用的。 读取
header false 对于读取,使用第一行作为列名。 对于写入,将列名作为第一行写入。 请注意,如果给定的路径是字符串的 RDD,则此 header 选项将删除所有与 header 相同的行(如果存在)。 CSV 内置函数忽略此选项。 读取/写入
inferSchema false 从数据自动推断输入模式。 它需要对数据进行一次额外的传递。 CSV 内置函数忽略此选项。 读取
preferDate true 在模式推断期间 (inferSchema),如果值满足 dateFormat 选项或默认日期格式,则尝试将包含日期的字符串列推断为 Date。 对于包含日期和时间戳混合的列,如果未指定时间戳格式,则尝试将它们推断为 TimestampType,否则将它们推断为 StringType 读取
enforceSchema true 如果设置为 true,则指定的或推断的模式将被强制应用于数据源文件,并且 CSV 文件中的 header 将被忽略。 如果该选项设置为 false,则在将 header 选项设置为 true 时,将针对 CSV 文件中的所有 header 验证该模式。 模式中的字段名称和 CSV header 中的列名通过它们的位置进行检查,同时考虑 spark.sql.caseSensitive。 虽然默认值为 true,但建议禁用 enforceSchema 选项以避免不正确的结果。 CSV 内置函数忽略此选项。 读取
ignoreLeadingWhiteSpace false (对于读取), true (对于写入) 一个标志,指示是否应跳过正在读取/写入的值中的前导空格。 读取/写入
ignoreTrailingWhiteSpace false (对于读取), true (对于写入) 一个标志,指示是否应跳过正在读取/写入的值中的尾随空格。 读取/写入
nullValue 设置 null 值的字符串表示形式。 从 2.0.1 开始,此 nullValue 参数适用于所有支持的类型,包括字符串类型。 读取/写入
nanValue NaN 设置非数字值的字符串表示形式。 读取
positiveInf Inf 设置正无穷值的字符串表示形式。 读取
negativeInf -Inf 设置负无穷值的字符串表示形式。 读取
dateFormat yyyy-MM-dd 设置指示日期格式的字符串。 自定义日期格式遵循 Datetime Patterns 中的格式。 这适用于日期类型。 读取/写入
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 设置指示时间戳格式的字符串。 自定义日期格式遵循 Datetime Patterns 中的格式。 这适用于时间戳类型。 读取/写入
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 设置指示不带时区的时间戳格式的字符串。 自定义日期格式遵循 Datetime Patterns 中的格式。 这适用于不带时区的时间戳类型,请注意,写入或读取此数据类型时不支持区域偏移和时区组件。 读取/写入
enableDateTimeParsingFallback 如果时间解析器策略具有旧版设置,或者未提供自定义日期或时间戳模式,则启用。 如果值与设置的模式不匹配,则允许回退到解析日期和时间戳的向后兼容(Spark 1.x 和 2.0)行为。 读取
maxColumns 20480 定义记录可以拥有的列数的硬性限制。 读取
maxCharsPerColumn -1 定义允许读取的任何给定值的最大字符数。 默认情况下,它是 -1,表示长度不受限制 读取
mode PERMISSIVE 允许一种用于处理解析期间的损坏记录的模式。 它支持以下不区分大小写的模式。 请注意,Spark 尝试在列修剪下仅解析 CSV 中所需的列。 因此,损坏的记录可能因所需的字段集而异。 此行为可以通过 spark.sql.csv.parser.columnPruning.enabled 控制(默认情况下启用)。
  • PERMISSIVE: 当它遇到损坏的记录时,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式错误的字段设置为 null。 要保留损坏的记录,用户可以在用户定义的模式中设置一个名为 columnNameOfCorruptRecord 的字符串类型字段。 如果模式没有该字段,它会在解析期间删除损坏的记录。 令牌少于/多于模式的记录不是 CSV 的损坏记录。 当它遇到一个令牌少于模式长度的记录时,会将 null 设置为额外的字段。 当记录的令牌多于模式的长度时,它会删除额外的令牌。
  • DROPMALFORMED: 忽略整个损坏的记录。 CSV 内置函数不支持此模式。
  • FAILFAST: 当它遇到损坏的记录时,抛出一个异常。
读取
columnNameOfCorruptRecord (spark.sql.columnNameOfCorruptRecord 配置的值) 允许重命名由 PERMISSIVE 模式创建的具有格式错误字符串的新字段。 这将覆盖 spark.sql.columnNameOfCorruptRecord 读取
multiLine false 解析每个文件的一条记录,该记录可能跨越多行。 CSV 内置函数忽略此选项。 读取
charToEscapeQuoteEscaping escape\0 设置一个用于转义引号字符的转义的单字符。 当转义字符和引号字符不同时,默认值为转义字符,否则为 \0 读取/写入
samplingRatio 1.0 定义用于模式推断的行的一部分。 CSV 内置函数忽略此选项。 读取
emptyValue (对于读取), "" (对于写入) 设置空值的字符串表示形式。 读取/写入
locale en-US 以 IETF BCP 47 格式设置语言标记的区域设置。 例如,这在解析日期和时间戳时使用。 读取
lineSep \r, \r\n and \n (对于读取), \n (对于写入) 定义应用于解析/写入的行分隔符。 最大长度为 1 个字符。 CSV 内置函数忽略此选项。 读取/写入
unescapedQuoteHandling STOP_AT_DELIMITER 定义 CsvParser 如何处理带有未转义引号的值。
  • STOP_AT_CLOSING_QUOTE: 如果在输入中找到未转义的引号,则累积引号字符并继续将该值解析为带引号的值,直到找到结束引号为止。
  • BACK_TO_DELIMITER: 如果在输入中找到未转义的引号,则将该值视为未带引号的值。 这将使解析器累积当前解析值的所有字符,直到找到分隔符为止。 如果在该值中未找到分隔符,则解析器将继续累积来自输入的字符,直到找到分隔符或行尾为止。
  • STOP_AT_DELIMITER:如果在输入中找到未转义的引号,则将该值视为未加引号的值。这将使解析器累积所有字符,直到在输入中找到分隔符或行尾。
  • SKIP_VALUE:如果在输入中找到未转义的引号,则将跳过为给定值解析的内容,并生成 nullValue 中设置的值。
  • RAISE_ERROR:如果在输入中找到未转义的引号,则会抛出 TextParsingException。
读取
压缩 (compression) (无) (none) 保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(nonebzip2gziplz4snappydeflate)。 CSV 内置函数会忽略此选项。 写入

其他通用选项可以在 通用文件源选项 中找到。