JDBC 连接其他数据库

Spark SQL 还包含一个数据源,可以使用 JDBC 从其他数据库读取数据。此功能应优先于使用 JdbcRDD。这是因为结果作为 DataFrame 返回,并且可以轻松地在 Spark SQL 中处理或与其他数据源连接。JDBC 数据源也更容易从 Java 或 Python 中使用,因为它不需要用户提供 ClassTag。(请注意,这与 Spark SQL JDBC 服务器不同,后者允许其他应用程序使用 Spark SQL 运行查询)。

要开始使用,您需要在 Spark classpath 中包含特定数据库的 JDBC 驱动程序。例如,要从 Spark Shell 连接到 Postgres,您可以运行以下命令:

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

数据源选项

Spark 支持以下用于 JDBC 的不区分大小写的选项。JDBC 的数据源选项可以通过以下方式设置:

对于连接属性,用户可以在数据源选项中指定 JDBC 连接属性。通常提供 userpassword 作为连接属性以登录数据源。

属性名称默认值含义范围
url (无) 要连接的 JDBC URL,格式为 jdbc:subprotocol:subname。可以在 URL 中指定特定于源的连接属性。例如,jdbc:postgresql://localhost/test?user=fred&password=secret 读/写
dbtable (无) 应从中读取或写入的 JDBC 表。请注意,当在读取路径中使用它时,任何在 SQL 查询的 FROM 子句中有效的内容都可以使用。例如,除了完整的表,您还可以使用括号中的子查询。不允许同时指定 dbtablequery 选项。 读/写
query (无) 将用于将数据读入 Spark 的查询。指定的查询将被括号括起来并用作 FROM 子句中的子查询。Spark 还会为子查询子句分配一个别名。例如,Spark 将向 JDBC Source 发出以下形式的查询。

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

以下是使用此选项时的一些限制。
  1. 不允许同时指定 dbtablequery 选项。
  2. 不允许同时指定 querypartitionColumn 选项。当需要指定 partitionColumn 选项时,可以使用 dbtable 选项指定子查询,并且分区列可以使用作为 dbtable 一部分提供的子查询别名进行限定。
    示例
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .load()
读取
prepareQuery (无) 一个前缀,将与 query 一起构成最终查询。由于指定的 query 将作为子查询在 FROM 子句中加括号,而某些数据库不支持子查询中的所有子句,因此 prepareQuery 属性提供了一种运行此类复杂查询的方法。例如,Spark 将向 JDBC Source 发出以下形式的查询。

<prepareQuery> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

以下是几个示例。
  1. MSSQL Server 不接受子查询中的 WITH 子句,但可以将此类查询拆分为 prepareQueryquery
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "WITH t AS (SELECT x, y FROM tbl)")
    .option("query", "SELECT * FROM t WHERE x > 10")
    .load()
  2. MSSQL Server 不接受子查询中的临时表子句,但可以将此类查询拆分为 prepareQueryquery
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)")
    .option("query", "SELECT * FROM #TempTable")
    .load()
读/写
driver (无) 用于连接到此 URL 的 JDBC 驱动程序的类名。 读/写
partitionColumn, lowerBound, upperBound (无) 如果指定了其中任何一个,则必须指定所有这些选项。此外,必须指定 numPartitions。它们描述了在从多个工作节点并行读取时如何对表进行分区。partitionColumn 必须是相关表中的数字、日期或时间戳列。请注意,lowerBoundupperBound 仅用于确定分区步长,而不是用于过滤表中的行。因此,表中的所有行都将被分区并返回。此选项仅适用于读取。
示例
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
读取
numPartitions (无) 在表读取和写入中可用于并行度的最大分区数。这还决定了并发 JDBC 连接的最大数量。如果写入的分区数超过此限制,我们会在写入之前通过调用 coalesce(numPartitions) 将其减少到此限制。 读/写
queryTimeout 0 驱动程序将等待 Statement 对象执行到给定秒数的时间。零表示没有限制。在写入路径中,此选项取决于 JDBC 驱动程序如何实现 API setQueryTimeout,例如,h2 JDBC 驱动程序检查每个查询的超时,而不是整个 JDBC 批处理。 读/写
fetchsize 0 JDBC 抓取大小,它决定了每次往返要抓取的行数。这有助于改善默认抓取大小较低的 JDBC 驱动程序(例如 Oracle 默认 10 行)的性能。 读取
batchsize 1000 JDBC 批处理大小,它决定了每次往返要插入的行数。这有助于改善 JDBC 驱动程序的性能。此选项仅适用于写入。 写入
isolationLevel READ_UNCOMMITTED 事务隔离级别,适用于当前连接。它可以是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READSERIALIZABLE 之一,对应于 JDBC 的 Connection 对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED。请参阅 java.sql.Connection 中的文档。 写入
sessionInitStatement (无) 在每个数据库会话打开到远程 DB 并开始读取数据之前,此选项执行自定义 SQL 语句(或 PL/SQL 块)。使用此选项可实现会话初始化代码。示例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") 读取
truncate false 这是一个与 JDBC 写入器相关的选项。当启用 SaveMode.Overwrite 时,此选项使 Spark 截断现有表而不是删除并重新创建它。这可能更高效,并防止表元数据(例如索引)被移除。但是,在某些情况下它将不起作用,例如当新数据具有不同的模式时。如果出现故障,用户应关闭 truncate 选项以再次使用 DROP TABLE。此外,由于不同 DBMS 之间 TRUNCATE TABLE 的行为不同,使用此选项并不总是安全的。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect 和 OracleDialect 支持此选项,而 PostgresDialect 和默认 JDBCDialect 不支持。对于未知和不支持的 JDBCDialect,用户选项 truncate 被忽略。 写入
cascadeTruncate 相关 JDBC 数据库的默认级联截断行为,在每个 JDBCDialect 的 isCascadeTruncate 中指定 这是一个与 JDBC 写入器相关的选项。如果启用且 JDBC 数据库支持(目前是 PostgreSQL 和 Oracle),此选项允许执行 TRUNCATE TABLE t CASCADE(对于 PostgreSQL,执行 TRUNCATE TABLE ONLY t CASCADE 以防止意外截断后代表)。这会影响其他表,因此应谨慎使用。 写入
createTableOptions 这是一个与 JDBC 写入器相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如,CREATE TABLE t (name string) ENGINE=InnoDB.)。 写入
createTableColumnTypes (无) 创建表时使用的数据库列数据类型,而不是默认值。数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:"name CHAR(64), comments VARCHAR(1024)")。指定的类型应为有效的 Spark SQL 数据类型。 写入
customSchema (无) 用于从 JDBC 连接器读取数据的自定义模式。例如,"id DECIMAL(38, 0), name STRING"。您还可以指定部分字段,其他字段使用默认类型映射。例如,"id DECIMAL(38, 0)"。列名应与 JDBC 表的相应列名相同。用户可以指定 Spark SQL 的相应数据类型,而不是使用默认值。 读取
pushDownPredicate true 启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将尽可能多地将过滤器下推到 JDBC 数据源。否则,如果设置为 false,则不会将任何过滤器下推到 JDBC 数据源,因此所有过滤器都将由 Spark 处理。当 Spark 比 JDBC 数据源执行谓词过滤更快时,通常会关闭谓词下推。 读取
pushDownAggregate true 启用或禁用 V2 JDBC 数据源中聚合下推的选项。默认值为 true,在这种情况下,Spark 将聚合下推到 JDBC 数据源。否则,如果设置为 false,则聚合不会下推到 JDBC 数据源。当 Spark 比 JDBC 数据源执行聚合更快时,通常会关闭聚合下推。请注意,当且仅当所有聚合函数和相关过滤器都可以下推时,才能下推聚合。如果 numPartitions 等于 1 或 group by 键与 partitionColumn 相同,Spark 将完全将聚合下推到数据源,并且不会对数据源输出应用最终聚合。否则,Spark 将对数据源输出应用最终聚合。 读取
pushDownLimit true 启用或禁用 LIMIT 下推到 V2 JDBC 数据源的选项。LIMIT 下推还包括 LIMIT + SORT,也称为 Top N 运算符。默认值为 true,在这种情况下,Spark 将 LIMIT 或带 SORT 的 LIMIT 下推到 JDBC 数据源。否则,如果设置为 false,则 LIMIT 或带 SORT 的 LIMIT 不会下推到 JDBC 数据源。如果 numPartitions 大于 1,即使 LIMIT 或带 SORT 的 LIMIT 被下推,Spark 仍然会对数据源结果应用 LIMIT 或带 SORT 的 LIMIT。否则,如果 LIMIT 或带 SORT 的 LIMIT 被下推并且 numPartitions 等于 1,Spark 将不会对数据源结果应用 LIMIT 或带 SORT 的 LIMIT。 读取
pushDownOffset true 启用或禁用 OFFSET 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将 OFFSET 下推到 JDBC 数据源。否则,如果设置为 false,Spark 将不会尝试将 OFFSET 下推到 JDBC 数据源。如果 pushDownOffset 为 true 并且 numPartitions 等于 1,则 OFFSET 将下推到 JDBC 数据源。否则,OFFSET 将不会下推,Spark 仍然会对数据源结果应用 OFFSET。 读取
pushDownTableSample true 启用或禁用 TABLESAMPLE 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将 TABLESAMPLE 下推到 JDBC 数据源。否则,如果设置为 false,TABLESAMPLE 将不会下推到 JDBC 数据源。 读取
keytab (无) Kerberos keytab 文件的位置(必须通过 spark-submit 的 --files 选项或手动预上传到所有节点)用于 JDBC 客户端。如果找到路径信息,则 Spark 认为 keytab 是手动分发的,否则假定为 --files。如果同时定义了 keytabprincipal,则 Spark 会尝试进行 kerberos 认证。 读/写
principal (无) 指定 JDBC 客户端的 Kerberos 主体名称。如果同时定义了 keytabprincipal,则 Spark 会尝试进行 Kerberos 认证。 读/写
refreshKrb5Config false 此选项控制 JDBC 客户端在建立新连接之前是否刷新 Kerberos 配置。如果您想刷新配置,请将其设置为 true,否则设置为 false。默认值为 false。请注意,如果您将此选项设置为 true 并尝试建立多个连接,则可能会发生竞态条件。一种可能的情况如下所示。
  1. refreshKrb5Config 标志设置为安全上下文 1
  2. 使用 JDBC 连接提供程序连接到相应的 DBMS
  3. krb5.conf 已修改,但 JVM 尚未意识到必须重新加载
  4. Spark 成功认证安全上下文 1
  5. JVM 从修改后的 krb5.conf 加载安全上下文 2
  6. Spark 恢复之前保存的安全上下文 1
  7. 修改后的 krb5.conf 内容丢失
读/写
connectionProvider (无) 用于连接到此 URL 的 JDBC 连接提供程序的名称,例如 db2mssql。必须是使用 JDBC 数据源加载的提供程序之一。用于在多个提供程序可以处理指定的驱动程序和选项时消除歧义。选定的提供程序不得被 spark.sql.sources.disabledJdbcConnProviderList 禁用。 读/写
preferTimestampNTZ false 当选项设置为 true 时,TIMESTAMP WITHOUT TIME ZONE 类型被推断为 Spark 的 TimestampNTZ 类型。否则,它被解释为 Spark 的 Timestamp 类型(相当于 TIMESTAMP WITH LOCAL TIME ZONE)。此设置仅影响 TIMESTAMP WITHOUT TIME ZONE 数据类型的推断。TIMESTAMP WITH LOCAL TIME ZONE 和 TIMESTAMP WITH TIME ZONE 数据类型无论此设置如何,都被一致地解释为 Spark 的 Timestamp 类型。 读取
hint (无) 此选项用于指定读取的提示。支持的提示格式是 C 风格注释的变体:它需要以 /*+ ` 开头,以 */ 结尾。目前,此选项仅支持 MySQLDialect、OracleDialect 和 DatabricksDialect。 读取

请注意,JDBC 驱动程序并非总是支持使用 keytab 进行 Kerberos 认证。
在使用 keytabprincipal 配置选项之前,请确保满足以下要求:

以下数据库有内置的连接提供程序:

如果要求不满足,请考虑使用 JdbcConnectionProvider 开发者 API 处理自定义认证。

# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})
在 Spark 仓库中查找完整示例代码,路径为 "examples/src/main/python/sql/datasource.py"。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
在 Spark 仓库中查找完整示例代码,路径为 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Specifying create table column data types on write
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
在 Spark 仓库中查找完整示例代码,路径为 "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
在 Spark 仓库中查找完整示例代码,路径为 "examples/src/main/r/RSparkSQLExample.R"。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename",
  user 'username',
  password 'password'
)

INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable

数据类型映射

MySQL 到 Spark SQL 数据类型映射

下表描述了当使用内置 jdbc 数据源和 MySQL Connector/J 作为激活的 JDBC 驱动程序从 MySQL 表读取数据时,MySQL 数据类型到 Spark SQL 数据类型的转换。请注意,不同的 JDBC 驱动程序(例如也可用于连接 MySQL 的 Maria Connector/J)可能具有不同的映射规则。

MySQL 数据类型 Spark SQL 数据类型 备注
BIT(1) BooleanType
BIT( >1 ) BinaryType (默认)
BIT( >1 ) LongType spark.sql.legacy.mysql.bitArrayMapping.enabled=true
TINYINT(1) BooleanType
TINYINT(1) ByteType tinyInt1isBit=false
BOOLEAN BooleanType
BOOLEAN ByteType tinyInt1isBit=false
TINYINT( >1 ) ByteType
TINYINT( any ) UNSIGNED ShortType
SMALLINT ShortType
SMALLINT UNSIGNED IntegerType
MEDIUMINT [UNSIGNED] IntegerType
INT IntegerType
INT UNSIGNED LongType
BIGINT LongType
BIGINT UNSIGNED DecimalType(20,0)
FLOAT FloatType
FLOAT UNSIGNED DoubleType
DOUBLE [UNSIGNED] DoubleType
DECIMAL(p,s) [UNSIGNED] DecimalType(min(38, p),(min(18,s))) 列类型限制为 DecimalType(38, 18),如果 'p>38',则小数部分将被截断。如果此列的任何值的实际精度大于 38,则会因 NUMERIC_VALUE_OUT_OF_RANGE.WITHOUT_SUGGESTION 错误而失败。
DATE DateType
DATETIME TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
DATETIME TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
TIMESTAMP TimestampType
TIME TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
TIME TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
YEAR DateType yearIsDateType=true
YEAR IntegerType yearIsDateType=false
CHAR(n) CharType(n)
VARCHAR(n) VarcharType(n)
BINARY(n) BinaryType
VARBINARY(n) BinaryType
CHAR(n) BINARY BinaryType
VARCHAR(n) BINARY BinaryType
BLOB BinaryType
TINYBLOB BinaryType
MEDIUMBLOB BinaryType
LONGBLOB BinaryType
TEXT StringType
TINYTEXT StringType
MEDIUMTEXT StringType
LONGTEXT StringType
JSON StringType
GEOMETRY BinaryType
ENUM CharType(n)
SET CharType(n)

Spark SQL 到 MySQL 数据类型映射

下表描述了当使用内置 jdbc 数据源和 MySQL Connector/J 作为激活的 JDBC 驱动程序创建、修改或写入数据到 MySQL 表时,Spark SQL 数据类型到 MySQL 数据类型的转换。

请注意,不同的 JDBC 驱动程序(例如也可用于连接 MySQL 的 Maria Connector/J)可能具有不同的映射规则。

