XML 文件
Spark SQL 提供了 spark.read().xml("file_1_path","file_2_path")
来将 XML 格式的文件或文件目录读取到 Spark DataFrame 中,并提供了 dataframe.write().xml("path")
来写入 XML 文件。必须指定 rowTag
选项,以指示映射到 DataFrame 行
的 XML 元素。option() 函数可用于自定义读写行为,例如控制 XML 属性、XSD 验证、压缩等行为。
# Primitive types (Int, String, etc) and Product types (case classes) encoders are
# supported by importing this when creating a Dataset.
# An XML dataset is pointed to by path.
# The path can be either a single xml file or more xml files
path = "examples/src/main/resources/people.xml"
peopleDF = spark.read.option("rowTag", "person").format("xml").load(path)
# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
# Alternatively, a DataFrame can be created for an XML dataset represented by a Dataset[String]
xmlStrings = ["""
<person>
<name>laglangyue</name>
<job>Developer</job>
<age>28</age>
</person>
"""]
xmlRDD = spark.sparkContext.parallelize(xmlStrings)
otherPeople = spark.read \
.option("rowTag", "person") \
.xml(xmlRDD)
otherPeople.show()
# +---+---------+----------+
# |age| job| name|
# +---+---------+----------+
# | 28|Developer|laglangyue|
# +---+---------+----------+
完整的示例代码可在 Spark 仓库中的 "examples/src/main/python/sql/datasource.py" 找到。
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// An XML dataset is pointed to by path.
// The path can be either a single xml file or more xml files
val path = "examples/src/main/resources/people.xml"
val peopleDF = spark.read.option("rowTag", "person").xml(path)
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for a XML dataset represented by a Dataset[String]
val otherPeopleDataset = spark.createDataset(
"""
|<person>
| <name>laglangyue</name>
| <job>Developer</job>
| <age>28</age>
|</person>
|""".stripMargin :: Nil)
val otherPeople = spark.read
.option("rowTag", "person")
.xml(otherPeopleDataset)
otherPeople.show()
// +---+---------+----------+
// |age| job| name|
// +---+---------+----------+
// | 28|Developer|laglangyue|
// +---+---------+----------+
完整的示例代码可在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 找到。
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
// An XML dataset is pointed to by path.
// The path can be either a single xml file or more xml files
String path = "examples/src/main/resources/people.xml";
Dataset<Row> peopleDF = spark.read().option("rowTag", "person").xml(path);
// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagerNamesDF = spark.sql(
"SELECT name FROM people WHERE age BETWEEN 13 AND 19");
teenagerNamesDF.show();
// +------+
// | name|
// +------+
// |Justin|
// +------+
// Alternatively, a DataFrame can be created for an XML dataset represented by a Dataset[String]
List<String> xmlData = Collections.singletonList(
"<person>" +
"<name>laglangyue</name><job>Developer</job><age>28</age>" +
"</person>");
Dataset<String> otherPeopleDataset = spark.createDataset(Lists.newArrayList(xmlData),
Encoders.STRING());
Dataset<Row> otherPeople = spark.read()
.option("rowTag", "person")
.xml(otherPeopleDataset);
otherPeople.show();
// +---+---------+----------+
// |age| job| name|
// +---+---------+----------+
// | 28|Developer|laglangyue|
// +---+---------+----------+
完整的示例代码可在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 找到。
数据源选项
XML 数据源选项可以通过以下方式设置
- 的
.option
/.options
方法DataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
- 以下内置函数
from_xml
to_xml
schema_of_xml
OPTIONS
子句在 CREATE TABLE USING DATA_SOURCE
属性名称 | 默认值 | 含义 | 范围 |
---|---|---|---|
rowTag |
XML 文件中用作行的行标签。例如,在以下 XML 中: ,适当的值将是 book。这是读写操作的必选选项。 |
读/写 | |
samplingRatio |
1.0 |
定义用于推断模式的行分数。XML 内置函数会忽略此选项。 | 读 |
excludeAttribute |
false |
是否排除元素中的属性。 | 读 |
mode |
PERMISSIVE |
允许一种在解析过程中处理损坏记录的模式。
|
读 |
inferSchema |
true |
如果为 true,则尝试为每个生成的 DataFrame 列推断适当的类型。如果为 false,则所有生成的列都为字符串类型。 | 读 |
columnNameOfCorruptRecord |
spark.sql.columnNameOfCorruptRecord |
允许重命名由 PERMISSIVE 模式创建的具有格式不正确字符串的新字段。 | 读 |
attributePrefix |
_ |
用于区分属性和元素的属性前缀。这将是字段名称的前缀。读取 XML 时可以为空,但写入时不行。 | 读/写 |
valueTag |
_VALUE |
当元素中包含没有子项的属性时,用于值的标签。 | 读/写 |
encoding |
UTF-8 |
对于读取,根据给定的编码类型解码 XML 文件。对于写入,指定保存的 XML 文件的编码(字符集)。XML 内置函数会忽略此选项。 | 读/写 |
ignoreSurroundingSpaces |
true |
定义是否应跳过从读取值中环绕的空格。 | 读 |
rowValidationXSDPath |
null |
指向可选 XSD 文件的路径,该文件用于单独验证每一行的 XML。未能通过验证的行将像上面一样被视为解析错误。XSD 不会以其他方式影响提供的或推断的模式。 | 读 |
ignoreNamespace |
false |
如果为 true,则忽略 XML 元素和属性上的命名空间前缀。例如,标签 <abc:author> 和 <def:author> 都将被视为 <author>。请注意,目前无法在 rowTag 元素上忽略命名空间,只能忽略其子元素。请注意,即使为 false,XML 解析通常也不是命名空间感知的。 | 读 |
timeZone |
(spark.sql.session.timeZone 配置的值) |
设置一个字符串,指示用于格式化 XML 数据源或分区值中时间戳的时区 ID。支持以下 timeZone 格式
|
读/写 |
timestampFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
设置一个字符串,指示时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于时间戳类型。 | 读/写 |
timestampNTZFormat |
yyyy-MM-dd'T'HH:mm:ss[.SSS] | 设置一个字符串,指示无时区时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于无时区时间戳类型,请注意在写入或读取此数据类型时不支持区域偏移量和时区组件。 | 读/写 |
dateFormat |
yyyy-MM-dd |
设置一个字符串,指示日期格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于日期类型。 | 读/写 |
locale |
en-US |
以 IETF BCP 47 格式设置区域设置作为语言标签。例如,在解析日期和时间戳时使用区域设置。 | 读/写 |
rootTag |
ROWS |
XML 文件的根标签。例如,在以下 XML 中: ,适当的值将是 books。通过指定一个像“books”这样的值,它可以包含基本属性。 |
写 |
declaration |
version="1.0" encoding="UTF-8" standalone="yes" |
在每个输出 XML 文件开头、rootTag 之前要写入的 XML 声明内容。例如,值 foo 将导致被写入。设置为空字符串以抑制 | 写 |
arrayElementName |
item |
写入时包含数组值列的每个元素的 XML 元素的名称。 | 写 |
nullValue |
null | 设置 null 值的字符串表示形式。默认为字符串 null。当此值为 null 时,不写入字段的属性和元素。 | 读/写 |
wildcardColName |
xs_any |
所提供模式中存在的列的名称,该列被解释为“通配符”。它必须是字符串类型或字符串数组类型。它将匹配任何未被模式匹配的 XML 子元素。子元素的 XML 将成为该列的字符串值。如果是一个数组,则所有未匹配的元素将作为字符串数组返回。顾名思义,它旨在模仿 XSD 的 xs:any 类型。 | 读 |
compression |
none |
保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(none、bzip2、gzip、lz4、snappy 和 deflate)。XML 内置函数会忽略此选项。 | 写 |
validateName |
true |
如果为 true,则在 XML 元素名称验证失败时抛出错误。例如,SQL 字段名可以有空格,但 XML 元素名不能。 | 写 |
其他通用选项可在 通用文件源选项 中找到。