JDBC 连接到其他数据库

Spark SQL 还包括一个数据源,可以使用 JDBC 从其他数据库读取数据。 应该优先使用此功能,而不是使用 JdbcRDD。 这是因为结果作为 DataFrame 返回,它们可以在 Spark SQL 中轻松处理或与其他数据源连接。 JDBC 数据源也更容易从 Java 或 Python 中使用,因为它不需要用户提供 ClassTag。(请注意,这与 Spark SQL JDBC 服务器不同,后者允许其他应用程序使用 Spark SQL 运行查询)。

要开始使用,您需要将特定数据库的 JDBC 驱动程序包含在 spark 类路径中。 例如,要从 Spark Shell 连接到 postgres,您可以运行以下命令

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

数据源选项

Spark 支持以下不区分大小写的 JDBC 选项。JDBC 的数据源选项可以通过以下方式设置

对于连接属性,用户可以在数据源选项中指定 JDBC 连接属性。userpassword 通常作为连接属性提供,用于登录到数据源。

属性名称默认值含义范围
url (无) 形式为 jdbc:subprotocol:subname 的 JDBC URL,用于连接到。 特定于源的连接属性可以在 URL 中指定。例如,jdbc:postgresql://localhost/test?user=fred&password=secret 读/写
dbtable (无) 应该从中读取或写入的 JDBC 表。 请注意,在读取路径中使用时,可以使用 SQL 查询的 FROM 子句中有效的任何内容。 例如,您也可以使用括号中的子查询,而不是完整的表。 不允许同时指定 dbtablequery 选项。 读/写
query (无) 一个将用于将数据读入 Spark 的查询。 指定的查询将用括号括起来,并在 FROM 子句中用作子查询。 Spark 还将为子查询子句分配一个别名。 例如,spark 将向 JDBC 源发出以下形式的查询。

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

以下是使用此选项的一些限制。
  1. 不允许同时指定 dbtablequery 选项。
  2. 不允许同时指定 querypartitionColumn 选项。 当指定 partitionColumn 选项是必需的,子查询可以使用 dbtable 选项代替指定,并且分区列可以使用作为 dbtable 的一部分提供的子查询别名来限定。
    示例
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .load()
读/写
prepareQuery (无) 将与 query 一起形成最终查询的前缀。 由于指定的 query 将在 FROM 子句中用括号括起来作为子查询,并且某些数据库不支持子查询中的所有子句,因此 prepareQuery 属性提供了一种运行此类复杂查询的方法。 例如,spark 将向 JDBC 源发出以下形式的查询。

<prepareQuery> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

以下是一些示例。
  1. MSSQL Server 不接受子查询中的 WITH 子句,但可以将此类查询拆分为 prepareQueryquery
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "WITH t AS (SELECT x, y FROM tbl)")
    .option("query", "SELECT * FROM t WHERE x > 10")
    .load()
  2. MSSQL Server 不接受子查询中的临时表子句,但可以将此类查询拆分为 prepareQueryquery
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)")
    .option("query", "SELECT * FROM #TempTable")
    .load()
读/写
driver (无) 用于连接到此 URL 的 JDBC 驱动程序的类名。 读/写
partitionColumn, lowerBound, upperBound (无) 如果指定了其中任何一个选项,则必须指定所有这些选项。 此外,必须指定 numPartitions。 它们描述了从多个 worker 并行读取时如何对表进行分区。 partitionColumn 必须是相关表中的数字、日期或时间戳列。 请注意,lowerBoundupperBound 仅用于决定分区步幅,而不是用于过滤表中的行。 因此,表中的所有行都将被分区和返回。 此选项仅适用于读取。
示例
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
read
numPartitions (无) 可用于表读取和写入中的并行度的最大分区数。 这也决定了并发 JDBC 连接的最大数量。 如果要写入的分区数超过此限制,我们会通过在写入之前调用 coalesce(numPartitions) 将其减少到此限制。 读/写
queryTimeout 0 驱动程序将等待 Statement 对象执行到给定秒数的秒数。 零表示没有限制。 在写入路径中,此选项取决于 JDBC 驱动程序如何实现 API setQueryTimeout,例如,h2 JDBC 驱动程序检查每个查询的超时时间,而不是整个 JDBC 批处理。 读/写
fetchsize 0 JDBC 提取大小,它决定了每次往返要提取的行数。 这可以帮助 JDBC 驱动程序上的性能,这些驱动程序默认设置为较低的提取大小(例如,Oracle,10 行)。 read
batchsize 1000 JDBC 批处理大小,它决定了每次往返要插入的行数。 这可以帮助 JDBC 驱动程序上的性能。 此选项仅适用于写入。 write
isolationLevel READ_UNCOMMITTED 事务隔离级别,适用于当前连接。 它可以是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READSERIALIZABLE 之一,对应于 JDBC Connection 对象定义的标准事务隔离级别,默认值为 READ_UNCOMMITTED。 请参阅 java.sql.Connection 中的文档。 write
sessionInitStatement (无) 在每个数据库会话打开到远程数据库之后,以及在开始读取数据之前,此选项会执行自定义 SQL 语句(或 PL/SQL 块)。 使用此选项可实现会话初始化代码。 示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") read
truncate false 这是一个 JDBC 写入器相关选项。 启用 SaveMode.Overwrite 后,此选项会导致 Spark 截断现有表,而不是删除并重新创建它。 这样做效率更高,并且可以防止删除表元数据(例如,索引)。 但是,在某些情况下它不起作用,例如,当新数据具有不同的模式时。 如果出现故障,用户应关闭 truncate 选项以再次使用 DROP TABLE。 此外,由于 DBMS 之间 TRUNCATE TABLE 的行为不同,因此使用它并不总是安全的。 MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect 和 OracleDialect 支持此功能,而 PostgresDialect 和默认 JDBCDirect 不支持。 对于未知和不受支持的 JDBCDirect,用户选项 truncate 将被忽略。write
cascadeTruncate 相关 JDBC 数据库的默认级联截断行为,在每个 JDBCDialect 中的 isCascadeTruncate 中指定 这是一个与 JDBC 写入器相关的选项。如果启用并且 JDBC 数据库(目前是 PostgreSQL 和 Oracle)支持,则此选项允许执行 TRUNCATE TABLE t CASCADE(在 PostgreSQL 的情况下,执行 TRUNCATE TABLE ONLY t CASCADE 以防止意外截断子表)。 这将影响其他表,因此应谨慎使用。 write
createTableOptions 这是一个与 JDBC 写入器相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如,CREATE TABLE t (name string) ENGINE=InnoDB.)。 write
createTableColumnTypes (无) 创建表时要使用的数据库列数据类型,而不是默认类型。 数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。 指定的类型应该是有效的 Spark SQL 数据类型。 write
customSchema (无) 用于从 JDBC 连接器读取数据的自定义 schema。 例如,"id DECIMAL(38, 0), name STRING"。 您还可以指定部分字段,其他字段使用默认类型映射。 例如,"id DECIMAL(38, 0)"。 列名应与 JDBC 表的相应列名相同。 用户可以指定 Spark SQL 的对应数据类型,而不是使用默认类型。 read
pushDownPredicate true 用于启用或禁用将谓词下推到 JDBC 数据源的选项。 默认值为 true,在这种情况下,Spark 会尽可能将过滤器下推到 JDBC 数据源。 否则,如果设置为 false,则不会将任何过滤器下推到 JDBC 数据源,因此所有过滤器将由 Spark 处理。 当谓词过滤由 Spark 执行速度快于由 JDBC 数据源执行时,通常会关闭谓词下推。 read
pushDownAggregate true 用于启用或禁用 V2 JDBC 数据源中的聚合下推的选项。 默认值为 true,在这种情况下,Spark 会将聚合下推到 JDBC 数据源。 否则,如果设置为 false,则聚合将不会下推到 JDBC 数据源。 当聚合由 Spark 执行速度快于由 JDBC 数据源执行时,通常会关闭聚合下推。 请注意,只有在所有聚合函数和相关过滤器都可以下推时,才能下推聚合。 如果 numPartitions 等于 1 或 group by 键与 partitionColumn 相同,Spark 将完全将聚合下推到数据源,而不会对数据源输出应用最终聚合。 否则,Spark 将对数据源输出应用最终聚合。 read
pushDownLimit true 用于启用或禁用将 LIMIT 下推到 V2 JDBC 数据源的选项。 LIMIT 下推还包括 LIMIT + SORT,也称为 Top N 运算符。 默认值为 true,在这种情况下,Spark 会将 LIMIT 或带有 SORT 的 LIMIT 下推到 JDBC 数据源。 否则,如果设置为 false,则 LIMIT 或带有 SORT 的 LIMIT 不会下推到 JDBC 数据源。 如果 numPartitions 大于 1,即使 LIMIT 或带有 SORT 的 LIMIT 被下推,Spark 仍然会对来自数据源的结果应用 LIMIT 或带有 SORT 的 LIMIT。 否则,如果 LIMIT 或带有 SORT 的 LIMIT 被下推且 numPartitions 等于 1,Spark 将不会对来自数据源的结果应用 LIMIT 或带有 SORT 的 LIMIT。 read
pushDownOffset true 用于启用或禁用将 OFFSET 下推到 V2 JDBC 数据源的选项。 默认值为 true,在这种情况下,Spark 会将 OFFSET 下推到 JDBC 数据源。 否则,如果设置为 false,Spark 将不会尝试将 OFFSET 下推到 JDBC 数据源。 如果 pushDownOffset 为 true 且 numPartitions 等于 1,则 OFFSET 将下推到 JDBC 数据源。 否则,OFFSET 将不会下推,并且 Spark 仍然会对来自数据源的结果应用 OFFSET。 read
pushDownTableSample true 用于启用或禁用将 TABLESAMPLE 下推到 V2 JDBC 数据源的选项。 默认值为 true,在这种情况下,Spark 会将 TABLESAMPLE 下推到 JDBC 数据源。 否则,如果值设置为 false,则 TABLESAMPLE 不会下推到 JDBC 数据源。 read
keytab (无) Kerberos keytab 文件的位置(必须通过 spark-submit 的 --files 选项或手动预先上传到所有节点)以供 JDBC 客户端使用。 找到路径信息后,Spark 认为 keytab 是手动分发的,否则假设为 --files。 如果同时定义了 keytabprincipal,则 Spark 尝试进行 Kerberos 身份验证。 读/写
principal (无) 指定 JDBC 客户端的 Kerberos 主体名称。 如果同时定义了 keytabprincipal,则 Spark 尝试进行 Kerberos 身份验证。 读/写
refreshKrb5Config false 此选项控制是否在 JDBC 客户端建立新连接之前刷新 Kerberos 配置。 如果要刷新配置,请设置为 true,否则设置为 false。 默认值为 false。 请注意,如果将此选项设置为 true 并尝试建立多个连接,则可能会发生争用情况。 一种可能的情况如下所示。
  1. refreshKrb5Config 标志设置为安全上下文 1
  2. JDBC 连接提供程序用于相应的 DBMS
  3. krb5.conf 已修改,但 JVM 尚未意识到必须重新加载它
  4. Spark 成功验证安全上下文 1
  5. JVM 从修改后的 krb5.conf 加载安全上下文 2
  6. Spark 恢复先前保存的安全上下文 1
  7. 修改后的 krb5.conf 内容刚刚消失
读/写
connectionProvider (无) 用于连接到此 URL 的 JDBC 连接提供程序的名称,例如 db2mssql。 必须是与 JDBC 数据源一起加载的提供程序之一。 用于在多个提供程序可以处理指定的驱动程序和选项时消除歧义。 选择的提供程序不得被 spark.sql.sources.disabledJdbcConnProviderList 禁用。 读/写
preferTimestampNTZ false 当此选项设置为 true 时,TIMESTAMP WITHOUT TIME ZONE 类型将被推断为 Spark 的 TimestampNTZ 类型。 否则,它将被解释为 Spark 的 Timestamp 类型(相当于 TIMESTAMP WITH LOCAL TIME ZONE)。 此设置专门影响 TIMESTAMP WITHOUT TIME ZONE 数据类型的推断。 无论此设置如何,TIMESTAMP WITH LOCAL TIME ZONE 和 TIMESTAMP WITH TIME ZONE 数据类型都始终如一地解释为 Spark 的 Timestamp 类型。 read

请注意,JDBC 驱动程序并非总是支持使用 keytab 的 Kerberos 身份验证。
在使用 keytabprincipal 配置选项之前,请确保满足以下要求

以下数据库有内置连接提供程序

如果不满足要求,请考虑使用 JdbcConnectionProvider 开发者 API 来处理自定义身份验证。

# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})
在 Spark 代码库的 "examples/src/main/python/sql/datasource.py" 中查找完整的示例代码。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
在 Spark 代码库的 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" 中查找完整的示例代码。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Specifying create table column data types on write
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
在 Spark 代码库的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" 中查找完整的示例代码。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
在 Spark 代码库的 "examples/src/main/r/RSparkSQLExample.R" 中查找完整的示例代码。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename",
  user 'username',
  password 'password'
)

INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable