Spark 独立模式

除了在 YARN 集群管理器上运行外,Spark 还提供了一种简单的独立部署模式。你可以手动启动 Master 和 Worker 来启动一个独立集群,或者使用我们提供的启动脚本。也可以在单机上运行这些守护进程进行测试。

安全

默认情况下,未启用身份验证等安全功能。在部署面向互联网或不可信网络的集群时,务必保护集群访问安全,以防止未经授权的应用程序在集群上运行。在运行 Spark 之前,请参阅Spark 安全以及本文档中的特定安全部分。

在集群中安装 Spark 独立模式

要安装 Spark 独立模式,只需将 Spark 的编译版本放置在集群中的每个节点上。你可以获取每个版本预构建的 Spark,或者自己构建

手动启动集群

你可以通过执行以下命令启动一个独立的 Master 服务器:

./sbin/start-master.sh

启动后,Master 将打印出其自身的 spark://HOST:PORT URL,你可以使用它将 Worker 连接到 Master,或者将其作为“master”参数传递给 SparkContext。你也可以在 Master 的 Web UI 上找到此 URL,默认情况下为 http://localhost:8080

同样,你可以启动一个或多个 Worker 并通过以下方式将它们连接到 Master:

./sbin/start-worker.sh <master-spark-URL>

启动 Worker 后,查看 Master 的 Web UI(默认情况下为 http://localhost:8080)。你应该会在其中看到新节点,以及它的 CPU 数量和内存(减去留给操作系统的 1GB)。

最后,以下配置选项可以传递给 Master 和 Worker

参数含义
-h HOST, --host HOST 监听的主机名
-p PORT, --port PORT 服务监听端口(默认:Master 为 7077,Worker 为随机)
--webui-port PORT Web UI 端口(默认:Master 为 8080,Worker 为 8081)
-c CORES, --cores CORES 允许 Spark 应用程序在该机器上使用的 CPU 总核心数(默认:所有可用核心);仅限 Worker
-m MEM, --memory MEM 允许 Spark 应用程序在该机器上使用的总内存量,格式如 1000M 或 2G(默认:机器总内存减去 1 GiB);仅限 Worker
-d DIR, --work-dir DIR 用于临时空间和作业输出日志的目录(默认:SPARK_HOME/work);仅限 Worker
--properties-file FILE 要加载的自定义 Spark 属性文件的路径(默认:conf/spark-defaults.conf)

集群启动脚本

要使用启动脚本启动 Spark 独立集群,你需要在 Spark 目录中创建一个名为 conf/workers 的文件,该文件必须包含你打算启动 Spark Worker 的所有机器的主机名,每行一个。如果 conf/workers 不存在,启动脚本将默认为单机(localhost),这对于测试很有用。注意,Master 机器通过 SSH 访问每个 Worker 机器。默认情况下,SSH 并行运行并要求设置免密码(使用私钥)访问。如果你没有设置免密码访问,可以设置环境变量 SPARK_SSH_FOREGROUND 并串行地为每个 Worker 提供密码。

设置好此文件后,你可以使用以下 shell 脚本启动或停止集群,这些脚本基于 Hadoop 的部署脚本,并位于 SPARK_HOME/sbin

请注意,这些脚本必须在你希望运行 Spark Master 的机器上执行,而不是你的本地机器。

你可以通过在 conf/spark-env.sh 中设置环境变量来进一步配置集群。通过 conf/spark-env.sh.template 文件创建此文件,并将其复制到所有 Worker 机器上以使设置生效。以下设置可用:

环境变量含义
SPARK_MASTER_HOST 将 Master 绑定到特定的主机名或 IP 地址,例如公共地址。
SPARK_MASTER_PORT 在不同的端口启动 Master(默认:7077)。
SPARK_MASTER_WEBUI_PORT Master Web UI 的端口(默认:8080)。
SPARK_MASTER_OPTS 仅适用于 Master 的配置属性,形式为“-Dx=y”(默认:无)。请参阅下文以获取可能的选项列表。
SPARK_LOCAL_DIRS Spark 中用于“临时”空间的目录,包括映射输出文件和存储在磁盘上的 RDD。这应该位于系统中的快速本地磁盘上。它也可以是不同磁盘上多个目录的逗号分隔列表。
SPARK_LOG_DIR 日志文件的存储位置。(默认:SPARK_HOME/logs)。
SPARK_LOG_MAX_FILES 最大日志文件数(默认:5)。
SPARK_PID_DIR PID 文件的存储位置。(默认:/tmp)。
SPARK_WORKER_CORES 允许 Spark 应用程序在该机器上使用的总核心数(默认:所有可用核心)。
SPARK_WORKER_MEMORY 允许 Spark 应用程序在该机器上使用的总内存量,例如 1000m2g(默认:总内存减去 1 GiB);请注意,每个应用程序的独立内存是使用其 spark.executor.memory 属性配置的。
SPARK_WORKER_PORT 在特定端口启动 Spark Worker(默认:随机)。
SPARK_WORKER_WEBUI_PORT Worker Web UI 的端口(默认:8081)。
SPARK_WORKER_DIR 运行应用程序的目录,将包括日志和临时空间(默认:SPARK_HOME/work)。
SPARK_WORKER_OPTS 仅适用于 Worker 的配置属性,形式为“-Dx=y”(默认:无)。请参阅下文以获取可能的选项列表。
SPARK_DAEMON_MEMORY 分配给 Spark Master 和 Worker 守护进程自身的内存(默认:1g)。
SPARK_DAEMON_JAVA_OPTS Spark Master 和 Worker 守护进程自身的 JVM 选项,形式为“-Dx=y”(默认:无)。
SPARK_DAEMON_CLASSPATH Spark Master 和 Worker 守护进程自身的类路径(默认:无)。
SPARK_PUBLIC_DNS Spark Master 和 Worker 的公共 DNS 名称(默认:无)。

注意:启动脚本目前不支持 Windows。要在 Windows 上运行 Spark 集群,请手动启动 Master 和 Worker。

SPARK_MASTER_OPTS 支持以下系统属性

属性名称默认值含义起始版本
spark.master.ui.port 8080 指定 Master Web UI 端点的端口号。 1.1.0
spark.master.ui.title (无) 指定 Master UI 页面的标题。如果未设置,则默认使用 Spark Master at 'master url' 4.0.0
spark.master.ui.decommission.allow.mode LOCAL 指定 Master Web UI 的 /workers/kill 端点的行为。可能的选择是:LOCAL 表示允许来自运行 Master 的机器本地 IP 的访问,DENY 表示完全禁用此端点,ALLOW 表示允许从任何 IP 调用此端点。 3.1.0
spark.master.ui.historyServerUrl (无) Spark 历史服务器运行的 URL。请注意,这假定所有 Spark 作业共享历史服务器访问的相同事件日志位置。 4.0.0
spark.master.rest.enabled false 是否使用 Master REST API 端点。 1.3.0
spark.master.rest.host (无) 指定 Master REST API 端点的主机。 4.0.0
spark.master.rest.port 6066 指定 Master REST API 端点的端口号。 1.3.0
spark.master.rest.filters (无) 应用于 Master REST API 的过滤器类名称的逗号分隔列表。 4.0.0
spark.master.useAppNameAsAppId.enabled false (实验性) 如果为 true,Spark Master 将使用用户提供的 appName 作为 appId。 4.0.0
spark.deploy.retainedApplications 200 要显示的最大已完成应用程序数。较旧的应用程序将从 UI 中删除以保持此限制。
0.8.0
spark.deploy.retainedDrivers 200 要显示的最大已完成驱动程序数。较旧的驱动程序将从 UI 中删除以保持此限制。
1.1.0
spark.deploy.spreadOutDrivers true 独立集群管理器是应该将驱动程序分散到各个节点上,还是尽量将它们整合到尽可能少的节点上。分散通常有利于 HDFS 中的数据本地性,但整合对于计算密集型工作负载更有效。 4.0.0
spark.deploy.spreadOutApps true 独立集群管理器是应该将应用程序分散到各个节点上,还是尽量将它们整合到尽可能少的节点上。分散通常有利于 HDFS 中的数据本地性,但整合对于计算密集型工作负载更有效。
0.6.1
spark.deploy.defaultCores Int.MaxValue 如果 Spark 独立模式下的应用程序未设置 spark.cores.max,则分配给应用程序的默认核心数。如果未设置,应用程序将始终获得所有可用核心,除非它们自己配置 spark.cores.max。在共享集群上将此值设置得更低,以防止用户默认占用整个集群。
0.9.0
spark.deploy.maxExecutorRetries 10 独立集群管理器在移除故障应用程序之前,允许连续发生的 Executor 故障的最大次数限制。如果应用程序有任何正在运行的 Executor,它将永远不会被移除。如果应用程序连续发生超过 spark.deploy.maxExecutorRetries 次故障,并且在这些故障之间没有 Executor 成功启动运行,并且应用程序没有正在运行的 Executor,则独立集群管理器将移除该应用程序并将其标记为失败。要禁用此自动移除功能,请将 spark.deploy.maxExecutorRetries 设置为 -1
1.6.3
spark.deploy.maxDrivers Int.MaxValue 正在运行的驱动程序的最大数量。 4.0.0
spark.deploy.appNumberModulo (无) 应用程序编号的模数。默认情况下,app-yyyyMMddHHmmss-9999 的下一个是 app-yyyyMMddHHmmss-10000。如果我们设置模数为 10000,它将是 app-yyyyMMddHHmmss-0000。在大多数情况下,前缀 app-yyyyMMddHHmmss 在创建 10000 个应用程序时已经增加。 4.0.0
spark.deploy.driverIdPattern driver-%s-%04d 基于 Java String.format 方法的驱动程序 ID 生成模式。默认值为 driver-%s-%04d,它表示现有的驱动程序 ID 字符串,例如 driver-20231031224459-0019。请注意生成唯一 ID。 4.0.0
spark.deploy.appIdPattern app-%s-%04d 基于 Java String.format 方法的应用程序 ID 生成模式。默认值为 app-%s-%04d,它表示现有的应用程序 ID 字符串,例如 app-20231031224509-0008。请注意生成唯一 ID。 4.0.0
spark.worker.timeout 60 如果独立部署 Master 在多少秒内未收到心跳,则认为 Worker 已丢失。 0.6.2
spark.dead.worker.persistence 15 在 UI 中保留死亡 Worker 信息的迭代次数。默认情况下,死亡 Worker 从其最后一次心跳起可见 (15 + 1) * spark.worker.timeout 0.8.0
spark.worker.resource.{name}.amount (无) 在 Worker 上使用的特定资源数量。 3.0.0
spark.worker.resource.{name}.discoveryScript (无) 资源发现脚本的路径,用于在 Worker 启动时查找特定资源。脚本的输出应格式化为 ResourceInformation 类。 3.0.0
spark.worker.resourcesFile (无) 资源文件的路径,用于在 Worker 启动时查找各种资源。资源文件的内容应格式化为 [{"id":{"componentName": "spark.worker", "resourceName":"gpu"}, "addresses":["0","1","2"]}]。如果在资源文件中未找到特定资源,则将使用发现脚本查找该资源。如果发现脚本也未找到资源,则 Worker 将无法启动。 3.0.0

SPARK_WORKER_OPTS 支持以下系统属性

属性名称默认值含义起始版本
spark.worker.initialRegistrationRetries 6 在短间隔(5 到 15 秒之间)内重新连接的重试次数。 4.0.0
spark.worker.maxRegistrationRetries 16 重新连接的最大重试次数。在 spark.worker.initialRegistrationRetries 次尝试后,间隔为 30 到 90 秒。 4.0.0
spark.worker.cleanup.enabled true 启用 Worker/应用程序目录的定期清理。请注意,这仅影响独立模式,因为 YARN 的工作方式不同。仅停止的应用程序的目录会被清理。如果 spark.shuffle.service.db.enabled 为 "true",则应启用此项。 1.0.0
spark.worker.cleanup.interval 1800 (30 分钟) 控制 Worker 在本地机器上清理旧应用程序工作目录的间隔(秒)。 1.0.0
spark.worker.cleanup.appDataTtl 604800 (7 天, 7 * 24 * 3600) 每个 Worker 上保留应用程序工作目录的秒数。这是一个生存时间(TTL),应取决于可用的磁盘空间量。应用程序日志和 Jar 包会下载到每个应用程序工作目录。随着时间的推移,工作目录可能会迅速占满磁盘空间,特别是当你非常频繁地运行作业时。 1.0.0
spark.shuffle.service.db.enabled true 将外部 Shuffle 服务的状态存储在本地磁盘上,以便当外部 Shuffle 服务重新启动时,它会自动重新加载当前 Executor 的信息。这仅影响独立模式(YARN 始终启用此行为)。你还应该启用 spark.worker.cleanup.enabled,以确保状态最终被清理。此配置将来可能会被移除。 3.0.0
spark.shuffle.service.db.backend ROCKSDB spark.shuffle.service.db.enabled 为 true 时,用户可以使用此选项指定 Shuffle 服务状态存储中使用的基于磁盘的存储类型。目前支持 ROCKSDBLEVELDB(已弃用),默认值为 ROCKSDB。目前,RocksDB/LevelDB 中的原始数据存储不会自动转换为另一种存储类型。 3.4.0
spark.storage.cleanupFilesAfterExecutorExit true 启用在 Executor 退出后清理 Worker 目录中非 Shuffle 文件(例如临时 Shuffle 块、缓存的 RDD/广播块、溢出文件等)。请注意,这与 spark.worker.cleanup.enabled 没有重叠,因为此选项启用清理死亡 Executor 本地目录中的非 Shuffle 文件,而 spark.worker.cleanup.enabled 启用清理已停止和超时的应用程序的所有文件/子目录。这仅影响独立模式,将来可能会添加对其他集群管理器的支持。 2.4.0
spark.worker.ui.compressedLogFileLengthCacheSize 100 对于压缩的日志文件,解压后的文件大小只能通过解压文件来计算。Spark 会缓存压缩日志文件的解压后文件大小。此属性控制缓存大小。 2.0.2
spark.worker.idPattern worker-%s-%s-%d 基于 Java String.format 方法的 Worker ID 生成模式。默认值为 worker-%s-%s-%d,它表示现有的 Worker ID 字符串,例如 worker-20231109183042-[fe80::1%lo0]-39729。请注意生成唯一 ID。 4.0.0

资源分配和配置概述

请确保已阅读配置页面上的“自定义资源调度和配置概述”部分。本节仅讨论 Spark 独立模式中资源调度的特定方面。

Spark 独立模式包含两个部分,第一部分是配置 Worker 的资源,第二部分是为特定应用程序分配资源。

用户必须配置 Worker 以使其拥有一组可用资源,以便它可以将这些资源分配给 Executor。spark.worker.resource.{resourceName}.amount 用于控制 Worker 已分配的每种资源的数量。用户还必须指定 spark.worker.resourcesFilespark.worker.resource.{resourceName}.discoveryScript 来指定 Worker 如何发现分配给它的资源。请参阅上面对每个选项的描述,以了解哪种方法最适合你的设置。

第二部分是在 Spark 独立模式下运行应用程序。标准 Spark 资源配置中唯一的特殊情况是在客户端模式下运行 Driver。对于客户端模式下的 Driver,用户可以通过 spark.driver.resourcesFilespark.driver.resource.{resourceName}.discoveryScript 指定其使用的资源。如果 Driver 与其他 Driver 在同一主机上运行,请确保资源文件或发现脚本只返回与同一节点上运行的其他 Driver 不冲突的资源。

请注意,用户在提交应用程序时无需指定发现脚本,因为 Worker 将使用分配给每个 Executor 的资源来启动它们。

将应用程序连接到集群

要在 Spark 集群上运行应用程序,只需将 Master 的 spark://IP:PORT URL 传递给 SparkContext 构造函数

要针对集群运行交互式 Spark Shell,请运行以下命令:

./bin/spark-shell --master spark://IP:PORT

你还可以传递选项 --total-executor-cores <numCores> 来控制 spark-shell 在集群上使用的核心数量。

客户端属性

Spark 应用程序支持以下独立模式特定的配置属性

属性名称默认值含义起始版本
spark.standalone.submit.waitAppCompletion false 在独立集群模式下,控制客户端是否等待应用程序完成再退出。如果设置为 true,客户端进程将保持活动状态并轮询驱动程序的状况。否则,客户端进程将在提交后退出。 3.1.0

启动 Spark 应用程序

Spark 协议

通过 spark-submit 脚本可以将编译好的 Spark 应用程序提交到集群,这是最直接的方式。对于独立集群,Spark 目前支持两种部署模式。在 client 模式下,驱动程序在提交应用程序的客户端的同一进程中启动。而在 cluster 模式下,驱动程序从集群内部的一个 Worker 进程中启动,并且客户端进程在完成提交应用程序的职责后立即退出,而无需等待应用程序完成。

如果你的应用程序是通过 Spark submit 启动的,那么应用程序 Jar 包会自动分发到所有 Worker 节点。对于你的应用程序所依赖的任何额外 Jar 包,你应该使用逗号作为分隔符通过 --jars 标志指定它们(例如 --jars jar1,jar2)。要控制应用程序的配置或执行环境,请参阅Spark 配置

此外,独立 cluster 模式支持在应用程序以非零退出代码退出时自动重启。要使用此功能,你可以在启动应用程序时将 --supervise 标志传递给 spark-submit。然后,如果你希望终止一个反复失败的应用程序,可以通过以下方式进行:

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

你可以通过独立 Master Web UI http://<master url>:8080 找到驱动程序 ID。

REST API

如果 spark.master.rest.enabled 已启用,Spark Master 将通过 http://[host:port]/[version]/submissions/[action] 提供额外的 REST API,其中 host 是 Master 主机,portspark.master.rest.port 指定的端口号(默认:6066),version 是协议版本,目前为 v1action 是以下支持的操作之一。

命令HTTP 方法描述起始版本
create POST 通过 cluster 模式创建 Spark 驱动程序。自 4.0.0 版本起,Spark Master 支持对 Spark 属性和环境变量的值进行服务器端变量替换。 1.3.0
kill POST 终止单个 Spark 驱动程序。 1.3.0
killall POST 终止所有正在运行的 Spark 驱动程序。 4.0.0
status GET 检查 Spark 作业的状态。 1.3.0
clear POST 清除已完成的驱动程序和应用程序。 4.0.0

以下是使用 pi.py 和 REST API 的 curl CLI 命令示例。

$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Content-Type:application/json;charset=UTF-8" \
--data '{
  "appResource": "",
  "sparkProperties": {
    "spark.master": "spark://master:7077",
    "spark.app.name": "Spark Pi",
    "spark.driver.memory": "1g",
    "spark.driver.cores": "1",
    "spark.jars": ""
  },
  "clientSparkVersion": "",
  "mainClass": "org.apache.spark.deploy.SparkSubmit",
  "environmentVariables": { },
  "action": "CreateSubmissionRequest",
  "appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ]
}'

以下是上述 create 请求的 REST API 响应。

{
  "action" : "CreateSubmissionResponse",
  "message" : "Driver successfully submitted as driver-20231124153531-0000",
  "serverSparkVersion" : "4.0.0",
  "submissionId" : "driver-20231124153531-0000",
  "success" : true
}

当 Spark Master 通过 spark.master.rest.filters=org.apache.spark.ui.JWSFilterspark.org.apache.spark.ui.JWSFilter.param.secretKey=BASE64URL-ENCODED-KEY 配置需要 HTTP Authorization 头时,curl CLI 命令可以提供所需的头,如下所示。

$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Authorization: Bearer USER-PROVIDED-WEB-TOEN-SIGNED-BY-THE-SAME-SHARED-KEY"
...

对于 sparkPropertiesenvironmentVariables,用户可以使用服务器端环境变量的占位符,如下所示。


...
  "sparkProperties": {
    "spark.hadoop.fs.s3a.endpoint": "{{AWS_ENDPOINT_URL}}",
    "spark.hadoop.fs.s3a.endpoint.region": "{{AWS_REGION}}"
  },
  "environmentVariables": {
    "AWS_CA_BUNDLE": "{{AWS_CA_BUNDLE}}"
  },
...

资源调度

独立集群模式目前仅支持跨应用程序的简单 FIFO 调度器。然而,为了允许多个并发用户,你可以控制每个应用程序将使用的最大资源数量。默认情况下,它将获取集群中的所有核心,这仅在你一次只运行一个应用程序时才有意义。你可以通过在 SparkConf 中设置 spark.cores.max 来限制核心数量。例如:

val conf = new SparkConf()
  .setMaster(...)
  .setAppName(...)
  .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

此外,你可以在集群 Master 进程上配置 spark.deploy.defaultCores,以更改未将 spark.cores.max 设置为非无限值的应用程序的默认值。通过将以下内容添加到 conf/spark-env.sh 来实现:

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

这在共享集群中很有用,因为用户可能没有单独配置最大核心数。

Executor 调度

分配给每个 Executor 的核心数是可配置的。当明确设置 spark.executor.cores 时,如果 Worker 有足够的内核和内存,来自同一应用程序的多个 Executor 可能会在同一个 Worker 上启动。否则,每个 Executor 默认会占用 Worker 上所有可用的核心,在这种情况下,在一次调度迭代中,每个 Worker 上可能只启动一个应用程序的一个 Executor。

阶段级别调度概述

独立模式支持阶段级别调度

注意事项

正如动态资源分配中所述,如果在启用动态分配时未明确指定每个 Executor 的核心数,Spark 可能会获取比预期多得多的 Executor。因此,在使用阶段级别调度时,建议你为每个资源配置文件明确设置 Executor 核心数。

监控与日志记录

Spark 的独立模式提供了一个基于 Web 的用户界面来监控集群。Master 和每个 Worker 都有自己的 Web UI,显示集群和作业统计信息。默认情况下,你可以通过 8080 端口访问 Master 的 Web UI。该端口可以在配置文件中或通过命令行选项更改。

此外,每个作业的详细日志输出也会写入每个 Worker 节点的工作目录(默认情况下为 SPARK_HOME/work)。每个作业将看到两个文件,stdoutstderr,其中包含其写入控制台的所有输出。

与 Hadoop 并行运行

你可以通过在相同的机器上将其作为单独的服务启动,从而使 Spark 与现有 Hadoop 集群并行运行。要从 Spark 访问 Hadoop 数据,只需使用 hdfs:// URL(通常是 hdfs://<namenode>:9000/path,但你可以在 Hadoop Namenode 的 Web UI 上找到正确的 URL)。另外,你也可以为 Spark 设置一个单独的集群,并仍然让它通过网络访问 HDFS;这会比磁盘本地访问慢,但如果你仍在同一局域网中运行(例如,你在拥有 Hadoop 的每个机架上放置几台 Spark 机器),这可能不是问题。

配置网络安全端口

一般来说,Spark 集群及其服务不会部署在公共互联网上。它们通常是私有服务,并且只能在部署 Spark 的组织网络内部访问。对 Spark 服务使用的主机和端口的访问应仅限于需要访问这些服务的源主机。

这对于使用独立资源管理器的集群尤为重要,因为它们不像其他资源管理器那样支持细粒度访问控制。

有关要配置的端口的完整列表,请参阅安全页面

高可用性

默认情况下,独立调度集群对 Worker 故障具有弹性(Spark 本身通过将工作转移到其他 Worker 来抵抗工作丢失)。然而,调度器使用 Master 来进行调度决策,这(默认情况下)创建了一个单点故障:如果 Master 崩溃,则无法创建新的应用程序。为了规避这一点,我们有两种高可用性方案,详述如下。

使用 ZooKeeper 实现备用 Master

概述

利用 ZooKeeper 提供领导者选举和一些状态存储,你可以在集群中启动连接到同一 ZooKeeper 实例的多个 Master。其中一个将被选举为“领导者”,其他 Master 将保持备用模式。如果当前领导者宕机,将选举另一个 Master,恢复旧 Master 的状态,然后恢复调度。整个恢复过程(从第一个领导者宕机开始)应在 1 到 2 分钟之间。请注意,此延迟仅影响调度应用程序——在 Master 故障转移期间已经运行的应用程序不受影响。

此处了解更多关于 ZooKeeper 入门的信息。

配置

为了启用此恢复模式,你可以通过配置 spark.deploy.recoveryMode 和相关的 spark.deploy.zookeeper.* 配置,在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS

可能的陷阱:如果你的集群中有多个 Master 但未能正确配置 Master 使用 ZooKeeper,则 Master 将无法相互发现,并认为它们都是领导者。这不会导致健康的集群状态(因为所有 Master 将独立调度)。

详情

设置好 ZooKeeper 集群后,启用高可用性非常简单。只需在不同节点上启动多个 Master 进程,并使用相同的 ZooKeeper 配置(ZooKeeper URL 和目录)。Master 可以随时添加和移除。

为了调度新的应用程序或向集群添加 Worker,它们需要知道当前领导者的 IP 地址。这可以通过简单地传入 Master 列表来实现,而以前你只传入单个 Master。例如,你可能会将 SparkContext 指向 spark://host1:port1,host2:port2。这将导致你的 SparkContext 尝试向两个 Master 注册——如果 host1 宕机,此配置仍然正确,因为我们将找到新的领导者 host2

“向 Master 注册”和“正常操作”之间存在一个重要的区别。启动时,应用程序或 Worker 需要能够找到并向当前领导 Master 注册。一旦成功注册,它就“进入系统”(即存储在 ZooKeeper 中)。如果发生故障转移,新的领导者将联系所有先前注册的应用程序和 Worker,告知它们领导权变更,因此它们在启动时甚至无需知道新 Master 的存在。

由于这个特性,可以随时创建新的 Master,你唯一需要担心的是,应用程序和 Worker 能够在它成为领导者时找到它进行注册。一旦注册成功,你就不必担心了。

使用本地文件系统实现单节点恢复

概述

对于生产级别的高可用性,ZooKeeper 是最佳选择,但如果你只是想在 Master 宕机时能够重新启动它,FILESYSTEM 模式可以解决这个问题。当应用程序和 Worker 注册时,它们会将足够的状态写入提供的目录,以便在 Master 进程重启时可以恢复它们。

配置

为了启用此恢复模式,你可以使用此配置在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS

系统属性默认值含义起始版本
spark.deploy.recoveryMode NONE 恢复模式设置,用于在集群模式下已提交的 Spark 作业失败并重新启动时进行恢复。设置为 FILESYSTEM 以启用基于文件系统的单节点恢复模式,ROCKSDB 以启用基于 RocksDB 的单节点恢复模式,ZOOKEEPER 以使用基于 Zookeeper 的恢复模式,CUSTOM 则通过额外的 spark.deploy.recoveryMode.factory 配置提供自定义提供程序类。NONE 是默认值,它禁用此恢复模式。 0.8.1
spark.deploy.recoveryDirectory "" Spark 将存储恢复状态的目录,可从 Master 的角度访问。请注意,如果 spark.deploy.recoveryModespark.deploy.recoveryCompressionCodec 发生更改,则应手动清除该目录。 0.8.1
spark.deploy.recoveryCompressionCodec (无) 持久化引擎的压缩编解码器。none(默认)、lz4、lzf、snappy 和 zstd。目前,只有 FILESYSTEM 模式支持此配置。 4.0.0
spark.deploy.recoveryTimeout (无) 恢复过程的超时时间。默认值与 spark.worker.timeout 相同。 4.0.0
spark.deploy.recoveryMode.factory "" 实现 StandaloneRecoveryModeFactory 接口的类 1.2.0
spark.deploy.zookeeper.url spark.deploy.recoveryMode 设置为 ZOOKEEPER 时,此配置用于设置要连接的 Zookeeper URL。 0.8.1
spark.deploy.zookeeper.dir spark.deploy.recoveryMode 设置为 ZOOKEEPER 时,此配置用于设置存储恢复状态的 Zookeeper 目录。 0.8.1

详情