Spark SQL 数据类型 MySQL 数据类型 备注
BooleanType BIT(1)
ByteType TINYINT
ShortType SMALLINT 对于 Spark 3.5 及之前版本,它映射到 INTEGER
IntegerType INTEGER
LongType BIGINT
FloatType FLOAT
DoubleType DOUBLE PRECISION
DecimalType(p, s) DECIMAL(p,s)
DateType DATE
TimestampType TIMESTAMP
TimestampNTZType DATETIME
StringType LONGTEXT
BinaryType BLOB
CharType(n) CHAR(n)
VarcharType(n) VARCHAR(n)

以下 Spark Catalyst 数据类型不支持对应的 MYSQL 类型。

PostgreSQL 到 Spark SQL 数据类型映射

下表描述了当使用内置 jdbc 数据源和 PostgreSQL JDBC Driver 作为激活的 JDBC 驱动程序从 Postgres 表读取数据时,PostgreSQL 数据类型到 Spark SQL 数据类型的转换。请注意,不同的 JDBC 驱动程序或不同的版本可能导致略有不同。

PostgreSQL 数据类型 Spark SQL 数据类型 备注
boolean BooleanType
smallint, smallserial ShortType
integer, serial IntegerType
bigint, bigserial LongType
float, float(p), real FloatType 1 ≤ p ≤ 24
float(p) DoubleType 25 ≤ p ≤ 53
double precision DoubleType
numeric, decimal DecimalType
  • 自 PostgreSQL 15 以来,'s' 可以是负数。如果 's<0',它将被调整为 DecimalType(min(p-s, 38), 0);否则为 DecimalType(p, s)。
  • 如果 'p>38',则小数部分将被截断。如果此列的任何值的实际精度大于 38,则会因 NUMERIC_VALUE_OUT_OF_RANGE.WITHOUT_SUGGESTION 错误而失败。
  • 不支持特殊数值,'NaN'、'infinity' 和 '-infinity'
character varying(n), varchar(n) VarcharType(n)
character(n), char(n), bpchar(n) CharType(n)
bpchar StringType
text StringType
bytea BinaryType
date DateType
timestamp [ (p) ] [ without time zone ] TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
timestamp [ (p) ] [ without time zone ] TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
timestamp [ (p) ] with time zone TimestampType
time [ (p) ] [ without time zone ] TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
time [ (p) ] [ without time zone ] TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
time [ (p) ] with time zone TimestampType
interval [ fields ] [ (p) ] StringType
ENUM StringType
money StringType 货币类型
inet, cidr, macaddr, macaddr8 StringType 网络地址类型
point, line, lseg, box, path, polygon, circle StringType 几何类型
pg_lsn StringType 日志序列号
bit, bit(1) BooleanType
bit( >1 ) BinaryType
bit varying( any ) BinaryType
tsvector, tsquery StringType 文本搜索类型
uuid StringType 通用唯一标识符类型
xml StringType XML 类型
json, jsonb StringType JSON 类型
array ArrayType
复合类型 StringType 通过 CREATE TYPE 语法创建的类型。
int4range, int8range, numrange, tsrange, tstzrange, daterange, etc StringType 范围类型
域类型 (由底层类型决定)
oid DecimalType(20, 0) 对象标识符类型
regxxx StringType 对象标识符类型
void NullType void 是 Postgres 伪类型,其他伪类型尚未验证

Spark SQL 到 PostgreSQL 数据类型映射

下表描述了当使用内置 jdbc 数据源和 PostgreSQL JDBC Driver 作为激活的 JDBC 驱动程序创建、修改或写入数据到 PostgreSQL 表时,Spark SQL 数据类型到 PostgreSQL 数据类型的转换。

