Hive 表
Spark SQL 也支持读取和写入存储在 Apache Hive 中的数据。 但是,由于 Hive 具有大量的依赖项,因此默认的 Spark 发行版中不包含这些依赖项。 如果可以在类路径上找到 Hive 依赖项,Spark 将自动加载它们。 请注意,这些 Hive 依赖项也必须存在于所有工作节点上,因为它们需要访问 Hive 序列化和反序列化库 (SerDes) 才能访问存储在 Hive 中的数据。
Hive 的配置是通过将您的 hive-site.xml
、core-site.xml
(用于安全配置) 和 hdfs-site.xml
(用于 HDFS 配置) 文件放在 conf/
中来完成的。
使用 Hive 时,必须实例化带有 Hive 支持的 SparkSession
,包括连接到持久 Hive Metastore、支持 Hive serdes 和 Hive 用户定义函数。 没有现有 Hive 部署的用户仍然可以启用 Hive 支持。 如果未通过 hive-site.xml
进行配置,则上下文会自动在当前目录中创建 metastore_db
,并创建一个由 spark.sql.warehouse.dir
配置的目录,该目录默认为 Spark 应用程序启动的当前目录中的 spark-warehouse
目录。 请注意,自 Spark 2.0.0 起,hive.metastore.warehouse.dir
属性在 hive-site.xml
中已弃用。 请改用 spark.sql.warehouse.dir
来指定仓库中数据库的默认位置。 您可能需要授予启动 Spark 应用程序的用户写权限。
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// | 0|
// | 1|
// | 2|
// ... Order may vary, as spark processes the partitions in parallel.
// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...
使用 Hive 时,必须实例化带有 Hive 支持的 SparkSession
。 这增加了对在 MetaStore 中查找表和使用 HiveQL 编写查询的支持。
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
为 Hive 表指定存储格式
创建 Hive 表时,需要定义此表应如何从/向文件系统读取/写入数据,即“输入格式”和“输出格式”。 您还需要定义此表应如何将数据反序列化为行,或将行序列化为数据,即“serde”。 以下选项可用于指定存储格式(“serde”、“输入格式”、“输出格式”),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
。 默认情况下,我们将读取表文件作为纯文本。 请注意,创建表时尚不支持 Hive 存储处理程序,您可以在 Hive 端使用存储处理程序创建表,并使用 Spark SQL 读取它。
属性名称 | 含义 |
---|---|
fileFormat |
fileFormat 是一种存储格式规范的包,包括“serde”、“输入格式”和“输出格式”。 目前我们支持 6 种 fileFormat:'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile' 和 'avro'。 |
inputFormat, outputFormat |
这两个选项将对应的 InputFormat 和 OutputFormat 类的名称指定为字符串文字,例如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 。 这两个选项必须成对出现,如果已经指定了 fileFormat 选项,则不能指定它们。 |
serde |
此选项指定 serde 类的名称。 如果指定了 fileFormat 选项,则如果给定的 fileFormat 已经包含 serde 的信息,则不要指定此选项。 目前,“sequencefile”、“textfile”和“rcfile”不包含 serde 信息,您可以将此选项与这 3 种 fileFormat 结合使用。 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim |
这些选项只能与 "textfile" fileFormat 结合使用。 它们定义了如何将分隔文件读取为行。 |
使用 OPTIONS
定义的所有其他属性将被视为 Hive serde 属性。
与不同版本的 Hive Metastore 交互
Spark SQL 的 Hive 支持最重要的部分之一是与 Hive Metastore 的交互,这使 Spark SQL 能够访问 Hive 表的元数据。 从 Spark 1.4.0 开始,可以使用 Spark SQL 的单个二进制版本来查询不同版本的 Hive Metastore,使用下面描述的配置。 请注意,无论用于与 Metastore 通信的 Hive 版本如何,Spark SQL 内部都会针对内置 Hive 进行编译,并使用这些类进行内部执行(serdes、UDF、UDAF 等)。
以下选项可用于配置用于检索元数据的 Hive 版本
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.sql.hive.metastore.version |
2.3.9 |
Hive Metastore 的版本。 可用选项为 0.12.0 到 2.3.9 和 3.0.0 到 3.1.3 。 |
1.4.0 |
spark.sql.hive.metastore.jars |
builtin |
应用于实例化 HiveMetastoreClient 的 jars 的位置。 此属性可以是以下四个选项之一
-Phive 时,它与 Spark 程序集捆绑在一起。 选择此选项时,spark.sql.hive.metastore.version 必须为 2.3.9 或未定义。spark.sql.hive.metastore.jars.path 配置的 Hive jars,以逗号分隔的格式。 支持本地路径和远程路径。 提供的 jars 应与 spark.sql.hive.metastore.version 的版本相同。 |
1.4.0 |
spark.sql.hive.metastore.jars.path |
(空) |
用于实例化 HiveMetastoreClient 的 jars 的逗号分隔路径。 仅当 spark.sql.hive.metastore.jars 设置为 path 时,此配置才有用。路径可以是以下任何格式
|
3.1.0 |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
应使用在 Spark SQL 和特定版本的 Hive 之间共享的类加载器加载的类前缀的逗号分隔列表。 应共享的类的一个示例是与 Metastore 通信所需的 JDBC 驱动程序。 需要共享的其他类是与已共享的类交互的类。 例如,log4j 使用的自定义追加器。 |
1.4.0 |
spark.sql.hive.metastore.barrierPrefixes |
(空) |
应该为 Spark SQL 与之通信的每个 Hive 版本显式重新加载的类前缀的逗号分隔列表。 例如,在通常会共享的前缀中声明的 Hive UDF(即 |
1.4.0 |