Spark 上的 pandas API

本页面介绍了 Spark 上的 pandas API(“Spark 上的 pandas”)的优势,以及何时应该使用它而不是 pandas(或与 pandas 结合使用)。

Spark 上的 pandas 比 pandas 快得多,并提供 pandas 用户熟悉的语法。它提供了 Spark 的强大功能和 pandas 的熟悉性。

以下是 Spark 上的 pandas 的主要优势

  • 在单机工作负载上更快的查询执行(因为 Spark 上的 pandas 使用所有可用的核心,并行处理查询并优化查询)
  • Spark 上的 pandas 可扩展到集群中的多台机器,可以处理大数据
  • Spark 上的 pandas 允许对大于内存的数据集运行查询
  • Spark 上的 pandas 为 pandas 用户提供了熟悉的语法

pandas 有很多限制

  • pandas 必须在运行查询之前将所有数据加载到内存中,这可能很慢
  • pandas 无法处理大于单台机器可用内存的数据集(因为所有数据都必须加载到内存中)
  • pandas 计算在单个核心上运行,并且不利用单台机器的所有可用核心
  • pandas 计算无法扩展到多台机器
  • pandas 没有查询优化器,因此用户必须手动编写优化代码或忍受缓慢的代码

让我们看一些简单的示例,以更好地了解 Spark 上的 pandas 如何克服 pandas 的局限性。我们还将研究 Spark 上的 pandas 的局限性。

在本页的末尾,您将看到如何将 pandas 和 Spark 上的 pandas 结合使用。这不是一个非此即彼的决定 - 在许多情况下,使用两者都是一个不错的选择。

Spark 上的 pandas 示例

本节演示了 Spark 上的 pandas 如何比 pandas 更快地在 localhost 上的单个文件上运行查询。Spark 上的 pandas 不一定对所有查询都更快,但此示例显示了它何时提供不错的加速。

假设您有一个包含 9 列和 10 亿行数据的 Parquet 文件。以下是该文件的前三行

+-------+-------+--------------+-------+-------+--------+------+------+---------+
| id1   | id2   | id3          |   id4 |   id5 |    id6 |   v1 |   v2 |      v3 |
|-------+-------+--------------+-------+-------+--------+------+------+---------|
| id016 | id046 | id0000109363 |    88 |    13 | 146094 |    4 |    6 | 18.8377 |
| id039 | id087 | id0000466766 |    14 |    30 | 111330 |    4 |   14 | 46.7973 |
| id047 | id098 | id0000307804 |    85 |    23 | 187639 |    3 |    5 | 47.5773 |
+-------+-------+--------------+-------+-------+--------+------+------+---------+

以下是如何使用 Spark 上的 pandas 读取文件并运行分组聚合。

import pyspark.pandas as ps

