JDBC 连接其他数据库

Spark SQL 还包含一个数据源,可以使用 JDBC 从其他数据库读取数据。此功能应优先于使用 JdbcRDD。这是因为结果将作为 DataFrame 返回,并且可以轻松地在 Spark SQL 中进行处理或与其他数据源进行联接。JDBC 数据源也更易于从 Java 或 Python 中使用,因为它不需要用户提供 ClassTag。(请注意,这与 Spark SQL JDBC 服务器不同,Spark SQL JDBC 服务器允许其他应用程序使用 Spark SQL 运行查询)。

要开始使用,您需要将特定数据库的 JDBC 驱动程序包含在 spark 类路径中。例如,要从 Spark Shell 连接到 postgres,您需要运行以下命令

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

数据源选项

Spark 支持以下不区分大小写的 JDBC 选项。JDBC 的数据源选项可以通过

对于连接属性,用户可以在数据源选项中指定 JDBC 连接属性。userpassword 通常作为连接属性提供,用于登录数据源。

属性名称默认值含义范围
url (无) JDBC URL,格式为 jdbc:subprotocol:subname,用于连接。源特定的连接属性可以在 URL 中指定。例如:jdbc:postgresql://127.0.0.1/test?user=fred&password=secret 读/写
dbtable (无) 要从中读取或写入的 JDBC 表。请注意,在读取路径中使用时,SQL 查询的 FROM 子句中有效的任何内容都可以使用。例如,您可以使用括号中的子查询,而不是完整的表。不允许同时指定 dbtablequery 选项。 读/写
query (无) 用于将数据读取到 Spark 的查询。指定的查询将用括号括起来,并在 FROM 子句中用作子查询。Spark 还将为子查询子句分配别名。例如,Spark 将向 JDBC 源发出以下形式的查询。

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

使用此选项时,有一些限制。
  1. 不允许同时指定 dbtablequery 选项。
  2. 不允许同时指定 querypartitionColumn 选项。当需要指定 partitionColumn 选项时,可以使用 dbtable 选项指定子查询,并且可以使用作为 dbtable 部分提供的子查询别名来限定分区列。
    示例
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .load()
读/写
prepareQuery (无) 一个前缀,它将与 query 一起形成最终查询。由于指定的 query 将在 FROM 子句中用括号括起来作为子查询,并且某些数据库不支持子查询中的所有子句,因此 prepareQuery 属性提供了一种运行此类复杂查询的方法。例如,Spark 将向 JDBC 源发出以下形式的查询。

<prepareQuery> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

以下是一些示例。
  1. MSSQL Server 不接受子查询中的 WITH 子句,但可以将此类查询拆分为 prepareQueryquery
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "WITH t AS (SELECT x, y FROM tbl)")
    .option("query", "SELECT * FROM t WHERE x > 10")
    .load()
  2. MSSQL Server 不接受子查询中的临时表子句,但可以将此类查询拆分为 prepareQueryquery
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)")
    .option("query", "SELECT * FROM #TempTable")
    .load()
