最佳实践#
利用 PySpark API#
Spark 上的 Pandas API 底层使用 Spark;因此,许多特性和性能优化也适用于 Spark 上的 Pandas API。请利用这些前沿特性并将其与 Spark 上的 Pandas API 结合使用。
现有 Spark 上下文和 Spark 会话在 Spark 上的 Pandas API 中开箱即用。如果您已经配置并运行了自己的 Spark 上下文或会话,Spark 上的 Pandas API 将使用它们。
如果您的环境中没有 Spark 上下文或会话运行(例如,普通 Python 解释器),则可以为 SparkContext
和/或 SparkSession
设置此类配置。一旦创建了 Spark 上下文和/或会话,Spark 上的 Pandas API 就可以自动使用此上下文和/或会话。例如,如果您想配置 Spark 中的执行器内存,可以如下操作:
from pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.set('spark.executor.memory', '2g')
# Pandas API on Spark automatically uses this Spark context with the configurations set.
SparkContext(conf=conf)
import pyspark.pandas as ps
...
另一个常见的配置可能是 PySpark 中的 Arrow 优化。对于 SQL 配置,可以如下在 Spark 会话中设置:
from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("pandas-on-spark")
builder = builder.config("spark.sql.execution.arrow.pyspark.enabled", "true")
# Pandas API on Spark automatically uses this Spark session with the configurations set.
builder.getOrCreate()
import pyspark.pandas as ps
...
所有 Spark 功能,例如历史服务器、Web UI 和部署模式,都可以与 Spark 上的 Pandas API 一起使用。如果您对性能调优感兴趣,另请参阅 调优 Spark。
检查执行计划#
由于 Spark 上的 Pandas API 基于惰性执行,因此可以在实际计算之前,通过利用 PySpark API DataFrame.spark.explain() 来预测昂贵的操作。例如,请看下面。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Filter (id#1L > 5)
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]
当您不确定此类情况时,可以检查实际执行计划并预见昂贵的情况。
尽管 Spark 上的 Pandas API 会尽力通过利用 Spark 优化器来优化和减少此类混洗操作,但在应用程序端尽可能避免混洗是最好的。
使用检查点#
在 Spark 上的 Pandas API 对象上执行一系列操作后,由于庞大而复杂的计划,底层 Spark 规划器可能会变慢。如果 Spark 计划变得庞大或规划时间过长,DataFrame.spark.checkpoint()
或 DataFrame.spark.local_checkpoint()
将会有所帮助。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf['id'] = psdf['id'] + (10 * psdf['id'] + psdf['id'])
>>> psdf = psdf.groupby('id').head(2)
>>> psdf.spark.explain()
== Physical Plan ==
*(3) Project [__index_level_0__#0L, id#31L]
+- *(3) Filter (isnotnull(__row_number__#44) AND (__row_number__#44 <= 2))
+- Window [row_number() windowspecdefinition(__groupkey_0__#36L, __natural_order__#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS __row_number__#44], [__groupkey_0__#36L], [__natural_order__#16L ASC NULLS FIRST]
+- *(2) Sort [__groupkey_0__#36L ASC NULLS FIRST, __natural_order__#16L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(__groupkey_0__#36L, 200), true, [id=#33]
+- *(1) Project [__index_level_0__#0L, (id#1L + ((id#1L * 10) + id#1L)) AS __groupkey_0__#36L, (id#1L + ((id#1L * 10) + id#1L)) AS id#31L, __natural_order__#16L]
+- *(1) Project [__index_level_0__#0L, id#1L, monotonically_increasing_id() AS __natural_order__#16L]
+- *(1) Filter (id#1L > 5)
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]
>>> psdf = psdf.spark.local_checkpoint() # or psdf.spark.checkpoint()
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#0L, id#31L]
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#31L,__natural_order__#59L]
如您所见,之前的 Spark 计划被丢弃,并从一个简单的计划开始。调用 DataFrame.spark.checkpoint()
时,前一个 DataFrame 的结果存储在配置的文件系统中;调用 DataFrame.spark.local_checkpoint()
时,结果存储在执行器中。
避免混洗#
某些操作,例如 sort_values
,在并行或分布式环境中比在单机内存中更难执行,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。请看下面的例子。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)}).sort_values(by="id")
>>> psdf.spark.explain()
== Physical Plan ==
*(2) Sort [id#9L ASC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#9L ASC NULLS LAST, 200), true, [id=#18]
+- *(1) Scan ExistingRDD[__index_level_0__#8L,id#9L]
如您所见,它需要 Exchange
,这需要混洗,并且很可能开销很大。
避免在单个分区上进行计算#
另一个常见情况是在单个分区上进行计算。目前,某些 API(如 DataFrame.rank)使用 PySpark 的 Window 而不指定分区规范。这会将所有数据移动到单个机器的单个分区中,并可能导致严重的性能下降。对于非常大的数据集,应避免使用此类 API。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf.rank().spark.explain()
== Physical Plan ==
*(4) Project [__index_level_0__#16L, id#24]
+- Window [avg(cast(_w0#26 as bigint)) windowspecdefinition(id#17L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS id#24], [id#17L]
+- *(3) Project [__index_level_0__#16L, _w0#26, id#17L]
+- Window [row_number() windowspecdefinition(id#17L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#26], [id#17L ASC NULLS FIRST]
+- *(2) Sort [id#17L ASC NULLS FIRST], false, 0
+- Exchange SinglePartition, true, [id=#48]
+- *(1) Scan ExistingRDD[__index_level_0__#16L,id#17L]
相反,使用 GroupBy.rank,因为它开销较小,因为数据可以分布并为每个组进行计算。
避免保留的列名#
在 Spark 上的 Pandas API 中,以 __
开头和以 __
结尾的列是保留的。为了处理内部行为,例如索引,Spark 上的 Pandas API 使用一些内部列。因此,不鼓励使用此类列名,并且不保证它们能正常工作。
不要使用重复的列名#
不允许使用重复的列名,因为 Spark SQL 通常不允许这样做。Spark 上的 Pandas API 继承了此行为。例如,请看下面
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]})
>>> psdf.columns = ["a", "a"]
...
Reference 'a' is ambiguous, could be: a, a.;
此外,强烈不鼓励使用区分大小写的列名。Spark 上的 Pandas API 默认不允许这样做。
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
...
Reference 'a' is ambiguous, could be: a, a.;
但是,您可以在 Spark 配置中开启 spark.sql.caseSensitive
以启用它,风险自负。
>>> from pyspark.sql import SparkSession
>>> builder = SparkSession.builder.appName("pandas-on-spark")
>>> builder = builder.config("spark.sql.caseSensitive", "true")
>>> builder.getOrCreate()
>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
>>> psdf
a A
0 1 3
1 2 4
将 Spark DataFrame 转换为 pandas-on-Spark DataFrame 时指定索引列#
当 pandas-on-Spark DataFrame 从 Spark DataFrame 转换而来时,它会丢失索引信息,这导致在 Spark 上的 Pandas API DataFrame 中使用默认索引。与显式指定索引列相比,默认索引通常效率低下。在可能的情况下,请指定索引列。
请参阅 使用 PySpark
使用 distributed
或 distributed-sequence
默认索引#
pandas-on-Spark 用户面临的一个常见问题是由于默认索引导致的性能缓慢。当索引未知时,例如 Spark DataFrame 直接转换为 pandas-on-Spark DataFrame 时,Spark 上的 Pandas API 会附加一个默认索引。
请注意,sequence
需要在单个分区上进行计算,这是不鼓励的。如果您计划在生产环境中处理大型数据,请将默认索引配置为 distributed
或 distributed-sequence
以使其分布式。
有关配置默认索引的更多详细信息,请参阅 默认索引类型。
处理 distributed-sequence
造成的索引错位#
虽然 distributed-sequence
确保全局序列索引,但它不保证在不同操作中保持相同的行到索引映射。诸如 apply()
、groupby()
或 transform()
等操作可能会导致索引重新生成,从而导致行与计算值之间错位。
apply()
的问题示例#
在以下示例中,我们加载一个数据集,其中 record_id
作为唯一标识符,我们使用 apply()
函数计算持续时间(工作日数)。然而,由于 apply()
期间 distributed-sequence
索引的重新生成,结果可能会分配给不正确的行。
import pyspark.pandas as ps
import numpy as np
ps.set_option('compute.default_index_type', 'distributed-sequence')
df = ps.DataFrame({
'record_id': ["RECORD_1001", "RECORD_1002"],
'start_date': ps.to_datetime(["2024-01-01", "2024-01-02"]),
'end_date': ps.to_datetime(["2024-01-01", "2024-01-03"])
})
df['duration'] = df.apply(lambda x: np.busday_count(x['start_date'].date(), x['end_date'].date()), axis=1)
预期输出
record_id start_date end_date duration
RECORD_1001 2024-01-01 2024-01-01 0
RECORD_1002 2024-01-02 2024-01-03 1
然而,由于 apply()
期间 distributed-sequence
索引被重新生成,结果 DataFrame 可能看起来像这样
record_id start_date end_date duration
RECORD_1002 2024-01-02 2024-01-03 0 # Wrong mapping!
RECORD_1001 2024-01-01 2024-01-01 1 # Wrong mapping!
防止索引错位的最佳实践#
为确保行到索引的映射保持一致,请考虑以下方法
在应用函数之前显式设置索引列
df = df.set_index("record_id") # Ensure the index is explicitly set df['duration'] = df.apply(lambda x: np.busday_count(x['start_date'].date(), x['end_date'].date()), axis=1)
在应用函数之前持久化 DataFrame 以保持行顺序
df = df.spark.persist() df['duration'] = df.apply(lambda x: np.busday_count(x['start_date'].date(), x['end_date'].date()), axis=1)
改用序列索引类型(请注意潜在的性能权衡)
ps.set_option('compute.default_index_type', 'sequence')
如果您的应用程序需要严格的行到索引映射,请考虑使用上述方法之一,而不是依赖默认的 distributed-sequence
索引。
欲了解更多信息,请参阅 默认索引类型
减少不同 DataFrame/Series 上的操作#
Spark 上的 Pandas API 默认不允许在不同 DataFrame(或 Series)上进行操作,以防止昂贵的操作。它在内部执行连接操作,这通常是昂贵的,因此不鼓励这样做。在可能的情况下,应避免此操作。
有关更多详细信息,请参阅 不同 DataFrames 上的操作。
尽可能直接使用 Spark 上的 Pandas API#
尽管 Spark 上的 Pandas API 拥有大部分与 pandas 等效的 API,但仍有几个 API 尚未实现或明确不受支持。
例如,Spark 上的 Pandas API 没有实现 __iter__()
,以防止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API(例如 Python 内置函数,如 min、max、sum 等)要求给定参数是可迭代的。在 pandas 中,它开箱即用,如下所示:
>>> import pandas as pd
>>> max(pd.Series([1, 2, 3]))
3
>>> min(pd.Series([1, 2, 3]))
1
>>> sum(pd.Series([1, 2, 3]))
6
pandas 数据集存在于单个机器中,并且自然地可以在同一机器中局部迭代。然而,pandas-on-Spark 数据集跨多台机器存在,并且以分布式方式进行计算。它很难局部迭代,而且用户很可能在不知情的情况下将整个数据收集到客户端。因此,最好坚持使用 pandas-on-Spark API。上面的示例可以转换为如下所示:
>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]).max()
3
>>> ps.Series([1, 2, 3]).min()
1
>>> ps.Series([1, 2, 3]).sum()
6
pandas 用户另一个常见的模式可能是依赖列表推导或生成器表达式。然而,它也假设数据集在底层是局部可迭代的。因此,它在 pandas 中无缝工作,如下所示:
>>> import pandas as pd
>>> data = []
>>> countries = ['London', 'New York', 'Helsinki']
>>> pser = pd.Series([20., 21., 12.], index=countries)
>>> for temperature in pser:
... assert temperature > 0
... if temperature > 1000:
... temperature = None
... data.append(temperature ** 2)
...
>>> pd.Series(data, index=countries)
London 400.0
New York 441.0
Helsinki 144.0
dtype: float64
然而,对于 Spark 上的 Pandas API,由于上述相同的原因,它不起作用。上面的示例也可以更改为直接使用 pandas-on-Spark API,如下所示:
>>> import pyspark.pandas as ps
>>> import numpy as np
>>> countries = ['London', 'New York', 'Helsinki']
>>> psser = ps.Series([20., 21., 12.], index=countries)
>>> def square(temperature) -> np.float64:
... assert temperature > 0
... if temperature > 1000:
... temperature = None
... return temperature ** 2
...
>>> psser.apply(square)
London 400.0
New York 441.0
Helsinki 144.0
dtype: float64