Python 包管理¶
当你想在 YARN、Kubernetes、Mesos 等集群上运行你的 PySpark 应用程序时,你需要确保你的代码和所有使用的库在 executors 上可用。
举个例子,假设你想运行 Pandas UDF 示例。因为它使用 pyarrow 作为底层实现,我们需要确保在集群的每个 executor 上都安装了 pyarrow。否则,你可能会收到诸如 ModuleNotFoundError: No module named 'pyarrow'
之类的错误。
这是前一个示例中的脚本 app.py
,它将在集群上执行
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
def main(spark):
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
print(df.groupby("id").agg(mean_udf(df['v'])).collect())
if __name__ == "__main__":
main(SparkSession.builder.getOrCreate())
有多种方法可以管理集群中的 Python 依赖项
使用 PySpark 本地特性
使用 Conda
使用 Virtualenv
使用 PEX
使用 PySpark 本地特性¶
PySpark 允许通过以下方式之一将 Python 文件 (.py
)、压缩的 Python 包 (.zip
) 和 Egg 文件 (.egg
) 上传到 executors:
设置配置设置
spark.submit.pyFiles
在 Spark 脚本中设置
--py-files
选项在应用程序中直接调用
pyspark.SparkContext.addPyFile()
这是一种将额外的自定义 Python 代码传送到集群的简单方法。你可以只添加单个文件或压缩整个包并上传它们。 使用 pyspark.SparkContext.addPyFile()
允许你在启动作业后仍然上传代码。
但是,它不允许添加构建为 Wheels 的包,因此不允许包含带有本机代码的依赖项。
使用 Conda¶
Conda 是最广泛使用的 Python 包管理系统之一。 PySpark 用户可以通过利用 conda-pack 直接使用 Conda 环境来传送他们的第三方 Python 包,conda-pack 是一个创建可重定位 Conda 环境的命令行工具。
下面的示例创建了一个 Conda 环境,以便在 driver 和 executor 上使用,并将其打包到一个归档文件中。 这个归档文件捕获 Python 的 Conda 环境,并存储 Python 解释器及其所有相关依赖项。
conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz
之后,你可以通过使用 --archives
选项或 spark.archives
配置(在 YARN 中为 spark.yarn.dist.archives
)将它与脚本一起传送或在代码中传送。它会自动在 executors 上解压缩归档文件。
在 spark-submit
脚本的情况下,你可以如下使用它
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py
请注意,对于 YARN 或 Kubernetes 中的集群模式,不应设置上面的 PYSPARK_DRIVER_PYTHON
。
如果你在常规 Python shell 或 notebook 中,你可以尝试如下所示
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_conda_env.tar.gz#environment").getOrCreate()
main(spark)
对于 pyspark shell
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_conda_env.tar.gz#environment
使用 Virtualenv¶
Virtualenv 是一种用于创建隔离 Python 环境的 Python 工具。 从 Python 3.3 开始,它的部分功能已作为标准库集成到 Python 中,位于 venv 模块下。 PySpark 用户可以使用 virtualenv 通过使用 venv-pack 以与 conda-pack 类似的方式管理其集群中的 Python 依赖项。
可以创建一个在 driver 和 executor 上使用的虚拟环境,如下所示。 它将当前的虚拟环境打包到一个归档文件中,并且包含 Python 解释器和依赖项。 但是,它要求集群中的所有节点都安装了相同的 Python 解释器,因为 venv-pack 将 Python 解释器打包为符号链接。
python -m venv pyspark_venv
source pyspark_venv/bin/activate
pip install pyarrow pandas venv-pack
venv-pack -o pyspark_venv.tar.gz
你可以直接传递/解压缩归档文件,并通过利用 --archives
选项或 spark.archives
配置(在 YARN 中为 spark.yarn.dist.archives
)在 executors 上启用该环境。
对于 spark-submit
,你可以通过运行以下命令来使用它。 另外,请注意,在 Kubernetes 或 YARN 集群模式下,必须取消设置 PYSPARK_DRIVER_PYTHON
。
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_venv.tar.gz#environment app.py
对于常规 Python shell 或 notebook
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_venv.tar.gz#environment").getOrCreate()
main(spark)
在 pyspark shell 的情况下
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_venv.tar.gz#environment
使用 PEX¶
PySpark 还可以使用 PEX 来一起传送 Python 包。 PEX 是一种创建自包含 Python 环境的工具。 这类似于 Conda 或 virtualenv,但是 .pex
文件本身是可执行的。
以下示例创建了一个 .pex
文件,供 driver 和 executor 使用。 该文件包含使用 pex
命令指定的 Python 依赖项。
pip install pyarrow pandas pex
pex pyspark pyarrow pandas -o pyspark_pex_env.pex
此文件的行为类似于常规 Python 解释器。
./pyspark_pex_env.pex -c "import pandas; print(pandas.__version__)"
1.1.5
但是,.pex
文件本身不包含 Python 解释器,因此集群中的所有节点都应安装相同的 Python 解释器。
为了在集群中传输和使用 .pex
文件,你应该通过 spark.files
配置(在 YARN 中为 spark.yarn.dist.files
)或 --files
选项传送它,因为它们是常规文件,而不是目录或归档文件。
对于应用程序提交,你运行如下所示的命令。 请注意,对于 YARN 或 Kubernetes 中的集群模式,不应设置 PYSPARK_DRIVER_PYTHON
。
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./pyspark_pex_env.pex
spark-submit --files pyspark_pex_env.pex app.py
对于常规 Python shell 或 notebook
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./pyspark_pex_env.pex"
spark = SparkSession.builder.config(
"spark.files", # 'spark.yarn.dist.files' in YARN.
"pyspark_pex_env.pex").getOrCreate()
main(spark)
对于交互式 pyspark shell,这些命令几乎相同
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./pyspark_pex_env.pex
pyspark --files pyspark_pex_env.pex
可以 在这里 找到一个用于部署独立的带有 SparkSession.builder
和 PEX 的 PySpark 的端到端 Docker 示例 - 它使用了 cluster-pack,一个位于 PEX 之上的库,可以自动执行必须手动创建和上传 PEX 的中间步骤。