Spark 上的 pandas API

本页描述了 Spark 上的 pandas API(“Spark 上的 pandas”)的优势,以及何时应使用它而不是 pandas(或与 pandas 结合使用)。

Spark 上的 pandas 比 pandas 快得多,并且提供了 pandas 用户熟悉的语法。它提供了 Spark 的强大功能和 pandas 的熟悉感。

以下是 Spark 上的 pandas 的主要优势

  • 在单机工作负载上更快的查询执行(因为 Spark 上的 pandas 使用所有可用核心,并行处理查询并优化查询)
  • Spark 上的 pandas 可扩展到集群中的多台机器并可处理大数据
  • Spark 上的 pandas 允许在超出内存限制的数据集上运行查询
  • Spark 上的 pandas 为 pandas 用户提供了熟悉的语法

pandas 有许多限制

  • pandas 必须在运行查询之前将所有数据加载到内存中,这可能很慢
  • pandas 无法处理大于单机可用内存的数据集(因为所有数据都必须加载到内存中)
  • pandas 计算在单个核心上运行,并且不利用单机的所有可用核心
  • pandas 计算无法扩展到多台机器
  • pandas 没有查询优化器,因此用户必须手动编写优化代码,否则会遇到慢代码

让我们看一些简单的例子,以便更好地理解 Spark 上的 pandas 如何克服 pandas 的局限性。我们还将探讨 Spark 上的 pandas 的局限性。

在本页末尾,您将看到如何结合使用 pandas 和 Spark 上的 pandas。这不是非此即彼的决定——在许多情况下,两者结合使用是一个很好的选择。

Spark 上的 pandas 示例

本节演示了 Spark 上的 pandas 如何在 localhost 上的单个文件上比 pandas 更快地运行查询。Spark 上的 pandas 并非对所有查询都更快,但此示例展示了它何时能提供不错的加速。

假设您有一个 Parquet 文件,其中包含 9 列和 10 亿行数据。以下是文件的前三行

+-------+-------+--------------+-------+-------+--------+------+------+---------+
| id1   | id2   | id3          |   id4 |   id5 |    id6 |   v1 |   v2 |      v3 |
|-------+-------+--------------+-------+-------+--------+------+------+---------|
| id016 | id046 | id0000109363 |    88 |    13 | 146094 |    4 |    6 | 18.8377 |
| id039 | id087 | id0000466766 |    14 |    30 | 111330 |    4 |   14 | 46.7973 |
| id047 | id098 | id0000307804 |    85 |    23 | 187639 |    3 |    5 | 47.5773 |
+-------+-------+--------------+-------+-------+--------+------+------+---------+

以下是您如何使用 Spark 上的 pandas 读取文件并运行分组聚合。

import pyspark.pandas as ps

