迁移指南:SQL、Datasets 和 DataFrame

从 Spark SQL 4.0 升级到 4.1

从 Spark SQL 3.5 升级到 4.0

从 Spark SQL 3.5.3 升级到 3.5.4

从 Spark SQL 3.5.1 升级到 3.5.2

从 Spark SQL 3.5.0 升级到 3.5.1

从 Spark SQL 3.4 升级到 3.5

从 Spark SQL 3.3 升级到 3.4

从 Spark SQL 3.2 升级到 3.3

从 Spark SQL 3.1 升级到 3.2

从 Spark SQL 3.0 升级到 3.1

从 Spark SQL 3.0.1 升级到 3.0.2

从 Spark SQL 3.0 升级到 3.0.1

从 Spark SQL 2.4 升级到 3.0

Dataset/DataFrame API

DDL 语句

UDF 和内置函数

查询引擎

数据源

其他

从 Spark SQL 2.4.7 升级到 2.4.8

从 Spark SQL 2.4.5 升级到 2.4.6

从 Spark SQL 2.4.4 升级到 2.4.5

从 Spark SQL 2.4.3 升级到 2.4.4

从 Spark SQL 2.4 升级到 2.4.1

从 Spark SQL 2.3 升级到 2.4

从 Spark SQL 2.2 升级到 2.3

从 Spark SQL 2.1 升级到 2.2

从 Spark SQL 2.0 升级到 2.1

从 Spark SQL 1.6 升级到 2.0

从 Spark SQL 1.5 升级到 1.6

   ./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...
   

从 Spark SQL 1.4 升级到 1.5

从 Spark SQL 1.3 升级到 1.4

DataFrame 数据读取器/写入器接口

根据用户反馈,我们创建了一个新的、更流畅的 API 用于读取数据 (SQLContext.read) 和写入数据 (DataFrame.write),并弃用了旧的 API(例如 SQLContext.parquetFile, SQLContext.jsonFile)。

有关更多信息,请参阅 SQLContext.read ( Scala, Java, Python ) 和 DataFrame.write ( Scala, Java, Python ) 的 API 文档。

DataFrame.groupBy 保留分组列

根据用户反馈,我们将 DataFrame.groupBy().agg() 的默认行为更改为在生成的 DataFrame 中保留分组列。要保持 1.3 版本中的行为,请将 spark.sql.retainGroupColumns 设置为 false

import pyspark.sql.functions as func

# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))

# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");

DataFrame.withColumn 的行为更改

在 1.4 版本之前,DataFrame.withColumn() 仅支持添加一列。即使结果 DataFrame 中可能存在同名列,该列也将始终以其指定的名称作为新列添加。从 1.4 版本开始,DataFrame.withColumn() 支持添加与所有现有列名称不同的列,或替换同名的现有列。

请注意,此更改仅适用于 Scala API,不适用于 PySpark 和 SparkR。

从 Spark SQL 1.0-1.2 升级到 1.3

在 Spark 1.3 中,我们从 Spark SQL 中删除了“Alpha”标签,并作为此过程的一部分,清理了可用的 API。从 Spark 1.3 开始,Spark SQL 将与 1.X 系列中的其他版本保持二进制兼容。此兼容性保证不包括明确标记为不稳定的 API(即 DeveloperAPI 或 Experimental)。

SchemaRDD 重命名为 DataFrame

用户在升级到 Spark SQL 1.3 时注意到的最大变化是 SchemaRDD 已重命名为 DataFrame。这主要是因为 DataFrames 不再直接继承自 RDD,而是通过其自己的实现提供了 RDD 所提供的大部分功能。DataFrames 仍然可以通过调用 .rdd 方法转换为 RDD。

在 Scala 中,存在从 SchemaRDDDataFrame 的类型别名,以便为某些用例提供源代码兼容性。仍然建议用户更新代码以改用 DataFrame。Java 和 Python 用户需要更新他们的代码。

Java 和 Scala API 的统一

在 Spark 1.3 之前,存在独立的 Java 兼容类(JavaSQLContextJavaSchemaRDD),它们镜像了 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已统一。任何一种语言的用户都应该使用 SQLContextDataFrame。通常,这些类尝试使用两种语言都可以使用的类型(即 Array 而不是特定于语言的集合)。在某些没有通用类型的情况下(例如,用于传入闭包或映射),则改为使用函数重载。

此外,Java 特定的类型 API 已被删除。Scala 和 Java 的用户都应使用 org.apache.spark.sql.types 中存在的类来以编程方式描述模式。

隐式转换的隔离和 dsl 包的移除(仅限 Scala)

许多 Spark 1.3 之前的代码示例以 import sqlContext._ 开头,这会将 sqlContext 中的所有函数带入作用域。在 Spark 1.3 中,我们将用于将 RDD 转换为 DataFrame 的隐式转换隔离到了 SQLContext 内部的一个对象中。用户现在应该编写 import sqlContext.implicits._

此外,隐式转换现在仅增强由 Product(即样例类或元组)组成的 RDD,并带有 toDF 方法,而不是自动应用。

当在 DSL(现在已替换为 DataFrame API)中使用函数时,用户过去常常导入 org.apache.spark.sql.catalyst.dsl。相反,应使用公共 dataframe 函数 API:import org.apache.spark.sql.functions._

移除 org.apache.spark.sql 中 DataType 的类型别名(仅限 Scala)

Spark 1.3 移除了基础 sql 包中为 DataType 提供的类型别名。用户应改用 org.apache.spark.sql.types 中的类。

UDF 注册移至 sqlContext.udf (Java & Scala)

用于注册 UDF(无论是用于 DataFrame DSL 还是 SQL)的函数已移至 SQLContext 中的 udf 对象中。

sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);

Python UDF 注册保持不变。

与 Apache Hive 的兼容性

Spark SQL 设计为与 Hive 元存储、SerDes 和 UDF 兼容。目前,Hive SerDes 和 UDF 基于内置的 Hive,Spark SQL 可以连接到不同版本的 Hive 元存储(从 2.0.0 到 2.3.10 以及 3.0.0 到 4.1.0。另请参阅 与不同版本的 Hive 元存储进行交互)。

在现有 Hive 数据仓库中部署

Spark SQL Thrift JDBC 服务器旨在与现有的 Hive 安装“开箱即用”地兼容。您无需修改现有的 Hive 元存储或更改表的放置位置或分区。

支持的 Hive 功能

Spark SQL 支持绝大多数 Hive 功能,例如

不支持的 Hive 功能

以下是我们尚不支持的 Hive 功能列表。这些功能中的大多数在 Hive 部署中很少使用。

深奥的 Hive 功能

Hive 输入/输出格式

Hive 优化

少数 Hive 优化尚未包含在 Spark 中。其中一些(例如索引)由于 Spark SQL 的内存计算模型而不太重要。其他则计划在 Spark SQL 的未来版本中实现。

Hive UDF/UDTF/UDAF

并非 Hive UDF/UDTF/UDAF 的所有 API 都受 Spark SQL 支持。以下是不支持的 API:

不兼容的 Hive UDF

以下是 Hive 和 Spark 生成不同结果的情况: