迁移指南:SQL、数据集和 DataFrame

从 Spark SQL 3.4 升级到 3.5

从 Spark SQL 3.3 升级到 3.4

从 Spark SQL 3.2 升级到 3.3

从 Spark SQL 3.1 升级到 3.2

从 Spark SQL 3.0 升级到 3.1

从 Spark SQL 3.0.1 升级到 3.0.2

从 Spark SQL 3.0 升级到 3.0.1

从 Spark SQL 2.4 升级到 3.0

数据集/DataFrame API

DDL 语句

UDF 和内置函数

查询引擎

数据源

其他

从 Spark SQL 2.4.7 升级到 2.4.8

从 Spark SQL 2.4.5 升级到 2.4.6

从 Spark SQL 2.4.4 升级到 2.4.5

从 Spark SQL 2.4.3 升级到 2.4.4

从 Spark SQL 2.4 升级到 2.4.1

从 Spark SQL 2.3 升级到 2.4

从 Spark SQL 2.2 升级到 2.3

</thead> <tr> <td> NullType </td> <td>NullType</td> <td>IntegerType</td> <td>LongType</td> <td>DecimalType(38,0)</td> <td>DoubleType</td> <td>DateType</td> <td>TimestampType</td> <td>StringType</td> </tr> <tr> <td> IntegerType </td> <td>IntegerType</td> <td>IntegerType</td> <td>LongType</td> <td>DecimalType(38,0)</td> <td>DoubleType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> <tr> <td> LongType </td> <td>LongType</td> <td>LongType</td> <td>LongType</td> <td>DecimalType(38,0)</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> <tr> <td> DecimalType(38,0)* </td> <td>DecimalType(38,0)</td> <td>DecimalType(38,0)</td> <td>DecimalType(38,0)</td> <td>DecimalType(38,0)</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> <tr> <td> DoubleType </td> <td>DoubleType</td> <td>DoubleType</td> <td>StringType</td> <td>StringType</td> <td>DoubleType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> <tr> <td> DateType </td> <td>DateType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>DateType</td> <td>TimestampType</td> <td>StringType</td> </tr> <tr> <td> TimestampType </td> <td>TimestampType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>TimestampType</td> <td>TimestampType</td> <td>StringType</td> </tr> <tr> <td> StringType </td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> </table>

Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.

从 Spark SQL 2.1 升级到 2.2

从 Spark SQL 2.0 升级到 2.1

从 Spark SQL 1.6 升级到 2.0

从 Spark SQL 1.5 升级到 1.6

   ./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...
   

从 Spark SQL 1.4 升级到 1.5

从 Spark SQL 1.3 升级到 1.4

DataFrame 数据读取器/写入器接口

根据用户反馈,我们创建了一个新的、更流畅的 API,用于读取数据 (SQLContext.read) 和写出数据 (DataFrame.write),并弃用了旧的 API(例如,SQLContext.parquetFileSQLContext.jsonFile)。

有关更多信息,请参阅 SQLContext.readScalaJavaPython)和 DataFrame.writeScalaJavaPython)的 API 文档。

DataFrame.groupBy 保留分组列

根据用户反馈,我们更改了 DataFrame.groupBy().agg() 的默认行为,以在结果 DataFrame 中保留分组列。要保持 1.3 中的行为,请将 spark.sql.retainGroupColumns 设置为 false

import pyspark.sql.functions as func

# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))

# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");

DataFrame.withColumn 的行为变化

在 1.4 之前,DataFrame.withColumn() 仅支持添加列。该列将始终作为新列添加到结果 DataFrame 中,并使用其指定的名称,即使可能存在任何同名列。从 1.4 开始,DataFrame.withColumn() 支持添加与所有现有列名称不同的列,或替换同名的现有列。

请注意,此更改仅适用于 Scala API,不适用于 PySpark 和 SparkR。

从 Spark SQL 1.0-1.2 升级到 1.3

