Python 包管理#
当您想在 YARN、Kubernetes 等集群上运行 PySpark 应用程序时,您需要确保您的代码和所有使用的库在执行器上可用。
例如,假设您想运行 Pandas UDF 示例。由于它使用 pyarrow 作为底层实现,我们需要确保在集群的每个执行器上都安装了 pyarrow。否则,您可能会收到诸如 ModuleNotFoundError: No module named 'pyarrow'
的错误。
这是上一个示例中将在集群上执行的脚本 app.py
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
def main(spark):
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
print(df.groupby("id").agg(mean_udf(df['v'])).collect())
if __name__ == "__main__":
main(SparkSession.builder.getOrCreate())
在集群中管理 Python 依赖项有多种方法
使用 PySpark 原生功能
使用 Conda
使用 Virtualenv
使用 PEX
使用 PySpark 原生功能#
PySpark 允许通过以下方式之一将 Python 文件 (.py
)、压缩的 Python 包 (.zip
) 和 Egg 文件 (.egg
) 上传到执行器:
设置配置项
spark.submit.pyFiles
在 Spark 脚本中设置
--py-files
选项在应用程序中直接调用
pyspark.SparkContext.addPyFile()
这是一种将额外的自定义 Python 代码传输到集群的直接方法。您可以只添加单个文件或压缩整个包并上传它们。使用 pyspark.SparkContext.addPyFile()
甚至允许您在作业启动后上传代码。
但是,它不允许添加构建为 Wheels 的包,因此不允许包含带有原生代码的依赖项。
使用 Conda#
Conda 是最广泛使用的 Python 包管理系统之一。PySpark 用户可以利用 conda-pack(一个创建可重定位 Conda 环境的命令行工具)直接使用 Conda 环境来传输他们的第三方 Python 包。
下面的示例创建了一个 Conda 环境供驱动程序和执行器使用,并将其打包成一个存档文件。这个存档文件捕获了 Python 的 Conda 环境,并存储了 Python 解释器及其所有相关依赖项。
conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz
之后,您可以通过使用 --archives
选项或 spark.archives
配置(在 YARN 中是 spark.yarn.dist.archives
)将其与脚本一起传输或在代码中传输。它会自动在执行器上解压存档。
在 spark-submit
脚本的情况下,您可以如下使用它
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py
请注意,在 YARN 或 Kubernetes 的集群模式下,不应设置上述 PYSPARK_DRIVER_PYTHON
。
如果您在常规 Python shell 或 notebook 中,您可以按如下所示尝试
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_conda_env.tar.gz#environment").getOrCreate()
main(spark)
对于 pyspark shell
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_conda_env.tar.gz#environment
使用 Virtualenv#
Virtualenv 是一个创建隔离 Python 环境的 Python 工具。自 Python 3.3 以来,它的一部分功能已作为标准库集成到 Python 中,位于 venv 模块下。PySpark 用户可以以类似于 conda-pack 的方式使用 venv-pack 来管理其集群中的 Python 依赖项。
可以按如下所示创建一个供驱动程序和执行器使用的虚拟环境。它将当前虚拟环境打包成一个存档文件,并且包含 Python 解释器和依赖项。但是,它要求集群中的所有节点都安装相同的 Python 解释器,因为 venv-pack 将 Python 解释器打包为符号链接。
python -m venv pyspark_venv
source pyspark_venv/bin/activate
pip install pyarrow pandas venv-pack
venv-pack -o pyspark_venv.tar.gz
您可以通过利用 --archives
选项或 spark.archives
配置(在 YARN 中是 spark.yarn.dist.archives
)直接传递/解压存档文件并在执行器上启用环境。
对于 spark-submit
,您可以按如下方式运行命令使用它。此外,请注意在 Kubernetes 或 YARN 集群模式下必须取消设置 PYSPARK_DRIVER_PYTHON
。
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_venv.tar.gz#environment app.py
对于常规 Python shell 或 notebook
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_venv.tar.gz#environment").getOrCreate()
main(spark)
在 pyspark shell 的情况下
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_venv.tar.gz#environment
使用 PEX#
PySpark 也可以使用 PEX 来一起传输 Python 包。PEX 是一个创建自包含 Python 环境的工具。这类似于 Conda 或 virtualenv,但是 .pex
文件本身是可执行的。
以下示例为驱动程序和执行器创建了一个 .pex
文件。该文件包含使用 pex
命令指定的 Python 依赖项。
pip install pyarrow pandas pex
pex pyspark pyarrow pandas -o pyspark_pex_env.pex
此文件与常规 Python 解释器的行为类似。
./pyspark_pex_env.pex -c "import pandas; print(pandas.__version__)"
1.1.5
但是,.pex
文件本身不包含 Python 解释器,因此集群中的所有节点都应该安装相同的 Python 解释器。
为了在集群中传输和使用 .pex
文件,您应该通过 spark.files
配置(在 YARN 中是 spark.yarn.dist.files
)或 --files
选项传输,因为它们是常规文件而不是目录或存档文件。
对于应用程序提交,您可以按如下所示运行命令。请注意,在 YARN 或 Kubernetes 的集群模式下,不应设置 PYSPARK_DRIVER_PYTHON
。
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./pyspark_pex_env.pex
spark-submit --files pyspark_pex_env.pex app.py
对于常规 Python shell 或 notebook
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./pyspark_pex_env.pex"
spark = SparkSession.builder.config(
"spark.files", # 'spark.yarn.dist.files' in YARN.
"pyspark_pex_env.pex").getOrCreate()
main(spark)
对于交互式 pyspark shell,命令几乎相同
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./pyspark_pex_env.pex
pyspark --files pyspark_pex_env.pex
一个部署带有 SparkSession.builder
和 PEX 的独立 PySpark 的端到端 Docker 示例可以在 这里 找到——它使用了 cluster-pack,这是一个在 PEX 之上的库,可以自动化手动创建和上传 PEX 的中间步骤。