最佳实践

利用 PySpark APIs

Spark 上的 Pandas API 在底层使用 Spark;因此,许多特性和性能优化在 Spark 上的 pandas API 中也可用。利用这些最前沿的特性并将其与 Spark 上的 pandas API 相结合。

现有的 Spark 上下文和 Spark 会话可以直接在 Spark 上的 pandas API 中使用。如果您已经有自己配置的 Spark 上下文或会话在运行,Spark 上的 pandas API 将使用它们。

如果您的环境中没有运行 Spark 上下文或会话(例如,普通的 Python 解释器),可以将此类配置设置为 SparkContext 和/或 SparkSession。一旦创建了 Spark 上下文和/或会话,Spark 上的 pandas API 就可以自动使用此上下文和/或会话。例如,如果您想配置 Spark 中的 executor 内存,您可以这样做:

from pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.set('spark.executor.memory', '2g')
# Pandas API on Spark automatically uses this Spark context with the configurations set.
SparkContext(conf=conf)

import pyspark.pandas as ps
...

另一个常见的配置可能是 PySpark 中的 Arrow 优化。对于 SQL 配置,可以将其设置为 Spark 会话,如下所示:

from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("pandas-on-spark")
builder = builder.config("spark.sql.execution.arrow.pyspark.enabled", "true")
# Pandas API on Spark automatically uses this Spark session with the configurations set.
builder.getOrCreate()

import pyspark.pandas as ps
...

所有 Spark 特性,例如历史服务器、Web UI 和部署模式,都可以与 Spark 上的 pandas API 一起使用。如果您对性能调优感兴趣,另请参阅 Tuning Spark

检查执行计划

由于 Spark 上的 pandas API 基于延迟执行,因此可以通过利用 PySpark API DataFrame.spark.explain() 在实际计算之前预测昂贵的操作。例如,请参见下文。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Filter (id#1L > 5)
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]

无论何时您对此类情况不确定,您都可以检查实际的执行计划并预见昂贵的情况。

即使 Spark 上的 pandas API 尽最大努力通过利用 Spark 优化器来优化和减少此类洗牌操作,最好还是尽可能避免在应用程序端进行洗牌。

使用检查点

