提交应用程序

Spark 的 bin 目录中的 spark-submit 脚本用于在集群上启动应用程序。 它可以使用 Spark 支持的所有 集群管理器,通过统一的界面,您无需专门为每个管理器配置您的应用程序。

打包应用程序的依赖项

如果你的代码依赖于其他项目,你需要将它们与你的应用程序一起打包,以便将代码分发到 Spark 集群。 为此,创建一个包含你的代码及其依赖项的程序集 jar(或“uber”jar)。 sbtMaven 都有程序集插件。 创建程序集 jar 时,将 Spark 和 Hadoop 列为 provided 依赖项; 这些不需要捆绑,因为它们由集群管理器在运行时提供。 获得程序集 jar 后,你可以调用此处显示的 bin/spark-submit 脚本,同时传递你的 jar。

对于 Python,你可以使用 spark-submit--py-files 参数来添加 .py.zip.egg 文件,以便与你的应用程序一起分发。 如果你依赖于多个 Python 文件,我们建议将它们打包成 .zip.egg。 对于第三方 Python 依赖项,请参阅 Python 包管理

使用 spark-submit 启动应用程序

一旦用户应用程序被打包,就可以使用 bin/spark-submit 脚本启动它。 此脚本负责使用 Spark 及其依赖项设置类路径,并且可以支持 Spark 支持的不同集群管理器和部署模式

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

一些常用的选项是

一种常见的部署策略是从与你的 worker 机器物理上位于同一位置的网关机器提交你的应用程序(例如,独立 EC2 集群中的 Master 节点)。 在此设置中,client 模式是合适的。 在 client 模式下,驱动程序直接在 spark-submit 进程中启动,该进程充当集群的 *客户端*。 应用程序的输入和输出附加到控制台。 因此,此模式尤其适合涉及 REPL 的应用程序(例如 Spark shell)。

或者,如果你的应用程序是从远离 worker 机器的机器提交的(例如,在你的笔记本电脑本地),则通常使用 cluster 模式来最大限度地减少驱动程序和 executors 之间的网络延迟。 目前,standalone 模式不支持 Python 应用程序的 cluster 模式。

对于 Python 应用程序,只需传递一个 .py 文件来代替 <application-jar>,并将 Python .zip.egg.py 文件添加到 --py-files 的搜索路径中。

有一些选项可用于 集群管理器。 例如,对于具有 cluster 部署模式的 Spark standalone 集群,你还可以指定 --supervise,以确保驱动程序在以非零退出代码失败时自动重启。 要枚举 spark-submit 可用的所有此类选项,请使用 --help 运行它。 以下是一些常见选项的示例

# Run application locally on 8 cores
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  100

# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a YARN cluster in cluster deploy mode
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000

# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  http://path/to/examples.jar \
  1000

# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master k8s://xx.yy.zz.ww:443 \
  --deploy-mode cluster \
  --executor-memory 20G \
  --num-executors 50 \
  http://path/to/examples.jar \
  1000

Master URL

传递给 Spark 的 master URL 可以采用以下格式之一

Master URL含义
local 在本地运行 Spark,带有一个 worker 线程(即根本没有并行性)。
local[K] 在本地运行 Spark,带有 K 个 worker 线程(理想情况下,将其设置为你的机器上的内核数)。
local[K,F] 在本地运行 Spark,带有 K 个 worker 线程和 F 个 maxFailures(有关此变量的说明,请参阅 spark.task.maxFailures)。
local[*] 在本地运行 Spark,worker 线程数与你的机器上的逻辑内核数相同。
local[*,F] 在本地运行 Spark,worker 线程数与你的机器上的逻辑内核数相同,并且 F maxFailures。
local-cluster[N,C,M] Local-cluster 模式仅用于单元测试。 它在单个 JVM 中模拟一个分布式集群,其中包含 N 个 worker、每个 worker C 个内核和每个 worker M MiB 内存。
spark://HOST:PORT 连接到给定的 Spark standalone 集群 master。 端口必须是你的 master 配置为使用的端口,默认为 7077。
spark://HOST1:PORT1,HOST2:PORT2 连接到给定的 带有 Zookeeper 的备用 master 的 Spark standalone 集群。 该列表必须包含使用 Zookeeper 设置的高可用性集群中的所有 master 主机。 端口必须是每个 master 配置为使用的端口,默认为 7077。
mesos://HOST:PORT 连接到给定的 Mesos 集群。 端口必须是配置为使用的端口,默认为 5050。 或者,对于使用 ZooKeeper 的 Mesos 集群,请使用 mesos://zk://...。 要使用 --deploy-mode cluster 提交,HOST:PORT 应配置为连接到 MesosClusterDispatcher
yarn 根据 --deploy-mode 的值,以 clientcluster 模式连接到 YARN 集群。 集群位置将根据 HADOOP_CONF_DIRYARN_CONF_DIR 变量找到。
k8s://HOST:PORT 根据 --deploy-mode 的值,以 clientcluster 模式连接到 Kubernetes 集群。 HOSTPORT 指的是 Kubernetes API Server。 默认情况下,它使用 TLS 连接。 为了强制它使用不安全的连接,你可以使用 k8s://http://HOST:PORT

从文件加载配置

spark-submit 脚本可以从属性文件中加载默认的 Spark 配置值,并将它们传递给你的应用程序。 默认情况下,它将从 Spark 目录中的 conf/spark-defaults.conf 读取选项。 有关更多详细信息,请参阅有关加载默认配置的部分。

以这种方式加载默认的 Spark 配置可以避免对 spark-submit 的某些标志的需求。 例如,如果设置了 spark.master 属性,你可以安全地从 spark-submit 中省略 --master 标志。 通常,在 SparkConf 上显式设置的配置值具有最高优先级,然后是传递给 spark-submit 的标志,然后是默认文件中的值。

如果你不清楚配置选项的来源,你可以通过使用 --verbose 选项运行 spark-submit 来打印出细粒度的调试信息。

高级依赖项管理

当使用 spark-submit 时,应用程序 jar 以及通过 --jars 选项包含的任何 jar 都会自动传输到集群。 在 --jars 之后提供的 URL 必须以逗号分隔。 该列表包含在 driver 和 executor 的 classpath 中。 目录扩展不适用于 --jars

Spark 使用以下 URL 方案来允许不同的 jar 分发策略

请注意,JAR 和文件会被复制到 executor 节点上每个 SparkContext 的工作目录中。 这会随着时间的推移占用大量空间,并且需要清理。 使用 YARN 时,清理是自动处理的,而使用 Spark standalone 时,可以使用 spark.worker.cleanup.appDataTtl 属性配置自动清理。

用户还可以通过使用 --packages 提供以逗号分隔的 Maven 坐标列表来包含任何其他依赖项。 使用此命令时,将处理所有传递依赖项。 可以使用 --repositories 标志以逗号分隔的方式添加其他存储库(或 SBT 中的解析器)。 (请注意,密码保护存储库的凭据可以在某些情况下在存储库 URI 中提供,例如在 https://user:password@host/... 中。 以这种方式提供凭据时要小心。) 这些命令可以与 pyspark, spark-shell, 和 spark-submit 一起使用以包含 Spark Packages。

对于 Python,可以使用等效的 --py-files 选项将 .egg, .zip.py 库分发给 executors。

更多信息

部署应用程序后,集群模式概述介绍了分布式执行中涉及的组件,以及如何监视和调试应用程序。