从/到 pandas 和 PySpark DataFrame#
pandas 和/或 PySpark 用户在使用 Spark 上的 Pandas API 时,有时会面临 API 兼容性问题。由于 Spark 上的 Pandas API 不以实现 pandas 和 PySpark 的 100% 兼容为目标,用户在这种情况下需要进行一些变通,以移植他们的 pandas 和/或 PySpark 代码,或者熟悉 Spark 上的 Pandas API。本页旨在描述这一点。
pandas#
pandas 用户可以通过调用 DataFrame.to_pandas()
来访问完整的 pandas API。Spark 上的 Pandas DataFrame 和 pandas DataFrame 相似。然而,前者是分布式的,而后者是在单机上的。当它们互相转换时,数据在多台机器和单个客户端机器之间传输。
例如,如果您需要调用 pandas DataFrame 的 pandas_df.values
,您可以按如下方式操作
>>> import pyspark.pandas as ps
>>>
>>> psdf = ps.range(10)
>>> pdf = psdf.to_pandas()
>>> pdf.values
array([[0],
[1],
[2],
[3],
[4],
[5],
[6],
[7],
[8],
[9]])
pandas DataFrame 可以轻松地转换为 Spark 上的 Pandas DataFrame,如下所示
>>> ps.from_pandas(pdf)
id
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
请注意,将 Spark 上的 Pandas DataFrame 转换为 pandas 需要将所有数据收集到客户端机器;因此,如果可能,建议使用 Spark 上的 Pandas API 或 PySpark API。
PySpark#
PySpark 用户可以通过调用 DataFrame.to_spark()
来访问完整的 PySpark API。Spark 上的 Pandas DataFrame 和 Spark DataFrame 实际上可以互换。
例如,如果您需要调用 Spark DataFrame 的 spark_df.filter(...)
,您可以按如下方式操作
>>> import pyspark.pandas as ps
>>>
>>> psdf = ps.range(10)
>>> sdf = psdf.to_spark().filter("id > 5")
>>> sdf.show()
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Spark DataFrame 可以轻松地转换为 Spark 上的 Pandas DataFrame,如下所示
>>> sdf.pandas_api()
id
0 6
1 7
2 8
3 9
然而,请注意,当从 Spark DataFrame 创建 Spark 上的 Pandas DataFrame 时,会创建一个新的默认索引。请参阅 默认索引类型。为了避免这种开销,尽可能指定用作索引的列。
>>> # Create a pandas-on-Spark DataFrame with an explicit index.
... psdf = ps.DataFrame({'id': range(10)}, index=range(10))
>>> # Keep the explicit index.
... sdf = psdf.to_spark(index_col='index')
>>> # Call Spark APIs
... sdf = sdf.filter("id > 5")
>>> # Uses the explicit index to avoid to create default index.
... sdf.pandas_api(index_col='index')
id
index
6 6
7 7
8 8
9 9