迁移指南:SQL、Datasets 和 DataFrame

从 Spark SQL 3.5.3 升级到 3.5.4

从 Spark SQL 3.5.1 升级到 3.5.2

从 Spark SQL 3.5.0 升级到 3.5.1

从 Spark SQL 3.4 升级到 3.5

从 Spark SQL 3.3 升级到 3.4

从 Spark SQL 3.2 升级到 3.3

从 Spark SQL 3.1 升级到 3.2

从 Spark SQL 3.0 升级到 3.1

从 Spark SQL 3.0.1 升级到 3.0.2

从 Spark SQL 3.0 升级到 3.0.1

从 Spark SQL 2.4 升级到 3.0

Dataset/DataFrame APIs

DDL 语句

UDFs 和内置函数

查询引擎

数据源

其他

从 Spark SQL 2.4.7 升级到 2.4.8

从 Spark SQL 2.4.5 升级到 2.4.6

从 Spark SQL 2.4.4 升级到 2.4.5

从 Spark SQL 2.4.3 升级到 2.4.4

从 Spark SQL 2.4 升级到 2.4.1

从 Spark SQL 2.3 升级到 2.4

从 Spark SQL 2.2 升级到 2.3

从 Spark SQL 2.1 升级到 2.2

从 Spark SQL 2.0 升级到 2.1

从 Spark SQL 1.6 升级到 2.0

从 Spark SQL 1.5 升级到 1.6

   ./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...
   

从 Spark SQL 1.4 升级到 1.5

从 Spark SQL 1.3 升级到 1.4

DataFrame 数据读取器/写入器接口

基于用户反馈,我们创建了一个新的、更流畅的 API,用于读取数据 (SQLContext.read) 和写出数据 (DataFrame.write),并弃用了旧的 API (例如,SQLContext.parquetFile, SQLContext.jsonFile)。

有关更多信息,请参阅 SQLContext.read 的 API 文档(ScalaJavaPython)和 DataFrame.write 的 API 文档(ScalaJavaPython)。

DataFrame.groupBy 保留分组列

根据用户反馈,我们更改了 DataFrame.groupBy().agg() 的默认行为,以在结果 DataFrame 中保留分组列。要保持 1.3 中的行为,请将 spark.sql.retainGroupColumns 设置为 false

import pyspark.sql.functions as func

# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))

# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");

DataFrame.withColumn 行为更改

在 1.4 之前,DataFrame.withColumn() 仅支持添加列。即使可能存在具有相同名称的任何现有列,该列也将始终作为具有其指定名称的新列添加到结果 DataFrame 中。从 1.4 开始,DataFrame.withColumn() 支持添加与所有现有列的名称不同的列,或替换具有相同名称的现有列。

请注意,此更改仅适用于 Scala API,不适用于 PySpark 和 SparkR。

从 Spark SQL 1.0-1.2 升级到 1.3

在 Spark 1.3 中,我们移除了 Spark SQL 的“Alpha”标签,并以此为契机清理了可用的 API。从 Spark 1.3 开始,Spark SQL 将提供与 1.X 系列中其他版本的二进制兼容性。此兼容性保证不包括显式标记为不稳定的 API(即,DeveloperAPI 或 Experimental)。

将 SchemaRDD 重命名为 DataFrame

升级到 Spark SQL 1.3 时用户会注意到的最大变化是 SchemaRDD 已重命名为 DataFrame。这主要是因为 DataFrames 不再直接从 RDD 继承,而是通过它们自己的实现提供 RDD 提供的大部分功能。DataFrames 仍然可以通过调用 .rdd 方法转换为 RDD。

在 Scala 中,存在从 SchemaRDDDataFrame 的类型别名,以便为某些用例提供源代码兼容性。仍然建议用户更新其代码以使用 DataFrame 代替。Java 和 Python 用户需要更新他们的代码。

Java 和 Scala API 的统一

在 Spark 1.3 之前,存在单独的 Java 兼容类 (JavaSQLContextJavaSchemaRDD),它们镜像了 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已统一。任一语言的用户都应使用 SQLContextDataFrame。通常,这些类尝试使用可从两种语言使用的类型(即,Array 而不是特定于语言的集合)。在某些情况下,如果不存在通用类型(例如,用于传入闭包或 Map),则会使用函数重载。

此外,Java 特定的类型 API 已被删除。Scala 和 Java 的用户都应使用 org.apache.spark.sql.types 中存在的类来以编程方式描述 schema。

隐式转换的隔离和 dsl 包的删除(仅限 Scala)

Spark 1.3 之前的许多代码示例都以 import sqlContext._ 开头,这将 sqlContext 中的所有函数都引入了作用域。在 Spark 1.3 中,我们已将用于将 RDD 转换为 DataFrame 的隐式转换隔离到 SQLContext 中的一个对象内部。用户现在应该编写 import sqlContext.implicits._

此外,隐式转换现在仅使用方法 toDF 来扩充由 Product(即,case 类或元组)组成的 RDD,而不是自动应用。

当使用 DSL 中的函数(现在已替换为 DataFrame API)时,用户过去常常导入 org.apache.spark.sql.catalyst.dsl。相反,应该使用公共 dataframe 函数 API:import org.apache.spark.sql.functions._

删除 org.apache.spark.sql 中 DataType 的类型别名(仅限 Scala)

Spark 1.3 删除了基本 sql 包中存在的 DataType 的类型别名。用户应改为导入 org.apache.spark.sql.types 中的类

UDF 注册已移至 sqlContext.udf (Java & Scala)

用于注册 UDF 的函数(无论是用于 DataFrame DSL 还是 SQL)都已移动到 SQLContext 中的 udf 对象中。

sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);

Python UDF 注册未更改。

与 Apache Hive 的兼容性

Spark SQL 旨在与 Hive Metastore、SerDes 和 UDF 兼容。目前,Hive SerDes 和 UDF 基于内置的 Hive,Spark SQL 可以连接到不同版本的 Hive Metastore(从 0.12.0 到 2.3.9 和 3.0.0 到 3.1.3。另请参阅 与不同版本的 Hive Metastore 交互)。

在现有 Hive 仓库中部署

Spark SQL Thrift JDBC 服务器旨在与现有 Hive 安装“开箱即用”兼容。您无需修改现有的 Hive Metastore 或更改表的data placement 或分区。

支持的 Hive 功能

Spark SQL 支持绝大多数 Hive 功能,例如

不支持的 Hive 功能

以下是我们尚未支持的 Hive 功能列表。这些功能中的大多数在 Hive 部署中很少使用。

深奥的 Hive 功能

Hive 输入/输出格式

Hive 优化

Spark 中尚未包含一些 Hive 优化。由于 Spark SQL 的内存计算模型,其中一些(例如索引)不太重要。其他一些已安排在 Spark SQL 的未来版本中。

Hive UDF/UDTF/UDAF

并非所有 Hive UDF/UDTF/UDAF 的 API 都受 Spark SQL 支持。以下是不支持的 API:

不兼容的 Hive UDF

以下是 Hive 和 Spark 生成不同结果的场景: