转换和应用函数

有许多 API 允许用户对 pandas-on-Spark DataFrame 应用函数,例如 DataFrame.transform(), DataFrame.apply(), DataFrame.pandas_on_spark.transform_batch(), DataFrame.pandas_on_spark.apply_batch(), Series.pandas_on_spark.transform_batch() 等。每个 API 都有不同的用途,并且内部工作方式不同。本节描述了用户经常混淆的它们之间的差异。

transformapply

DataFrame.transform()DataFrame.apply() 之间的主要区别在于,前者要求返回与输入长度相同的输出,而后者则没有此要求。 请参见下面的示例

>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
...     return pser + 1  # should always return the same length as input.
...
>>> psdf.transform(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})
>>> def pandas_plus(pser):
...     return pser[pser % 2 == 1]  # allows an arbitrary length
...
>>> psdf.apply(pandas_plus)

在这种情况下,每个函数都接受一个 pandas Series,并且 Spark 上的 pandas API 以分布式方式计算这些函数,如下所示。

transform and apply

在“column”轴的情况下,该函数将每一行作为 pandas Series。

>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
...     return sum(pser)  # allows an arbitrary length
...
>>> psdf.apply(pandas_plus, axis='columns')

上面的示例计算了作为 pandas Series 的每一行的总和。 见下文

apply axis

在上面的示例中,为简单起见,没有使用类型提示,但建议使用它们以避免性能损失。 请参阅 API 文档。

pandas_on_spark.transform_batchpandas_on_spark.apply_batch

DataFrame.pandas_on_spark.transform_batch(), DataFrame.pandas_on_spark.apply_batch(), Series.pandas_on_spark.transform_batch() 等中,batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。 这些 API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后将给定的函数与 pandas DataFrame 或 Series 作为输入和输出应用。 参见下面的例子

>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
...     return pdf + 1  # should always return the same length as input.
...
>>> psdf.pandas_on_spark.transform_batch(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
...     return pdf[pdf.a > 1]  # allow arbitrary length
...
>>> psdf.pandas_on_spark.apply_batch(pandas_plus)

这两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。 Spark 上的 Pandas API 将 pandas DataFrames 组合成一个 pandas-on-Spark DataFrame。

请注意,DataFrame.pandas_on_spark.transform_batch() 有长度限制 - 输入和输出的长度应相同 - 而 DataFrame.pandas_on_spark.apply_batch() 则没有。 但是,重要的是要知道,当 DataFrame.pandas_on_spark.transform_batch() 返回 Series 时,该输出属于同一 DataFrame,并且可以通过不同 DataFrames 之间的操作来避免 shuffle。 对于 DataFrame.pandas_on_spark.apply_batch(),其输出始终被视为属于一个新的不同的 DataFrame。 另请参见 不同 DataFrame 上的操作 以获取更多详细信息。

pandas_on_spark.transform_batch and pandas_on_spark.apply_batch in Frame

对于 Series.pandas_on_spark.transform_batch(),它也与 DataFrame.pandas_on_spark.transform_batch() 类似; 但是,它将 pandas Series 作为 pandas-on-Spark Series 的一个块。

>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
...     return pser + 1  # should always return the same length as input.
...
>>> psdf.a.pandas_on_spark.transform_batch(pandas_plus)

在底层,每个 pandas-on-Spark Series 批次都被拆分为多个 pandas Series,并且每个函数都在其上进行计算,如下所示

pandas_on_spark.transform_batch in Series

还有更多详细信息,例如类型推断和避免其性能损失。 请参阅 API 文档。