在对 Spark 上的 pandas API 对象进行大量操作之后,由于庞大而复杂的计划,底层 Spark 规划器可能会变慢。如果 Spark 计划变得庞大或规划花费很长时间,DataFrame.spark.checkpoint()DataFrame.spark.local_checkpoint() 可能会有所帮助。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf['id'] = psdf['id'] + (10 * psdf['id'] + psdf['id'])
>>> psdf = psdf.groupby('id').head(2)
>>> psdf.spark.explain()
== Physical Plan ==
*(3) Project [__index_level_0__#0L, id#31L]
+- *(3) Filter (isnotnull(__row_number__#44) AND (__row_number__#44 <= 2))
   +- Window [row_number() windowspecdefinition(__groupkey_0__#36L, __natural_order__#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS __row_number__#44], [__groupkey_0__#36L], [__natural_order__#16L ASC NULLS FIRST]
      +- *(2) Sort [__groupkey_0__#36L ASC NULLS FIRST, __natural_order__#16L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(__groupkey_0__#36L, 200), true, [id=#33]
            +- *(1) Project [__index_level_0__#0L, (id#1L + ((id#1L * 10) + id#1L)) AS __groupkey_0__#36L, (id#1L + ((id#1L * 10) + id#1L)) AS id#31L, __natural_order__#16L]
               +- *(1) Project [__index_level_0__#0L, id#1L, monotonically_increasing_id() AS __natural_order__#16L]
                  +- *(1) Filter (id#1L > 5)
                     +- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]

>>> psdf = psdf.spark.local_checkpoint()  # or psdf.spark.checkpoint()
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#0L, id#31L]
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#31L,__natural_order__#59L]

如您所见,之前的 Spark 计划被删除,并以一个简单的计划开始。之前的 DataFrame 的结果在调用 DataFrame.spark.checkpoint() 时存储在配置的文件系统中,或在调用 DataFrame.spark.local_checkpoint() 时存储在 executor 中。

避免洗牌

某些操作(例如 sort_values)在并行或分布式环境中比在单台机器上的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。请参见以下示例。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)}).sort_values(by="id")
>>> psdf.spark.explain()
== Physical Plan ==
*(2) Sort [id#9L ASC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#9L ASC NULLS LAST, 200), true, [id=#18]
   +- *(1) Scan ExistingRDD[__index_level_0__#8L,id#9L]

如您所见,它需要 Exchange,这需要一个洗牌,并且可能非常昂贵。

避免在单个分区上进行计算

另一种常见的情况是在单个分区上进行计算。目前,某些 API(例如 DataFrame.rank)使用 PySpark 的 Window 而不指定分区规范。这会将所有数据移动到单个机器上的单个分区中,并可能导致严重的性能下降。对于非常大的数据集,应避免使用此类 API。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf.rank().spark.explain()
== Physical Plan ==
*(4) Project [__index_level_0__#16L, id#24]
+- Window [avg(cast(_w0#26 as bigint)) windowspecdefinition(id#17L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS id#24], [id#17L]
   +- *(3) Project [__index_level_0__#16L, _w0#26, id#17L]
      +- Window [row_number() windowspecdefinition(id#17L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#26], [id#17L ASC NULLS FIRST]
         +- *(2) Sort [id#17L ASC NULLS FIRST], false, 0
            +- Exchange SinglePartition, true, [id=#48]
               +- *(1) Scan ExistingRDD[__index_level_0__#16L,id#17L]

而是使用 GroupBy.rank,因为它成本较低,因为数据可以分布并在每个组中进行计算。

避免保留列名

在 Spark 上的 pandas API 中,以 __ 开头和以 __ 结尾的列是保留的。为了处理内部行为,例如索引,Spark 上的 pandas API 使用一些内部列。因此,不鼓励使用此类列名,并且不能保证它们能正常工作。

不要使用重复的列名

不允许使用重复的列名,因为 Spark SQL 通常不允许这样做。Spark 上的 pandas API 继承了此行为。例如,请参见下文

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]})
>>> psdf.columns = ["a", "a"]
...
Reference 'a' is ambiguous, could be: a, a.;

此外,强烈建议不要使用区分大小写的列名。默认情况下,Spark 上的 pandas API 不允许这样做。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
...
Reference 'a' is ambiguous, could be: a, a.;

但是,您可以在 Spark 配置中打开 spark.sql.caseSensitive 以启用它,但风险自负。

>>> from pyspark.sql import SparkSession
>>> builder = SparkSession.builder.appName("pandas-on-spark")
>>> builder = builder.config("spark.sql.caseSensitive", "true")
>>> builder.getOrCreate()

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
>>> psdf
   a  A
0  1  3
1  2  4

在从 Spark DataFrame 转换为 pandas-on-Spark DataFrame 时指定索引列

当 pandas-on-Spark Dataframe 从 Spark DataFrame 转换时,它会丢失索引信息,这会导致在 pandas API on Spark DataFrame 中使用默认索引。通常,与显式指定索引列相比,默认索引效率低下。尽可能指定索引列。

请参阅 使用 PySpark

使用 distributeddistributed-sequence 默认索引

pandas-on-Spark 用户面临的一个常见问题是由于默认索引导致的性能缓慢。当索引未知时,例如,Spark DataFrame 直接转换为 pandas-on-Spark DataFrame 时,Spark 上的 Pandas API 会附加一个默认索引。

请注意,sequence 需要在单个分区上进行计算,这是不鼓励的。如果您计划在生产环境中处理大型数据,请通过将默认索引配置为 distributeddistributed-sequence 使其分布化。

有关配置默认索引的更多详细信息,请参阅 默认索引类型

减少对不同 DataFrame/Series 的操作

默认情况下,Spark 上的 Pandas API 不允许对不同的 DataFrames(或 Series)进行操作,以防止昂贵的操作。它在内部执行 join 操作,这通常可能很昂贵,因此不鼓励这样做。只要有可能,就应该避免这种操作。

有关更多详细信息,请参阅 对不同 DataFrames 的操作

尽可能直接使用 Spark 上的 pandas API

尽管 Spark 上的 pandas API 具有大多数 pandas 等效的 API,但仍有几个 API 尚未实现或明确不支持。

例如,Spark 上的 pandas API 不实现 __iter__(),以防止用户从整个集群中将所有数据收集到客户端(驱动程序)端。不幸的是,许多外部 API(例如 Python 内置函数,例如 min、max、sum 等)要求给定的参数是可迭代的。在 pandas 的情况下,它可以正常工作,如下所示

>>> import pandas as pd
>>> max(pd.Series([1, 2, 3]))
3
>>> min(pd.Series([1, 2, 3]))
1
>>> sum(pd.Series([1, 2, 3]))
6

pandas 数据集位于单台机器中,并且在同一机器中自然是本地可迭代的。但是,pandas-on-Spark 数据集分布在多台机器中,并且它们以分布式方式进行计算。很难在本地可迭代,并且很可能用户在不知情的情况下将整个数据收集到客户端。因此,最好坚持使用 pandas-on-Spark API。上面的示例可以转换为如下

>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]).max()
3
>>> ps.Series([1, 2, 3]).min()
1
>>> ps.Series([1, 2, 3]).sum()
6

pandas 用户另一个常见的模式可能是依赖于列表推导式或生成器表达式。但是,它也假设数据集在底层是本地可迭代的。因此,它在 pandas 中可以无缝工作,如下所示

>>> import pandas as pd
>>> data = []
>>> countries = ['London', 'New York', 'Helsinki']
>>> pser = pd.Series([20., 21., 12.], index=countries)
>>> for temperature in pser:
...     assert temperature > 0
...     if temperature > 1000:
...         temperature = None
...     data.append(temperature ** 2)
...
>>> pd.Series(data, index=countries)
London      400.0
New York    441.0
Helsinki    144.0
dtype: float64

然而,对于 Spark 上的 pandas API 来说,由于上述相同的原因,它不起作用。上面的例子也可以改成直接使用 pandas-on-Spark API,如下所示

>>> import pyspark.pandas as ps
>>> import numpy as np
>>> countries = ['London', 'New York', 'Helsinki']
>>> psser = ps.Series([20., 21., 12.], index=countries)
>>> def square(temperature) -> np.float64:
...     assert temperature > 0
...     if temperature > 1000:
...         temperature = None
...     return temperature ** 2
...
>>> psser.apply(square)
London      400.0
New York    441.0
Helsinki    144.0
dtype: float64