pandas API on Spark

本页面介绍了 pandas API on Spark(“pandas on Spark”)的优势,以及何时应该使用它来代替 pandas(或与 pandas 结合使用)。

pandas on Spark 的运行速度通常远快于 pandas,并提供了 pandas 用户熟悉的语法。它既具备 Spark 的强大功能,又拥有 pandas 的易用性。

以下是 pandas on Spark 的主要优势:

  • 在单机工作负载上更快的查询执行速度(因为 pandas on Spark 使用所有可用核心、并行处理查询并优化查询)
  • pandas on Spark 可扩展至集群中的多台机器,能够处理大数据
  • pandas on Spark 允许对超出内存大小的数据集运行查询
  • pandas on Spark 为 pandas 用户提供了熟悉的语法

pandas 存在许多局限性

  • pandas 在运行查询前必须将所有数据加载到内存中,这可能会很慢
  • pandas 无法处理大于单机可用内存的数据集(因为所有数据必须加载到内存中)
  • pandas 计算在单核上运行,无法利用单机的所有可用核心
  • pandas 计算无法扩展到多台机器
  • pandas 没有查询优化器,因此用户必须手动编写优化代码,否则会导致代码运行缓慢

让我们通过一些简单的示例来更好地理解 pandas on Spark 如何克服 pandas 的局限性。我们还将探讨 pandas on Spark 的局限性。

在本页面末尾,你将看到如何结合使用 pandas 和 pandas on Spark。这并不是非此即彼的选择——在许多情况下,两者结合使用是非常棒的选择。

pandas on Spark 示例

本节演示了 pandas on Spark 如何在本地主机上比 pandas 更快地对单个文件运行查询。pandas on Spark 并不一定在所有查询中都更快,但此示例展示了它何时能提供显著的速度提升。

假设你有一个包含 9 列和 10 亿行数据的 Parquet 文件。以下是该文件的前三行:

+-------+-------+--------------+-------+-------+--------+------+------+---------+
| id1   | id2   | id3          |   id4 |   id5 |    id6 |   v1 |   v2 |      v3 |
|-------+-------+--------------+-------+-------+--------+------+------+---------|
| id016 | id046 | id0000109363 |    88 |    13 | 146094 |    4 |    6 | 18.8377 |
| id039 | id087 | id0000466766 |    14 |    30 | 111330 |    4 |   14 | 46.7973 |
| id047 | id098 | id0000307804 |    85 |    23 | 187639 |    3 |    5 | 47.5773 |
+-------+-------+--------------+-------+-------+--------+------+------+---------+

以下是如何使用 pandas on Spark 读取该文件并运行分组聚合的方法:

import pyspark.pandas as ps

