CSV 文件

Spark SQL 提供了 spark.read().csv("file_name") 来将 CSV 格式的文件或文件目录读取到 Spark DataFrame 中,并提供了 dataframe.write().csv("path") 来写入 CSV 文件。函数 option() 可用于自定义读取或写入的行为,例如控制标题、分隔符、字符集等的行为。

# spark is from the previous example
sc = spark.sparkContext

# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"

df = spark.read.csv(path)
df.show()
# +------------------+
# |               _c0|
# +------------------+
# |      name;age;job|
# |Jorge;30;Developer|
# |  Bob;32;Developer|
# +------------------+

# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# |  _c0|_c1|      _c2|
# +-----+---+---------+
# | name|age|      job|
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age|      job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)

# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# |        _c0|
# +-----------+
# |238val_238|
# |  86val_86|
# |311val_311|
# |  27val_27|
# |165val_165|
# +-----------+
完整的示例代码可以在 Spark 仓库的 "examples/src/main/python/sql/datasource.py" 中找到。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"

val df = spark.read.csv(path)
df.show()
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")

// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
完整的示例代码可以在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 中找到。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";

Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+

// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+

// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);

// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");

// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
完整的示例代码可以在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中找到。

数据源选项

CSV 的数据源选项可以通过以下方式设置

属性名称默认值含义范围
sep , , 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
读取/写入 encoding UTF-8 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
对于读取,使用给定的编码类型解码 CSV 文件。对于写入,指定保存的 CSV 文件的编码(字符集)。CSV 内置函数忽略此选项。 " quote 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
" 设置用于转义带引号的值的单个字符,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,则需要设置为空字符串而不是 null。对于写入,如果设置为空字符串,则使用 u0000(空字符)。 quoteAll false
一个标志,指示是否应始终将所有值括在引号中。默认情况下,仅转义包含引号字符的值。 \ 写入 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
escape \ 设置用于转义已加引号的值内的引号的单个字符。 false
escapeQuotes true 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
comment 设置用于转义带引号的值的单个字符,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,则需要设置为空字符串而不是 null。对于写入,如果设置为空字符串,则使用 u0000(空字符)。 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
设置用于跳过以此字符开头的行的单个字符。默认情况下,它是禁用的。 设置用于转义带引号的值的单个字符,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,则需要设置为空字符串而不是 null。对于写入,如果设置为空字符串,则使用 u0000(空字符)。 读取 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
header \ false 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
对于读取,使用第一行作为列名。对于写入,将列名写入第一行。请注意,如果给定的路径是字符串的 RDD,则此标题选项将删除与标题相同的所有行(如果存在)。CSV 内置函数忽略此选项。 \ inferSchema 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
false 从数据中自动推断输入架构。它需要对数据进行一次额外的遍历。CSV 内置函数忽略此选项。 preferDate 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
false 从数据中自动推断输入架构。它需要对数据进行一次额外的遍历。CSV 内置函数忽略此选项。 在架构推断 (inferSchema) 期间,如果值满足 dateFormat 选项或默认日期格式,则尝试将包含日期的字符串列推断为 Date。对于包含日期和时间戳混合的列,如果未指定时间戳格式,则尝试将它们推断为 TimestampType,否则将它们推断为 StringType 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
enforceSchema true 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
如果将其设置为 true,则指定的或推断的架构将强制应用于数据源文件,并且 CSV 文件中的标题将被忽略。如果将该选项设置为 false,则在 header 选项设置为 true 的情况下,将针对 CSV 文件中的所有标题验证架构。架构中的字段名称和 CSV 标题中的列名称将通过考虑 spark.sql.caseSensitive 的位置进行检查。尽管默认值为 true,但建议禁用 enforceSchema 选项以避免出现错误结果。CSV 内置函数忽略此选项。 ignoreLeadingWhiteSpace false(用于读取),true(用于写入) 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
一个标志,指示是否应跳过正在读取/写入的值中的前导空格。 ignoreTrailingWhiteSpace false(用于读取),true(用于写入) 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
一个标志,指示是否应跳过正在读取/写入的值中的尾随空格。 nullValue 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
设置空值的字符串表示形式。从 2.0.1 开始,此 nullValue 参数适用于所有受支持的类型,包括字符串类型。 nanValue NaN 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
设置非数字值的字符串表示形式。 positiveInf Inf 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
设置正无穷大值的字符串表示形式。 negativeInf -Inf 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
设置负无穷大值的字符串表示形式。 dateFormat yyyy-MM-dd 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
设置指示日期格式的字符串。自定义日期格式遵循日期时间模式中的格式。这适用于日期类型。 20480 timestampFormat 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] -1 设置指示时间戳格式的字符串。自定义日期格式遵循日期时间模式中的格式。这适用于时间戳类型。 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 设置指示没有时区格式的时间戳的字符串。自定义日期格式遵循日期时间模式中的格式。这适用于没有时区类型的时间戳,请注意,在写入或读取此数据类型时,不支持区域偏移量和时区组件。
  • enableDateTimeParsingFallback
  • 如果时间解析器策略具有旧设置,或者未提供自定义日期或时间戳模式,则启用。
  • 如果值与设置的模式不匹配,则允许回退到解析日期和时间戳的向后兼容(Spark 1.x 和 2.0)行为。
一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
maxColumns 20480 定义记录可以具有的最大列数。 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
maxCharsPerColumn 设置用于转义带引号的值的单个字符,其中分隔符可以是值的一部分。对于读取,如果要关闭引号,则需要设置为空字符串而不是 null。对于写入,如果设置为空字符串,则使用 u0000(空字符)。 4096 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
定义允许读取的任何给定值的最大字符数。默认情况下,它是 -1,表示长度不受限制 mode PERMISSIVE 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
允许一种在解析过程中处理损坏记录的模式。它支持以下不区分大小写的模式。请注意,在列剪枝下,Spark 尝试仅解析 CSV 中的必需列。因此,根据所需的字段集,损坏的记录可能会有所不同。此行为可以通过 spark.sql.csv.parser.columnPruning.enabled(默认启用)进行控制。 1.0 PERMISSIVE:当遇到损坏的记录时,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式错误的字段设置为 null。要保留损坏的记录,用户可以在用户定义的架构中设置一个名为 columnNameOfCorruptRecord 的字符串类型字段。如果架构没有该字段,则在解析过程中会删除损坏的记录。令牌少于/多于架构的记录对于 CSV 而言不是损坏的记录。当遇到令牌少于架构长度的记录时,将为多余的字段设置 null。当记录的令牌多于架构的长度时,它将删除多余的令牌。 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
DROPMALFORMED:忽略整个损坏的记录。CSV 内置函数不支持此模式。 FAILFAST:当遇到损坏的记录时抛出异常。 columnNameOfCorruptRecord 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
spark.sql.columnNameOfCorruptRecord 配置的值) 允许重命名由 PERMISSIVE 模式创建的具有格式错误字符串的新字段。这将覆盖 spark.sql.columnNameOfCorruptRecord multiLine 一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
false 每个文件解析一条记录,该记录可能跨越多行。CSV 内置函数忽略此选项。 charToEscapeQuoteEscaping 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。
escape\0 设置用于转义引号字符的转义符的单个字符。默认值是转义字符(当转义字符和引号字符不同时),否则为 \0 samplingRatio
  • 1.0
  • 定义用于架构推断的行数比例。CSV 内置函数忽略此选项。
  • emptyValue
  • (用于读取),""(用于写入)
  • 设置空值的字符串表示形式。
一个标志,指示是否应始终将包含引号的值括在引号中。默认情况下,将转义包含引号字符的所有值。
locale en-US 以 IETF BCP 47 格式设置语言标签形式的区域设置。例如,这在解析日期和时间戳时使用。 false

lineSep