df = ps.read_parquet("G1_1e9_1e2_0_0.parquet")[
    ["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

此查询在配备 64 GB RAM 的 2020 M1 Macbook 上使用 Spark 3.5.0 执行时,运行时间为 62 秒。

让我们将其与未优化的 pandas 代码进行比较。

import pandas as pd

df = pd.read_parquet("G1_1e9_1e2_0_0.parquet")[
    ["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

此查询出错,因为具有 64 GB RAM 的机器没有足够的内存空间来存储 10 亿行数据。

让我们手动添加一些 pandas 优化以使查询运行

df = pd.read_parquet(
    "G1_1e9_1e2_0_0.parquet",
    columns=["id1", "id2", "v3"],
    filters=[("id1", ">", "id098")],
    engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

此查询使用 pandas 2.2.0 运行时间为 275 秒。

使用 pandas 手动编写这些优化可能会导致错误的结果。以下是一个分组查询的示例,该查询是正确的,但行组过滤谓词是错误的

df = pd.read_parquet(
    "G1_1e9_1e2_0_0.parquet",
    columns=["id1", "id2", "v3"],
    filters=[("id1", "==", "id001")],
    engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

这返回了错误的结果,即使分组聚合逻辑是正确的!

使用 pandas,您需要在读取 Parquet 文件时手动应用列修剪和行组过滤。使用 Spark 上的 pandas,Spark 优化器会自动应用这些查询增强功能,因此您无需手动键入它们。

让我们更详细地研究 Spark 上的 pandas 的优势。

Spark 上的 pandas 的优势

让我们回顾一下 Spark 上的 pandas 的优势

更快的查询执行

Spark 上的 pandas 可以比 pandas 更快地执行查询,因为它使用所有可用的核心来并行化计算,并在运行查询之前优化查询,以实现高效的执行。

pandas 计算仅在单个核心上运行。

可扩展到大于内存的数据集

pandas 在运行查询之前将数据加载到内存中,因此它只能查询适合内存的数据集。

Spark 可以通过流式传输数据并增量运行计算来查询大于内存的数据集。

pandas 因数据集大小增长而臭名昭著,而 Spark 没有这种限制。

可在多个机器的集群上运行

Spark 可以运行在单台机器上,也可以分布到集群中的多台机器上。

当 Spark 运行在单台机器上时,计算将在所有可用的核心上运行。这通常比仅在单个核心上运行计算的 pandas 更快。

在多台机器上扩展计算非常适合您想要对更大的数据集运行计算或只是访问更多 RAM/核心以使查询运行得更快的情况。

为 pandas 用户提供熟悉的语法

Spark 上的 pandas 旨在为 pandas 用户提供熟悉的语法。

熟悉的语法是重点 - Spark 上的 pandas 提供了 Spark 的强大功能,并具有 pandas 用户习惯的相同语法。

提供对 Spark 的经过实战检验的查询优化器的访问

Spark 上的 pandas 计算在执行之前由 Spark 的 Catalyst 优化器优化。

这些优化简化了查询并添加了优化。

在本帖的前面,我们看到了 Spark 如何在读取 Parquet 文件时自动添加列修剪/行组过滤优化。pandas 没有查询优化器,因此您需要自己添加这些优化。手动添加优化很繁琐且容易出错。如果您没有手动应用正确的优化,您的查询将返回错误的结果。

Spark 上的 pandas 的局限性

Spark 上的 pandas 不支持 pandas 支持的所有 API,原因有两个

  • 某些功能尚未添加到 Spark 上的 pandas 中

  • 某些 pandas 功能与 Spark 的分布式并行执行模型不符

Spark 将 DataFrame 分成多个块,以便可以并行处理它们,因此某些 pandas 操作无法很好地过渡到 Spark 的执行模型。

将 Spark 上的 pandas 与常规 pandas 结合使用

将 Spark 上的 pandas 和 pandas 结合使用通常很有用,以获得两全其美。

假设您有一个大型数据集,您将其清理并聚合到一个较小的数据集,然后将其传递到 scikit-learn 机器学习模型中。

您可以使用 Spark 上的 pandas 来清理和聚合数据集,以利用快速查询时间和并行执行。处理完数据集后,您可以使用 to_pandas() 将其转换为 pandas DataFrame,然后使用 scikit-learn 运行机器学习模型。如果数据集可以缩减到足以放入 pandas DataFrame 中,这种方法效果很好。

Spark 上的 pandas 查询执行模型不同

Spark 上的 pandas 以与 pandas 完全不同的方式执行查询。

Spark 上的 pandas 使用延迟评估。它将查询转换为未解析的逻辑计划,使用 Spark 优化它,并且仅在请求结果时运行计算。

pandas 使用急切评估。它将所有数据加载到内存中,并在调用操作时立即执行操作。pandas 不应用查询优化,并且所有数据都必须加载到内存中才能执行查询。

在比较 Spark 上的 pandas 和 pandas 时,您必须注意将数据加载到内存中需要多长时间以及运行查询需要多长时间。许多数据集需要很长时间才能加载到 pandas 中。

您也可以使用 Spark 上的 pandas 将数据加载到内存中,但这通常被认为是一种反模式。如果存储中的数据发生变化(通过追加、合并或删除),加载到内存中的数据集将不会更新。在某些情况下,持久化 Spark DataFrame 是明智的,可以加快查询速度,但必须谨慎使用,因为它会导致查询结果不正确。

Spark 上的 pandas 和 PySpark 的区别

Spark 上的 pandas 和 PySpark 都接受查询,将其转换为未解析的逻辑计划,然后使用 Spark 执行它们。

PySpark 和 Spark 上的 pandas 都具有类似的查询执行模型。将查询转换为未解析的逻辑计划相对较快。优化查询并执行它需要更多时间。因此,PySpark 和 Spark 上的 pandas 应该具有类似的性能。

Spark 上的 pandas 和 PySpark 之间的主要区别仅在于语法。

结论

对于希望更快地运行查询并希望利用 Spark 的优化器而不是编写自己的优化的 pandas 用户来说,Spark 上的 pandas 是一个很好的替代方案。

Spark 上的 pandas 使用 pandas 用户熟悉的语法,因此易于学习。

Spark 上的 pandas 也是与 pandas 结合使用的绝佳技术。您可以使用 Spark 上的 pandas 的大数据和高性能处理功能来处理数据集,然后再将其转换为与其他技术兼容的 pandas DataFrame。

查看 文档,详细了解如何使用 Spark 上的 pandas。

最新消息

存档