Hive 表

Spark SQL 也支持读写存储在 Apache Hive 中的数据。然而,由于 Hive 有大量的依赖项,这些依赖项不包含在默认的 Spark 发行版中。如果在 classpath 中找到 Hive 依赖项,Spark 将自动加载它们。请注意,这些 Hive 依赖项也必须存在于所有工作节点上,因为它们需要访问 Hive 序列化和反序列化库 (SerDes) 才能访问存储在 Hive 中的数据。

Hive 的配置通过将您的 hive-site.xmlcore-site.xml(用于安全配置)和 hdfs-site.xml(用于 HDFS 配置)文件放置在 conf/ 中来完成。

在使用 Hive 时,必须实例化支持 Hive 的 SparkSession,包括连接到持久的 Hive metastore、支持 Hive serdes 和 Hive 用户定义函数。没有现有 Hive 部署的用户仍然可以启用 Hive 支持。当未通过 hive-site.xml 配置时,上下文会在当前目录中自动创建 metastore_db,并创建一个由 spark.sql.warehouse.dir 配置的目录,该目录默认为 Spark 应用程序启动时当前目录中的 spark-warehouse 目录。请注意,hive-site.xml 中的 hive.metastore.warehouse.dir 属性自 Spark 2.0.0 起已弃用。相反,请使用 spark.sql.warehouse.dir 指定数据仓库中数据库的默认位置。您可能需要授予启动 Spark 应用程序的用户写入权限。

from os.path import abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key|  value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...

# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# |    500 |
# +--------+

# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...

# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# |  2| val_2|  2| val_2|
# |  4| val_4|  4| val_4|
# |  5| val_5|  5| val_5|
# ...
在 Spark 仓库中查找完整的示例代码,路径为 "examples/src/main/python/sql/hive.py"。
import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// |  0|
// |  1|
// |  2|
// ... Order may vary, as spark processes the partitions in parallel.

// Turn on flag for Hive Dynamic Partitioning
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()
在 Spark 仓库中查找完整的示例代码,路径为 "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala"。
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
    (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
    Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...
在 Spark 仓库中查找完整的示例代码,路径为 "examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java"。

在使用 Hive 时,必须实例化支持 Hive 的 SparkSession。这增加了在 MetaStore 中查找表以及使用 HiveQL 编写查询的支持。

# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
在 Spark 仓库中查找完整的示例代码,路径为 "examples/src/main/r/RSparkSQLExample.R"。

指定 Hive 表的存储格式

当您创建 Hive 表时,需要定义该表如何从文件系统读取/写入数据,即“输入格式”和“输出格式”。您还需要定义该表如何将数据反序列化为行,或将行序列化为数据,即“serde”。以下选项可用于指定存储格式(“serde”、“输入格式”、“输出格式”),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默认情况下,我们将以纯文本形式读取表文件。请注意,目前创建表时不支持 Hive 存储处理器,您可以在 Hive 端使用存储处理器创建表,然后使用 Spark SQL 读取它。

属性名称含义
fileFormat fileFormat 是一种存储格式规范的集合,包括“serde”、“输入格式”和“输出格式”。目前我们支持 6 种 fileFormat:'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile' 和 'avro'。
inputFormat, outputFormat 这两个选项指定相应的 InputFormatOutputFormat 类名称作为字符串字面量,例如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。这两个选项必须成对出现,并且如果您已经指定了 fileFormat 选项,则不能再指定它们。
serde 此选项指定 serde 类的名称。当指定 fileFormat 选项时,如果给定的 fileFormat 已经包含了 serde 信息,则不要指定此选项。目前 "sequencefile"、"textfile" 和 "rcfile" 不包含 serde 信息,您可以将此选项与这 3 种 fileFormat 一起使用。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与 "textfile" fileFormat 一起使用。它们定义了如何将分隔文件读取到行中。

所有其他使用 OPTIONS 定义的属性都将被视为 Hive serde 属性。

与不同版本的 Hive Metastore 交互

Spark SQL 的 Hive 支持最重要的部分之一是与 Hive metastore 的交互,这使得 Spark SQL 能够访问 Hive 表的元数据。从 Spark 1.4.0 开始,可以使用 Spark SQL 的单个二进制构建来查询不同版本的 Hive metastore,使用下面描述的配置。请注意,无论用于与 metastore 通信的 Hive 版本如何,Spark SQL 内部都将针对内置 Hive 进行编译,并使用这些类进行内部执行(serdes、UDF、UDAF 等)。

以下选项可用于配置用于检索元数据的 Hive 版本

属性名称默认值含义起始版本
spark.sql.hive.metastore.version 2.3.10 Hive metastore 的版本。可用选项为 2.0.02.3.103.0.03.1.3,以及 4.0.04.0.1 1.4.0
spark.sql.hive.metastore.jars 内置 用于实例化 HiveMetastoreClient 的 jar 包位置。此属性可以是以下四种选项之一
  1. 内置
  2. 使用 Hive 2.3.10,当启用 -Phive 时,它与 Spark assembly 捆绑在一起。选择此选项时,spark.sql.hive.metastore.version 必须是 2.3.10 或未定义。
  3. maven
  4. 使用从 Maven 仓库下载的指定版本的 Hive jar 包。此配置通常不建议用于生产部署。
  5. 路径
  6. 使用由 spark.sql.hive.metastore.jars.path 以逗号分隔格式配置的 Hive jar 包。支持本地或远程路径。提供的 jar 包应与 spark.sql.hive.metastore.version 版本相同。
  7. JVM 标准格式的 classpath。此 classpath 必须包含所有 Hive 及其依赖项,包括正确版本的 Hadoop。提供的 jar 包应与 spark.sql.hive.metastore.version 版本相同。这些 jar 包仅需存在于驱动程序上,但如果您在 YARN 集群模式下运行,则必须确保它们与您的应用程序一起打包。
1.4.0
spark.sql.hive.metastore.jars.path (空) 用于实例化 HiveMetastoreClient 的 jar 包的逗号分隔路径。此配置仅在 spark.sql.hive.metastore.jars 设置为 path 时有用。
路径可以是以下任何格式
  1. file://path/to/jar/foo.jar
  2. hdfs://nameservice/path/to/jar/foo.jar
  3. /path/to/jar/(没有 URI 方案的路径,遵循 conf fs.defaultFS 的 URI 方案)
  4. [http/https/ftp]://path/to/jar/foo.jar
请注意,1、2 和 3 支持通配符。例如
  1. file://path/to/jar/*,file://path2/to/jar/*/*.jar
  2. hdfs://nameservice/path/to/jar/*,hdfs://nameservice2/path/to/jar/*/*.jar
3.1.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

一个逗号分隔的类前缀列表,这些类应使用 Spark SQL 和特定版本的 Hive 之间共享的类加载器进行加载。需要共享的类的一个示例是与 metastore 通信所需的 JDBC 驱动程序。其他需要共享的类是那些与已共享的类进行交互的类。例如,log4j 使用的自定义 appender。

1.4.0
spark.sql.hive.metastore.barrierPrefixes (空)

一个逗号分隔的类前缀列表,这些类应该针对 Spark SQL 正在通信的每个 Hive 版本显式重新加载。例如,在通常会共享的前缀(即 org.apache.spark.*)中声明的 Hive UDF。

1.4.0