Python 包管理#

当您想在 YARN、Kubernetes 等集群上运行 PySpark 应用程序时,您需要确保您的代码和所有使用的库在执行器上可用。

例如,假设您想运行 Pandas UDF 示例。由于它使用 pyarrow 作为底层实现,我们需要确保在集群的每个执行器上都安装了 pyarrow。否则,您可能会收到诸如 ModuleNotFoundError: No module named 'pyarrow' 的错误。

这是上一个示例中将在集群上执行的脚本 app.py

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession

def main(spark):
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))

    @pandas_udf("double")
    def mean_udf(v: pd.Series) -> float:
        return v.mean()

    print(df.groupby("id").agg(mean_udf(df['v'])).collect())


if __name__ == "__main__":
    main(SparkSession.builder.getOrCreate())

在集群中管理 Python 依赖项有多种方法

  • 使用 PySpark 原生功能

  • 使用 Conda

  • 使用 Virtualenv

  • 使用 PEX

使用 PySpark 原生功能#

PySpark 允许通过以下方式之一将 Python 文件 (.py)、压缩的 Python 包 (.zip) 和 Egg 文件 (.egg) 上传到执行器:

这是一种将额外的自定义 Python 代码传输到集群的直接方法。您可以只添加单个文件或压缩整个包并上传它们。使用 pyspark.SparkContext.addPyFile() 甚至允许您在作业启动后上传代码。

但是,它不允许添加构建为 Wheels 的包,因此不允许包含带有原生代码的依赖项。

使用 Conda#

Conda 是最广泛使用的 Python 包管理系统之一。PySpark 用户可以利用 conda-pack(一个创建可重定位 Conda 环境的命令行工具)直接使用 Conda 环境来传输他们的第三方 Python 包。

下面的示例创建了一个 Conda 环境供驱动程序和执行器使用,并将其打包成一个存档文件。这个存档文件捕获了 Python 的 Conda 环境,并存储了 Python 解释器及其所有相关依赖项。

conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz

之后,您可以通过使用 --archives 选项或 spark.archives 配置(在 YARN 中是 spark.yarn.dist.archives)将其与脚本一起传输或在代码中传输。它会自动在执行器上解压存档。

spark-submit 脚本的情况下,您可以如下使用它

export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py

请注意,在 YARN 或 Kubernetes 的集群模式下,不应设置上述 PYSPARK_DRIVER_PYTHON

如果您在常规 Python shell 或 notebook 中,您可以按如下所示尝试

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "pyspark_conda_env.tar.gz#environment").getOrCreate()
main(spark)

对于 pyspark shell

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_conda_env.tar.gz#environment

使用 Virtualenv#

Virtualenv 是一个创建隔离 Python 环境的 Python 工具。自 Python 3.3 以来,它的一部分功能已作为标准库集成到 Python 中,位于 venv 模块下。PySpark 用户可以以类似于 conda-pack 的方式使用 venv-pack 来管理其集群中的 Python 依赖项。

可以按如下所示创建一个供驱动程序和执行器使用的虚拟环境。它将当前虚拟环境打包成一个存档文件,并且包含 Python 解释器和依赖项。但是,它要求集群中的所有节点都安装相同的 Python 解释器,因为 venv-pack 将 Python 解释器打包为符号链接

python -m venv pyspark_venv
source pyspark_venv/bin/activate
pip install pyarrow pandas venv-pack
venv-pack -o pyspark_venv.tar.gz

您可以通过利用 --archives 选项或 spark.archives 配置(在 YARN 中是 spark.yarn.dist.archives)直接传递/解压存档文件并在执行器上启用环境。

对于 spark-submit,您可以按如下方式运行命令使用它。此外,请注意在 Kubernetes 或 YARN 集群模式下必须取消设置 PYSPARK_DRIVER_PYTHON

export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_venv.tar.gz#environment app.py

对于常规 Python shell 或 notebook

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "pyspark_venv.tar.gz#environment").getOrCreate()
main(spark)

在 pyspark shell 的情况下

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_venv.tar.gz#environment

使用 PEX#

PySpark 也可以使用 PEX 来一起传输 Python 包。PEX 是一个创建自包含 Python 环境的工具。这类似于 Conda 或 virtualenv,但是 .pex 文件本身是可执行的。

以下示例为驱动程序和执行器创建了一个 .pex 文件。该文件包含使用 pex 命令指定的 Python 依赖项。

pip install pyarrow pandas pex
pex pyspark pyarrow pandas -o pyspark_pex_env.pex

此文件与常规 Python 解释器的行为类似。

./pyspark_pex_env.pex -c "import pandas; print(pandas.__version__)"
1.1.5

但是,.pex 文件本身不包含 Python 解释器,因此集群中的所有节点都应该安装相同的 Python 解释器。

为了在集群中传输和使用 .pex 文件,您应该通过 spark.files 配置(在 YARN 中是 spark.yarn.dist.files)或 --files 选项传输,因为它们是常规文件而不是目录或存档文件。

对于应用程序提交,您可以按如下所示运行命令。请注意,在 YARN 或 Kubernetes 的集群模式下,不应设置 PYSPARK_DRIVER_PYTHON

export PYSPARK_DRIVER_PYTHON=python  # Do not set in cluster modes.
export PYSPARK_PYTHON=./pyspark_pex_env.pex
spark-submit --files pyspark_pex_env.pex app.py

对于常规 Python shell 或 notebook

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./pyspark_pex_env.pex"
spark = SparkSession.builder.config(
    "spark.files",  # 'spark.yarn.dist.files' in YARN.
    "pyspark_pex_env.pex").getOrCreate()
main(spark)

对于交互式 pyspark shell,命令几乎相同

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./pyspark_pex_env.pex
pyspark --files pyspark_pex_env.pex

一个部署带有 SparkSession.builder 和 PEX 的独立 PySpark 的端到端 Docker 示例可以在 这里 找到——它使用了 cluster-pack,这是一个在 PEX 之上的库,可以自动化手动创建和上传 PEX 的中间步骤。