JSON 文件
Spark SQL 可以自动推断 JSON 数据集的 schema 并将其加载为 DataFrame。 此转换可以使用 SparkSession.read.json
在 JSON 文件上完成。
请注意,作为JSON文件提供的文件不是典型的JSON文件。 每行必须包含一个单独的、自包含的有效JSON对象。 有关更多信息,请参阅 JSON Lines 文本格式,也称为换行符分隔的 JSON。
对于常规的多行 JSON 文件,请将 multiLine
参数设置为 True
。
# spark is from the previous example.
sc = spark.sparkContext
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
path = "examples/src/main/resources/people.json"
peopleDF = spark.read.json(path)
# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
# Alternatively, a DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
# +---------------+----+
# | address|name|
# +---------------+----+
# |[Columbus,Ohio]| Yin|
# +---------------+----+
Spark SQL 可以自动推断 JSON 数据集的 schema 并将其加载为 Dataset[Row]
。此转换可以使用 SparkSession.read.json()
在 Dataset[String]
或 JSON 文件上完成。
请注意,作为JSON文件提供的文件不是典型的JSON文件。 每行必须包含一个单独的、自包含的有效JSON对象。 有关更多信息,请参阅 JSON Lines 文本格式,也称为换行符分隔的 JSON。
对于常规的多行 JSON 文件,请将 multiLine
选项设置为 true
。
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Spark SQL 可以自动推断 JSON 数据集的 schema 并将其加载为 Dataset<Row>
。此转换可以使用 SparkSession.read().json()
在 Dataset<String>
或 JSON 文件上完成。
请注意,作为JSON文件提供的文件不是典型的JSON文件。 每行必须包含一个单独的、自包含的有效JSON对象。 有关更多信息,请参阅 JSON Lines 文本格式,也称为换行符分隔的 JSON。
对于常规的多行 JSON 文件,请将 multiLine
选项设置为 true
。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Spark SQL 可以使用 read.json()
函数自动推断 JSON 数据集的 schema 并将其加载为 DataFrame。该函数从 JSON 文件目录加载数据,其中文件的每一行都是一个 JSON 对象。
请注意,作为JSON文件提供的文件不是典型的JSON文件。 每行必须包含一个单独的、自包含的有效JSON对象。 有关更多信息,请参阅 JSON Lines 文本格式,也称为换行符分隔的 JSON。
对于常规的多行 JSON 文件,请将命名参数 multiLine
设置为 TRUE
。
# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path <- "examples/src/main/resources/people.json"
# Create a DataFrame from the file(s) pointed to by path
people <- read.json(path)
# The inferred schema can be visualized using the printSchema() method.
printSchema(people)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# Register this DataFrame as a table.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql methods.
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
path "examples/src/main/resources/people.json"
)
SELECT * FROM jsonTable
数据源选项
JSON 的数据源选项可以通过以下方式设置
.option
/.options
方法DataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
- 以下内置函数
from_json
to_json
schema_of_json
OPTIONS
子句,请参见 CREATE TABLE USING DATA_SOURCE
属性名 | 默认值 | 含义 | 范围 |
---|---|---|---|
timeZone |
(spark.sql.session.timeZone 配置的值) |
设置一个字符串,指示用于格式化 JSON 数据源或分区值中时间戳的时区 ID。 支持以下 timeZone 格式
|
读/写 |
primitivesAsString |
false |
将所有基本类型值推断为字符串类型。 | 读 |
prefersDecimal |
false |
将所有浮点值推断为 decimal 类型。 如果这些值不适合 decimal 类型,则将它们推断为 double 类型。 | 读 |
allowComments |
false |
忽略 JSON 记录中的 Java/C++ 风格注释。 | 读 |
allowUnquotedFieldNames |
false |
允许不带引号的 JSON 字段名称。 | 读 |
allowSingleQuotes |
true |
除了双引号外,还允许使用单引号。 | 读 |
allowNumericLeadingZeros |
false |
允许数字中包含前导零(例如 00012)。 | 读 |
allowBackslashEscapingAnyCharacter |
false |
允许接受使用反斜杠转义机制对所有字符进行引号。 | 读 |
mode |
PERMISSIVE |
允许一种模式来处理解析期间损坏的记录。
|
读 |
columnNameOfCorruptRecord |
(spark.sql.columnNameOfCorruptRecord 配置的值) |
允许重命名由 PERMISSIVE 模式创建的具有格式错误的字符串的新字段。 这会覆盖 spark.sql.columnNameOfCorruptRecord。 |
读 |
dateFormat |
yyyy-MM-dd |
设置一个字符串,指示日期格式。 自定义日期格式遵循 datetime pattern 的格式。 这适用于日期类型。 | 读/写 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
设置一个字符串,指示时间戳格式。 自定义日期格式遵循 datetime pattern 的格式。 这适用于时间戳类型。 | 读/写 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] | 设置一个字符串,指示没有时区的时间戳格式。 自定义日期格式遵循 Datetime Patterns 的格式。 这适用于没有时区的时间戳类型,请注意,写入或读取此数据类型时不支持区域偏移量和时区组件。 | 读/写 |
enableDateTimeParsingFallback |
如果时间解析器策略具有旧版设置,或者未提供自定义日期或时间戳模式,则启用。 | 如果值与设置的模式不匹配,则允许回退到解析日期和时间戳的向后兼容(Spark 1.x 和 2.0)行为。 | 读 |
multiLine |
false |
解析每个文件的一个记录,该记录可能跨越多行。 JSON 内置函数忽略此选项。 | 读 |
allowUnquotedControlChars |
false |
允许 JSON 字符串包含不带引号的控制字符(值小于 32 的 ASCII 字符,包括制表符和换行符)或不允许包含。 | 读 |
encoding |
当 multiLine 设置为 true 时自动检测 (用于读取), UTF-8 (用于写入) |
对于读取,允许强制为 JSON 文件设置标准基本或扩展编码之一。 例如 UTF-16BE, UTF-32LE。 对于写入,指定保存的 json 文件的编码(字符集)。 JSON 内置函数忽略此选项。 | 读/写 |
lineSep |
\r , \r\n , \n (用于读取), \n (用于写入) |
定义应用于解析的行分隔符。 JSON 内置函数忽略此选项。 | 读/写 |
samplingRatio |
1.0 |
定义用于 schema 推断的输入 JSON 对象的比例。 | 读 |
dropFieldIfAllNull |
false |
在 schema 推断期间是否忽略所有 null 值或空数组的列。 | 读 |
locale |
en-US |
设置一个语言环境作为 IETF BCP 47 格式的语言标记。 例如,在解析日期和时间戳时使用 locale 。 |
读 |
allowNonNumericNumbers |
true |
允许 JSON 解析器将一组“非数字”(NaN)标记识别为合法的浮点数值。
|
读 |
compression |
(none) | 保存到文件时使用的压缩编解码器。 这可以是已知的不区分大小写的缩短名称之一(none、bzip2、gzip、lz4、snappy 和 deflate)。 JSON 内置函数忽略此选项。 | write |
ignoreNullFields |
(spark.sql.jsonGenerator.ignoreNullFields 配置的值) |
生成 JSON 对象时是否忽略空字段。 | write |
其他通用选项可以在 通用文件源选项 中找到。