JDBC 连接其他数据库
Spark SQL 还包含一个数据源,可以使用 JDBC 从其他数据库读取数据。此功能应优先于使用 JdbcRDD。这是因为结果将作为 DataFrame 返回,并且可以轻松地在 Spark SQL 中进行处理或与其他数据源进行联接。JDBC 数据源也更易于从 Java 或 Python 中使用,因为它不需要用户提供 ClassTag。(请注意,这与 Spark SQL JDBC 服务器不同,Spark SQL JDBC 服务器允许其他应用程序使用 Spark SQL 运行查询)。
要开始使用,您需要将特定数据库的 JDBC 驱动程序包含在 spark 类路径中。例如,要从 Spark Shell 连接到 postgres,您需要运行以下命令
数据源选项
Spark 支持以下不区分大小写的 JDBC 选项。JDBC 的数据源选项可以通过
- 的
.option
/.options
方法DataFrameReader
DataFrameWriter
OPTIONS
子句在 CREATE TABLE USING DATA_SOURCE
对于连接属性,用户可以在数据源选项中指定 JDBC 连接属性。user
和 password
通常作为连接属性提供,用于登录数据源。
属性名称 | 默认值 | 含义 | 范围 |
---|---|---|---|
url |
(无) | JDBC URL,格式为 jdbc:subprotocol:subname ,用于连接。源特定的连接属性可以在 URL 中指定。例如:jdbc:postgresql://127.0.0.1/test?user=fred&password=secret |
读/写 |
dbtable |
(无) | 要从中读取或写入的 JDBC 表。请注意,在读取路径中使用时,SQL 查询的 FROM 子句中有效的任何内容都可以使用。例如,您可以使用括号中的子查询,而不是完整的表。不允许同时指定 dbtable 和 query 选项。 |
读/写 |
query |
(无) | 用于将数据读取到 Spark 的查询。指定的查询将用括号括起来,并在 FROM 子句中用作子查询。Spark 还将为子查询子句分配别名。例如,Spark 将向 JDBC 源发出以下形式的查询。SELECT <columns> FROM (<user_specified_query>) spark_gen_alias 使用此选项时,有一些限制。
|
读/写 |
prepareQuery |
(无) | 一个前缀,它将与 query 一起形成最终查询。由于指定的 query 将在 FROM 子句中用括号括起来作为子查询,并且某些数据库不支持子查询中的所有子句,因此 prepareQuery 属性提供了一种运行此类复杂查询的方法。例如,Spark 将向 JDBC 源发出以下形式的查询。<prepareQuery> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias 以下是一些示例。
|
读/写 |
driver |
(无) | 用于连接到此 URL 的 JDBC 驱动程序的类名。 | 读/写 |
partitionColumn, lowerBound, upperBound |
(无) | 如果指定了任何一个选项,则必须指定所有这些选项。此外,必须指定 numPartitions 。它们描述了在从多个工作程序并行读取时如何对表进行分区。partitionColumn 必须是所讨论表中的数字、日期或时间戳列。请注意,lowerBound 和 upperBound 仅用于确定分区跨度,而不是用于过滤表中的行。因此,表中的所有行都将被分区并返回。此选项仅适用于读取。示例 spark.read.format("jdbc")
|
读取 |
numPartitions |
(无) | 用于表读取和写入并行处理的最大分区数。这也决定了并发 JDBC 连接的最大数量。如果要写入的分区数超过此限制,我们将在写入之前通过调用 coalesce(numPartitions) 将其减少到此限制。 |
读/写 |
queryTimeout |
0 |
驱动程序等待 Statement 对象执行到给定秒数的秒数。零表示没有限制。在写入路径中,此选项取决于 JDBC 驱动程序如何实现 API setQueryTimeout ,例如,h2 JDBC 驱动程序检查每个查询的超时,而不是整个 JDBC 批处理。 |
读/写 |
fetchsize |
0 |
JDBC 获取大小,它决定每次往返获取的行数。这可以帮助提高默认获取大小较低的 JDBC 驱动程序的性能(例如,Oracle 的获取大小为 10 行)。 | 读取 |
batchsize |
1000 |
JDBC 批处理大小,它决定每次往返插入的行数。这可以帮助提高 JDBC 驱动程序的性能。此选项仅适用于写入。 | 写入 |
isolationLevel |
READ_UNCOMMITTED |
事务隔离级别,它适用于当前连接。它可以是 NONE 、READ_COMMITTED 、READ_UNCOMMITTED 、REPEATABLE_READ 或 SERIALIZABLE 之一,对应于 JDBC 的 Connection 对象定义的标准事务隔离级别,默认值为 READ_UNCOMMITTED 。请参阅 java.sql.Connection 中的文档。 |
写入 |
sessionInitStatement |
(无) | 在每个数据库会话打开到远程数据库并开始读取数据之前,此选项将执行自定义 SQL 语句(或 PL/SQL 块)。使用它来实现会话初始化代码。示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") |
读取 |
truncate |
false |
这是一个与 JDBC 写入器相关的选项。当启用 SaveMode.Overwrite 时,此选项会导致 Spark 截断现有表,而不是删除并重新创建它。这可能更有效,并且可以防止表元数据(例如,索引)被删除。但是,在某些情况下它将不起作用,例如,当新数据具有不同的模式时。在发生故障的情况下,用户应关闭 truncate 选项以再次使用 DROP TABLE 。此外,由于不同 DBMS 中 TRUNCATE TABLE 的行为不同,因此使用它并不总是安全的。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect 和 OracleDialect 支持此选项,而 PostgresDialect 和默认 JDBCDirect 不支持。对于未知和不支持的 JDBCDirect,用户选项 truncate 将被忽略。 | 写入 |
cascadeTruncate |
所讨论的 JDBC 数据库的默认级联截断行为,在每个 JDBCDialect 中指定为 isCascadeTruncate |
这是一个与 JDBC 写入器相关的选项。如果启用并受 JDBC 数据库支持(目前为 PostgreSQL 和 Oracle),此选项允许执行 TRUNCATE TABLE t CASCADE (在 PostgreSQL 的情况下,执行 TRUNCATE TABLE ONLY t CASCADE 以防止意外截断后代表)。这将影响其他表,因此应谨慎使用。 |
写入 |
createTableOptions |
|
这是一个与 JDBC 写入器相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如,CREATE TABLE t (name string) ENGINE=InnoDB. )。 |
写入 |
createTableColumnTypes |
(无) | 在创建表时使用的数据库列数据类型,而不是默认数据类型。数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)" )。指定的类型应为有效的 spark sql 数据类型。 |
写入 |
customSchema |
(无) | 用于从 JDBC 连接器读取数据的自定义模式。例如,"id DECIMAL(38, 0), name STRING" 。您也可以指定部分字段,其他字段使用默认类型映射。例如,"id DECIMAL(38, 0)" 。列名应与 JDBC 表的对应列名相同。用户可以指定 Spark SQL 的对应数据类型,而不是使用默认值。 |
读取 |
pushDownPredicate |
true |
启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 会尽可能地将过滤器下推到 JDBC 数据源。否则,如果设置为 false,则不会将任何过滤器下推到 JDBC 数据源,因此所有过滤器都将由 Spark 处理。当 Spark 比 JDBC 数据源更快地执行谓词过滤时,通常会关闭谓词下推。 | 读取 |
pushDownAggregate |
true |
启用或禁用 V2 JDBC 数据源中聚合下推的选项。默认值为 true,在这种情况下,Spark 会将聚合下推到 JDBC 数据源。否则,如果设置为 false,则不会将聚合下推到 JDBC 数据源。当 Spark 比 JDBC 数据源更快地执行聚合时,通常会关闭聚合下推。请注意,只有当所有聚合函数和相关过滤器都可以下推时,才能下推聚合。如果 numPartitions 等于 1 或分组键与 partitionColumn 相同,Spark 将完全将聚合下推到数据源,并且不会对数据源输出应用最终聚合。否则,Spark 将对数据源输出应用最终聚合。 |
读取 |
pushDownLimit |
true |
启用或禁用 LIMIT 下推到 V2 JDBC 数据源的选项。LIMIT 下推还包括 LIMIT + SORT,也称为 Top N 运算符。默认值为 true,在这种情况下,Spark 将 LIMIT 或带有 SORT 的 LIMIT 下推到 JDBC 数据源。否则,如果设置为 false,则不会将 LIMIT 或带有 SORT 的 LIMIT 下推到 JDBC 数据源。如果 numPartitions 大于 1,即使将 LIMIT 或带有 SORT 的 LIMIT 下推,Spark 仍然会对来自数据源的结果应用 LIMIT 或带有 SORT 的 LIMIT。否则,如果将 LIMIT 或带有 SORT 的 LIMIT 下推并且 numPartitions 等于 1,Spark 不会对来自数据源的结果应用 LIMIT 或带有 SORT 的 LIMIT。 |
读取 |
pushDownOffset |
true |
启用或禁用 OFFSET 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 会将 OFFSET 下推到 JDBC 数据源。否则,如果设置为 false,Spark 不会尝试将 OFFSET 下推到 JDBC 数据源。如果 pushDownOffset 为 true 且 numPartitions 等于 1,OFFSET 将被下推到 JDBC 数据源。否则,OFFSET 不会被下推,Spark 仍然会对来自数据源的结果应用 OFFSET。 |
读取 |
pushDownTableSample |
true |
启用或禁用 TABLESAMPLE 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将 TABLESAMPLE 下推到 JDBC 数据源。否则,如果值设置为 false,则不会将 TABLESAMPLE 下推到 JDBC 数据源。 | 读取 |
keytab |
(无) | JDBC 客户端的 Kerberos keytab 文件的位置(必须通过 spark-submit 的 --files 选项或手动预先上传到所有节点)。当找到路径信息时,Spark 认为 keytab 是手动分发的,否则假设 --files 。如果同时定义了 keytab 和 principal ,则 Spark 会尝试进行 Kerberos 身份验证。 |
读/写 |
principal |
(无) | 指定 JDBC 客户端的 Kerberos 主体名称。如果同时定义了 keytab 和 principal ,则 Spark 会尝试进行 Kerberos 身份验证。 |
读/写 |
refreshKrb5Config |
false |
此选项控制在建立新连接之前是否要为 JDBC 客户端刷新 Kerberos 配置。如果要刷新配置,则设置为 true,否则设置为 false。默认值为 false。请注意,如果将此选项设置为 true 并尝试建立多个连接,则可能会发生竞争条件。一种可能的情况如下所示。
|
读/写 |
connectionProvider |
(无) | 用于连接到此 URL 的 JDBC 连接提供程序的名称,例如 db2 、mssql 。必须是使用 JDBC 数据源加载的提供程序之一。用于在多个提供程序可以处理指定的驱动程序和选项时进行区分。所选提供程序不得被 spark.sql.sources.disabledJdbcConnProviderList 禁用。 |
读/写 |
preferTimestampNTZ |
false | 当选项设置为 true 时,所有时间戳都被推断为 TIMESTAMP WITHOUT TIME ZONE。否则,时间戳将以本地时区读取。 |
读取 |
请注意,JDBC 驱动程序并不总是支持使用 keytab 进行 Kerberos 身份验证。
在使用 keytab
和 principal
配置选项之前,请确保满足以下要求
- 包含的 JDBC 驱动程序版本支持使用 keytab 进行 Kerberos 身份验证。
- 存在一个支持所用数据库的内置连接提供程序。
以下数据库存在内置连接提供程序
- DB2
- MariaDB
- MS Sql
- Oracle
- PostgreSQL
如果未满足要求,请考虑使用 JdbcConnectionProvider
开发人员 API 来处理自定义身份验证。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
.load()
# Saving data to a JDBC source
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying create table column data types on write
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
在 Spark 存储库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Specifying create table column data types on write
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
在 Spark 存储库的“examples/src/main/r/RSparkSQLExample.R”中找到完整的示例代码。