CSV 文件
Spark SQL 提供了 spark.read().csv("file_name")
来读取 CSV 格式的文件或文件目录到 Spark DataFrame 中,以及 dataframe.write().csv("path")
来写入到 CSV 文件。 函数 option()
可以用来定制读取或写入的行为,例如控制 header,分隔符,字符集等等。
# spark is from the previous example
sc = spark.sparkContext
# A CSV dataset is pointed to by path.
# The path can be either a single CSV file or a directory of CSV files
path = "examples/src/main/resources/people.csv"
df = spark.read.csv(path)
df.show()
# +------------------+
# | _c0|
# +------------------+
# | name;age;job|
# |Jorge;30;Developer|
# | Bob;32;Developer|
# +------------------+
# Read a csv with delimiter, the default delimiter is ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# | _c0|_c1| _c2|
# +-----+---+---------+
# | name|age| job|
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# Read a csv with delimiter and a header
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age| job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# You can also use options() to use multiple options
df4 = spark.read.options(delimiter=";", header=True).csv(path)
# "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")
# Read all files in a folder, please make sure only CSV files should present in the folder.
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# Wrong schema because non-CSV files are read
# +-----------+
# | _c0|
# +-----------+
# |238val_238|
# | 86val_86|
# |311val_311|
# | 27val_27|
# |165val_165|
# +-----------+
在 Spark 代码仓库的 "examples/src/main/python/sql/datasource.py" 中查找完整的示例代码。
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
val path = "examples/src/main/resources/people.csv"
val df = spark.read.csv(path)
df.show()
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// Read a csv with delimiter, the default delimiter is ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// Read a csv with delimiter and a header
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// You can also use options() to use multiple options
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)
// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write.csv("output")
// Read all files in a folder, please make sure only CSV files should present in the folder.
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
在 Spark 代码仓库的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 中查找完整的示例代码。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// A CSV dataset is pointed to by path.
// The path can be either a single CSV file or a directory of CSV files
String path = "examples/src/main/resources/people.csv";
Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// Read a csv with delimiter, the default delimiter is ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// Read a csv with delimiter and a header
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// You can also use options() to use multiple options
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);
// "output" is a folder which contains multiple csv files and a _SUCCESS file.
df3.write().csv("output");
// Read all files in a folder, please make sure only CSV files should present in the folder.
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// Wrong schema because non-CSV files are read
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
在 Spark 代码仓库的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中查找完整的示例代码。
数据源选项
CSV 的数据源选项可以通过以下方式设置
- the
.option
/.options
methods ofDataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
- 以下内置函数
from_csv
to_csv
schema_of_csv
OPTIONS
子句 在 CREATE TABLE USING DATA_SOURCE 中
属性名 | 默认值 | 含义 | 作用域 |
---|---|---|---|
sep |
, | 设置每个字段和值的分隔符。此分隔符可以是一个或多个字符。 | 读取/写入 |
encoding |
UTF-8 | 对于读取,使用给定的编码类型解码 CSV 文件。 对于写入,指定保存的 CSV 文件的编码 (字符集)。 CSV 内置函数忽略此选项。 | 读取/写入 |
quote |
" | 设置一个用于转义引用值的单字符,其中分隔符可以是值的一部分。 对于读取,如果要关闭引号,则需要设置的不是 null 而是空字符串。 对于写入,如果设置了空字符串,则使用 u0000 (空字符)。 |
读取/写入 |
quoteAll |
false | 一个标志,指示是否应始终将所有值括在引号中。 默认值是仅转义包含引号字符的值。 | 写入 |
escape |
\ | 设置一个用于转义已经引用的值中的引号的单字符。 | 读取/写入 |
escapeQuotes |
true | 一个标志,指示是否应始终将包含引号的值括在引号中。 默认值是转义所有包含引号字符的值。 | 写入 |
comment |
设置一个用于跳过以此字符开头的行的单字符。 默认情况下,它是禁用的。 | 读取 | |
header |
false | 对于读取,使用第一行作为列名。 对于写入,将列名作为第一行写入。 请注意,如果给定的路径是字符串的 RDD,则此 header 选项将删除所有与 header 相同的行(如果存在)。 CSV 内置函数忽略此选项。 | 读取/写入 |
inferSchema |
false | 从数据自动推断输入模式。 它需要对数据进行一次额外的传递。 CSV 内置函数忽略此选项。 | 读取 |
preferDate |
true | 在模式推断期间 (inferSchema ),如果值满足 dateFormat 选项或默认日期格式,则尝试将包含日期的字符串列推断为 Date 。 对于包含日期和时间戳混合的列,如果未指定时间戳格式,则尝试将它们推断为 TimestampType ,否则将它们推断为 StringType 。 |
读取 |
enforceSchema |
true | 如果设置为 true ,则指定的或推断的模式将被强制应用于数据源文件,并且 CSV 文件中的 header 将被忽略。 如果该选项设置为 false ,则在将 header 选项设置为 true 时,将针对 CSV 文件中的所有 header 验证该模式。 模式中的字段名称和 CSV header 中的列名通过它们的位置进行检查,同时考虑 spark.sql.caseSensitive 。 虽然默认值为 true,但建议禁用 enforceSchema 选项以避免不正确的结果。 CSV 内置函数忽略此选项。 |
读取 |
ignoreLeadingWhiteSpace |
false (对于读取), true (对于写入) |
一个标志,指示是否应跳过正在读取/写入的值中的前导空格。 | 读取/写入 |
ignoreTrailingWhiteSpace |
false (对于读取), true (对于写入) |
一个标志,指示是否应跳过正在读取/写入的值中的尾随空格。 | 读取/写入 |
nullValue |
设置 null 值的字符串表示形式。 从 2.0.1 开始,此 nullValue 参数适用于所有支持的类型,包括字符串类型。 |
读取/写入 | |
nanValue |
NaN | 设置非数字值的字符串表示形式。 | 读取 |
positiveInf |
Inf | 设置正无穷值的字符串表示形式。 | 读取 |
negativeInf |
-Inf | 设置负无穷值的字符串表示形式。 | 读取 |
dateFormat |
yyyy-MM-dd | 设置指示日期格式的字符串。 自定义日期格式遵循 Datetime Patterns 中的格式。 这适用于日期类型。 | 读取/写入 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] | 设置指示时间戳格式的字符串。 自定义日期格式遵循 Datetime Patterns 中的格式。 这适用于时间戳类型。 | 读取/写入 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] | 设置指示不带时区的时间戳格式的字符串。 自定义日期格式遵循 Datetime Patterns 中的格式。 这适用于不带时区的时间戳类型,请注意,写入或读取此数据类型时不支持区域偏移和时区组件。 | 读取/写入 |
enableDateTimeParsingFallback |
如果时间解析器策略具有旧版设置,或者未提供自定义日期或时间戳模式,则启用。 | 如果值与设置的模式不匹配,则允许回退到解析日期和时间戳的向后兼容(Spark 1.x 和 2.0)行为。 | 读取 |
maxColumns |
20480 | 定义记录可以拥有的列数的硬性限制。 | 读取 |
maxCharsPerColumn |
-1 | 定义允许读取的任何给定值的最大字符数。 默认情况下,它是 -1,表示长度不受限制 | 读取 |
mode |
PERMISSIVE | 允许一种用于处理解析期间的损坏记录的模式。 它支持以下不区分大小写的模式。 请注意,Spark 尝试在列修剪下仅解析 CSV 中所需的列。 因此,损坏的记录可能因所需的字段集而异。 此行为可以通过 spark.sql.csv.parser.columnPruning.enabled 控制(默认情况下启用)。
|
读取 |
columnNameOfCorruptRecord |
(spark.sql.columnNameOfCorruptRecord 配置的值) |
允许重命名由 PERMISSIVE 模式创建的具有格式错误字符串的新字段。 这将覆盖 spark.sql.columnNameOfCorruptRecord 。 |
读取 |
multiLine |
false | 解析每个文件的一条记录,该记录可能跨越多行。 CSV 内置函数忽略此选项。 | 读取 |
charToEscapeQuoteEscaping |
escape 或 \0 |
设置一个用于转义引号字符的转义的单字符。 当转义字符和引号字符不同时,默认值为转义字符,否则为 \0 。 |
读取/写入 |
samplingRatio |
1.0 | 定义用于模式推断的行的一部分。 CSV 内置函数忽略此选项。 | 读取 |
emptyValue |
(对于读取), "" (对于写入) |
设置空值的字符串表示形式。 | 读取/写入 |
locale |
en-US | 以 IETF BCP 47 格式设置语言标记的区域设置。 例如,这在解析日期和时间戳时使用。 | 读取 |
lineSep |
\r , \r\n and \n (对于读取), \n (对于写入) |
定义应用于解析/写入的行分隔符。 最大长度为 1 个字符。 CSV 内置函数忽略此选项。 | 读取/写入 |
unescapedQuoteHandling |
STOP_AT_DELIMITER | 定义 CsvParser 如何处理带有未转义引号的值。
|
读取 |
压缩 (compression) |
(无) (none) | 保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(none 、bzip2 、gzip 、lz4 、snappy 和 deflate )。 CSV 内置函数会忽略此选项。 |
写入 |
其他通用选项可以在 通用文件源选项 中找到。