df = ps.read_parquet("G1_1e9_1e2_0_0.parquet")[
    ["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

此查询在具有 64 GB RAM 的 2020 M1 Macbook 上,使用 Spark 3.5.0 执行时,耗时 62 秒。

让我们将其与未优化的 pandas 代码进行比较。

import pandas as pd

df = pd.read_parquet("G1_1e9_1e2_0_0.parquet")[
    ["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

此查询出错,因为具有 64GB RAM 的机器没有足够的空间在内存中存储 10 亿行数据。

让我们手动添加一些 pandas 优化以使查询运行

df = pd.read_parquet(
    "G1_1e9_1e2_0_0.parquet",
    columns=["id1", "id2", "v3"],
    filters=[("id1", ">", "id098")],
    engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

此查询在 pandas 2.2.0 中运行耗时 275 秒。

手动使用 pandas 编写这些优化代码可能会给出错误的结果。这是一个分组查询的示例,它是正确的,但行组过滤谓词是错误的

df = pd.read_parquet(
    "G1_1e9_1e2_0_0.parquet",
    columns=["id1", "id2", "v3"],
    filters=[("id1", "==", "id001")],
    engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

这会返回错误的结果,尽管分组聚合逻辑是正确的!

使用 pandas,您需要在读取 Parquet 文件时手动应用列修剪和行组过滤。使用 Spark 上的 pandas,Spark 优化器会自动应用这些查询增强,因此您无需手动输入它们。

让我们更详细地探讨 Spark 上的 pandas 的优势。

Spark 上的 pandas 的优势

让我们回顾一下 Spark 上的 pandas 的优势

更快的查询执行

Spark 上的 pandas 可以比 pandas 更快地执行查询,因为它使用所有可用核心并行化计算,并在运行查询之前对其进行优化,以实现高效执行。

pandas 计算仅在单个核心上运行。

可扩展到大于内存的数据集

pandas 在运行查询之前将数据加载到内存中,因此它只能查询适合内存的数据集。

Spark 可以通过流式传输数据和增量运行计算来查询大于内存的数据集。

pandas 在数据集大小增长时经常出错,而 Spark 没有这个限制。

可在多台机器集群上运行

Spark 可以在单机上运行,也可以分发到集群中的多台机器上。

当 Spark 在单机上运行时,计算在所有可用核心上运行。这通常比仅在单个核心上运行计算的 pandas 更快。

在多台机器上扩展计算非常适合您希望对更大的数据集运行计算或只是访问更多 RAM/核心以使查询运行更快的情况。

pandas 用户熟悉的语法

Spark 上的 pandas 旨在为 pandas 用户提供熟悉的语法。

熟悉的语法是关键——Spark 上的 pandas 提供了 Spark 的强大功能,同时沿用了 pandas 用户习惯的语法。

提供了访问 Spark 久经考验的查询优化器的能力

Spark 上的 pandas 计算在执行之前会通过 Spark 的 Catalyst 优化器进行优化。

这些优化简化了查询并增加了优化。

在本文前面,我们看到了 Spark 如何在读取 Parquet 文件时自动添加列修剪/行组过滤优化。pandas 没有查询优化器,因此您需要自己添加这些优化。手动添加优化是繁琐且容易出错的。如果您不手动应用正确的优化,您的查询将返回错误的结果。

Spark 上的 pandas 的局限性

Spark 上的 pandas 不支持 pandas 支持的所有 API,原因有二

  • 某些功能尚未添加到 Spark 上的 pandas 中

  • 某些 pandas 功能在 Spark 的分布式、并行执行模型中没有意义

Spark 将 DataFrame 分成多个块以便并行处理,因此某些 pandas 操作无法很好地转换为 Spark 的执行模型。

结合使用 Spark 上的 pandas 和普通 pandas

结合使用 Spark 上的 pandas 和 pandas 通常很有用,可以两全其美。

假设您有一个大型数据集,您将其清理并聚合到一个较小的数据集中,然后将其传递给 scikit-learn 机器学习模型。

您可以使用 Spark 上的 pandas 清理和聚合数据集,以利用快速查询时间和并行执行。处理完数据集后,您可以使用 to_pandas() 将其转换为 pandas DataFrame,然后使用 scikit-learn 运行机器学习模型。如果数据集可以足够小以适应 pandas DataFrame,则此方法效果很好。

Spark 上的 pandas 查询执行模型不同

Spark 上的 pandas 执行查询的方式与 pandas 完全不同。

Spark 上的 pandas 使用惰性评估。它将查询转换为未解析的逻辑计划,使用 Spark 对其进行优化,并且仅在请求结果时才运行计算。

pandas 使用急切评估。它将所有数据加载到内存中,并在调用操作时立即执行。pandas 不应用查询优化,并且在执行查询之前必须将所有数据加载到内存中。

比较 Spark 上的 pandas 和 pandas 时,您必须注意考虑将数据加载到内存所需的时间以及运行查询所需的时间。许多数据集需要很长时间才能加载到 pandas 中。

您也可以使用 Spark 上的 pandas 将数据加载到内存中,但这通常被认为是一种反模式。如果存储中的数据发生更改(通过追加、合并或删除),则加载到内存中的数据集将不会更新。在某些情况下,持久化 Spark DataFrame 以加快查询时间是明智的,但必须谨慎使用,因为它可能导致不正确的查询结果。

Spark 上的 pandas 和 PySpark 的区别

Spark 上的 pandas 和 PySpark 都接收查询,将其转换为未解析的逻辑计划,然后使用 Spark 执行它们。

PySpark 和 Spark 上的 pandas 都具有相似的查询执行模型。将查询转换为未解析的逻辑计划相对较快。优化查询和执行查询需要更多时间。因此,PySpark 和 Spark 上的 pandas 应该具有相似的性能。

Spark 上的 pandas 和 PySpark 之间的主要区别仅在于语法。

结论

Spark 上的 pandas 是一个很好的替代方案,适用于希望更快地运行查询并希望利用 Spark 优化器而不是编写自己的优化代码的 pandas 用户。

Spark 上的 pandas 使用 pandas 用户熟悉的语法,因此易于学习。

Spark 上的 pandas 也是一种可以与 pandas 结合使用的出色技术。您可以利用 Spark 上的 pandas 的大数据和高性能处理能力来处理数据集,然后再将其转换为与其他技术兼容的 pandas DataFrame。

查看文档,了解有关如何使用 Spark 上的 pandas 的更多信息。

最新消息

归档