读/写
driver (无) 用于连接到此 URL 的 JDBC 驱动程序的类名。 读/写
partitionColumn, lowerBound, upperBound (无) 如果指定了任何一个选项,则必须指定所有这些选项。此外,必须指定 numPartitions。它们描述了在从多个工作程序并行读取时如何对表进行分区。partitionColumn 必须是所讨论表中的数字、日期或时间戳列。请注意,lowerBoundupperBound 仅用于确定分区跨度,而不是用于过滤表中的行。因此,表中的所有行都将被分区并返回。此选项仅适用于读取。
示例
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
读取
numPartitions (无) 用于表读取和写入并行处理的最大分区数。这也决定了并发 JDBC 连接的最大数量。如果要写入的分区数超过此限制,我们将在写入之前通过调用 coalesce(numPartitions) 将其减少到此限制。 读/写
queryTimeout 0 驱动程序等待 Statement 对象执行到给定秒数的秒数。零表示没有限制。在写入路径中,此选项取决于 JDBC 驱动程序如何实现 API setQueryTimeout,例如,h2 JDBC 驱动程序检查每个查询的超时,而不是整个 JDBC 批处理。 读/写
fetchsize 0 JDBC 获取大小,它决定每次往返获取的行数。这可以帮助提高默认获取大小较低的 JDBC 驱动程序的性能(例如,Oracle 的获取大小为 10 行)。 读取
batchsize 1000 JDBC 批处理大小,它决定每次往返插入的行数。这可以帮助提高 JDBC 驱动程序的性能。此选项仅适用于写入。 写入
isolationLevel READ_UNCOMMITTED 事务隔离级别,它适用于当前连接。它可以是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READSERIALIZABLE 之一,对应于 JDBC 的 Connection 对象定义的标准事务隔离级别,默认值为 READ_UNCOMMITTED。请参阅 java.sql.Connection 中的文档。 写入
sessionInitStatement (无) 在每个数据库会话打开到远程数据库并开始读取数据之前,此选项将执行自定义 SQL 语句(或 PL/SQL 块)。使用它来实现会话初始化代码。示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") 读取
truncate false 这是一个与 JDBC 写入器相关的选项。当启用 SaveMode.Overwrite 时,此选项会导致 Spark 截断现有表,而不是删除并重新创建它。这可能更有效,并且可以防止表元数据(例如,索引)被删除。但是,在某些情况下它将不起作用,例如,当新数据具有不同的模式时。在发生故障的情况下,用户应关闭 truncate 选项以再次使用 DROP TABLE。此外,由于不同 DBMS 中 TRUNCATE TABLE 的行为不同,因此使用它并不总是安全的。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect 和 OracleDialect 支持此选项,而 PostgresDialect 和默认 JDBCDirect 不支持。对于未知和不支持的 JDBCDirect,用户选项 truncate 将被忽略。写入
cascadeTruncate 所讨论的 JDBC 数据库的默认级联截断行为,在每个 JDBCDialect 中指定为 isCascadeTruncate 这是一个与 JDBC 写入器相关的选项。如果启用并受 JDBC 数据库支持(目前为 PostgreSQL 和 Oracle),此选项允许执行 TRUNCATE TABLE t CASCADE(在 PostgreSQL 的情况下,执行 TRUNCATE TABLE ONLY t CASCADE 以防止意外截断后代表)。这将影响其他表,因此应谨慎使用。 写入
createTableOptions 这是一个与 JDBC 写入器相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如,CREATE TABLE t (name string) ENGINE=InnoDB.)。 写入
createTableColumnTypes (无) 在创建表时使用的数据库列数据类型,而不是默认数据类型。数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。指定的类型应为有效的 spark sql 数据类型。 写入
customSchema (无) 用于从 JDBC 连接器读取数据的自定义模式。例如,"id DECIMAL(38, 0), name STRING"。您也可以指定部分字段,其他字段使用默认类型映射。例如,"id DECIMAL(38, 0)"。列名应与 JDBC 表的对应列名相同。用户可以指定 Spark SQL 的对应数据类型,而不是使用默认值。 读取
pushDownPredicate true 启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 会尽可能地将过滤器下推到 JDBC 数据源。否则,如果设置为 false,则不会将任何过滤器下推到 JDBC 数据源,因此所有过滤器都将由 Spark 处理。当 Spark 比 JDBC 数据源更快地执行谓词过滤时,通常会关闭谓词下推。 读取
pushDownAggregate true 启用或禁用 V2 JDBC 数据源中聚合下推的选项。默认值为 true,在这种情况下,Spark 会将聚合下推到 JDBC 数据源。否则,如果设置为 false,则不会将聚合下推到 JDBC 数据源。当 Spark 比 JDBC 数据源更快地执行聚合时,通常会关闭聚合下推。请注意,只有当所有聚合函数和相关过滤器都可以下推时,才能下推聚合。如果 numPartitions 等于 1 或分组键与 partitionColumn 相同,Spark 将完全将聚合下推到数据源,并且不会对数据源输出应用最终聚合。否则,Spark 将对数据源输出应用最终聚合。 读取
pushDownLimit true 启用或禁用 LIMIT 下推到 V2 JDBC 数据源的选项。LIMIT 下推还包括 LIMIT + SORT,也称为 Top N 运算符。默认值为 true,在这种情况下,Spark 将 LIMIT 或带有 SORT 的 LIMIT 下推到 JDBC 数据源。否则,如果设置为 false,则不会将 LIMIT 或带有 SORT 的 LIMIT 下推到 JDBC 数据源。如果 numPartitions 大于 1,即使将 LIMIT 或带有 SORT 的 LIMIT 下推,Spark 仍然会对来自数据源的结果应用 LIMIT 或带有 SORT 的 LIMIT。否则,如果将 LIMIT 或带有 SORT 的 LIMIT 下推并且 numPartitions 等于 1,Spark 不会对来自数据源的结果应用 LIMIT 或带有 SORT 的 LIMIT。 读取
pushDownOffset true 启用或禁用 OFFSET 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 会将 OFFSET 下推到 JDBC 数据源。否则,如果设置为 false,Spark 不会尝试将 OFFSET 下推到 JDBC 数据源。如果 pushDownOffset 为 true 且 numPartitions 等于 1,OFFSET 将被下推到 JDBC 数据源。否则,OFFSET 不会被下推,Spark 仍然会对来自数据源的结果应用 OFFSET。 读取
pushDownTableSample true 启用或禁用 TABLESAMPLE 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将 TABLESAMPLE 下推到 JDBC 数据源。否则,如果值设置为 false,则不会将 TABLESAMPLE 下推到 JDBC 数据源。 读取
keytab (无) JDBC 客户端的 Kerberos keytab 文件的位置(必须通过 spark-submit 的 --files 选项或手动预先上传到所有节点)。当找到路径信息时,Spark 认为 keytab 是手动分发的,否则假设 --files。如果同时定义了 keytabprincipal,则 Spark 会尝试进行 Kerberos 身份验证。 读/写
principal (无) 指定 JDBC 客户端的 Kerberos 主体名称。如果同时定义了 keytabprincipal,则 Spark 会尝试进行 Kerberos 身份验证。 读/写
refreshKrb5Config false 此选项控制在建立新连接之前是否要为 JDBC 客户端刷新 Kerberos 配置。如果要刷新配置,则设置为 true,否则设置为 false。默认值为 false。请注意,如果将此选项设置为 true 并尝试建立多个连接,则可能会发生竞争条件。一种可能的情况如下所示。
  1. refreshKrb5Config 标志设置为安全上下文 1
  2. JDBC 连接提供程序用于相应的 DBMS
  3. krb5.conf 已修改,但 JVM 尚未意识到它必须重新加载
  4. Spark 成功地为安全上下文 1 进行了身份验证
  5. JVM 从修改后的 krb5.conf 加载安全上下文 2
  6. Spark 恢复了先前保存的安全上下文 1
  7. 修改后的 krb5.conf 内容已消失
读/写
connectionProvider (无) 用于连接到此 URL 的 JDBC 连接提供程序的名称,例如 db2mssql。必须是使用 JDBC 数据源加载的提供程序之一。用于在多个提供程序可以处理指定的驱动程序和选项时进行区分。所选提供程序不得被 spark.sql.sources.disabledJdbcConnProviderList 禁用。 读/写
preferTimestampNTZ false 当选项设置为 true 时,所有时间戳都被推断为 TIMESTAMP WITHOUT TIME ZONE。否则,时间戳将以本地时区读取。 读取

请注意,JDBC 驱动程序并不总是支持使用 keytab 进行 Kerberos 身份验证。
在使用 keytabprincipal 配置选项之前,请确保满足以下要求

以下数据库存在内置连接提供程序

如果未满足要求,请考虑使用 JdbcConnectionProvider 开发人员 API 来处理自定义身份验证。

# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})
在 Spark 存储库的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala”中找到完整的示例代码。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Specifying create table column data types on write
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java”中找到完整的示例代码。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
在 Spark 存储库的“examples/src/main/r/RSparkSQLExample.R”中找到完整的示例代码。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename",
  user 'username',
  password 'password'
)

INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable