Hive 表
Spark SQL 也支持读写存储在 Apache Hive 中的数据。然而,由于 Hive 有大量的依赖项,这些依赖项不包含在默认的 Spark 发行版中。如果在 classpath 中找到 Hive 依赖项,Spark 将自动加载它们。请注意,这些 Hive 依赖项也必须存在于所有工作节点上,因为它们需要访问 Hive 序列化和反序列化库 (SerDes) 才能访问存储在 Hive 中的数据。
Hive 的配置通过将您的 hive-site.xml、core-site.xml(用于安全配置)和 hdfs-site.xml(用于 HDFS 配置)文件放置在 conf/ 中来完成。
在使用 Hive 时,必须实例化支持 Hive 的 SparkSession,包括连接到持久的 Hive metastore、支持 Hive serdes 和 Hive 用户定义函数。没有现有 Hive 部署的用户仍然可以启用 Hive 支持。当未通过 hive-site.xml 配置时,上下文会在当前目录中自动创建 metastore_db,并创建一个由 spark.sql.warehouse.dir 配置的目录,该目录默认为 Spark 应用程序启动时当前目录中的 spark-warehouse 目录。请注意,hive-site.xml 中的 hive.metastore.warehouse.dir 属性自 Spark 2.0.0 起已弃用。相反,请使用 spark.sql.warehouse.dir 指定数据仓库中数据库的默认位置。您可能需要授予启动 Spark 应用程序的用户写入权限。
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// | 0|
// | 1|
// | 2|
// ... Order may vary, as spark processes the partitions in parallel.
// Turn on flag for Hive Dynamic Partitioning
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...在使用 Hive 时,必须实例化支持 Hive 的 SparkSession。这增加了在 MetaStore 中查找表以及使用 HiveQL 编写查询的支持。
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))指定 Hive 表的存储格式
当您创建 Hive 表时,需要定义该表如何从文件系统读取/写入数据,即“输入格式”和“输出格式”。您还需要定义该表如何将数据反序列化为行,或将行序列化为数据,即“serde”。以下选项可用于指定存储格式(“serde”、“输入格式”、“输出格式”),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默认情况下,我们将以纯文本形式读取表文件。请注意,目前创建表时不支持 Hive 存储处理器,您可以在 Hive 端使用存储处理器创建表,然后使用 Spark SQL 读取它。
| 属性名称 | 含义 |
|---|---|
fileFormat |
fileFormat 是一种存储格式规范的集合,包括“serde”、“输入格式”和“输出格式”。目前我们支持 6 种 fileFormat:'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile' 和 'avro'。 |
inputFormat, outputFormat |
这两个选项指定相应的 InputFormat 和 OutputFormat 类名称作为字符串字面量,例如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。这两个选项必须成对出现,并且如果您已经指定了 fileFormat 选项,则不能再指定它们。 |
serde |
此选项指定 serde 类的名称。当指定 fileFormat 选项时,如果给定的 fileFormat 已经包含了 serde 信息,则不要指定此选项。目前 "sequencefile"、"textfile" 和 "rcfile" 不包含 serde 信息,您可以将此选项与这 3 种 fileFormat 一起使用。 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim |
这些选项只能与 "textfile" fileFormat 一起使用。它们定义了如何将分隔文件读取到行中。 |
所有其他使用 OPTIONS 定义的属性都将被视为 Hive serde 属性。
与不同版本的 Hive Metastore 交互
Spark SQL 的 Hive 支持最重要的部分之一是与 Hive metastore 的交互,这使得 Spark SQL 能够访问 Hive 表的元数据。从 Spark 1.4.0 开始,可以使用 Spark SQL 的单个二进制构建来查询不同版本的 Hive metastore,使用下面描述的配置。请注意,无论用于与 metastore 通信的 Hive 版本如何,Spark SQL 内部都将针对内置 Hive 进行编译,并使用这些类进行内部执行(serdes、UDF、UDAF 等)。
以下选项可用于配置用于检索元数据的 Hive 版本
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.sql.hive.metastore.version |
2.3.10 |
Hive metastore 的版本。可用选项为 2.0.0 到 2.3.10,3.0.0 到 3.1.3,以及 4.0.0 到 4.0.1。 |
1.4.0 |
spark.sql.hive.metastore.jars |
内置 |
用于实例化 HiveMetastoreClient 的 jar 包位置。此属性可以是以下四种选项之一
-Phive 时,它与 Spark assembly 捆绑在一起。选择此选项时,spark.sql.hive.metastore.version 必须是 2.3.10 或未定义。spark.sql.hive.metastore.jars.path 以逗号分隔格式配置的 Hive jar 包。支持本地或远程路径。提供的 jar 包应与 spark.sql.hive.metastore.version 版本相同。 |
1.4.0 |
spark.sql.hive.metastore.jars.path |
(空) |
用于实例化 HiveMetastoreClient 的 jar 包的逗号分隔路径。此配置仅在 spark.sql.hive.metastore.jars 设置为 path 时有用。路径可以是以下任何格式
|
3.1.0 |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
一个逗号分隔的类前缀列表,这些类应使用 Spark SQL 和特定版本的 Hive 之间共享的类加载器进行加载。需要共享的类的一个示例是与 metastore 通信所需的 JDBC 驱动程序。其他需要共享的类是那些与已共享的类进行交互的类。例如,log4j 使用的自定义 appender。 |
1.4.0 |
spark.sql.hive.metastore.barrierPrefixes |
(空) |
一个逗号分隔的类前缀列表,这些类应该针对 Spark SQL 正在通信的每个 Hive 版本显式重新加载。例如,在通常会共享的前缀(即 |
1.4.0 |