XML 文件

Spark SQL 提供了 spark.read().xml("file_1_path","file_2_path") 来将 XML 格式的文件或文件目录读取到 Spark DataFrame 中,并提供了 dataframe.write().xml("path") 来写入 XML 文件。必须指定 rowTag 选项,以指示映射到 DataFrame 行的 XML 元素。option() 函数可用于自定义读写行为,例如控制 XML 属性、XSD 验证、压缩等行为。

# Primitive types (Int, String, etc) and Product types (case classes) encoders are
# supported by importing this when creating a Dataset.
# An XML dataset is pointed to by path.
# The path can be either a single xml file or more xml files
path = "examples/src/main/resources/people.xml"
peopleDF = spark.read.option("rowTag", "person").format("xml").load(path)

# The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

# SQL statements can be run by using the sql methods provided by spark
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
# +------+
# |  name|
# +------+
# |Justin|
# +------+

# Alternatively, a DataFrame can be created for an XML dataset represented by a Dataset[String]
xmlStrings = ["""
      <person>
          <name>laglangyue</name>
          <job>Developer</job>
          <age>28</age>
      </person>
    """]
xmlRDD = spark.sparkContext.parallelize(xmlStrings)
otherPeople = spark.read \
    .option("rowTag", "person") \
    .xml(xmlRDD)
otherPeople.show()
# +---+---------+----------+
# |age|      job|      name|
# +---+---------+----------+
# | 28|Developer|laglangyue|
# +---+---------+----------+
完整的示例代码可在 Spark 仓库中的 "examples/src/main/python/sql/datasource.py" 找到。
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// An XML dataset is pointed to by path.
// The path can be either a single xml file or more xml files
val path = "examples/src/main/resources/people.xml"
val peopleDF = spark.read.option("rowTag", "person").xml(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a XML dataset represented by a Dataset[String]
val otherPeopleDataset = spark.createDataset(
  """
    |<person>
    |    <name>laglangyue</name>
    |    <job>Developer</job>
    |    <age>28</age>
    |</person>
    |""".stripMargin :: Nil)
val otherPeople = spark.read
  .option("rowTag", "person")
  .xml(otherPeopleDataset)
otherPeople.show()
// +---+---------+----------+
// |age|      job|      name|
// +---+---------+----------+
// | 28|Developer|laglangyue|
// +---+---------+----------+
完整的示例代码可在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 找到。
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.

// An XML dataset is pointed to by path.
// The path can be either a single xml file or more xml files
String path = "examples/src/main/resources/people.xml";
Dataset<Row> peopleDF = spark.read().option("rowTag", "person").xml(path);

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagerNamesDF = spark.sql(
        "SELECT name FROM people WHERE age BETWEEN 13 AND 19");
teenagerNamesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for an XML dataset represented by a Dataset[String]
List<String> xmlData = Collections.singletonList(
        "<person>" +
        "<name>laglangyue</name><job>Developer</job><age>28</age>" +
        "</person>");
Dataset<String> otherPeopleDataset = spark.createDataset(Lists.newArrayList(xmlData),
        Encoders.STRING());

Dataset<Row> otherPeople = spark.read()
    .option("rowTag", "person")
    .xml(otherPeopleDataset);
otherPeople.show();
// +---+---------+----------+
// |age|      job|      name|
// +---+---------+----------+
// | 28|Developer|laglangyue|
// +---+---------+----------+
完整的示例代码可在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 找到。

数据源选项

XML 数据源选项可以通过以下方式设置

属性名称默认值含义范围
rowTag XML 文件中用作行的行标签。例如,在以下 XML 中:<books><book></book>...</books>,适当的值将是 book。这是读写操作的必选选项。 读/写
samplingRatio 1.0 定义用于推断模式的行分数。XML 内置函数会忽略此选项。
excludeAttribute false 是否排除元素中的属性。
mode PERMISSIVE 允许一种在解析过程中处理损坏记录的模式。
  • PERMISSIVE:当遇到损坏记录时,将格式不正确的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将格式不正确的字段设置为 null。为了保留损坏记录,用户可以在用户定义的模式中设置一个名为 columnNameOfCorruptRecord 的字符串类型字段。如果模式没有该字段,它将在解析过程中丢弃损坏记录。在推断模式时,它会在输出模式中隐式添加一个 columnNameOfCorruptRecord 字段。
  • DROPMALFORMED:忽略所有损坏的记录。XML 内置函数不支持此模式。
  • FAILFAST:当遇到损坏记录时抛出异常。
inferSchema true 如果为 true,则尝试为每个生成的 DataFrame 列推断适当的类型。如果为 false,则所有生成的列都为字符串类型。
columnNameOfCorruptRecord spark.sql.columnNameOfCorruptRecord 允许重命名由 PERMISSIVE 模式创建的具有格式不正确字符串的新字段。
attributePrefix _ 用于区分属性和元素的属性前缀。这将是字段名称的前缀。读取 XML 时可以为空,但写入时不行。 读/写
valueTag _VALUE 当元素中包含没有子项的属性时,用于值的标签。 读/写
encoding UTF-8 对于读取,根据给定的编码类型解码 XML 文件。对于写入,指定保存的 XML 文件的编码(字符集)。XML 内置函数会忽略此选项。 读/写
ignoreSurroundingSpaces true 定义是否应跳过从读取值中环绕的空格。
rowValidationXSDPath null 指向可选 XSD 文件的路径,该文件用于单独验证每一行的 XML。未能通过验证的行将像上面一样被视为解析错误。XSD 不会以其他方式影响提供的或推断的模式。
ignoreNamespace false 如果为 true,则忽略 XML 元素和属性上的命名空间前缀。例如,标签 <abc:author> 和 <def:author> 都将被视为 <author>。请注意,目前无法在 rowTag 元素上忽略命名空间,只能忽略其子元素。请注意,即使为 false,XML 解析通常也不是命名空间感知的。
timeZone (spark.sql.session.timeZone 配置的值) 设置一个字符串,指示用于格式化 XML 数据源或分区值中时间戳的时区 ID。支持以下 timeZone 格式
  • 基于区域的时区 ID:它应采用“区域/城市”的形式,例如“America/Los_Angeles”。
  • 区域偏移量:它应采用“(+|-)HH:mm”的格式,例如“-08:00”或“+01:00”,同时支持“UTC”和“Z”作为“+00:00”的别名。
不建议使用“CST”等其他短名称,因为它们可能存在歧义。
读/写
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 设置一个字符串,指示时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于时间戳类型。 读/写
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 设置一个字符串,指示无时区时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于无时区时间戳类型,请注意在写入或读取此数据类型时不支持区域偏移量和时区组件。 读/写
dateFormat yyyy-MM-dd 设置一个字符串,指示日期格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于日期类型。 读/写
locale en-US 以 IETF BCP 47 格式设置区域设置作为语言标签。例如,在解析日期和时间戳时使用区域设置。 读/写
rootTag ROWS XML 文件的根标签。例如,在以下 XML 中:<books><book></book>...</books>,适当的值将是 books。通过指定一个像“books”这样的值,它可以包含基本属性。
declaration version="1.0" encoding="UTF-8" standalone="yes" 在每个输出 XML 文件开头、rootTag 之前要写入的 XML 声明内容。例如,值 foo 将导致被写入。设置为空字符串以抑制
arrayElementName item 写入时包含数组值列的每个元素的 XML 元素的名称。
nullValue null 设置 null 值的字符串表示形式。默认为字符串 null。当此值为 null 时,不写入字段的属性和元素。 读/写
wildcardColName xs_any 所提供模式中存在的列的名称,该列被解释为“通配符”。它必须是字符串类型或字符串数组类型。它将匹配任何未被模式匹配的 XML 子元素。子元素的 XML 将成为该列的字符串值。如果是一个数组,则所有未匹配的元素将作为字符串数组返回。顾名思义,它旨在模仿 XSD 的 xs:any 类型。
compression none 保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(none、bzip2、gzip、lz4、snappy 和 deflate)。XML 内置函数会忽略此选项。
validateName true 如果为 true,则在 XML 元素名称验证失败时抛出错误。例如,SQL 字段名可以有空格,但 XML 元素名不能。

其他通用选项可在 通用文件源选项 中找到。