Spark SQL 数据类型 PostgreSQL 数据类型 备注
BooleanType boolean
ByteType smallint
ShortType smallint
IntegerType integer
LongType bigint
FloatType float4
DoubleType float8
DecimalType(p, s) numeric(p,s)
DateType date
TimestampType timestamp with time zone 在 Spark 4.0 之前,它被映射为 timestamp。请参阅迁移指南以获取更多信息
TimestampNTZType timestamp
StringType text
BinaryType bytea
CharType(n) CHAR(n)
VarcharType(n) VARCHAR(n)
ArrayType
元素类型 PG 数组
BooleanType boolean[]
ByteType smallint[]
ShortType smallint[]
IntegerType integer[]
LongType bigint[]
FloatType float4[]
DoubleType float8[]
DecimalType(p, s) numeric(p,s)[]
DateType date[]
TimestampType timestamp[]
TimestampNTZType timestamp[]
StringType text[]
BinaryType bytea[]
CharType(n) char(n)[]
VarcharType(n) varchar(n)[]
如果元素类型是 ArrayType,它将转换为 Postgres 多维数组。
例如,
ArrayType(ArrayType(StringType)) 转换为 text[][]
ArrayType(ArrayType(ArrayType(LongType))) 转换为 bigint[][][]

以下 Spark Catalyst 数据类型不支持对应的 PostgreSQL 类型。

Oracle 到 Spark SQL 数据类型映射

下表描述了当使用内置 jdbc 数据源和 Oracle JDBC 作为激活的 JDBC 驱动程序从 Oracle 表读取数据时,Oracle 数据类型到 Spark SQL 数据类型的转换。

Oracle 数据类型 Spark SQL 数据类型 备注
BOOLEAN BooleanType 自 Oracle Release 23c 引入
NUMBER[(p[,s])] DecimalType(p,s) Oracle 中 's' 可以是负数。如果 's<0',它将被调整为 DecimalType(min(p-s, 38), 0);否则为 DecimalType(p, s)。如果 'p>38',则小数部分将被截断。如果此列的任何值的实际精度大于 38,则会因 NUMERIC_VALUE_OUT_OF_RANGE.WITHOUT_SUGGESTION 错误而失败。
FLOAT[(p)] DecimalType(38, 10)
BINARY_FLOAT FloatType
BINARY_DOUBLE DoubleType
LONG BinaryType
RAW(size) BinaryType
LONG RAW BinaryType
DATE TimestampType 当 oracle.jdbc.mapDateToTimestamp=true 时,它遵循下面的 TIMESTAMP 行为
DATE DateType 当 oracle.jdbc.mapDateToTimestamp=false 时,它映射到 DateType
TIMESTAMP TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
TIMESTAMP TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
TIMESTAMP WITH TIME ZONE TimestampType
TIMESTAMP WITH LOCAL TIME ZONE TimestampType
INTERVAL YEAR TO MONTH YearMonthIntervalType
INTERVAL DAY TO SECOND DayTimeIntervalType
CHAR[(size [BYTE | CHAR])] CharType(size)
NCHAR[(size)] StringType
VARCHAR2(size [BYTE | CHAR]) VarcharType(size)
NVARCHAR2 StringType
ROWID/UROWID StringType
CLOB StringType
NCLOB StringType
BLOB BinaryType
BFILE 引发 UNRECOGNIZED_SQL_TYPE 错误

Spark SQL 到 Oracle 数据类型映射

下表描述了当使用内置 jdbc 数据源和 Oracle JDBC 作为激活的 JDBC 驱动程序创建、修改或写入数据到 Oracle 表时,Spark SQL 数据类型到 Oracle 数据类型的转换。

Spark SQL 数据类型 Oracle 数据类型 备注
BooleanType NUMBER(1, 0) BooleanType 映射到 NUMBER(1, 0),因为 BOOLEAN 是自 Oracle Release 23c 引入的
ByteType NUMBER(3)
ShortType NUMBER(5)
IntegerType NUMBER(10)
LongType NUMBER(19)
FloatType NUMBER(19, 4)
DoubleType NUMBER(19, 4)
DecimalType(p, s) NUMBER(p,s)
DateType DATE
TimestampType TIMESTAMP WITH LOCAL TIME ZONE
TimestampNTZType TIMESTAMP
StringType VARCHAR2(255) 由于历史原因,字符串值最大长度为 255 个字符
BinaryType BLOB
CharType(n) CHAR(n)
VarcharType(n) VARCHAR2(n)

以下 Spark Catalyst 数据类型不支持对应的 Oracle 类型。

Microsoft SQL Server 到 Spark SQL 数据类型映射

下表描述了当使用内置 jdbc 数据源和 mssql-jdbc 作为激活的 JDBC 驱动程序从 Microsoft SQL Server 表读取数据时,Microsoft SQL Server 数据类型到 Spark SQL 数据类型的转换。

SQL Server 数据类型 Spark SQL 数据类型 备注
bit BooleanType
tinyint ShortType
smallint ShortType
int IntegerType
bigint LongType
float(p), real FloatType 1 ≤ p ≤ 24
float[(p)] DoubleType 25 ≤ p ≤ 53
double precision DoubleType
smallmoney DecimalType(10, 4)
money DecimalType(19, 4)
decimal[(p[, s])], numeric[(p[, s])] DecimalType(p, s)
date DateType
datetime TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
datetime TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
datetime2 [ (fractional seconds precision) ] TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
datetime2 [ (fractional seconds precision) ] TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
datetimeoffset [ (fractional seconds precision) ] TimestampType
smalldatetime TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
smalldatetime TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
time [ (fractional second scale) ] TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
time [ (fractional second scale) ] TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
binary [ ( n ) ] BinaryType
varbinary [ ( n | max ) ] BinaryType
char [ ( n ) ] CharType(n)
varchar [ ( n | max ) ] VarcharType(n)
nchar [ ( n ) ] StringType
nvarchar [ ( n | max ) ] StringType
text StringType
ntext StringType
image StringType
geography BinaryType
geometry BinaryType
rowversion BinaryType
sql_variant 引发 UNRECOGNIZED_SQL_TYPE 错误

Spark SQL 到 Microsoft SQL Server 数据类型映射

下表描述了当使用内置 jdbc 数据源和 mssql-jdbc 作为激活的 JDBC 驱动程序创建、修改或写入数据到 Microsoft SQL Server 表时,Spark SQL 数据类型到 Microsoft SQL Server 数据类型的转换。

Spark SQL 数据类型 SQL Server 数据类型 备注
BooleanType bit
ByteType smallint 自 Spark 4.0.0 起支持,之前版本会抛出错误
ShortType smallint
IntegerType int
LongType bigint
FloatType real
DoubleType double precision
DecimalType(p, s) number(p,s)
DateType date
TimestampType datetime
TimestampNTZType datetime
StringType nvarchar(max)
BinaryType varbinary(max)
CharType(n) char(n)
VarcharType(n) varchar(n)

以下 Spark Catalyst 数据类型不支持对应的 SQL Server 类型。

DB2 到 Spark SQL 数据类型映射

下表描述了当使用内置 jdbc 数据源和 IBM Data Server Driver For JDBC and SQLJ 作为激活的 JDBC 驱动程序从 DB2 表读取数据时,DB2 数据类型到 Spark SQL 数据类型的转换。

DB2 数据类型 Spark SQL 数据类型 备注
BOOLEAN BinaryType
SMALLINT ShortType
INTEGER IntegerType
BIGINT LongType
REAL FloatType
DOUBLE, FLOAT DoubleType FLOAT 在 db2 中是双精度浮点数
DECIMAL, NUMERIC, DECFLOAT DecimalType
DATE DateType
TIMESTAMP, TIMESTAMP WITHOUT TIME ZONE TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
TIMESTAMP, TIMESTAMP WITHOUT TIME ZONE TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
TIMESTAMP WITH TIME ZONE TimestampType
TIME TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
TIME TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
CHAR(n) CharType(n)
VARCHAR(n) VarcharType(n)
CHAR(n) FOR BIT DATA BinaryType
VARCHAR(n) FOR BIT DATA BinaryType
BINARY(n) BinaryType
VARBINARY(n) BinaryType
CLOB(n) StringType
DBCLOB(n) StringType
BLOB(n) BinaryType
GRAPHIC(n) StringType
VARGRAPHIC(n) StringType
XML StringType
ROWID StringType

