升级 PySpark

从 PySpark 3.3 升级到 3.4

  • 在 Spark 3.4 中,数组列的 schema 是通过合并数组中所有元素的 schema 来推断的。要恢复之前的行为(仅从第一个元素推断 schema),您可以将 spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled 设置为 true

  • 在 Spark 3.4 中,如果 Spark API 上的 Pandas Groupby.applyfunc 参数返回类型未指定,并且 compute.shortcut_limit 设置为 0,则抽样行数将设置为 2(确保抽样行数始终 >= 2)以确保推断 schema 的准确性。

  • 在 Spark 3.4 中,如果 Spark API 上的 Pandas Index.insert 超出范围,将会抛出带有 index {} is out of bounds for axis 0 with size {} 的 IndexError,以遵循 pandas 1.4 的行为。

  • 在 Spark 3.4 中,序列名称将在 Spark API 上的 Pandas Series.mode 中保留,以遵循 pandas 1.4 的行为。

  • 在 Spark API 上的 Pandas Index.__setitem__ 中,Spark 3.4 将首先检查 value 类型是否为 Column 类型,以避免在 is_list_like 中引发意外的 ValueError,例如 Cannot convert column into bool: please use ‘&’ for ‘and’, ‘|’ for ‘or’, ‘~’ for ‘not’ when building DataFrame boolean expressions.

  • 在 Spark 3.4 中,Spark API 上的 Pandas astype('category') 也会根据原始数据 dtype 刷新 categories.dtype 以遵循 pandas 1.4 的行为。

  • 在 Spark 3.4 中,Spark API 上的 Pandas 支持在 GroupBy.headGroupBy.tail 中进行 groupby 位置索引,以遵循 pandas 1.4。现在,负参数可以正确工作,并产生相对于每个组的结尾和开始的范围。以前,负参数返回空帧。

  • 在 Spark 3.4 中,Spark 上的 Pandas 中 groupby.apply 的推断 schema 过程将首先推断 pandas 类型,以尽可能确保 pandas dtype 的准确性。

  • 在 Spark 3.4 中,Series.concat sort 参数将被尊重,以遵循 pandas 1.4 的行为。

  • 在 Spark 3.4 中,DataFrame.__setitem__ 将进行复制并替换预先存在的数组,这不会被覆盖,以遵循 pandas 1.4 的行为。

  • 在 Spark 3.4 中,SparkSession.sql 和 Spark API 上的 Pandas sql 获取了新的参数 args,该参数提供将命名参数绑定到其 SQL 字面值。

  • 在 Spark 3.4 中,Spark 上的 Pandas API 遵循 pandas 2.0,并且根据 pandas 2.0 中所做的更改,某些 API 在 Spark 3.4 中已弃用或删除。 有关更多详细信息,请参阅 [pandas 的发行说明](https://pandas.ac.cn/docs/dev/whatsnew/)。

  • 在 Spark 3.4 中,删除了 collections.namedtuple 的自定义 monkey-patch,并且默认使用 cloudpickle。 要恢复以前的行为以解决 collections.namedtuple 的任何相关的 pickle 问题,请将 PYSPARK_ENABLE_NAMEDTUPLE_PATCH 环境变量设置为 1

从 PySpark 3.2 升级到 3.3

  • 在 Spark 3.3 中,pyspark.pandas.sql 方法遵循 [标准 Python 字符串格式化程序](https://docs.pythonlang.cn/3/library/string.html#format-string-syntax)。 要恢复以前的行为,请将 PYSPARK_PANDAS_SQL_LEGACY 环境变量设置为 1

  • 在 Spark 3.3 中,Spark DataFrame 上的 pandas API 的 drop 方法支持按 index 删除行,并默认设置为按索引而不是按列删除。

  • 在 Spark 3.3 中,PySpark 升级了 Pandas 版本,新的最低必需版本从 0.23.2 更改为 1.0.5。

  • 在 Spark 3.3 中,SQL DataTypes 的 repr 返回值已更改为在传递给 eval 时产生具有相同值的对象。

从 PySpark 3.1 升级到 3.2

  • 在 Spark 3.2 中,sql、ml、spark_on_pandas 模块中的 PySpark 方法在应用于类型不合适的参数时,会引发 TypeError 而不是 ValueError

  • 在 Spark 3.2 中,默认情况下,Python UDF、pandas UDF 和 pandas 函数 API 的回溯被简化,不包含来自内部 Python worker 的回溯。 在 Spark 3.1 或更早版本中,会打印出来自 Python worker 的回溯。 要恢复 Spark 3.2 之前的行为,您可以将 spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled 设置为 false

  • 在 Spark 3.2 中,默认情况下启用固定线程模式,以将每个 Python 线程映射到相应的 JVM 线程。 以前,一个 JVM 线程可以被多个 Python 线程重用,导致一个 JVM 线程本地变量被多个 Python 线程共享。 此外,请注意,现在建议将 pyspark.InheritableThreadpyspark.inheritable_thread_target 一起使用,以使 Python 线程正确继承可继承的属性(例如 JVM 线程中的本地属性),并避免潜在的资源泄漏问题。 要恢复 Spark 3.2 之前的行为,您可以将 PYSPARK_PIN_THREAD 环境变量设置为 false

从 PySpark 2.4 升级到 3.0

  • 在 Spark 3.0 中,PySpark 需要 pandas 版本为 0.23.2 或更高版本才能使用 pandas 相关功能,例如 toPandas、从 pandas DataFrame 创建 createDataFrame 等。

  • 在 Spark 3.0 中,PySpark 需要 PyArrow 版本为 0.12.1 或更高版本才能使用 PyArrow 相关功能,例如 pandas_udftoPandascreateDataFrame 与“spark.sql.execution.arrow.enabled=true”等。

  • 在 PySpark 中,当使用 SparkSession.builder.getOrCreate() 创建 SparkSession 时,如果存在现有的 SparkContext,则构建器尝试使用为构建器指定的配置更新现有 SparkContextSparkConf,但是 SparkContext 由所有 SparkSession s 共享,因此我们不应更新它们。 在 3.0 中,构建器不会更新配置。 这与 2.3 及更高版本中的 Java/Scala API 行为相同。 如果要更新它们,则需要在创建 SparkSession 之前更新它们。

  • 在 PySpark 中,当启用 Arrow 优化时,如果 Arrow 版本高于 0.11.0,则 Arrow 可以在序列化期间将 pandas.Series 转换为 Arrow 数组时执行安全类型转换。 当检测到不安全的类型转换(如溢出)时,Arrow 会引发错误。 您可以通过将 spark.sql.execution.pandas.convertToArrowArraySafely 设置为 true 来启用它。 默认设置为 false。 下表说明了 Arrow 版本的 PySpark 行为

    PyArrow 版本

    整数溢出

    浮点数截断

    0.11.0 及以下

    抛出错误

    静默允许

    > 0.11.0, arrowSafeTypeConversion=false

    静默溢出

    静默允许

    > 0.11.0, arrowSafeTypeConversion=true

    抛出错误

    抛出错误

  • 在 Spark 3.0 中,createDataFrame(..., verifySchema=True) 也会在 PySpark 中验证 LongType。 之前,LongType 未被验证,如果值溢出,则结果为 None。 要恢复此行为,可以将 verifySchema 设置为 False 以禁用验证。

  • 从 Spark 3.0 开始,当使用 Python 3.6 及以上版本的命名参数构造 Row 时,字段名称不再按字母顺序排序,并且字段的顺序将与输入的顺序匹配。 要默认启用排序字段(如 Spark 2.4 中那样),请将环境变量 PYSPARK_ROW_FIELD_SORTING_ENABLED 设置为 true 以用于 executors 和 driver - 此环境变量在所有 executors 和 driver 上必须保持一致;否则,可能会导致故障或不正确的答案。 对于低于 3.6 的 Python 版本,字段名称将按字母顺序排序,这是唯一的选项。

  • 在 Spark 3.0 中,pyspark.ml.param.shared.Has* mixin 不再提供任何 set*(self, value) setter 方法,请改用相应的 self.set(self.*, value)。 有关详细信息,请参见 SPARK-29093

从 PySpark 2.3 升级到 2.4

  • 在 PySpark 中,当启用 Arrow 优化时,之前当 Arrow 优化无法使用时,toPandas 只是失败,而来自 Pandas DataFrame 的 createDataFrame 允许回退到非优化。 现在,来自 Pandas DataFrame 的 toPandascreateDataFrame 默认都允许回退,这可以通过 spark.sql.execution.arrow.fallback.enabled 关闭。

从 PySpark 2.3.0 升级到 2.3.1 及以上

  • 从 2.3.1 版本开始,包括 pandas_udftoPandas()/createDataFrame() (并将 spark.sql.execution.arrow.enabled 设置为 True)在内的 Arrow 功能已被标记为实验性的。 这些仍在发展中,目前不建议在生产中使用。

从 PySpark 2.2 升级到 2.3

  • 在 PySpark 中,如果您想使用 Pandas 相关功能,例如来自 Pandas DataFrame 的 toPandas, createDataFrame 等,现在需要 Pandas 0.19.2 或更高版本。

  • 在 PySpark 中,Pandas 相关功能的时间戳值的行为已更改为遵守会话时区。 如果您想使用旧的行为,则需要将配置 spark.sql.execution.pandas.respectSessionTimeZone 设置为 False。 有关详细信息,请参见 SPARK-22395

  • 在 PySpark 中,na.fill()fillna 也接受布尔值,并将空值替换为布尔值。 在之前的 Spark 版本中,PySpark 只是忽略它并返回原始的 Dataset/DataFrame。

  • 在 PySpark 中,当 to_replace 不是字典时,df.replace 不允许省略 value。 以前,在其他情况下可以省略 value,并且默认值为 None,这与直觉相反且容易出错。

从 PySpark 1.4 升级到 1.5

  • 现在,Python 中将字符串解析为列的操作支持使用点 (.) 来限定列或访问嵌套值。 例如 df['table.column.nestedField']。 但是,这意味着如果您的列名包含任何点,您现在必须使用反引号对其进行转义(例如,table.`column.with.dots`.nested)。

  • PySpark 中的 DataFrame.withColumn 方法支持添加新列或替换同名的现有列。

从 PySpark 1.0-1.2 升级到 1.3

  • 在 Python 中使用 DataTypes 时,您需要构造它们 (即 StringType()),而不是引用单例。