提交应用程序
Spark 的 bin
目录中的 spark-submit
脚本用于在集群上启动应用程序。它可以通过统一的接口使用 Spark 支持的所有集群管理器,因此您无需为每个集群管理器专门配置您的应用程序。
捆绑应用程序的依赖项
如果您的代码依赖于其他项目,则需要将它们与您的应用程序一起打包,以便将代码分发到 Spark 集群。为此,请创建一个包含您的代码及其依赖项的 assembly jar(或“uber”jar)。sbt 和 Maven 都包含 assembly 插件。在创建 assembly jar 时,请将 Spark 和 Hadoop 列为 provided
依赖项;这些无需捆绑,因为它们在运行时由集群管理器提供。一旦您有了组装好的 jar,就可以像此处所示,在传递您的 jar 时调用 bin/spark-submit
脚本。
对于 Python,您可以使用 spark-submit
的 --py-files
参数来添加 .py
、.zip
或 .egg
文件,以便与您的应用程序一起分发。如果您依赖多个 Python 文件,我们建议将它们打包成 .zip
或 .egg
。对于第三方 Python 依赖项,请参阅Python 包管理。
使用 spark-submit 启动应用程序
用户应用程序捆绑后,可以使用 bin/spark-submit
脚本启动。此脚本负责设置 Spark 及其依赖项的 classpath,并且支持 Spark 支持的不同集群管理器和部署模式。
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
一些常用的选项是
--class
:您的应用程序的入口点(例如org.apache.spark.examples.SparkPi
)--master
:集群的主 URL(例如spark://23.195.26.187:7077
)--deploy-mode
:是否将您的驱动程序部署在工作节点上(cluster
)或作为外部客户端在本地部署(client
)(默认:client
) †--conf
:以 key=value 格式表示的任意 Spark 配置属性。对于包含空格的值,请用引号将“key=value”括起来(如所示)。多个配置应作为单独的参数传递。(例如--conf <key>=<value> --conf <key2>=<value2>
)application-jar
:包含您的应用程序和所有依赖项的捆绑 jar 的路径。URL 在您的集群内必须全局可见,例如,一个hdfs://
路径或一个存在于所有节点上的file://
路径。application-arguments
:传递给您的主类 main 方法的参数(如果有)
† 一种常见的部署策略是从与您的工作机器物理上位于同一位置的网关机器(例如独立 EC2 集群中的主节点)提交您的应用程序。在这种设置中,client
模式是合适的。在 client
模式下,驱动程序直接在充当集群客户端的 spark-submit
进程中启动。应用程序的输入和输出连接到控制台。因此,此模式特别适用于涉及 REPL 的应用程序(例如 Spark shell)。
或者,如果您的应用程序是从远离工作机器的机器(例如本地笔记本电脑)提交的,通常使用 cluster
模式来最小化驱动程序和执行器之间的网络延迟。目前,独立模式不支持 Python 应用程序的集群模式。
对于 Python 应用程序,只需将 .py
文件作为 <application-jar>
的替代,并通过 --py-files
将 Python .zip
、.egg
或 .py
文件添加到搜索路径中。
有一些选项是特定于所使用的集群管理器的。例如,对于具有 cluster
部署模式的Spark 独立集群,您还可以指定 --supervise
以确保如果驱动程序以非零退出代码失败,它会自动重新启动。要列出 spark-submit
可用的所有此类选项,请使用 --help
运行它。以下是几个常见选项的示例:
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master "local[8]" \
/path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a YARN cluster in cluster deploy mode
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master k8s://xx.yy.zz.ww:443 \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
http://path/to/examples.jar \
1000
Master URL
传递给 Spark 的 master URL 可以采用以下格式之一:
Master URL | 含义 |
---|---|
local | 在本地运行 Spark,使用一个工作线程(即完全没有并行性)。 |
local[K] | 在本地运行 Spark,使用 K 个工作线程(理想情况下,将其设置为您机器上的核心数)。 |
local[K,F] | 在本地运行 Spark,使用 K 个工作线程和 F 个最大失败次数(关于此变量的解释,请参阅spark.task.maxFailures)。 |
local[*] | 在本地运行 Spark,使用与您机器上的逻辑核心数一样多的工作线程。 |
local[*,F] | 在本地运行 Spark,使用与您机器上的逻辑核心数一样多的工作线程和 F 个最大失败次数。 |
local-cluster[N,C,M] | 本地集群模式仅用于单元测试。它在一个 JVM 中模拟一个分布式集群,其中包含 N 个工作节点,每个工作节点 C 个核心和 M MiB 内存。 |
spark://HOST:PORT | 连接到给定的Spark 独立集群 master。端口必须是您 master 配置使用的端口,默认为 7077。 |
spark://HOST1:PORT1,HOST2:PORT2 | 连接到给定带 Zookeeper 备用 master 的 Spark 独立集群。列表必须包含通过 Zookeeper 设置的高可用集群中的所有 master 主机。端口必须是每个 master 配置使用的端口,默认为 7077。 |
yarn | 根据 --deploy-mode 的值,以 client 或 cluster 模式连接到 YARN 集群。集群位置将根据 HADOOP_CONF_DIR 或 YARN_CONF_DIR 变量查找。 |
k8s://HOST:PORT | 根据 --deploy-mode 的值,以 client 或 cluster 模式连接到 Kubernetes 集群。HOST 和 PORT 指的是 Kubernetes API Server。默认使用 TLS 连接。为了强制使用不安全的连接,可以使用 k8s://http://HOST:PORT 。 |
从文件中加载配置
spark-submit
脚本可以从属性文件中加载默认的Spark 配置值并将其传递给您的应用程序。文件可以通过 --properties-file
参数指定。如果未指定,Spark 默认会从 SPARK_HOME
目录中的 conf/spark-defaults.conf
读取选项。
可以额外使用 --load-spark-defaults
标志来告诉 Spark 即使通过 --properties-file
提供了属性文件,也要从 conf/spark-defaults.conf
加载配置。这在某些情况下很有用,例如,当用户希望将系统范围的默认设置放在前者中,而将用户/集群特定的设置放在后者中时。
以这种方式加载默认 Spark 配置可以省去 spark-submit
的某些标志。例如,如果设置了 spark.master
属性,您可以安全地从 spark-submit
中省略 --master
标志。通常,在 SparkConf
上显式设置的配置值具有最高优先级,其次是传递给 spark-submit
的标志,然后是默认文件中的值。
如果您不确定配置选项的来源,可以通过使用 --verbose
选项运行 spark-submit
来打印出详细的调试信息。
高级依赖项管理
使用 spark-submit
时,应用程序 jar 以及 --jars
选项中包含的任何 jar 将自动传输到集群。在 --jars
之后提供的 URL 必须用逗号分隔。该列表包含在驱动程序和执行器 classpath 中。目录扩展不适用于 --jars
。
Spark 使用以下 URL 方案来允许不同的 jar 分发策略:
- file: - 绝对路径和
file:/
URI 由驱动程序的 HTTP 文件服务器提供服务,每个执行器都从驱动程序 HTTP 服务器拉取文件。 - hdfs:, http:, https:, ftp: - 这些会像预期一样从 URI 下载文件和 JAR。
- local: - 以 local:/ 开头的 URI 预计作为本地文件存在于每个工作节点上。这意味着不会产生网络 IO,并且非常适用于推送到每个工作节点或通过 NFS、GlusterFS 等共享的大文件/JAR。
请注意,JAR 和文件会被复制到执行器节点上每个 SparkContext 的工作目录中。这会随着时间的推移占用大量空间,需要清理。对于 YARN,清理是自动处理的;对于 Spark standalone,可以通过 spark.worker.cleanup.appDataTtl
属性配置自动清理。
用户还可以通过 --packages
提供逗号分隔的 Maven 坐标列表来包含任何其他依赖项。使用此命令时,所有传递依赖项都将得到处理。可以使用 --repositories
标志以逗号分隔的方式添加额外的仓库(或 SBT 中的解析器)。(请注意,对于受密码保护的仓库,有时可以在仓库 URI 中提供凭据,例如 https://user:password@host/...
。以这种方式提供凭据时请务必小心。)这些命令可以与 pyspark
、spark-shell
和 spark-submit
一起使用以包含 Spark 包。
对于 Python,可以使用等效的 --py-files
选项将 .egg
、.zip
和 .py
库分发到执行器。
更多信息
部署应用程序后,集群模式概述描述了分布式执行中涉及的组件以及如何监控和调试应用程序。