转换和应用函数¶
有许多 API 允许用户对 pandas-on-Spark DataFrame 应用函数,例如 DataFrame.transform(), DataFrame.apply(), DataFrame.pandas_on_spark.transform_batch(), DataFrame.pandas_on_spark.apply_batch(), Series.pandas_on_spark.transform_batch() 等。每个 API 都有不同的用途,并且内部工作方式不同。本节描述了用户经常混淆的它们之间的差异。
transform 和 apply¶
DataFrame.transform() 和 DataFrame.apply() 之间的主要区别在于,前者要求返回与输入长度相同的输出,而后者则没有此要求。 请参见下面的示例
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
... return pser + 1 # should always return the same length as input.
...
>>> psdf.transform(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})
>>> def pandas_plus(pser):
... return pser[pser % 2 == 1] # allows an arbitrary length
...
>>> psdf.apply(pandas_plus)
在这种情况下,每个函数都接受一个 pandas Series,并且 Spark 上的 pandas API 以分布式方式计算这些函数,如下所示。
在“column”轴的情况下,该函数将每一行作为 pandas Series。
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
... return sum(pser) # allows an arbitrary length
...
>>> psdf.apply(pandas_plus, axis='columns')
上面的示例计算了作为 pandas Series 的每一行的总和。 见下文
在上面的示例中,为简单起见,没有使用类型提示,但建议使用它们以避免性能损失。 请参阅 API 文档。
pandas_on_spark.transform_batch 和 pandas_on_spark.apply_batch¶
在 DataFrame.pandas_on_spark.transform_batch(), DataFrame.pandas_on_spark.apply_batch(), Series.pandas_on_spark.transform_batch() 等中,batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。 这些 API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后将给定的函数与 pandas DataFrame 或 Series 作为输入和输出应用。 参见下面的例子
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
... return pdf + 1 # should always return the same length as input.
...
>>> psdf.pandas_on_spark.transform_batch(pandas_plus)
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pdf):
... return pdf[pdf.a > 1] # allow arbitrary length
...
>>> psdf.pandas_on_spark.apply_batch(pandas_plus)
这两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。 Spark 上的 Pandas API 将 pandas DataFrames 组合成一个 pandas-on-Spark DataFrame。
请注意,DataFrame.pandas_on_spark.transform_batch() 有长度限制 - 输入和输出的长度应相同 - 而 DataFrame.pandas_on_spark.apply_batch() 则没有。 但是,重要的是要知道,当 DataFrame.pandas_on_spark.transform_batch() 返回 Series 时,该输出属于同一 DataFrame,并且可以通过不同 DataFrames 之间的操作来避免 shuffle。 对于 DataFrame.pandas_on_spark.apply_batch(),其输出始终被视为属于一个新的不同的 DataFrame。 另请参见 不同 DataFrame 上的操作 以获取更多详细信息。
对于 Series.pandas_on_spark.transform_batch(),它也与 DataFrame.pandas_on_spark.transform_batch() 类似; 但是,它将 pandas Series 作为 pandas-on-Spark Series 的一个块。
>>> psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
>>> def pandas_plus(pser):
... return pser + 1 # should always return the same length as input.
...
>>> psdf.a.pandas_on_spark.transform_batch(pandas_plus)
在底层,每个 pandas-on-Spark Series 批次都被拆分为多个 pandas Series,并且每个函数都在其上进行计算,如下所示
还有更多详细信息,例如类型推断和避免其性能损失。 请参阅 API 文档。