Hive 表

Spark SQL 也支持读取和写入存储在 Apache Hive 中的数据。 但是,由于 Hive 具有大量的依赖项,因此默认的 Spark 发行版中不包含这些依赖项。 如果可以在类路径上找到 Hive 依赖项,Spark 将自动加载它们。 请注意,这些 Hive 依赖项也必须存在于所有工作节点上,因为它们需要访问 Hive 序列化和反序列化库 (SerDes) 才能访问存储在 Hive 中的数据。

Hive 的配置是通过将您的 hive-site.xmlcore-site.xml (用于安全配置) 和 hdfs-site.xml (用于 HDFS 配置) 文件放在 conf/ 中来完成的。

使用 Hive 时,必须实例化带有 Hive 支持的 SparkSession,包括连接到持久 Hive Metastore、支持 Hive serdes 和 Hive 用户定义函数。 没有现有 Hive 部署的用户仍然可以启用 Hive 支持。 如果未通过 hive-site.xml 进行配置,则上下文会自动在当前目录中创建 metastore_db,并创建一个由 spark.sql.warehouse.dir 配置的目录,该目录默认为 Spark 应用程序启动的当前目录中的 spark-warehouse 目录。 请注意,自 Spark 2.0.0 起,hive.metastore.warehouse.dir 属性在 hive-site.xml 中已弃用。 请改用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置。 您可能需要授予启动 Spark 应用程序的用户写权限。

from os.path import abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key|  value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...

# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# |    500 |
# +--------+

# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...

# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# |  2| val_2|  2| val_2|
# |  4| val_4|  4| val_4|
# |  5| val_5|  5| val_5|
# ...
在 Spark 仓库的 "examples/src/main/python/sql/hive.py" 中找到完整的示例代码。
import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// |  0|
// |  1|
// |  2|
// ... Order may vary, as spark processes the partitions in parallel.

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" 中找到完整的示例代码。
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
    (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
    Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java" 中找到完整的示例代码。

使用 Hive 时,必须实例化带有 Hive 支持的 SparkSession。 这增加了对在 MetaStore 中查找表和使用 HiveQL 编写查询的支持。

# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
在 Spark 仓库的 "examples/src/main/r/RSparkSQLExample.R" 中找到完整的示例代码。

为 Hive 表指定存储格式

创建 Hive 表时,需要定义此表应如何从/向文件系统读取/写入数据,即“输入格式”和“输出格式”。 您还需要定义此表应如何将数据反序列化为行,或将行序列化为数据,即“serde”。 以下选项可用于指定存储格式(“serde”、“输入格式”、“输出格式”),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。 默认情况下,我们将读取表文件作为纯文本。 请注意,创建表时尚不支持 Hive 存储处理程序,您可以在 Hive 端使用存储处理程序创建表,并使用 Spark SQL 读取它。

属性名称含义
fileFormat fileFormat 是一种存储格式规范的包,包括“serde”、“输入格式”和“输出格式”。 目前我们支持 6 种 fileFormat:'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile' 和 'avro'。
inputFormat, outputFormat 这两个选项将对应的 InputFormatOutputFormat 类的名称指定为字符串文字,例如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。 这两个选项必须成对出现,如果已经指定了 fileFormat 选项,则不能指定它们。
serde 此选项指定 serde 类的名称。 如果指定了 fileFormat 选项,则如果给定的 fileFormat 已经包含 serde 的信息,则不要指定此选项。 目前,“sequencefile”、“textfile”和“rcfile”不包含 serde 信息,您可以将此选项与这 3 种 fileFormat 结合使用。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与 "textfile" fileFormat 结合使用。 它们定义了如何将分隔文件读取为行。

使用 OPTIONS 定义的所有其他属性将被视为 Hive serde 属性。

与不同版本的 Hive Metastore 交互

Spark SQL 的 Hive 支持最重要的部分之一是与 Hive Metastore 的交互,这使 Spark SQL 能够访问 Hive 表的元数据。 从 Spark 1.4.0 开始,可以使用 Spark SQL 的单个二进制版本来查询不同版本的 Hive Metastore,使用下面描述的配置。 请注意,无论用于与 Metastore 通信的 Hive 版本如何,Spark SQL 内部都会针对内置 Hive 进行编译,并使用这些类进行内部执行(serdes、UDF、UDAF 等)。

以下选项可用于配置用于检索元数据的 Hive 版本

属性名称默认值含义自从版本
spark.sql.hive.metastore.version 2.3.9 Hive Metastore 的版本。 可用选项为 0.12.02.3.93.0.03.1.3 1.4.0
spark.sql.hive.metastore.jars builtin 应用于实例化 HiveMetastoreClient 的 jars 的位置。 此属性可以是以下四个选项之一
  1. builtin
  2. 使用 Hive 2.3.9,当启用 -Phive 时,它与 Spark 程序集捆绑在一起。 选择此选项时,spark.sql.hive.metastore.version 必须为 2.3.9 或未定义。
  3. maven
  4. 使用从 Maven 存储库下载的指定版本的 Hive jars。 通常不建议将此配置用于生产部署。
  5. path
  6. 使用由 spark.sql.hive.metastore.jars.path 配置的 Hive jars,以逗号分隔的格式。 支持本地路径和远程路径。 提供的 jars 应与 spark.sql.hive.metastore.version 的版本相同。
  7. 用于 JVM 的标准格式的类路径。 此类路径必须包含 Hive 及其所有依赖项,包括正确版本的 Hadoop。 提供的 jars 应与 spark.sql.hive.metastore.version 的版本相同。 这些 jars 只需要存在于驱动程序上,但如果您在 yarn 集群模式下运行,则必须确保它们与您的应用程序一起打包。
1.4.0
spark.sql.hive.metastore.jars.path (空) 用于实例化 HiveMetastoreClient 的 jars 的逗号分隔路径。 仅当 spark.sql.hive.metastore.jars 设置为 path 时,此配置才有用。
路径可以是以下任何格式
  1. file://path/to/jar/foo.jar
  2. hdfs://nameservice/path/to/jar/foo.jar
  3. /path/to/jar/(没有 URI 模式的路径遵循 conf fs.defaultFS 的 URI 模式)
  4. [http/https/ftp]://path/to/jar/foo.jar
请注意,1、2 和 3 支持通配符。 例如
  1. file://path/to/jar/*,file://path2/to/jar/*/*.jar
  2. hdfs://nameservice/path/to/jar/*,hdfs://nameservice2/path/to/jar/*/*.jar
3.1.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

应使用在 Spark SQL 和特定版本的 Hive 之间共享的类加载器加载的类前缀的逗号分隔列表。 应共享的类的一个示例是与 Metastore 通信所需的 JDBC 驱动程序。 需要共享的其他类是与已共享的类交互的类。 例如,log4j 使用的自定义追加器。

1.4.0
spark.sql.hive.metastore.barrierPrefixes (空)

应该为 Spark SQL 与之通信的每个 Hive 版本显式重新加载的类前缀的逗号分隔列表。 例如,在通常会共享的前缀中声明的 Hive UDF(即 org.apache.spark.*)。

1.4.0