Hive 表
Spark SQL 还支持读取和写入存储在 Apache Hive 中的数据。但是,由于 Hive 有大量的依赖项,这些依赖项未包含在默认的 Spark 发行版中。如果在类路径中找到 Hive 依赖项,Spark 将自动加载它们。请注意,这些 Hive 依赖项也必须存在于所有工作节点上,因为它们需要访问 Hive 序列化和反序列化库 (SerDes) 才能访问存储在 Hive 中的数据。
Hive 的配置是通过将您的 hive-site.xml
、core-site.xml
(用于安全配置)和 hdfs-site.xml
(用于 HDFS 配置)文件放在 conf/
中完成的。
在使用 Hive 时,必须使用 Hive 支持实例化 SparkSession
,包括连接到持久 Hive 元数据存储、支持 Hive SerDes 和 Hive 用户定义函数。没有现有 Hive 部署的用户仍然可以启用 Hive 支持。当未由 hive-site.xml
配置时,上下文会自动在当前目录中创建 metastore_db
,并在由 spark.sql.warehouse.dir
配置的目录中创建目录,该目录默认情况下是 Spark 应用程序启动的当前目录中的 spark-warehouse
目录。请注意,hive-site.xml
中的 hive.metastore.warehouse.dir
属性已从 Spark 2.0.0 开始弃用。相反,请使用 spark.sql.warehouse.dir
指定仓库中数据库的默认位置。您可能需要授予启动 Spark 应用程序的用户写入权限。
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// | 0|
// | 1|
// | 2|
// ... Order may vary, as spark processes the partitions in parallel.
// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...
在使用 Hive 时,必须使用 Hive 支持实例化 SparkSession
。这增加了在 MetaStore 中查找表和使用 HiveQL 编写查询的支持。
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
为 Hive 表指定存储格式
创建 Hive 表时,需要定义此表如何从文件系统读取/写入数据,即“输入格式”和“输出格式”。您还需要定义此表如何将数据反序列化为行,或将行序列化为数据,即“serde”。以下选项可用于指定存储格式(“serde”、“输入格式”、“输出格式”),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
。默认情况下,我们将以纯文本形式读取表文件。请注意,在创建表时不支持 Hive 存储处理程序,您可以在 Hive 端使用存储处理程序创建表,并使用 Spark SQL 读取它。
属性名称 | 含义 |
---|---|
fileFormat |
fileFormat 是一种存储格式规范包,包括“serde”、“输入格式”和“输出格式”。目前我们支持 6 种 fileFormat:'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile' 和 'avro'。 |
inputFormat、outputFormat |
这两个选项指定相应的 InputFormat 和 OutputFormat 类的名称,作为字符串文字,例如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 。这两个选项必须成对出现,并且如果已指定 fileFormat 选项,则不能指定它们。 |
serde |
此选项指定 serde 类的名称。当指定 fileFormat 选项时,如果给定的 fileFormat 已经包含 serde 信息,则不要指定此选项。目前,“sequencefile”、“textfile”和“rcfile”不包含 serde 信息,您可以将此选项与这 3 种 fileFormat 一起使用。 |
fieldDelim、escapeDelim、collectionDelim、mapkeyDelim、lineDelim |
这些选项只能与“textfile”fileFormat 一起使用。它们定义如何将分隔文件读取到行中。 |
使用 OPTIONS
定义的所有其他属性将被视为 Hive serde 属性。
与不同版本的 Hive 元数据存储交互
Spark SQL Hive 支持最重要的部分之一是与 Hive 元数据存储的交互,这使 Spark SQL 能够访问 Hive 表的元数据。从 Spark 1.4.0 开始,可以使用单个 Spark SQL 二进制构建来查询不同版本的 Hive 元数据存储,使用下面描述的配置。请注意,无论用于与元数据存储通信的 Hive 版本如何,Spark SQL 在内部都会针对内置 Hive 进行编译,并使用这些类进行内部执行(serdes、UDF、UDAF 等)。
以下选项可用于配置用于检索元数据的 Hive 版本
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.hive.metastore.version |
2.3.9 |
Hive 元数据存储的版本。可用选项为 0.12.0 到 2.3.9 以及 3.0.0 到 3.1.3 。 |
1.4.0 |
spark.sql.hive.metastore.jars |
内置 |
用于实例化 HiveMetastoreClient 的 jar 的位置。此属性可以是以下四个选项之一
-Phive 时。选择此选项时,spark.sql.hive.metastore.version 必须为 2.3.9 或未定义。spark.sql.hive.metastore.jars.path 以逗号分隔的格式配置的 Hive jar。支持本地或远程路径。提供的 jar 应与 spark.sql.hive.metastore.version 版本相同。 |
1.4.0 |
spark.sql.hive.metastore.jars.path |
(空) |
用于实例化 HiveMetastoreClient 的 jar 的逗号分隔路径。此配置仅在将 spark.sql.hive.metastore.jars 设置为 path 时有用。这些路径可以是以下任何格式
|
3.1.0 |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
一个逗号分隔的类前缀列表,应使用 Spark SQL 和特定版本的 Hive 之间共享的类加载器加载。需要共享的类的示例是与元数据存储通信所需的 JDBC 驱动程序。需要共享的其他类是那些与已共享的类交互的类。例如,由 log4j 使用的自定义追加器。 |
1.4.0 |
spark.sql.hive.metastore.barrierPrefixes |
(空) |
一个逗号分隔的类前缀列表,应明确地为 Spark SQL 与之通信的每个版本的 Hive 重新加载。例如,在通常会共享的前缀中声明的 Hive UDF(即 |
1.4.0 |