迁移指南:SQL、数据集和 DataFrame

从 Spark SQL 3.5 升级到 4.0

从 Spark SQL 3.5.3 升级到 3.5.4

从 Spark SQL 3.5.1 升级到 3.5.2

从 Spark SQL 3.5.0 升级到 3.5.1

从 Spark SQL 3.4 升级到 3.5

从 Spark SQL 3.3 升级到 3.4

从 Spark SQL 3.2 升级到 3.3

从 Spark SQL 3.1 升级到 3.2

从 Spark SQL 3.0 升级到 3.1

从 Spark SQL 3.0.1 升级到 3.0.2

从 Spark SQL 3.0 升级到 3.0.1

从 Spark SQL 2.4 升级到 3.0

Dataset/DataFrame API

DDL 语句

UDF 和内置函数

查询引擎

数据源

其他

从 Spark SQL 2.4.7 升级到 2.4.8

从 Spark SQL 2.4.5 升级到 2.4.6

从 Spark SQL 2.4.4 升级到 2.4.5

从 Spark SQL 2.4.3 升级到 2.4.4

从 Spark SQL 2.4 升级到 2.4.1

从 Spark SQL 2.3 升级到 2.4

从 Spark SQL 2.2 升级到 2.3

从 Spark SQL 2.1 升级到 2.2

从 Spark SQL 2.0 升级到 2.1

从 Spark SQL 1.6 升级到 2.0

从 Spark SQL 1.5 升级到 1.6

   ./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...
   

从 Spark SQL 1.4 升级到 1.5

从 Spark SQL 1.3 升级到 1.4

DataFrame 数据读写器接口

根据用户反馈,我们创建了一个新的、更流畅的 API,用于读取数据 (SQLContext.read) 和写入数据 (DataFrame.write),并弃用了旧的 API(例如,SQLContext.parquetFileSQLContext.jsonFile)。

有关更多信息,请参阅 SQLContext.readScalaJavaPython )和 DataFrame.writeScalaJavaPython )的 API 文档。

DataFrame.groupBy 保留分组列

根据用户反馈,我们更改了 DataFrame.groupBy().agg() 的默认行为,以在结果 DataFrame 中保留分组列。为了保持 1.3 中的行为,请将 spark.sql.retainGroupColumns 设置为 false

import pyspark.sql.functions as func

# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))

# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");

DataFrame.withColumn 的行为变化

在 1.4 之前,DataFrame.withColumn() 仅支持添加列。即使存在任何同名列,该列也将始终以其指定名称作为新列添加到结果 DataFrame 中。从 1.4 开始,DataFrame.withColumn() 支持添加名称与所有现有列名称不同的列,或替换同名现有列。

请注意,此更改仅适用于 Scala API,不适用于 PySpark 和 SparkR。

从 Spark SQL 1.0-1.2 升级到 1.3

在 Spark 1.3 中,我们从 Spark SQL 中移除了“Alpha”标签,并以此为契机清理了可用的 API。从 Spark 1.3 开始,Spark SQL 将与 1.X 系列中的其他版本提供二进制兼容性。此兼容性保证不包括明确标记为不稳定的 API(即 DeveloperAPI 或 Experimental)。

将 SchemaRDD 重命名为 DataFrame

用户升级到 Spark SQL 1.3 时会注意到的最大变化是 SchemaRDD 已被重命名为 DataFrame。这主要是因为 DataFrame 不再直接继承自 RDD,而是通过自己的实现提供 RDD 提供的大部分功能。DataFrame 仍然可以通过调用 .rdd 方法转换为 RDD。

在 Scala 中,存在从 SchemaRDDDataFrame 的类型别名,以便为某些用例提供源兼容性。仍然建议用户更新其代码以使用 DataFrame。Java 和 Python 用户需要更新其代码。

Java 和 Scala API 的统一

在 Spark 1.3 之前,存在独立的 Java 兼容类(JavaSQLContextJavaSchemaRDD),它们镜像了 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已统一。两种语言的用户都应使用 SQLContextDataFrame。通常,这些类尝试使用两种语言都可用的类型(即 Array 而不是特定于语言的集合)。在某些不存在通用类型的情况下(例如,用于传递闭包或 Map),则使用函数重载代替。

此外,Java 特定类型 API 已被移除。Scala 和 Java 用户都应使用 org.apache.spark.sql.types 中存在的类来以编程方式描述模式。

隐式转换的隔离和 dsl 包的移除(仅限 Scala)

在 Spark 1.3 之前,许多代码示例都以 import sqlContext._ 开头,这会将 sqlContext 中的所有函数引入作用域。在 Spark 1.3 中,我们已将用于将 RDD 转换为 DataFrame 的隐式转换隔离到 SQLContext 内的一个对象中。用户现在应该编写 import sqlContext.implicits._

此外,隐式转换现在只为由 Product(即 case 类或元组)组成的 RDD 增加一个 toDF 方法,而不是自动应用。

在 DSL(现在已被 DataFrame API 取代)内部使用函数时,用户以前会导入 org.apache.spark.sql.catalyst.dsl。现在应该使用公共 DataFrame 函数 API:import org.apache.spark.sql.functions._

移除 org.apache.spark.sql 中 DataType 的类型别名(仅限 Scala)

Spark 1.3 移除了基本 sql 包中 DataType 的类型别名。用户应改为导入 org.apache.spark.sql.types 中的类。

UDF 注册已移至 sqlContext.udf(Java 和 Scala)

用于注册 UDF(无论是在 DataFrame DSL 还是 SQL 中使用)的函数已移至 SQLContext 中的 udf 对象。

sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);

Python UDF 注册保持不变。

与 Apache Hive 的兼容性

Spark SQL 旨在与 Hive Metastore、SerDes 和 UDF 兼容。目前,Hive SerDes 和 UDF 基于内置 Hive,并且 Spark SQL 可以连接到不同版本的 Hive Metastore(从 2.0.0 到 2.3.10 以及 3.0.0 到 3.1.3。另请参阅 与不同版本的 Hive Metastore 交互)。

部署到现有 Hive 数据仓库

Spark SQL Thrift JDBC 服务器旨在“开箱即用”地与现有 Hive 安装兼容。您无需修改现有的 Hive Metastore,也无需更改表的数据放置或分区。

支持的 Hive 功能

Spark SQL 支持绝大多数 Hive 功能,例如:

不支持的 Hive 功能

以下是我们尚不支持的 Hive 功能列表。这些功能中的大多数在 Hive 部署中很少使用。

不常用的 Hive 功能

Hive 输入/输出格式

Hive 优化

少数 Hive 优化尚未包含在 Spark 中。其中一些(例如索引)由于 Spark SQL 的内存计算模型而变得不那么重要。其他则计划在 Spark SQL 的未来版本中实现。

Hive UDF/UDTF/UDAF

并非所有 Hive UDF/UDTF/UDAF 的 API 都受 Spark SQL 支持。以下是不受支持的 API:

不兼容的 Hive UDF

以下是 Hive 和 Spark 生成不同结果的场景: