与 pandas 和 PySpark DataFrames 之间的转换

pandas 和/或 PySpark 用户在使用 Spark 上的 Pandas API 时,有时会遇到 API 兼容性问题。由于 Spark 上的 Pandas API 并非旨在实现 pandas 和 PySpark 的 100% 兼容,因此用户需要进行一些变通方法来移植他们的 pandas 和/或 PySpark 代码,或者熟悉这种情况下的 Spark 上的 Pandas API。本页旨在描述这一点。

pandas

pandas 用户可以通过调用 DataFrame.to_pandas() 来访问完整的 pandas API。Spark 上的 Pandas DataFrame 和 pandas DataFrame 很相似。但是,前者是分布式的,后者位于单个机器上。当相互转换时,数据在多台机器和单个客户端机器之间传输。

例如,如果您需要调用 pandas DataFrame 的 pandas_df.values,您可以这样做:

>>> import pyspark.pandas as ps
>>>
>>> psdf = ps.range(10)
>>> pdf = psdf.to_pandas()
>>> pdf.values
array([[0],
       [1],
       [2],
       [3],
       [4],
       [5],
       [6],
       [7],
       [8],
       [9]])

pandas DataFrame 可以很容易地成为 Spark 上的 Pandas DataFrame,如下所示:

>>> ps.from_pandas(pdf)
   id
0   0
1   1
2   2
3   3
4   4
5   5
6   6
7   7
8   8
9   9

请注意,将 Spark 上的 Pandas DataFrame 转换为 pandas 需要将所有数据收集到客户端机器中;因此,如果可能,建议使用 Spark 上的 Pandas API 或 PySpark API 代替。

PySpark

PySpark 用户可以通过调用 DataFrame.to_spark() 来访问完整的 PySpark API。Spark 上的 Pandas DataFrame 和 Spark DataFrame 实际上是可以互换的。

例如,如果您需要调用 Spark DataFrame 的 spark_df.filter(...),您可以这样做:

>>> import pyspark.pandas as ps
>>>
>>> psdf = ps.range(10)
>>> sdf = psdf.to_spark().filter("id > 5")
>>> sdf.show()
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Spark DataFrame 可以很容易地成为 Spark 上的 Pandas DataFrame,如下所示:

>>> sdf.pandas_api()
   id
0   6
1   7
2   8
3   9

但是,请注意,从 Spark DataFrame 创建 Spark 上的 Pandas DataFrame 时,会创建一个新的默认索引。请参阅 默认索引类型。为了避免这种开销,请尽可能指定要用作索引的列。

>>> # Create a pandas-on-Spark DataFrame with an explicit index.
... psdf = ps.DataFrame({'id': range(10)}, index=range(10))
>>> # Keep the explicit index.
... sdf = psdf.to_spark(index_col='index')
>>> # Call Spark APIs
... sdf = sdf.filter("id > 5")
>>> # Uses the explicit index to avoid to create default index.
... sdf.pandas_api(index_col='index')
       id
index
6       6
7       7
8       8
9       9