df = ps.read_parquet("G1_1e9_1e2_0_0.parquet")[
    ["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

此查询在配备 64GB RAM 的 2020 M1 Macbook 上,使用 Spark 3.5.0 执行时,耗时 62 秒。

让我们将其与未经优化的 pandas 代码进行比较。

import pandas as pd

df = pd.read_parquet("G1_1e9_1e2_0_0.parquet")[
    ["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

此查询会报错,因为配备 64GB RAM 的机器没有足够的空间将 10 亿行数据存入内存。

让我们手动添加一些 pandas 优化来使查询运行:

df = pd.read_parquet(
    "G1_1e9_1e2_0_0.parquet",
    columns=["id1", "id2", "v3"],
    filters=[("id1", ">", "id098")],
    engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

此查询在 pandas 2.2.0 下运行耗时 275 秒。

使用 pandas 手动编写这些优化可能会导致结果错误。这是一个分组查询的示例,逻辑正确,但行组过滤谓词是错误的:

df = pd.read_parquet(
    "G1_1e9_1e2_0_0.parquet",
    columns=["id1", "id2", "v3"],
    filters=[("id1", "==", "id001")],
    engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)

这返回了错误的结果,即使分组聚合逻辑本身是正确的!

使用 pandas 时,读取 Parquet 文件时需要手动应用列裁剪和行组过滤。而在 pandas on Spark 中,Spark 优化器会自动应用这些查询增强功能,因此你无需手动输入它们。

让我们更详细地探讨 pandas on Spark 的优势。

pandas on Spark 的优势

让我们回顾一下 pandas on Spark 的优势:

更快的查询执行速度

pandas on Spark 的查询执行速度比 pandas 快,因为它使用所有可用核心进行并行计算,并在运行前对查询进行优化,从而实现高效执行。

pandas 计算仅在单核上运行。

可扩展至超大内存数据集

pandas 在运行查询前会将数据加载到内存中,因此只能查询适合内存的数据集。

Spark 可以通过流式传输数据并逐步运行计算,从而查询大于内存的数据集。

pandas 以在数据集增大时报错而著称,而 Spark 没有这个限制。

可在多机集群上运行

Spark 既可以在单机上运行,也可以分布在集群中的多台机器上。

当 Spark 在单机上运行时,计算在所有可用核心上运行。这通常比仅在单核上运行计算的 pandas 快得多。

当你想在更大的数据集上运行计算,或者仅仅是为了获取更多的 RAM/核心以加快查询速度时,在多台机器上扩展计算是非常好的选择。

为 pandas 用户提供熟悉的语法

pandas on Spark 旨在为 pandas 用户提供熟悉的语法。

熟悉的语法正是其核心所在——pandas on Spark 以 pandas 用户习惯的相同语法提供了 Spark 的强大功能。

提供对 Spark 经过实战检验的查询优化器的访问

pandas on Spark 的计算在执行前由 Spark 的 Catalyst 优化器进行优化。

这些优化简化了查询并增加了性能提升。

在本文前面,我们看到 Spark 如何自动添加诸如读取 Parquet 文件时的列裁剪/行组过滤优化。pandas 没有查询优化器,因此你需要自己添加这些优化。手动添加优化既繁琐又容易出错。如果你没有正确地手动应用优化,你的查询就会返回错误的结果。

pandas on Spark 的局限性

由于以下两个原因,pandas on Spark 并不支持 pandas 支持的所有 API:

  • 某些功能尚未添加到 pandas on Spark 中

  • 某些 pandas 功能在 Spark 的分布式并行执行模型中没有意义

Spark 将 DataFrame 分解为多个块以便并行处理,因此某些 pandas 操作无法很好地转换到 Spark 的执行模型中。

将 pandas on Spark 与常规 pandas 结合使用

将 pandas on Spark 和 pandas 结合使用通常能取长补短,发挥两者的最大优势。

假设你有一个大数据集,经过清洗和聚合后得到一个较小的数据集,并将其传入 scikit-learn 机器学习模型中。

你可以使用 pandas on Spark 清洗和聚合数据集,以利用其快速的查询时间和并行执行能力。数据集处理完成后,可以使用 to_pandas() 将其转换为 pandas DataFrame,然后使用 scikit-learn 运行机器学习模型。如果数据集可以被缩减到足以放入 pandas DataFrame,这种方法效果很好。

pandas on Spark 的查询执行模型不同

pandas on Spark 执行查询的方式与 pandas 完全不同。

pandas on Spark 使用惰性求值。它将查询转换为未解析的逻辑计划,通过 Spark 进行优化,仅在请求结果时才运行计算。

pandas 使用急切求值。它将所有数据加载到内存中,并在调用操作时立即执行。pandas 不应用查询优化,并且在执行查询前必须将所有数据加载到内存中。

在比较 pandas on Spark 和 pandas 时,必须仔细考虑将数据加载到内存所需的时间以及运行查询所需的时间。许多数据集加载到 pandas 中需要很长时间。

你也可以使用 pandas on Spark 将数据加载到内存中,但这通常被认为是一种反模式。如果存储中的数据发生更改(通过追加、合并或删除),加载到内存的数据集将不会更新。在某些情况下,持久化 Spark DataFrame 是明智的,可以加快查询速度,但必须谨慎使用,因为它可能导致查询结果不正确。

pandas on Spark 与 PySpark 的区别

pandas on Spark 和 PySpark 都接收查询,将其转换为未解析的逻辑计划,然后使用 Spark 执行它们。

PySpark 和 pandas on Spark 具有相似的查询执行模型。将查询转换为未解析的逻辑计划相对较快。优化查询并执行它需要花费更多时间。因此,PySpark 和 pandas on Spark 应该具有相似的性能。

pandas on Spark 和 PySpark 的主要区别仅在于语法。

总结

对于希望更快地运行查询并希望利用 Spark 的优化器而不是编写自己的优化代码的 pandas 用户来说,pandas on Spark 是一个很好的替代方案。

pandas on Spark 使用 pandas 用户熟悉的语法,因此非常容易学习。

pandas on Spark 也是一种与 pandas 结合使用的优秀技术。你可以利用 pandas on Spark 的大数据和高性能处理能力来处理数据集,然后再将其转换为与其他技术兼容的 pandas DataFrame。

查看 官方文档 以了解更多关于如何使用 pandas on Spark 的信息。