JSON 文件

Spark SQL 可以自动推断 JSON 数据集的模式,并将其加载为 DataFrame。此转换可以通过在 JSON 文件上使用 SparkSession.read.json 来完成。

请注意,作为 json 文件 提供的文件并非典型的 JSON 文件。每行必须包含一个独立、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式(也称为按行分隔的 JSON)

对于常规的多行 JSON 文件,请将 multiLine 参数设置为 True

# spark is from the previous example.
sc = spark.sparkContext

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
path = "examples/src/main/resources/people.json"
peopleDF = spark.read.json(path)

# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
# +------+
# |  name|
# +------+
# |Justin|
# +------+

# Alternatively, a DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
# +---------------+----+
# |        address|name|
# +---------------+----+
# |[Columbus,Ohio]| Yin|
# +---------------+----+
完整示例代码请参见 Spark 仓库中的“examples/src/main/python/sql/datasource.py”。

Spark SQL 可以自动推断 JSON 数据集的模式,并将其加载为 Dataset[Row]。此转换可以通过在 Dataset[String] 或 JSON 文件上使用 SparkSession.read.json() 来完成。

请注意,作为 json 文件 提供的文件并非典型的 JSON 文件。每行必须包含一个独立、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式(也称为按行分隔的 JSON)

对于常规的多行 JSON 文件,请将 multiLine 选项设置为 true

// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
完整示例代码请参见 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”。

Spark SQL 可以自动推断 JSON 数据集的模式,并将其加载为 Dataset<Row>。此转换可以通过在 Dataset<String> 或 JSON 文件上使用 SparkSession.read().json() 来完成。

请注意,作为 json 文件 提供的文件并非典型的 JSON 文件。每行必须包含一个独立、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式(也称为按行分隔的 JSON)

对于常规的多行 JSON 文件,请将 multiLine 选项设置为 true

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
        "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
完整示例代码请参见 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”。

Spark SQL 可以自动推断 JSON 数据集的模式,并将其加载为 DataFrame,使用 read.json() 函数,该函数从 JSON 文件目录加载数据,其中文件的每一行都是一个 JSON 对象。

请注意,作为 json 文件 提供的文件并非典型的 JSON 文件。每行必须包含一个独立、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式(也称为按行分隔的 JSON)

对于常规的多行 JSON 文件,请将命名参数 multiLine 设置为 TRUE

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files.
path <- "examples/src/main/resources/people.json"
# Create a DataFrame from the file(s) pointed to by path
people <- read.json(path)

# The inferred schema can be visualized using the printSchema() method.
printSchema(people)
## root
##  |-- age: long (nullable = true)
##  |-- name: string (nullable = true)

# Register this DataFrame as a table.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql methods.
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##     name
## 1 Justin
完整示例代码请参见 Spark 仓库中的“examples/src/main/r/RSparkSQLExample.R”。
CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path "examples/src/main/resources/people.json"
)

SELECT * FROM jsonTable

数据源选项

JSON 数据源选项可以通过以下方式设置:

属性名称默认值含义范围
timeZone (spark.sql.session.timeZone 配置的值) 设置一个字符串,指示用于格式化 JSON 数据源或分区值中时间戳的时区 ID。timeZone 支持以下格式:
  • 基于区域的时区 ID:应采用“区域/城市”的形式,例如“America/Los_Angeles”。
  • 时区偏移:应采用“(+|-)HH:mm”格式,例如“-08:00”或“+01:00”。“UTC”和“Z”也支持作为“+00:00”的别名。
不建议使用“CST”等其他短名称,因为它们可能存在歧义。
读/写
primitivesAsString false 将所有原始值推断为字符串类型。
prefersDecimal false 将所有浮点值推断为十进制类型。如果值不适合十进制,则将其推断为双精度类型。
allowComments false 忽略 JSON 记录中的 Java/C++ 风格注释。
allowUnquotedFieldNames false 允许不带引号的 JSON 字段名称。
allowSingleQuotes true 允许使用单引号以及双引号。
allowNumericLeadingZeros false 允许数字中包含前导零(例如 00012)。
allowBackslashEscapingAnyCharacter false 允许使用反斜杠引用机制接受所有字符的引用。
mode PERMISSIVE 允许一种处理解析过程中损坏记录的模式。
  • PERMISSIVE:当遇到损坏的记录时,将格式不正确的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式不正确的字段设置为 null。为了保留损坏的记录,用户可以在用户定义的模式中设置一个名为 columnNameOfCorruptRecord 的字符串类型字段。如果模式没有该字段,它会在解析过程中丢弃损坏的记录。在推断模式时,它会在输出模式中隐式添加一个 columnNameOfCorruptRecord 字段。
  • DROPMALFORMED:忽略所有损坏的记录。JSON 内置函数不支持此模式。
  • FAILFAST:当遇到损坏的记录时抛出异常。
columnNameOfCorruptRecord (spark.sql.columnNameOfCorruptRecord 配置的值) 允许重命名 PERMISSIVE 模式创建的包含格式不正确字符串的新字段。这将覆盖 spark.sql.columnNameOfCorruptRecord。
dateFormat yyyy-MM-dd 设置一个字符串,指示日期格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于日期类型。 读/写
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 设置一个字符串,指示时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于时间戳类型。 读/写
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 设置一个字符串,指示不带时区的时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于不带时区的时间戳类型,请注意,在写入或读取此数据类型时不支持区域偏移和时区组件。 读/写
enableDateTimeParsingFallback 如果时间解析器策略具有旧版设置或未提供自定义日期或时间戳模式,则启用。 如果值与设置的模式不匹配,则允许回退到解析日期和时间戳的向后兼容(Spark 1.x 和 2.0)行为。
multiLine false 按文件解析一条记录,该记录可能跨多行。JSON 内置函数忽略此选项。
allowUnquotedControlChars false 允许 JSON 字符串包含未引用的控制字符(ASCII 值小于 32 的字符,包括制表符和换行符)或不包含。
encoding multiLine 设置为 true 时(用于读取)自动检测,UTF-8(用于写入) 对于读取,允许强制为 JSON 文件设置一种标准基本或扩展编码。例如 UTF-16BE、UTF-32LE。对于写入,指定保存的 JSON 文件的编码(字符集)。JSON 内置函数忽略此选项。 读/写
lineSep \r, \r\n, \n (用于读取),\n (用于写入) 定义应在解析时使用的行分隔符。JSON 内置函数忽略此选项。 读/写
samplingRatio 1.0 定义用于模式推断的输入 JSON 对象的分数。
dropFieldIfAllNull false 在模式推断期间是否忽略所有空值或空数组的列。
locale en-US 以 IETF BCP 47 格式设置区域设置为语言标签。例如,在解析日期和时间戳时使用 locale
allowNonNumericNumbers true 允许 JSON 解析器将一组“非数字”(NaN)标记识别为合法的浮点数值。
  • +INF:表示正无穷,以及 +InfinityInfinity 的别名。
  • -INF:表示负无穷,别名 -Infinity
  • NaN:表示其他非数字,例如除以零的结果。
compression (无) 保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(none、bzip2、gzip、lz4、snappy 和 deflate)。JSON 内置函数忽略此选项。
ignoreNullFields (spark.sql.jsonGenerator.ignoreNullFields 配置的值) 生成 JSON 对象时是否忽略空字段。
useUnsafeRow (spark.sql.json.useUnsafeRow 配置的值) 在 JSON 解析器中是否使用 UnsafeRow 表示结构结果。

其他通用选项可以在 通用文件源选项 中找到。