本页介绍 Spark 上的 pandas API (“pandas on Spark”) 的优势,以及何时应该使用它来代替 pandas(或与 pandas 结合使用)。
Spark 上的 pandas 比 pandas 快得多,并且提供 pandas 用户熟悉的语法。它提供了 Spark 的强大功能以及 pandas 的熟悉性。
以下是 Spark 上的 pandas 的主要优势
pandas 有很多限制
让我们看一些简单的例子,以便更好地理解 Spark 上的 pandas 如何克服 pandas 的限制。我们还将研究 Spark 上的 pandas 的局限性。
在本页结尾,您将看到如何结合使用 pandas 和 Spark 上的 pandas。这不是一个非此即彼的决定 - 在许多情况下,两者都使用是一个不错的选择。
本节演示了 Spark 上的 pandas 如何在 localhost 上对单个文件运行查询,速度比 pandas 快。 Spark 上的 pandas 对于所有查询不一定都更快,但此示例显示了何时它可以提供很好的加速。
假设您有一个 Parquet 文件,其中包含 9 列和 10 亿行数据。以下是文件的前三行
+-------+-------+--------------+-------+-------+--------+------+------+---------+
| id1 | id2 | id3 | id4 | id5 | id6 | v1 | v2 | v3 |
|-------+-------+--------------+-------+-------+--------+------+------+---------|
| id016 | id046 | id0000109363 | 88 | 13 | 146094 | 4 | 6 | 18.8377 |
| id039 | id087 | id0000466766 | 14 | 30 | 111330 | 4 | 14 | 46.7973 |
| id047 | id098 | id0000307804 | 85 | 23 | 187639 | 3 | 5 | 47.5773 |
+-------+-------+--------------+-------+-------+--------+------+------+---------+
以下是如何使用 Spark 上的 pandas 读取文件并运行 group by 聚合。
import pyspark.pandas as ps
df = ps.read_parquet("G1_1e9_1e2_0_0.parquet")[
["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)
此查询在配备 64 GB RAM 的 2020 M1 Macbook 上以 Spark 3.5.0 运行需要 62 秒。
让我们将其与未优化的 pandas 代码进行比较。
import pandas as pd
df = pd.read_parquet("G1_1e9_1e2_0_0.parquet")[
["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)
此查询出错,因为具有 64GB RAM 的机器没有足够的空间来将 10 亿行数据存储在内存中。
让我们手动添加一些 pandas 优化来使查询运行
df = pd.read_parquet(
"G1_1e9_1e2_0_0.parquet",
columns=["id1", "id2", "v3"],
filters=[("id1", ">", "id098")],
engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)
此查询在 pandas 2.2.0 中运行需要 275 秒。
使用 pandas 手动编写这些优化可能会给出错误的结果。 这是一个 group by 查询的示例,该查询是正确的,但行组过滤谓词是错误的
df = pd.read_parquet(
"G1_1e9_1e2_0_0.parquet",
columns=["id1", "id2", "v3"],
filters=[("id1", "==", "id001")],
engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)
即使 group by 聚合逻辑是正确的,这也会返回错误的结果!
使用 pandas,您需要在读取 Parquet 文件时手动应用列剪枝和行组过滤。 使用 Spark 上的 pandas,Spark 优化器会自动应用这些查询增强功能,因此您无需手动键入它们。
让我们更详细地研究 Spark 上的 pandas 的优势。
让我们回顾一下 Spark 上的 pandas 的优势
更快的查询执行
Spark 上的 pandas 可以比 pandas 更快地执行查询,因为它使用所有可用的核心来并行化计算,并在运行之前优化查询,以实现高效执行。
pandas 计算仅在单个核心上运行。
可扩展到大于内存的数据集
pandas 在运行查询之前将数据加载到内存中,因此它只能查询适合内存的数据集。
Spark 可以通过流式传输数据和增量运行计算来查询大于内存的数据集。
当数据集大小增加时,pandas 会出现错误,而 Spark 没有此限制。
可在多台机器的集群上运行
Spark 可以在单台机器上运行,也可以分发到集群中的多台机器上。
当 Spark 在单台机器上运行时,计算将在所有可用的核心上运行。 这通常比仅在单个核心上运行计算的 pandas 快。
在多台机器上扩展计算非常适合想要对更大的数据集运行计算,或者只是访问更多的 RAM/核心,以便查询运行得更快。
pandas 用户熟悉的语法
Spark 上的 pandas 旨在为 pandas 用户提供熟悉的语法。
熟悉的语法是关键 - Spark 上的 pandas 提供了 Spark 的强大功能,并具有 pandas 用户习惯使用的相同语法。
提供对 Spark 经过实战检验的查询优化器的访问
Spark 上的 pandas 计算在执行之前由 Spark 的 Catalyst 优化器进行优化。
这些优化简化了查询并添加了优化。
在本帖的前面,我们看到了 Spark 在读取 Parquet 文件时如何自动添加列剪枝/行组过滤优化。 pandas 没有查询优化器,因此您需要自己添加这些优化。 手动添加优化既繁琐又容易出错。 如果不手动应用正确的优化,您的查询将返回错误的结果。
Spark 上的 pandas 不支持 pandas 支持的所有 API,原因有两个
某些功能尚未添加到 Spark 上的 pandas
某些 pandas 功能在 Spark 的分布式并行执行模型中没有意义
Spark 将 DataFrames 分解为多个块,以便可以并行处理它们,因此某些 pandas 操作无法很好地过渡到 Spark 的执行模型。
通常使用 Spark 上的 pandas 和 pandas 来获得两全其美。
假设您有一个大型数据集,您将其清理并聚合到一个较小的数据集中,该数据集传递到 scikit-learn 机器学习模型中。
您可以使用 Spark 上的 pandas 来清理和聚合数据集,以利用快速查询时间和并行执行。 处理完数据集后,您可以使用 to_pandas()
将其转换为 pandas DataFrame,然后使用 scikit-learn 运行机器学习模型。 如果数据集可以减少到足以放入 pandas DataFrame 中,则此方法效果很好。
Spark 上的 pandas 执行查询的方式与 pandas 完全不同。
Spark 上的 pandas 使用延迟评估。 它将查询转换为未解析的逻辑计划,使用 Spark 对其进行优化,并且仅在请求结果时才运行计算。
pandas 使用立即评估。 它将所有数据加载到内存中,并在调用时立即执行操作。 pandas 不应用查询优化,并且必须在执行查询之前将所有数据加载到内存中。
比较 Spark 上的 pandas 和 pandas 时,必须小心考虑将数据加载到内存中所需的时间以及运行查询所需的时间。 许多数据集需要很长时间才能加载到 pandas 中。
您也可以使用 Spark 上的 pandas 将数据加载到内存中,但这通常被认为是一种反模式。 如果存储中的数据发生更改(通过追加、合并或删除),则加载到内存中的数据集将不会更新。 在某些情况下,持久化 Spark DataFrame 是明智的,可以加快查询时间,但必须谨慎使用,因为它会导致不正确的查询结果。
Spark 上的 pandas 和 PySpark 都采用查询,将其转换为未解析的逻辑计划,然后使用 Spark 执行它们。
PySpark 和 Spark 上的 pandas 都具有相似的查询执行模型。 将查询转换为未解析的逻辑计划相对较快。 优化查询并执行它需要更多的时间。 因此,PySpark 和 Spark 上的 pandas 应该具有相似的性能。
Spark 上的 pandas 和 PySpark 之间的主要区别仅在于语法。
对于希望更快地运行查询并希望利用 Spark 的优化器而不是编写自己的优化的 pandas 用户来说,Spark 上的 pandas 是一个很好的替代方案。
Spark 上的 pandas 使用 pandas 用户熟悉的语法,因此很容易学习。
Spark 上的 pandas 也是一种与 pandas 结合使用的绝佳技术。 您可以使用 Spark 上的 pandas 的大数据和高性能处理能力来处理数据集,然后再将其转换为与其他技术兼容的 pandas DataFrames。
查看文档,以了解有关如何使用 Spark 上的 pandas 的更多信息。