Spark SQL 到 DB2 数据类型映射

下表描述了当使用内置 jdbc 数据源和 IBM Data Server Driver For JDBC and SQLJ 作为激活的 JDBC 驱动程序创建、修改或写入数据到 DB2 表时,Spark SQL 数据类型到 DB2 数据类型的转换。

Spark SQL 数据类型 DB2 数据类型 备注
BooleanType BOOLEAN
ByteType SMALLINT
ShortType SMALLINT
IntegerType INTEGER
LongType BIGINT
FloatType REAL
DoubleType DOUBLE PRECISION
DecimalType(p, s) DECIMAL(p,s) DB2 中 'p' 的最大值为 31,而 Spark 中为 38。当存储 DecimalType(p>=32, s) 到 DB2 时可能会失败
DateType DATE
TimestampType TIMESTAMP
TimestampNTZType TIMESTAMP
StringType CLOB
BinaryType BLOB
CharType(n) CHAR(n) DB2 中 'n' 的最大值为 255,而 Spark 中没有限制。
VarcharType(n) VARCHAR(n) DB2 中 'n' 的最大值为 255,而 Spark 中没有限制。

以下 Spark Catalyst 数据类型不支持对应的 DB2 类型。

Teradata 到 Spark SQL 数据类型映射

下表描述了当使用内置 jdbc 数据源和 Teradata JDBC Driver 作为激活的 JDBC 驱动程序从 Teradata 表读取数据时,Teradata 数据类型到 Spark SQL 数据类型的转换。

Teradata 数据类型 Spark SQL 数据类型 备注
BYTEINT ByteType
SMALLINT ShortType
INTEGER, INT IntegerType
BIGINT LongType
REAL, DOUBLE PRECISION, FLOAT DoubleType
DECIMAL, NUMERIC, NUMBER DecimalType
DATE DateType
TIMESTAMP, TIMESTAMP WITH TIME ZONE TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
TIMESTAMP, TIMESTAMP WITH TIME ZONE TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
TIME, TIME WITH TIME ZONE TimestampType (默认)preferTimestampNTZ=false 或 spark.sql.timestampType=TIMESTAMP_LTZ
TIME, TIME WITH TIME ZONE TimestampNTZType preferTimestampNTZ=true 或 spark.sql.timestampType=TIMESTAMP_NTZ
CHARACTER(n), CHAR(n), GRAPHIC(n) CharType(n)
VARCHAR(n), VARGRAPHIC(n) VarcharType(n)
BYTE(n), VARBYTE(n) BinaryType
CLOB StringType
BLOB BinaryType
INTERVAL 数据类型 - INTERVAL 数据类型尚不明确
周期数据类型、ARRAY、UDT - 不支持

Spark SQL 到 Teradata 数据类型映射

下表描述了当使用内置 jdbc 数据源和 Teradata JDBC Driver 作为激活的 JDBC 驱动程序创建、修改或写入数据到 Teradata 表时,Spark SQL 数据类型到 Teradata 数据类型的转换。

Spark SQL 数据类型 Teradata 数据类型 备注
BooleanType CHAR(1)
ByteType BYTEINT
ShortType SMALLINT
IntegerType INTEGER
LongType BIGINT
FloatType REAL
DoubleType DOUBLE PRECISION
DecimalType(p, s) DECIMAL(p,s)
DateType DATE
TimestampType TIMESTAMP
TimestampNTZType TIMESTAMP
StringType VARCHAR(255)
BinaryType BLOB
CharType(n) CHAR(n)
VarcharType(n) VARCHAR(n)

以下 Spark Catalyst 数据类型不支持对应的 Teradata 类型。