在 Spark 1.3 中,我们删除了 Spark SQL 的“Alpha”标签,并在此过程中清理了可用的 API。从 Spark 1.3 开始,Spark SQL 将提供与 1.X 系列中其他版本的二进制兼容性。此兼容性保证不包括明确标记为不稳定(即 DeveloperAPI 或 Experimental)的 API。

将 SchemaRDD 重命名为 DataFrame

用户在升级到 Spark SQL 1.3 时会注意到的最大变化是 SchemaRDD 已重命名为 DataFrame。这主要是因为 DataFrame 不再直接继承自 RDD,而是通过自身的实现提供了 RDD 提供的大部分功能。仍然可以通过调用 .rdd 方法将 DataFrame 转换为 RDD。

在 Scala 中,有一个从 SchemaRDDDataFrame 的类型别名,以便为某些用例提供源代码兼容性。仍然建议用户更新其代码以使用 DataFrame

Java 和 Scala API 的统一

在 Spark 1.3 之前,有单独的 Java 兼容类(JavaSQLContextJavaSchemaRDD)来镜像 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已经统一。两种语言的用户都应该使用 SQLContextDataFrame。通常,这些类尝试使用两种语言都可以使用的类型(例如,使用 Array 而不是特定于语言的集合)。在某些情况下,如果不存在通用类型(例如,用于传入闭包或映射),则改用函数重载。

此外,特定于 Java 的类型 API 已被删除。Scala 和 Java 用户都应该使用 org.apache.spark.sql.types 中的类以编程方式描述架构。

隔离隐式转换和删除 dsl 包(仅限 Scala)

在 Spark 1.3 之前,许多代码示例都以 import sqlContext._ 开头,这会将 sqlContext 中的所有函数都引入作用域。在 Spark 1.3 中,我们已将用于将 RDD 转换为 DataFrame 的隐式转换隔离到 SQLContext 内部的对象中。用户现在应该编写 import sqlContext.implicits._

此外,隐式转换现在只会在由 Product(即 case 类或元组)组成的 RDD 中添加一个方法 toDF,而不是自动应用。

当在 DSL(现在已替换为 DataFrame API)中使用函数时,用户过去常常导入 org.apache.spark.sql.catalyst.dsl。现在应该使用公共 dataframe 函数 API:import org.apache.spark.sql.functions._

删除 org.apache.spark.sql 中 DataType 的类型别名(仅限 Scala)

Spark 1.3 删除了基本 sql 包中 DataType 的类型别名。用户应该改为导入 org.apache.spark.sql.types 中的类

UDF 注册移至 sqlContext.udf(Java 和 Scala)

用于注册 UDF 的函数(无论是在 DataFrame DSL 还是 SQL 中使用)都已移至 SQLContext 中的 udf 对象中。

sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);

Python UDF 注册保持不变。

与 Apache Hive 的兼容性

Spark SQL 旨在与 Hive Metastore、SerDes 和 UDF 兼容。目前,Hive SerDes 和 UDF 基于内置 Hive,Spark SQL 可以连接到不同版本的 Hive Metastore(从 0.12.0 到 2.3.9 和 3.0.0 到 3.1.3。另请参阅与不同版本的 Hive Metastore 交互)。

在现有 Hive 数据仓库中部署

Spark SQL Thrift JDBC 服务器旨在与现有 Hive 安装“开箱即用”地兼容。您无需修改现有的 Hive Metastore 或更改表的放置或分区方式。

支持的 Hive 功能

Spark SQL 支持绝大多数 Hive 功能,例如

不支持的 Hive 功能

以下列出了我们尚不支持的 Hive 功能。这些功能大多数在 Hive 部署中很少使用。

深奥的 Hive 功能

Hive 输入/输出格式

Hive 优化

Spark 中尚未包含少数 Hive 优化。由于 Spark SQL 的内存计算模型,其中一些(例如索引)不太重要。其他一些计划用于未来版本的 Spark SQL。

Hive UDF/UDTF/UDAF

Spark SQL 不支持 Hive UDF/UDTF/UDAF 的所有 API。以下是未支持的 API

不兼容的 Hive UDF

以下是 Hive 和 Spark 生成不同结果的场景