Spark 独立模式

除了在 Mesos 或 YARN 集群管理器上运行外,Spark 还提供了一种简单的独立部署模式。 您可以手动启动主节点和工作节点来启动独立集群,也可以使用我们提供的 启动脚本。 也可以在一台机器上运行这些守护程序以进行测试。

安全

身份验证等安全功能默认情况下未启用。 部署开放给互联网或不受信任网络的集群时,务必保护对集群的访问,以防止未经授权的应用程序在集群上运行。 在运行 Spark 之前,请参阅Spark 安全和本文档中的特定安全部分。

在集群上安装 Spark 独立模式

要安装 Spark 独立模式,只需将 Spark 的编译版本放在集群上的每个节点上即可。 您可以通过每个版本获取 Spark 的预构建版本,也可以自行构建

手动启动集群

您可以通过执行以下命令来启动独立的 master 服务器

./sbin/start-master.sh

启动后,master 将打印出自身的 spark://HOST:PORT URL,您可以使用它将工作节点连接到 master,或者将其作为“master”参数传递给 SparkContext。 您还可以在 master 的 Web UI 上找到此 URL,默认情况下为 http://localhost:8080

同样,您可以启动一个或多个工作节点并通过以下命令将它们连接到 master

./sbin/start-worker.sh <master-spark-URL>

启动工作节点后,查看 master 的 Web UI(默认情况下为 http://localhost:8080)。 您应该在那里看到新节点,以及它的 CPU 数量和内存(减去为操作系统保留的 1 GB)。

最后,可以将以下配置选项传递给 master 和 worker

参数含义
-h HOST, --host HOST 要监听的主机名
-i HOST, --ip HOST 要监听的主机名(已弃用,请使用 -h 或 --host)
-p PORT, --port PORT 服务要监听的端口(master 默认为 7077,worker 默认为随机端口)
--webui-port PORT Web UI 的端口(master 默认为 8080,worker 默认为 8081)
-c CORES, --cores CORES 允许 Spark 应用程序在该机器上使用的总 CPU 核心数(默认值:所有可用核心);仅在 worker 上
-m MEM, --memory MEM 允许 Spark 应用程序在该机器上使用的总内存量,格式如 1000M 或 2G(默认值:您机器的总 RAM 减去 1 GiB);仅在 worker 上
-d DIR, --work-dir DIR 用于临时空间和作业输出日志的目录(默认值:SPARK_HOME/work);仅在 worker 上
--properties-file FILE 要加载的自定义 Spark 属性文件的路径(默认值:conf/spark-defaults.conf)

集群启动脚本

要使用启动脚本启动 Spark 独立集群,您应该在 Spark 目录中创建一个名为 conf/workers 的文件,该文件必须包含您要启动 Spark worker 的所有机器的主机名,每行一个。 如果 conf/workers 不存在,则启动脚本默认为单台机器 (localhost),这对于测试很有用。 请注意,master 机器通过 ssh 访问每个 worker 机器。 默认情况下,ssh 并行运行,需要设置无密码(使用私钥)访问。 如果您没有无密码设置,您可以设置环境变量 SPARK_SSH_FOREGROUND 并为每个 worker 串行提供密码。

设置好此文件后,您可以使用以下 shell 脚本启动或停止集群,这些脚本基于 Hadoop 的部署脚本,位于 SPARK_HOME/sbin

请注意,这些脚本必须在您要运行 Spark master 的机器上执行,而不是在您的本地机器上执行。

您可以通过在 conf/spark-env.sh 中设置环境变量来选择性地进一步配置集群。 通过从 conf/spark-env.sh.template 开始创建此文件,然后将其复制到所有 worker 机器,以使设置生效。 以下设置可用

环境变量含义
SPARK_MASTER_HOST 将 master 绑定到特定的主机名或 IP 地址,例如公共地址。
SPARK_MASTER_PORT 在不同的端口上启动 master(默认值:7077)。
SPARK_MASTER_WEBUI_PORT Master Web UI 的端口(默认值:8080)。
SPARK_MASTER_OPTS 仅适用于 master 的配置属性,格式为 "-Dx=y"(默认值:无)。 有关可能的选项列表,请参见下文。
SPARK_LOCAL_DIRS 用于 Spark 中“scratch”空间的目录,包括 map 输出文件和存储在磁盘上的 RDD。 这应该位于系统上的快速本地磁盘上。 它也可以是不同磁盘上多个目录的逗号分隔列表。
SPARK_WORKER_CORES 允许 Spark 应用程序在该机器上使用的总核心数(默认值:所有可用核心)。
SPARK_WORKER_MEMORY 允许 Spark 应用程序在该机器上使用的总内存量,例如 1000m2g(默认值:总内存减去 1 GiB);请注意,每个应用程序的单个内存都使用其 spark.executor.memory 属性进行配置。
SPARK_WORKER_PORT 在特定端口上启动 Spark worker(默认值:随机)。
SPARK_WORKER_WEBUI_PORT Worker Web UI 的端口(默认值:8081)。
SPARK_WORKER_DIR 用于运行应用程序的目录,其中将包含日志和临时空间(默认值:SPARK_HOME/work)。
SPARK_WORKER_OPTS 仅适用于 worker 的配置属性,格式为 "-Dx=y"(默认值:无)。 有关可能的选项列表,请参见下文。
SPARK_DAEMON_MEMORY 要分配给 Spark master 和 worker 守护进程本身的内存(默认值:1g)。
SPARK_DAEMON_JAVA_OPTS Spark master 和 worker 守护进程本身的 JVM 选项,格式为 "-Dx=y"(默认值:无)。
SPARK_DAEMON_CLASSPATH Spark master 和 worker 守护进程本身的类路径(默认值:无)。
SPARK_PUBLIC_DNS Spark master 和 worker 的公共 DNS 名称(默认值:无)。

注意: 启动脚本目前不支持 Windows。 要在 Windows 上运行 Spark 集群,请手动启动 master 和 worker。

SPARK_MASTER_OPTS 支持以下系统属性

属性名称默认值含义起始版本
spark.master.ui.port 8080 指定 Master Web UI 端点的端口号。 1.1.0
spark.master.ui.decommission.allow.mode LOCAL 指定 Master Web UI 的 /workers/kill 端点的行为。 可能的选择包括:LOCAL 表示允许来自运行 Master 的机器本地 IP 的此端点,DENY 表示完全禁用此端点,ALLOW 表示允许从任何 IP 调用此端点。 3.1.0
spark.master.rest.enabled false 是否使用 Master REST API 端点。 1.3.0
spark.master.rest.port 6066 指定 Master REST API 端点的端口号。 1.3.0
spark.deploy.retainedApplications 200 要显示的最大已完成应用程序数。 较旧的应用程序将从 UI 中删除,以保持此限制。
0.8.0
spark.deploy.retainedDrivers 200 要显示的最大已完成驱动程序数。 较旧的驱动程序将从 UI 中删除,以保持此限制。
1.1.0
spark.deploy.spreadOut true 独立集群管理器应该将应用程序分散到各个节点,还是尽可能地将它们整合到少数节点上。分散通常更有利于 HDFS 中的数据局部性,但整合对于计算密集型工作负载更有效率。
0.6.1
spark.deploy.defaultCores (无限) 如果在 Spark 独立模式下,应用程序未设置 spark.cores.max 时,默认分配给应用程序的内核数。如果未设置,应用程序总是会获得所有可用的内核,除非它们自行配置 spark.cores.max。在共享集群上,应将此值设置得较低,以防止用户默认情况下占用整个集群。
0.9.0
spark.deploy.maxExecutorRetries 10 在独立集群管理器移除故障应用程序之前,允许连续发生的最大 Executor 失败次数限制。如果应用程序有任何正在运行的 Executor,则永远不会被移除。如果一个应用程序连续出现超过 spark.deploy.maxExecutorRetries 次失败,且在此期间没有 Executor 成功启动运行,并且该应用程序没有正在运行的 Executor,则独立集群管理器将移除该应用程序并将其标记为失败。要禁用此自动移除,请将 spark.deploy.maxExecutorRetries 设置为 -1
1.6.3
spark.worker.timeout 60 独立部署 Master 在没有收到 Worker 心跳后,认为该 Worker 丢失的秒数。 0.6.2
spark.worker.resource.{name}.amount (无) 在 Worker 上要使用的特定资源数量。 3.0.0
spark.worker.resource.{name}.discoveryScript (无) 资源发现脚本的路径,用于在 Worker 启动时查找特定资源。脚本的输出应采用 ResourceInformation 类的格式。 3.0.0
spark.worker.resourcesFile (无) 资源文件的路径,用于在 Worker 启动时查找各种资源。资源文件的内容应采用以下格式:[{"id":{"componentName": "spark.worker", "resourceName":"gpu"}, "addresses":["0","1","2"]}]。如果在资源文件中找不到特定资源,将使用资源发现脚本来查找该资源。如果资源发现脚本也找不到资源,则 Worker 将无法启动。 3.0.0

SPARK_WORKER_OPTS 支持以下系统属性

属性名称默认值含义起始版本
spark.worker.cleanup.enabled false 启用定期清理 Worker/应用程序目录。请注意,这仅影响独立模式,因为 YARN 的工作方式不同。仅清理已停止应用程序的目录。如果 spark.shuffle.service.db.enabled 设置为 "true",则应启用此项。 1.0.0
spark.worker.cleanup.interval 1800 (30 分钟) 控制 Worker 在本地计算机上清理旧应用程序工作目录的间隔(以秒为单位)。 1.0.0
spark.worker.cleanup.appDataTtl 604800 (7 天,7 * 24 * 3600) 在每个 Worker 上保留应用程序工作目录的秒数。这是一个生存时间 (TTL),应取决于您拥有的可用磁盘空间量。应用程序日志和 jar 会下载到每个应用程序工作目录。随着时间的推移,工作目录会快速填满磁盘空间,尤其是在您非常频繁地运行作业时。 1.0.0
spark.shuffle.service.db.enabled true 将外部 Shuffle 服务的状态存储在本地磁盘上,以便在重新启动外部 Shuffle 服务时,它将自动重新加载有关当前 Executor 的信息。这仅影响独立模式(yarn 始终启用此行为)。您还应启用 spark.worker.cleanup.enabled,以确保最终清理状态。此配置将来可能会被删除。 3.0.0
spark.shuffle.service.db.backend LEVELDB spark.shuffle.service.db.enabled 为 true 时,用户可以使用此选项指定 shuffle 服务状态存储中使用的基于磁盘的存储类型。 现在支持 `LEVELDB` 和 `ROCKSDB`,默认值为 `LEVELDB`。`LevelDB/RocksDB` 中的原始数据存储现在不会自动转换为另一种存储类型。 3.4.0
spark.storage.cleanupFilesAfterExecutorExit true 启用在 Executor 退出后清理工作目录下的非 Shuffle 文件(例如临时 Shuffle 块、缓存的 RDD/广播块、溢出文件等)。请注意,这与 `spark.worker.cleanup.enabled` 不重叠,因为它启用了清理已死 Executor 的本地目录中的非 Shuffle 文件,而 `spark.worker.cleanup.enabled` 启用了清理已停止且超时的应用程序的所有文件/子目录。 这仅影响独立模式,将来可以添加对其他集群管理器的支持。 2.4.0
spark.worker.ui.compressedLogFileLengthCacheSize 100 对于压缩日志文件,未压缩的文件只能通过解压缩文件来计算。 Spark 缓存压缩日志文件的未压缩文件大小。 此属性控制缓存大小。 2.0.2

资源分配和配置概述

请务必阅读配置页面上的“自定义资源调度和配置概述”部分。本节仅讨论资源调度的 Spark Standalone 特定方面。

Spark Standalone 有两个部分,第一部分是配置 Worker 的资源,第二部分是为特定应用程序分配资源。

用户必须配置 Worker 以拥有一组可用资源,以便它可以将它们分配给 Executor。 spark.worker.resource.{resourceName}.amount 用于控制 Worker 分配的每个资源的数量。用户还必须指定 spark.worker.resourcesFilespark.worker.resource.{resourceName}.discoveryScript,以指定 Worker 如何发现其分配的资源。请参阅上面每个资源的描述,以了解哪种方法最适合您的设置。

第二部分是在 Spark Standalone 上运行应用程序。标准 Spark 资源配置的唯一特殊情况是您在客户端模式下运行 Driver。对于客户端模式下的 Driver,用户可以通过 spark.driver.resourcesFilespark.driver.resource.{resourceName}.discoveryScript 指定其使用的资源。如果 Driver 与其他 Driver 在同一主机上运行,请确保资源文件或发现脚本仅返回与其他在同一节点上运行的 Driver 不冲突的资源。

请注意,用户在提交应用程序时无需指定发现脚本,因为 Worker 将使用分配给它的资源启动每个 Executor。

将应用程序连接到集群

要在 Spark 集群上运行应用程序,只需将 Master 的 spark://IP:PORT URL 作为参数传递给 SparkContext 构造函数

要针对集群运行交互式 Spark Shell,请运行以下命令

./bin/spark-shell --master spark://IP:PORT

您还可以传递选项 --total-executor-cores <numCores> 来控制 spark-shell 在集群上使用的内核数量。

客户端属性

Spark 应用程序支持以下特定于独立模式的配置属性

属性名称默认值含义起始版本
spark.standalone.submit.waitAppCompletion false 在独立集群模式下,控制客户端是否等待应用程序完成后退出。如果设置为 true,客户端进程将保持活动状态,轮询 Driver 的状态。否则,客户端进程将在提交后退出。 3.1.0

启动 Spark 应用程序

Spark 协议

spark-submit 脚本提供了一种最直接的方式将编译后的 Spark 应用程序提交到集群。对于独立集群,Spark 目前支持两种部署模式。在 client 模式下,Driver 与提交应用程序的客户端在同一进程中启动。然而,在 cluster 模式下,Driver 从集群内部的某个 Worker 进程启动,并且客户端进程在完成提交应用程序的职责后立即退出,而无需等待应用程序完成。

如果您的应用程序是通过 Spark submit 启动的,那么应用程序 jar 会自动分发到所有 worker 节点。 对于您的应用程序依赖的任何其他 jar,您应该通过 --jars 标志使用逗号作为分隔符指定它们(例如 --jars jar1,jar2)。 要控制应用程序的配置或执行环境,请参阅 Spark 配置

此外,standalone cluster 模式支持在应用程序以非零退出代码退出时自动重新启动您的应用程序。 要使用此功能,您可以在启动应用程序时将 --supervise 标志传递给 spark-submit。 然后,如果您想杀死一个重复失败的应用程序,您可以通过以下方式进行操作

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

您可以在独立 Master Web UI(位于 http://<master url>:8080)中找到 Driver ID。

REST API

如果启用了 spark.master.rest.enabled,Spark Master 会通过 http://[host:port]/[version]/submissions/[action] 提供额外的 REST API,其中 host 是 Master 主机,portspark.master.rest.port 指定的端口号(默认值:6066),version 是协议版本,当前为 v1action 是以下支持的操作之一。

命令描述HTTP 方法起始版本
create 通过 cluster 模式创建一个 Spark Driver。 POST 1.3.0
kill 杀死一个 Spark Driver。 POST 1.3.0
status 检查 Spark 作业的状态。 GET 1.3.0

以下是使用 pi.py 和 REST API 的 curl CLI 命令示例。

$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Content-Type:application/json;charset=UTF-8" \
--data '{
  "appResource": "",
  "sparkProperties": {
    "spark.master": "spark://master:7077",
    "spark.app.name": "Spark Pi",
    "spark.driver.memory": "1g",
    "spark.driver.cores": "1",
    "spark.jars": ""
  },
  "clientSparkVersion": "",
  "mainClass": "org.apache.spark.deploy.SparkSubmit",
  "environmentVariables": { },
  "action": "CreateSubmissionRequest",
  "appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ]
}'

以下是上述 create 请求的 REST API 的响应。

{
  "action" : "CreateSubmissionResponse",
  "message" : "Driver successfully submitted as driver-20231124153531-0000",
  "serverSparkVersion" : "3.5.1",
  "submissionId" : "driver-20231124153531-0000",
  "success" : true
}

资源调度

独立集群模式目前仅支持跨应用程序的简单 FIFO 调度程序。但是,为了允许多个并发用户,您可以控制每个应用程序将使用的最大资源数量。 默认情况下,它将获取集群中的所有核心,这仅在您一次只运行一个应用程序时才有意义。 您可以通过在 SparkConf 中设置 spark.cores.max 来限制核心数。 例如

val conf = new SparkConf()
  .setMaster(...)
  .setAppName(...)
  .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

此外,您可以在集群 Master 进程上配置 spark.deploy.defaultCores,以将未设置 spark.cores.max 的应用程序的默认值更改为小于无限的值。 通过将以下内容添加到 conf/spark-env.sh 来实现

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

这在共享集群上很有用,在这些集群上,用户可能没有单独配置最大核心数。

Executor 调度

分配给每个 Executor 的核心数量是可配置的。 当显式设置 spark.executor.cores 时,如果 Worker 具有足够的内核和内存,则可以在同一 Worker 上启动来自同一应用程序的多个 Executor。 否则,默认情况下,每个 Executor 都会获取 Worker 上所有可用的内核,在这种情况下,在单个调度迭代期间,每个应用程序只能在每个 Worker 上启动一个 Executor。

Stage 级别调度概述

Standalone 支持 Stage 级别的调度

注意事项

正如动态资源分配中所述,如果启用了动态分配但未明确指定每个 Executor 的内核数,则 Spark 可能会获取比预期更多的 Executor。 因此,建议在使用 Stage 级别调度时,为每个 ResourceProfile 显式设置 Executor 内核。

监控和日志记录

Spark 的独立模式提供了一个基于 Web 的用户界面来监控集群。Master 节点和每个 Worker 节点都有自己的 Web UI,显示集群和作业统计信息。默认情况下,您可以通过端口 8080 访问 Master 节点的 Web UI。该端口可以在配置文件中更改,也可以通过命令行选项更改。

此外,每个作业的详细日志输出也会写入每个 Worker 节点的工作目录(默认情况下是 SPARK_HOME/work)。对于每个作业,您将看到两个文件:stdoutstderr,其中包含其写入控制台的所有输出。

与 Hadoop 并行运行

您可以将 Spark 与现有的 Hadoop 集群一起运行,只需在相同的机器上将其作为单独的服务启动即可。要从 Spark 访问 Hadoop 数据,只需使用 hdfs:// URL(通常是 hdfs://<namenode>:9000/path,但您可以在 Hadoop Namenode 的 Web UI 上找到正确的 URL)。或者,您可以为 Spark 设置一个单独的集群,并仍然通过网络访问 HDFS;这比磁盘本地访问慢,但如果您仍然在同一个局域网中运行(例如,您在每个 Hadoop 机架上放置一些 Spark 机器),这可能不是问题。

为网络安全配置端口

一般来说,Spark 集群及其服务不会部署在公共互联网上。它们通常是私有服务,应仅限于需要访问这些服务的组织网络内访问。对 Spark 服务使用的主机和端口的访问应仅限于需要访问这些服务的源主机。

对于使用独立资源管理器的集群来说,这一点尤其重要,因为它们不像其他资源管理器那样支持细粒度的访问控制。

有关要配置的完整端口列表,请参阅安全页面

高可用性

默认情况下,独立调度集群可以容忍 Worker 节点故障(因为 Spark 本身可以通过将工作移动到其他 Worker 节点来容忍工作丢失)。但是,调度器使用 Master 节点来做出调度决策,这(默认情况下)会创建一个单点故障:如果 Master 节点崩溃,则无法创建新的应用程序。为了规避这个问题,我们有两种高可用性方案,详见下文。

使用 ZooKeeper 的备用 Master

概述

利用 ZooKeeper 提供领导者选举和一些状态存储,您可以在集群中启动多个连接到同一 ZooKeeper 实例的 Master 节点。其中一个将被选为“领导者”,其余的将保持在备用模式。如果当前的领导者死亡,则将选举另一个 Master 节点,恢复旧 Master 节点的状态,然后恢复调度。整个恢复过程(从第一个领导者关闭时开始)应在 1 到 2 分钟之间。请注意,此延迟只会影响调度应用程序 - 在 Master 节点故障转移期间已经运行的应用程序不受影响。

在此处了解有关 ZooKeeper 入门的更多信息 这里

配置

为了启用此恢复模式,您可以通过配置 spark.deploy.recoveryMode 和相关的 spark.deploy.zookeeper.* 配置在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS。 有关这些配置的更多信息,请参阅 配置文档

可能遇到的问题:如果您的集群中有多个 Master 节点,但未能正确配置 Master 节点以使用 ZooKeeper,则 Master 节点将无法相互发现,并认为它们都是领导者。这不会导致健康的集群状态(因为所有 Master 节点都会独立调度)。

详细信息

设置好 ZooKeeper 集群后,启用高可用性非常简单。只需使用相同的 ZooKeeper 配置(ZooKeeper URL 和目录)在不同的节点上启动多个 Master 节点进程。可以随时添加和删除 Master 节点。

为了调度新的应用程序或将 Worker 节点添加到集群,它们需要知道当前领导者的 IP 地址。这可以通过传入一个 Master 节点列表来实现,而您过去只传入一个 Master 节点。例如,您可以将您的 SparkContext 指向 spark://host1:port1,host2:port2。这将导致您的 SparkContext 尝试向两个 Master 节点注册 – 如果 host1 发生故障,此配置仍然正确,因为我们会找到新的领导者 host2

“向 Master 节点注册”和正常操作之间存在一个重要的区别。启动时,应用程序或 Worker 节点需要能够找到并注册当前的领导 Master 节点。但是,一旦成功注册,它就“进入系统”(即存储在 ZooKeeper 中)。如果发生故障转移,新的领导者将联系所有先前注册的应用程序和 Worker 节点,通知他们领导地位的变化,因此他们甚至不需要知道新 Master 节点在启动时存在。

由于此属性,可以随时创建新的 Master 节点,您唯一需要担心的是应用程序和 Worker 节点可以找到它以进行注册,以防它成为领导者。一旦注册,您就不用担心了。

使用本地文件系统的单节点恢复

概述

ZooKeeper 是生产级高可用性的最佳方式,但如果您只是想能够在 Master 节点关闭时重新启动它,FILESYSTEM 模式可以处理它。当应用程序和 Worker 节点注册时,它们会将足够的状态写入提供的目录,以便在 Master 节点进程重新启动时可以恢复它们。

配置

为了启用此恢复模式,您可以使用此配置在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS

系统属性含义起始版本
spark.deploy.recoveryMode 设置为 FILESYSTEM 以启用单节点恢复模式(默认:NONE)。 0.8.1
spark.deploy.recoveryDirectory Spark 将在其中存储恢复状态的目录,可以从 Master 节点的角度访问。 0.8.1

详细信息