JDBC 连接到其他数据库
Spark SQL 还包括一个数据源,可以使用 JDBC 从其他数据库读取数据。 应该优先使用此功能,而不是使用 JdbcRDD。 这是因为结果作为 DataFrame 返回,它们可以在 Spark SQL 中轻松处理或与其他数据源连接。 JDBC 数据源也更容易从 Java 或 Python 中使用,因为它不需要用户提供 ClassTag。(请注意,这与 Spark SQL JDBC 服务器不同,后者允许其他应用程序使用 Spark SQL 运行查询)。
要开始使用,您需要将特定数据库的 JDBC 驱动程序包含在 spark 类路径中。 例如,要从 Spark Shell 连接到 postgres,您可以运行以下命令
./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
数据源选项
Spark 支持以下不区分大小写的 JDBC 选项。JDBC 的数据源选项可以通过以下方式设置
.option
/.options
方法DataFrameReader
DataFrameWriter
- CREATE TABLE USING DATA_SOURCE 的
OPTIONS
子句
对于连接属性,用户可以在数据源选项中指定 JDBC 连接属性。user
和 password
通常作为连接属性提供,用于登录到数据源。
属性名称 | 默认值 | 含义 | 范围 |
---|---|---|---|
url |
(无) | 形式为 jdbc:subprotocol:subname 的 JDBC URL,用于连接到。 特定于源的连接属性可以在 URL 中指定。例如,jdbc:postgresql://localhost/test?user=fred&password=secret |
读/写 |
dbtable |
(无) | 应该从中读取或写入的 JDBC 表。 请注意,在读取路径中使用时,可以使用 SQL 查询的 FROM 子句中有效的任何内容。 例如,您也可以使用括号中的子查询,而不是完整的表。 不允许同时指定 dbtable 和 query 选项。 |
读/写 |
query |
(无) | 一个将用于将数据读入 Spark 的查询。 指定的查询将用括号括起来,并在 FROM 子句中用作子查询。 Spark 还将为子查询子句分配一个别名。 例如,spark 将向 JDBC 源发出以下形式的查询。SELECT <columns> FROM (<user_specified_query>) spark_gen_alias 以下是使用此选项的一些限制。
|
读/写 |
prepareQuery |
(无) | 将与 query 一起形成最终查询的前缀。 由于指定的 query 将在 FROM 子句中用括号括起来作为子查询,并且某些数据库不支持子查询中的所有子句,因此 prepareQuery 属性提供了一种运行此类复杂查询的方法。 例如,spark 将向 JDBC 源发出以下形式的查询。<prepareQuery> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias 以下是一些示例。
|
读/写 |
driver |
(无) | 用于连接到此 URL 的 JDBC 驱动程序的类名。 | 读/写 |
partitionColumn, lowerBound, upperBound |
(无) | 如果指定了其中任何一个选项,则必须指定所有这些选项。 此外,必须指定 numPartitions 。 它们描述了从多个 worker 并行读取时如何对表进行分区。 partitionColumn 必须是相关表中的数字、日期或时间戳列。 请注意,lowerBound 和 upperBound 仅用于决定分区步幅,而不是用于过滤表中的行。 因此,表中的所有行都将被分区和返回。 此选项仅适用于读取。示例 spark.read.format("jdbc")
|
read |
numPartitions |
(无) | 可用于表读取和写入中的并行度的最大分区数。 这也决定了并发 JDBC 连接的最大数量。 如果要写入的分区数超过此限制,我们会通过在写入之前调用 coalesce(numPartitions) 将其减少到此限制。 |
读/写 |
queryTimeout |
0 |
驱动程序将等待 Statement 对象执行到给定秒数的秒数。 零表示没有限制。 在写入路径中,此选项取决于 JDBC 驱动程序如何实现 API setQueryTimeout ,例如,h2 JDBC 驱动程序检查每个查询的超时时间,而不是整个 JDBC 批处理。 |
读/写 |
fetchsize |
0 |
JDBC 提取大小,它决定了每次往返要提取的行数。 这可以帮助 JDBC 驱动程序上的性能,这些驱动程序默认设置为较低的提取大小(例如,Oracle,10 行)。 | read |
batchsize |
1000 |
JDBC 批处理大小,它决定了每次往返要插入的行数。 这可以帮助 JDBC 驱动程序上的性能。 此选项仅适用于写入。 | write |
isolationLevel |
READ_UNCOMMITTED |
事务隔离级别,适用于当前连接。 它可以是 NONE 、READ_COMMITTED 、READ_UNCOMMITTED 、REPEATABLE_READ 或 SERIALIZABLE 之一,对应于 JDBC Connection 对象定义的标准事务隔离级别,默认值为 READ_UNCOMMITTED 。 请参阅 java.sql.Connection 中的文档。 |
write |
sessionInitStatement |
(无) | 在每个数据库会话打开到远程数据库之后,以及在开始读取数据之前,此选项会执行自定义 SQL 语句(或 PL/SQL 块)。 使用此选项可实现会话初始化代码。 示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") |
read |
truncate |
false |
这是一个 JDBC 写入器相关选项。 启用 SaveMode.Overwrite 后,此选项会导致 Spark 截断现有表,而不是删除并重新创建它。 这样做效率更高,并且可以防止删除表元数据(例如,索引)。 但是,在某些情况下它不起作用,例如,当新数据具有不同的模式时。 如果出现故障,用户应关闭 truncate 选项以再次使用 DROP TABLE 。 此外,由于 DBMS 之间 TRUNCATE TABLE 的行为不同,因此使用它并不总是安全的。 MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect 和 OracleDialect 支持此功能,而 PostgresDialect 和默认 JDBCDirect 不支持。 对于未知和不受支持的 JDBCDirect,用户选项 truncate 将被忽略。 | write |
cascadeTruncate |
相关 JDBC 数据库的默认级联截断行为,在每个 JDBCDialect 中的 isCascadeTruncate 中指定 |
这是一个与 JDBC 写入器相关的选项。如果启用并且 JDBC 数据库(目前是 PostgreSQL 和 Oracle)支持,则此选项允许执行 TRUNCATE TABLE t CASCADE (在 PostgreSQL 的情况下,执行 TRUNCATE TABLE ONLY t CASCADE 以防止意外截断子表)。 这将影响其他表,因此应谨慎使用。 |
write |
createTableOptions |
|
这是一个与 JDBC 写入器相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如,CREATE TABLE t (name string) ENGINE=InnoDB. )。 |
write |
createTableColumnTypes |
(无) | 创建表时要使用的数据库列数据类型,而不是默认类型。 数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)") 。 指定的类型应该是有效的 Spark SQL 数据类型。 |
write |
customSchema |
(无) | 用于从 JDBC 连接器读取数据的自定义 schema。 例如,"id DECIMAL(38, 0), name STRING" 。 您还可以指定部分字段,其他字段使用默认类型映射。 例如,"id DECIMAL(38, 0)" 。 列名应与 JDBC 表的相应列名相同。 用户可以指定 Spark SQL 的对应数据类型,而不是使用默认类型。 |
read |
pushDownPredicate |
true |
用于启用或禁用将谓词下推到 JDBC 数据源的选项。 默认值为 true,在这种情况下,Spark 会尽可能将过滤器下推到 JDBC 数据源。 否则,如果设置为 false,则不会将任何过滤器下推到 JDBC 数据源,因此所有过滤器将由 Spark 处理。 当谓词过滤由 Spark 执行速度快于由 JDBC 数据源执行时,通常会关闭谓词下推。 | read |
pushDownAggregate |
true |
用于启用或禁用 V2 JDBC 数据源中的聚合下推的选项。 默认值为 true,在这种情况下,Spark 会将聚合下推到 JDBC 数据源。 否则,如果设置为 false,则聚合将不会下推到 JDBC 数据源。 当聚合由 Spark 执行速度快于由 JDBC 数据源执行时,通常会关闭聚合下推。 请注意,只有在所有聚合函数和相关过滤器都可以下推时,才能下推聚合。 如果 numPartitions 等于 1 或 group by 键与 partitionColumn 相同,Spark 将完全将聚合下推到数据源,而不会对数据源输出应用最终聚合。 否则,Spark 将对数据源输出应用最终聚合。 |
read |
pushDownLimit |
true |
用于启用或禁用将 LIMIT 下推到 V2 JDBC 数据源的选项。 LIMIT 下推还包括 LIMIT + SORT,也称为 Top N 运算符。 默认值为 true,在这种情况下,Spark 会将 LIMIT 或带有 SORT 的 LIMIT 下推到 JDBC 数据源。 否则,如果设置为 false,则 LIMIT 或带有 SORT 的 LIMIT 不会下推到 JDBC 数据源。 如果 numPartitions 大于 1,即使 LIMIT 或带有 SORT 的 LIMIT 被下推,Spark 仍然会对来自数据源的结果应用 LIMIT 或带有 SORT 的 LIMIT。 否则,如果 LIMIT 或带有 SORT 的 LIMIT 被下推且 numPartitions 等于 1,Spark 将不会对来自数据源的结果应用 LIMIT 或带有 SORT 的 LIMIT。 |
read |
pushDownOffset |
true |
用于启用或禁用将 OFFSET 下推到 V2 JDBC 数据源的选项。 默认值为 true,在这种情况下,Spark 会将 OFFSET 下推到 JDBC 数据源。 否则,如果设置为 false,Spark 将不会尝试将 OFFSET 下推到 JDBC 数据源。 如果 pushDownOffset 为 true 且 numPartitions 等于 1,则 OFFSET 将下推到 JDBC 数据源。 否则,OFFSET 将不会下推,并且 Spark 仍然会对来自数据源的结果应用 OFFSET。 |
read |
pushDownTableSample |
true |
用于启用或禁用将 TABLESAMPLE 下推到 V2 JDBC 数据源的选项。 默认值为 true,在这种情况下,Spark 会将 TABLESAMPLE 下推到 JDBC 数据源。 否则,如果值设置为 false,则 TABLESAMPLE 不会下推到 JDBC 数据源。 | read |
keytab |
(无) | Kerberos keytab 文件的位置(必须通过 spark-submit 的 --files 选项或手动预先上传到所有节点)以供 JDBC 客户端使用。 找到路径信息后,Spark 认为 keytab 是手动分发的,否则假设为 --files 。 如果同时定义了 keytab 和 principal ,则 Spark 尝试进行 Kerberos 身份验证。 |
读/写 |
principal |
(无) | 指定 JDBC 客户端的 Kerberos 主体名称。 如果同时定义了 keytab 和 principal ,则 Spark 尝试进行 Kerberos 身份验证。 |
读/写 |
refreshKrb5Config |
false |
此选项控制是否在 JDBC 客户端建立新连接之前刷新 Kerberos 配置。 如果要刷新配置,请设置为 true,否则设置为 false。 默认值为 false。 请注意,如果将此选项设置为 true 并尝试建立多个连接,则可能会发生争用情况。 一种可能的情况如下所示。
|
读/写 |
connectionProvider |
(无) | 用于连接到此 URL 的 JDBC 连接提供程序的名称,例如 db2 、mssql 。 必须是与 JDBC 数据源一起加载的提供程序之一。 用于在多个提供程序可以处理指定的驱动程序和选项时消除歧义。 选择的提供程序不得被 spark.sql.sources.disabledJdbcConnProviderList 禁用。 |
读/写 |
preferTimestampNTZ |
false | 当此选项设置为 true 时,TIMESTAMP WITHOUT TIME ZONE 类型将被推断为 Spark 的 TimestampNTZ 类型。 否则,它将被解释为 Spark 的 Timestamp 类型(相当于 TIMESTAMP WITH LOCAL TIME ZONE)。 此设置专门影响 TIMESTAMP WITHOUT TIME ZONE 数据类型的推断。 无论此设置如何,TIMESTAMP WITH LOCAL TIME ZONE 和 TIMESTAMP WITH TIME ZONE 数据类型都始终如一地解释为 Spark 的 Timestamp 类型。 |
read |
请注意,JDBC 驱动程序并非总是支持使用 keytab 的 Kerberos 身份验证。
在使用 keytab
和 principal
配置选项之前,请确保满足以下要求
- 包含的 JDBC 驱动程序版本支持使用 keytab 的 Kerberos 身份验证。
- 有一个内置的连接提供程序,支持使用的数据库。
以下数据库有内置连接提供程序
- DB2
- MariaDB
- MS Sql
- Oracle
- PostgreSQL
如果不满足要求,请考虑使用 JdbcConnectionProvider
开发者 API 来处理自定义身份验证。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
.load()
# Saving data to a JDBC source
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying create table column data types on write
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
在 Spark 代码库的 "examples/src/main/python/sql/datasource.py" 中查找完整的示例代码。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
在 Spark 代码库的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 中查找完整的示例代码。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Specifying create table column data types on write
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
在 Spark 代码库的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中查找完整的示例代码。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
在 Spark 代码库的 "examples/src/main/r/RSparkSQLExample.R" 中查找完整的示例代码。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
user 'username',
password 'password'
)
INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable