转换和应用函数¶
有许多 API 允许用户对 pandas-on-Spark DataFrame 应用函数,例如 DataFrame.transform()
, DataFrame.apply()
, DataFrame.pandas_on_spark.transform_batch()
, DataFrame.pandas_on_spark.apply_batch()
, Series.pandas_on_spark.transform_batch()
等。每个 API 都有不同的用途,并且内部工作方式不同。本节描述了用户经常混淆的它们之间的差异。
transform
和 apply
¶
DataFrame.transform()
和 DataFrame.apply()
之间的主要区别在于,前者要求返回与输入长度相同的输出,而后者则没有此要求。 请参见下面的示例
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
... return pser + 1 # should always return the same length as input.
...
>>> psdf.transform(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})
>>> def pandas_plus(pser):
... return pser[pser % 2 == 1] # allows an arbitrary length
...
>>> psdf.apply(pandas_plus)
在这种情况下,每个函数都接受一个 pandas Series,并且 Spark 上的 pandas API 以分布式方式计算这些函数,如下所示。

在“column”轴的情况下,该函数将每一行作为 pandas Series。
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
... return sum(pser) # allows an arbitrary length
...
>>> psdf.apply(pandas_plus, axis='columns')
上面的示例计算了作为 pandas Series 的每一行的总和。 见下文

在上面的示例中,为简单起见,没有使用类型提示,但建议使用它们以避免性能损失。 请参阅 API 文档。
pandas_on_spark.transform_batch
和 pandas_on_spark.apply_batch
¶
在 DataFrame.pandas_on_spark.transform_batch()
, DataFrame.pandas_on_spark.apply_batch()
, Series.pandas_on_spark.transform_batch()
等中,batch
后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。 这些 API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后将给定的函数与 pandas DataFrame 或 Series 作为输入和输出应用。 参见下面的例子
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
... return pdf + 1 # should always return the same length as input.
...
>>> psdf.pandas_on_spark.transform_batch(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
... return pdf[pdf.a > 1] # allow arbitrary length
...
>>> psdf.pandas_on_spark.apply_batch(pandas_plus)
这两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。 Spark 上的 Pandas API 将 pandas DataFrames 组合成一个 pandas-on-Spark DataFrame。
请注意,DataFrame.pandas_on_spark.transform_batch()
有长度限制 - 输入和输出的长度应相同 - 而 DataFrame.pandas_on_spark.apply_batch()
则没有。 但是,重要的是要知道,当 DataFrame.pandas_on_spark.transform_batch()
返回 Series 时,该输出属于同一 DataFrame,并且可以通过不同 DataFrames 之间的操作来避免 shuffle。 对于 DataFrame.pandas_on_spark.apply_batch()
,其输出始终被视为属于一个新的不同的 DataFrame。 另请参见 不同 DataFrame 上的操作 以获取更多详细信息。

对于 Series.pandas_on_spark.transform_batch()
,它也与 DataFrame.pandas_on_spark.transform_batch()
类似; 但是,它将 pandas Series 作为 pandas-on-Spark Series 的一个块。
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
... return pser + 1 # should always return the same length as input.
...
>>> psdf.a.pandas_on_spark.transform_batch(pandas_plus)
在底层,每个 pandas-on-Spark Series 批次都被拆分为多个 pandas Series,并且每个函数都在其上进行计算,如下所示

还有更多详细信息,例如类型推断和避免其性能损失。 请参阅 API 文档。