Spark 独立部署模式
- 安全性
- 在集群中安装 Spark 独立模式
- 手动启动集群
- 集群启动脚本
- 资源分配与配置概述
- 将应用程序连接到集群
- 客户端属性
- 启动 Spark 应用程序
- 资源调度
- Executor 调度
- 阶段级调度概述
- 监控与日志
- 与 Hadoop 并行运行
- 为网络安全配置端口
- 高可用性
除了在 YARN 集群管理器上运行外,Spark 还提供了一种简单的独立(Standalone)部署模式。你可以通过手动启动 Master 和 Worker 来启动独立集群,或者使用我们提供的启动脚本。在单台机器上运行这些守护进程进行测试也是可行的。
安全性
身份验证等安全功能默认未启用。在部署面向互联网或不受信任网络的集群时,务必保护对集群的访问,以防止未经授权的应用程序在集群上运行。在运行 Spark 之前,请参阅 Spark 安全性及本文档中的特定安全章节。
在集群中安装 Spark 独立模式
要安装 Spark 独立模式,只需将编译好的 Spark 放置在集群的每个节点上即可。你可以通过每个版本的预构建版本获取 Spark,或者自行构建。
手动启动集群
你可以通过执行以下命令启动独立 Master 服务器:
./sbin/start-master.sh
启动后,Master 将打印出一个 spark://HOST:PORT URL,你可以使用它将 Worker 连接到 Master,或者将其作为“master”参数传递给 SparkContext。你还可以在 Master 的 Web UI(默认地址为 https://:8080)上找到此 URL。
类似地,你可以通过以下命令启动一个或多个 Worker 并将其连接到 Master:
./sbin/start-worker.sh <master-spark-URL>
启动 Worker 后,请查看 Master 的 Web UI(默认地址为 https://:8080)。你应该能看到列出的新节点,以及它的 CPU 和内存数量(减去预留给操作系统的 1GB)。
最后,以下配置选项可以传递给 Master 和 Worker:
| 参数 | 含义 |
|---|---|
-h HOST, --host HOST |
监听的主机名 |
-p PORT, --port PORT |
服务监听的端口(默认:Master 为 7077,Worker 为随机) |
--webui-port PORT |
Web UI 端口(默认:Master 为 8080,Worker 为 8081) |
-c CORES, --cores CORES |
允许 Spark 应用程序在此机器上使用的 CPU 核心总数(默认:所有可用核心);仅限 Worker 使用 |
-m MEM, --memory MEM |
允许 Spark 应用程序在此机器上使用的内存总量,格式如 1000M 或 2G(默认:机器总内存减去 1 GiB);仅限 Worker 使用 |
-d DIR, --work-dir DIR |
用于暂存空间和作业输出日志的目录(默认:SPARK_HOME/work);仅限 Worker 使用 |
--properties-file FILE |
要加载的自定义 Spark 属性文件路径(默认:conf/spark-defaults.conf) |
集群启动脚本
要使用启动脚本启动 Spark 独立集群,你应该在 Spark 目录下创建一个名为 conf/workers 的文件,其中必须包含所有打算启动 Spark Worker 的机器的主机名,每行一个。如果 conf/workers 不存在,启动脚本默认指向单机(localhost),这在测试时很有用。注意,Master 机器通过 ssh 访问每个 Worker 机器。默认情况下,ssh 是并行运行的,需要设置无密码(通过私钥)访问。如果你没有设置无密码访问,可以设置环境变量 SPARK_SSH_FOREGROUND 并为每个 Worker 串行提供密码。
设置好此文件后,你可以使用以下 shell 脚本启动或停止集群,这些脚本基于 Hadoop 的部署脚本,位于 SPARK_HOME/sbin 中:
sbin/start-master.sh- 在执行脚本的机器上启动一个 Master 实例。sbin/start-workers.sh- 在conf/workers文件中指定的每台机器上启动一个 Worker 实例。sbin/start-worker.sh- 在执行脚本的机器上启动一个 Worker 实例。sbin/start-connect-server.sh- 在执行脚本的机器上启动一个 Spark Connect 服务器。sbin/start-all.sh- 启动 Master 和上述数量的 Worker。sbin/stop-master.sh- 停止通过sbin/start-master.sh脚本启动的 Master。sbin/stop-worker.sh- 停止执行脚本的机器上的所有 Worker 实例。sbin/stop-workers.sh- 停止conf/workers文件中指定的机器上的所有 Worker 实例。sbin/stop-connect-server.sh- 停止执行脚本的机器上的所有 Spark Connect 服务器实例。sbin/stop-all.sh- 停止 Master 和上述的 Worker。
注意,这些脚本必须在你想要运行 Spark Master 的机器上执行,而不是你的本地机器。
你可以通过在 conf/spark-env.sh 中设置环境变量来进一步配置集群。通过 conf/spark-env.sh.template 创建此文件,并将其复制到你所有的 Worker 机器上以使设置生效。以下是可用的设置:
| 环境变量 | 含义 |
|---|---|
SPARK_MASTER_HOST |
将 Master 绑定到特定的主机名或 IP 地址,例如公共地址。 |
SPARK_MASTER_PORT |
在不同的端口启动 Master(默认:7077)。 |
SPARK_MASTER_WEBUI_PORT |
Master Web UI 的端口(默认:8080)。 |
SPARK_MASTER_OPTS |
仅适用于 Master 的配置属性,形式为 "-Dx=y"(默认:无)。请参阅下文获取可能的选项列表。 |
SPARK_LOCAL_DIRS |
Spark 中用于“暂存”空间的目录,包括 Map 输出文件和存储在磁盘上的 RDD。这应该放在系统中快速的本地磁盘上。也可以是多个不同磁盘上的目录组成的逗号分隔列表。 |
SPARK_LOG_DIR |
日志文件的存储位置(默认:SPARK_HOME/logs)。 |
SPARK_LOG_MAX_FILES |
日志文件的最大数量(默认:5)。 |
SPARK_PID_DIR |
pid 文件的存储位置(默认:/tmp)。 |
SPARK_WORKER_CORES |
允许 Spark 应用程序在此机器上使用的总核心数(默认:所有可用核心)。 |
SPARK_WORKER_MEMORY |
允许 Spark 应用程序在此机器上使用的总内存总量,例如 1000m, 2g(默认:总内存减去 1 GiB);注意,每个应用程序的独立内存配置使用其 spark.executor.memory 属性。 |
SPARK_WORKER_PORT |
在特定端口启动 Spark Worker(默认:随机)。 |
SPARK_WORKER_WEBUI_PORT |
Worker Web UI 的端口(默认:8081)。 |
SPARK_WORKER_DIR |
运行应用程序的目录,将包含日志和暂存空间(默认:SPARK_HOME/work)。 |
SPARK_WORKER_OPTS |
仅适用于 Worker 的配置属性,形式为 "-Dx=y"(默认:无)。请参阅下文获取可能的选项列表。 |
SPARK_DAEMON_MEMORY |
分配给 Spark Master 和 Worker 守护进程本身的内存(默认:1g)。 |
SPARK_DAEMON_JAVA_OPTS |
Spark Master 和 Worker 守护进程本身的 JVM 选项,形式为 "-Dx=y"(默认:无)。 |
SPARK_DAEMON_CLASSPATH |
Spark Master 和 Worker 守护进程本身的类路径(默认:无)。 |
SPARK_PUBLIC_DNS |
Spark Master 和 Worker 的公共 DNS 名称(默认:无)。 |
注意:启动脚本目前不支持 Windows。要在 Windows 上运行 Spark 集群,请手动启动 Master 和 Worker。
SPARK_MASTER_OPTS 支持以下系统属性:
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.master.ui.port |
8080 |
指定 Master Web UI 端点的端口号。 | 1.1.0 |
spark.master.ui.title |
(无) | 指定 Master UI 页面的标题。如果未设置,则默认使用 Spark Master at 'master url'。 |
4.0.0 |
spark.master.ui.decommission.allow.mode |
LOCAL |
指定 Master Web UI 的 /workers/kill 端点的行为。可选值:LOCAL 表示仅允许运行 Master 的机器本地 IP 访问;DENY 表示完全禁用该端点;ALLOW 表示允许任何 IP 调用该端点。 |
3.1.0 |
spark.master.ui.historyServerUrl |
(无) | Spark 历史服务器运行的 URL。请注意,这假设所有 Spark 作业共享历史服务器访问的相同事件日志位置。 | 4.0.0 |
spark.master.rest.enabled |
true |
是否使用 Master REST API 端点。 | 1.3.0 |
spark.master.rest.host |
(无) | 指定 Master REST API 端点的主机。 | 4.0.0 |
spark.master.rest.port |
6066 |
指定 Master REST API 端点的端口号。 | 1.3.0 |
spark.master.rest.filters |
(无) | 应用于 Master REST API 的过滤器类名(逗号分隔)。 | 4.0.0 |
spark.master.useAppNameAsAppId.enabled |
false |
(实验性) 如果为 true,Spark Master 将使用用户提供的 appName 作为 appId。 | 4.0.0 |
spark.deploy.retainedApplications |
200 | 显示已完成应用程序的最大数量。旧的应用程序将被从 UI 中移除以保持此限制。 |
0.8.0 |
spark.deploy.retainedDrivers |
200 | 显示已完成驱动程序(Driver)的最大数量。旧的驱动程序将被从 UI 中移除以保持此限制。 |
1.1.0 |
spark.deploy.spreadOutDrivers |
true | 独立集群管理器是否应将驱动程序分散到各个节点,还是尽可能合并到少数节点上。分散分布通常对 HDFS 的数据局部性更好,但在计算密集型工作负载中,合并更有效。 | 4.0.0 |
spark.deploy.spreadOutApps |
true | 独立集群管理器是否应将应用程序分散到各个节点,还是尽可能合并到少数节点上。分散分布通常对 HDFS 的数据局部性更好,但在计算密集型工作负载中,合并更有效。 |
0.6.1 |
spark.deploy.defaultCores |
Int.MaxValue | 在 Spark 独立模式下,如果应用程序未设置 spark.cores.max,则分配给应用程序的默认核心数。如果未设置,除非应用程序自行配置 spark.cores.max,否则应用程序总是会获得所有可用核心。在共享集群上建议调小此值,以防止用户默认占用整个集群。 |
0.9.0 |
spark.deploy.maxExecutorRetries |
10 | 在独立集群管理器移除故障应用程序之前,连续发生 Executor 故障的最大次数限制。如果应用程序有任何正在运行的 Executor,它将永远不会被移除。如果应用程序连续发生超过 spark.deploy.maxExecutorRetries 次失败,且期间没有 Executor 成功启动,且该应用程序没有正在运行的 Executor,则独立集群管理器将移除该应用程序并将其标记为失败。要禁用此自动移除功能,请将 spark.deploy.maxExecutorRetries 设置为 -1。 |
1.6.3 |
spark.deploy.maxDrivers |
Int.MaxValue | 运行驱动程序的最大数量。 | 4.0.0 |
spark.deploy.appNumberModulo |
(无) | 应用程序编号的模数。默认情况下,app-yyyyMMddHHmmss-9999 的下一个编号是 app-yyyyMMddHHmmss-10000。如果我们设置 10000 为模数,它将变成 app-yyyyMMddHHmmss-0000。在大多数情况下,前缀 app-yyyyMMddHHmmss 在创建 10000 个应用程序时已经增加了。 |
4.0.0 |
spark.deploy.driverIdPattern |
driver-%s-%04d |
基于 Java String.format 方法生成的驱动程序 ID 模式。默认值是 driver-%s-%04d,代表现有的驱动程序 ID 字符串,例如 driver-20231031224459-0019。请谨慎操作以确保 ID 的唯一性。 |
4.0.0 |
spark.deploy.appIdPattern |
app-%s-%04d |
基于 Java String.format 方法生成的应用程序 ID 模式。默认值是 app-%s-%04d,代表现有的应用程序 ID 字符串,例如 app-20231031224509-0008。请谨慎操作以确保 ID 的唯一性。 |
4.0.0 |
spark.worker.timeout |
60 | 独立部署 Master 在未收到心跳后认为 Worker 已丢失的秒数。 | 0.6.2 |
spark.dead.worker.persistence |
15 | 在 UI 中保留死掉 Worker 信息迭代次数。默认情况下,死掉 Worker 从最后一次心跳起可见时长为 (15 + 1) * spark.worker.timeout。 |
0.8.0 |
spark.worker.resource.{name}.amount |
(无) | 在 Worker 上使用的特定资源数量。 | 3.0.0 |
spark.worker.resource.{name}.discoveryScript |
(无) | 资源发现脚本的路径,用于在 Worker 启动时查找特定资源。脚本的输出应格式化为 ResourceInformation 类。 |
3.0.0 |
spark.worker.resourcesFile |
(无) | 用于在 Worker 启动时查找各种资源的资源文件路径。资源文件的内容应格式化为 [{"id":{"componentName": "spark.worker", "resourceName":"gpu"}, "addresses":["0","1","2"]}]。如果资源文件中未找到特定资源,则会使用发现脚本来查找该资源。如果发现脚本也未找到资源,Worker 将无法启动。 |
3.0.0 |
SPARK_WORKER_OPTS 支持以下系统属性:
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.worker.initialRegistrationRetries |
6 | 在短间隔(5 到 15 秒之间)内重新连接的重试次数。 | 4.0.0 |
spark.worker.maxRegistrationRetries |
16 | 重新连接的最大重试次数。在尝试 spark.worker.initialRegistrationRetries 次后,间隔变为 30 到 90 秒。 |
4.0.0 |
spark.worker.cleanup.enabled |
true | 启用 Worker/应用程序目录的定期清理。注意这仅影响独立模式,因为 YARN 的工作方式不同。只有已停止的应用程序目录才会被清理。如果 spark.shuffle.service.db.enabled 为 "true",则应启用此功能。 |
1.0.0 |
spark.worker.cleanup.interval |
1800 (30 分钟) | 以秒为单位,控制 Worker 清理本地机器上旧应用程序工作目录的间隔。 | 1.0.0 |
spark.worker.cleanup.appDataTtl |
604800 (7 天,7 * 24 * 3600) | 在每个 Worker 上保留应用程序工作目录的秒数。这是一个生存时间(TTL),具体取决于你拥有的可用磁盘空间量。应用程序日志和 JAR 包会被下载到每个应用程序工作目录中。随着时间的推移,工作目录会迅速填满磁盘空间,特别是如果你非常频繁地运行作业。 | 1.0.0 |
spark.shuffle.service.db.enabled |
true | 将外部 Shuffle 服务状态存储在本地磁盘上,以便重启外部 Shuffle 服务时,它能自动重新加载当前 Executor 的信息。这仅影响独立模式(YARN 总是默认启用此行为)。你还应该启用 spark.worker.cleanup.enabled 以确保状态最终被清理。此配置将来可能会被删除。 |
3.0.0 |
spark.shuffle.service.db.backend |
ROCKSDB | 当 spark.shuffle.service.db.enabled 为 true 时,用户可以使用此配置指定 Shuffle 服务状态存储中使用的磁盘存储类型。目前支持 ROCKSDB 和 LEVELDB (已弃用),默认为 ROCKSDB。存储在 RocksDB/LevelDB 中的原始数据现在不会被自动转换。 |
3.4.0 |
spark.storage.cleanupFilesAfterExecutorExit |
true | 在 Executor 退出后,启用对 Worker 目录中非 Shuffle 文件(如临时 Shuffle 块、缓存的 RDD/广播块、溢出文件等)的清理。注意,这与 spark.worker.cleanup.enabled 不同,后者启用对已停止和超时的应用程序的所有文件/子目录的清理,而此配置启用对死掉的 Executor 的本地目录中非 Shuffle 文件的清理。这仅影响独立模式,将来可能会添加对其他集群管理器的支持。 |
2.4.0 |
spark.worker.ui.compressedLogFileLengthCacheSize |
100 | 对于压缩日志文件,解压后的文件大小只能通过解压文件来计算。Spark 会缓存压缩日志文件的解压后文件大小。此属性控制缓存大小。 | 2.0.2 |
spark.worker.idPattern |
worker-%s-%s-%d |
基于 Java String.format 方法生成的 Worker ID 模式。默认值为 worker-%s-%s-%d,代表现有的 Worker ID 字符串,例如 worker-20231109183042-[fe80::1%lo0]-39729。请谨慎操作以确保 ID 的唯一性。 |
4.0.0 |
资源分配与配置概述
请务必阅读配置页面上的“自定义资源调度和配置概述”部分。本节仅讨论 Spark 独立模式下的资源调度特定方面。
Spark 独立部署包括两个部分,第一是配置 Worker 的资源,第二是特定应用程序的资源分配。
用户必须配置 Worker 以使其拥有一组可用资源,从而分配给 Executor。spark.worker.resource.{resourceName}.amount 用于控制 Worker 分配的每种资源的数量。用户还必须指定 spark.worker.resourcesFile 或 spark.worker.resource.{resourceName}.discoveryScript,以指定 Worker 如何发现分配给它的资源。请参阅上述说明以确定哪种方法最适合你的设置。
第二部分是在 Spark 独立模式上运行应用程序。与标准 Spark 资源配置的唯一特殊情况是当你以客户端模式运行驱动程序时。对于客户端模式下的驱动程序,用户可以通过 spark.driver.resourcesFile 或 spark.driver.resource.{resourceName}.discoveryScript 指定其使用的资源。如果驱动程序与其它驱动程序运行在同一主机上,请确保资源文件或发现脚本仅返回不会与节点上运行的其他驱动程序冲突的资源。
注意,用户在提交应用程序时不需要指定发现脚本,因为 Worker 将使用分配给它的资源启动每个 Executor。
将应用程序连接到集群
要在 Spark 集群上运行应用程序,只需将 Master 的 spark://IP:PORT URL 传递给 SparkContext 构造函数。
要在集群上运行交互式 Spark shell,请执行以下命令:
./bin/spark-shell --master spark://IP:PORT
你还可以传递选项 --total-executor-cores <numCores> 来控制 spark-shell 在集群上使用的核心数。
客户端属性
Spark 应用程序支持以下独立模式专用的配置属性:
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.standalone.submit.waitAppCompletion |
false |
在独立集群模式下,控制客户端是否等待直到应用程序完成才退出。如果设置为 true,客户端进程将保持活动状态并轮询驱动程序的状态。否则,客户端进程将在提交后立即退出。 |
3.1.0 |
启动 Spark 应用程序
Spark 协议
spark-submit 脚本提供了提交已编译 Spark 应用程序到集群的最直接方式。对于独立集群,Spark 目前支持两种部署模式。在 client 模式下,驱动程序在提交应用程序的客户端的相同进程中启动。然而,在 cluster 模式下,驱动程序从集群内部的一个 Worker 进程中启动,客户端进程在完成提交应用程序的责任后立即退出,而不等待应用程序完成。
如果你的应用程序是通过 Spark submit 启动的,那么应用程序的 jar 包会自动分发到所有 Worker 节点。对于应用程序依赖的任何其他 jar 包,你应该使用 --jars 标志并用逗号分隔来指定它们(例如 --jars jar1,jar2)。要控制应用程序的配置或执行环境,请参阅 Spark 配置。
此外,独立 cluster 模式支持在应用程序因非零退出代码退出时自动重启。要使用此功能,你可以在启动应用程序时将 --supervise 标志传递给 spark-submit。之后,如果你想终止一个不断失败的应用程序,可以通过以下方式进行:
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
你可以通过 http://<master url>:8080 处的独立 Master Web UI 找到驱动程序 ID。
REST API
如果启用了 spark.master.rest.enabled,Spark Master 提供额外的 REST API,地址为 http://[host:port]/[version]/submissions/[action],其中 host 是 Master 主机,port 是 spark.master.rest.port 指定的端口号(默认:6066),version 是协议版本(当前为 v1),action 是以下支持的操作之一。
| 命令 | HTTP 方法 | 描述 | 起始版本 |
|---|---|---|---|
create |
POST | 以 cluster 模式创建一个 Spark 驱动程序。从 4.0.0 版本开始,Spark Master 支持对 Spark 属性和环境变量的值进行服务端变量替换。 |
1.3.0 |
kill |
POST | 终止单个 Spark 驱动程序。 | 1.3.0 |
killall |
POST | 终止所有正在运行的 Spark 驱动程序。 | 4.0.0 |
status |
GET | 检查 Spark 作业的状态。 | 1.3.0 |
clear |
POST | 清理已完成的驱动程序和应用程序。 | 4.0.0 |
以下是使用 pi.py 和 REST API 的 curl CLI 命令示例。
$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Content-Type:application/json;charset=UTF-8" \
--data '{
"appResource": "",
"sparkProperties": {
"spark.master": "spark://master:7077",
"spark.app.name": "Spark Pi",
"spark.driver.memory": "1g",
"spark.driver.cores": "1",
"spark.jars": ""
},
"clientSparkVersion": "",
"mainClass": "org.apache.spark.deploy.SparkSubmit",
"environmentVariables": { },
"action": "CreateSubmissionRequest",
"appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ]
}'
以下是上述 create 请求的 REST API 响应。
{
"action" : "CreateSubmissionResponse",
"message" : "Driver successfully submitted as driver-20231124153531-0000",
"serverSparkVersion" : "4.0.0",
"submissionId" : "driver-20231124153531-0000",
"success" : true
}
当 Spark Master 通过 spark.master.rest.filters=org.apache.spark.ui.JWSFilter 和 spark.org.apache.spark.ui.JWSFilter.param.secretKey=BASE64URL-ENCODED-KEY 配置要求 HTTP Authorization 标头时,curl CLI 命令可以像下面这样提供所需的标头。
$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Authorization: Bearer USER-PROVIDED-WEB-TOEN-SIGNED-BY-THE-SAME-SHARED-KEY"
...
对于 sparkProperties 和 environmentVariables,用户可以使用服务端环境变量的占位符,如下所示。
...
"sparkProperties": {
"spark.hadoop.fs.s3a.endpoint": "{{AWS_ENDPOINT_URL}}",
"spark.hadoop.fs.s3a.endpoint.region": "{{AWS_REGION}}"
},
"environmentVariables": {
"AWS_CA_BUNDLE": "{{AWS_CA_BUNDLE}}"
},
...
资源调度
独立集群模式目前仅支持应用程序之间的简单 FIFO 调度。但是,为了允许多个并发用户,你可以控制每个应用程序使用的最大资源数量。默认情况下,它将获取集群中的所有核心,这只有在你一次只运行一个应用程序时才有意义。你可以通过在 SparkConf 中设置 spark.cores.max 来限制核心数。例如:
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)此外,你可以在集群 Master 进程上配置 spark.deploy.defaultCores,为那些未设置 spark.cores.max 的应用程序修改默认值,使其不是无限的。通过在 conf/spark-env.sh 中添加以下内容来实现:
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"这对于用户可能未单独配置最大核心数的共享集群非常有用。
Executor 调度
分配给每个 Executor 的核心数是可以配置的。当明确设置了 spark.executor.cores 时,如果 Worker 有足够的核心和内存,则来自同一个应用程序的多个 Executor 可以在同一 Worker 上启动。否则,每个 Executor 默认会占用 Worker 上所有可用的核心,在这种情况下,在单次调度迭代期间,每个 Worker 上可能只能启动一个该应用程序的 Executor。
阶段级调度概述
独立模式支持阶段级调度。
- 当禁用动态分配时:它允许用户在阶段级别指定不同的任务资源需求,并使用启动时请求的相同 Executor。
- 当启用动态分配时:目前,当 Master 为一个应用程序分配 Executor 时,它将根据多个 ResourceProfile 的 ID 顺序进行调度。ID 较小的 ResourceProfile 将被优先调度。通常这无关紧要,因为 Spark 在开始下一个阶段之前会先完成当前阶段。唯一可能产生影响的情况是在作业服务器(Job Server)场景中,所以这一点需要留意。调度时,我们仅从内置 Executor 资源中提取 Executor 内存和核心数,而从 ResourceProfile 中提取所有其他自定义资源,其他内置 Executor 资源(如 offHeap 和 memoryOverhead)将不起作用。基础默认配置(Default Profile)将基于你提交应用程序时的 Spark 配置创建。来自基础默认配置的 Executor 内存和核心数可以传播到自定义 ResourceProfiles,但所有其他自定义资源不能传播。
注意事项
正如动态资源分配中所述,如果未在启用动态分配时显式指定每个 Executor 的核心数,Spark 可能会获取比预期多得多的 Executor。因此,在使用阶段级调度时,建议你明确为每个 ResourceProfile 设置 Executor 核心数。
监控与日志
Spark 的独立模式提供了一个基于 Web 的用户界面来监控集群。Master 和每个 Worker 都有自己的 Web UI,显示集群和作业统计信息。默认情况下,你可以通过 8080 端口访问 Master 的 Web UI。端口可以在配置文件或通过命令行选项中更改。
此外,每个作业的详细日志输出也会写入每个 Worker 节点的工作目录(默认:SPARK_HOME/work)。你将看到每个作业的两个文件,stdout 和 stderr,其中包含其控制台的所有输出。
与 Hadoop 并行运行
你可以通过在同一台机器上将其作为独立服务启动,从而使 Spark 与现有的 Hadoop 集群并行运行。要从 Spark 访问 Hadoop 数据,只需使用 hdfs:// URL(通常为 hdfs://<namenode>:9000/path,你可以在 Hadoop Namenode 的 Web UI 上找到正确的 URL)。或者,你可以为 Spark 设置一个单独的集群,并仍然通过网络访问 HDFS;这将比磁盘本地访问慢,但如果你仍然在同一个局域网内运行(例如,你在 Hadoop 所在的每个机架上放置几台 Spark 机器),这可能不是问题。
为网络安全配置端口
总的来说,Spark 集群及其服务通常不会部署在公共互联网上。它们通常是私有服务,并且只能在部署 Spark 的组织网络内访问。对 Spark 服务所使用主机和端口的访问应仅限于需要访问这些服务的源主机。
这对于使用独立资源管理器的集群尤为重要,因为它们不支持其他资源管理器所具备的细粒度访问控制。
有关要配置的端口的完整列表,请参阅安全页面。
高可用性
默认情况下,独立调度集群对 Worker 故障具有弹性(只要 Spark 本身可以通过将工作移动到其他 Worker 来从丢失工作中恢复)。然而,调度程序使用 Master 来做出调度决策,这(默认情况下)创建了一个单点故障:如果 Master 崩溃,则无法创建新的应用程序。为了规避这一点,我们有两种高可用性方案,详见下文。
使用 ZooKeeper 的备用 Master
概述
利用 ZooKeeper 提供领导者选举和一些状态存储,你可以在集群中启动连接到同一个 ZooKeeper 实例的多个 Master。其中一个将被选举为“领导者”(leader),其他将保持待机模式。如果当前的领导者死亡,另一个 Master 将被选举出来,恢复旧 Master 的状态,然后恢复调度。整个恢复过程(从第一个领导者宕机开始)应该在 1 到 2 分钟之间。注意,此延迟仅影响新应用程序的调度——在 Master 故障转移期间已经在运行的应用程序不受影响。
在此处了解更多关于开始使用 ZooKeeper 的信息:这里。
配置
为了启用此恢复模式,你可以通过配置 spark.deploy.recoveryMode 和相关的 spark.deploy.zookeeper.* 配置,在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS。
可能遇到的问题:如果你的集群中有多个 Master 但未能正确配置它们使用 ZooKeeper,Master 将无法发现彼此并认为自己都是领导者。这将不会导致健康的集群状态(因为所有 Master 将独立调度)。
详细信息
在你设置好 ZooKeeper 集群后,启用高可用性非常简单。只需在不同的节点上启动多个 Master 进程,并使用相同的 ZooKeeper 配置(ZooKeeper URL 和目录)。Master 可以随时添加和删除。
为了调度新应用程序或向集群添加 Worker,它们需要知道当前领导者的 IP 地址。这可以通过简单地传递一份 Master 列表来实现,而不是过去传递单个地址的方式。例如,你可以将 SparkContext 指向 spark://host1:port1,host2:port2。这将导致你的 SparkContext 尝试向两个 Master 注册——如果 host1 宕机,此配置仍然有效,因为我们会找到新的领导者 host2。
“向 Master 注册”与正常操作之间有一个重要的区别。在启动时,应用程序或 Worker 需要能够找到并向当前的领导 Master 注册。但是,一旦成功注册,它就“处于系统中”(即存储在 ZooKeeper 中)。如果发生故障转移,新的领导者将联系所有先前注册的应用程序和 Worker 以通知它们领导层的变更,因此它们甚至不需要在启动时知道新 Master 的存在。
由于这一特性,新的 Master 可以随时创建,你唯一需要担心的就是新的应用程序和 Worker 能够找到它以便在它成为领导者时进行注册。一旦注册,后续的事情就不用操心了。
使用本地文件系统的单节点恢复
概述
ZooKeeper 是生产级高可用性的最佳方案,但如果你只是想在 Master 宕机时能够重启它,FILESYSTEM 模式可以解决这个问题。当应用程序和 Worker 注册时,它们会将足够的状态写入提供的目录,以便在 Master 进程重启时进行恢复。
配置
为了启用此恢复模式,你可以使用此配置在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS:
| 系统属性 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.deploy.recoveryMode |
NONE | 恢复模式设置,用于在以集群模式提交的 Spark 作业失败并重新启动时进行恢复。设置为 FILESYSTEM 以启用基于文件系统的单节点恢复模式;ROCKSDB 以启用基于 RocksDB 的单节点恢复模式;ZOOKEEPER 以使用基于 ZooKeeper 的恢复模式;CUSTOM 以通过额外的 `spark.deploy.recoveryMode.factory` 配置提供自定义工厂类。NONE 是默认值,禁用了此恢复模式。 | 0.8.1 |
spark.deploy.recoveryDirectory |
"" | Spark 将存储恢复状态的目录,从 Master 的角度来看是可访问的。注意,如果更改了 spark.deploy.recoveryMode 或 spark.deploy.recoveryCompressionCodec,则应手动清除该目录。 |
0.8.1 |
spark.deploy.recoveryCompressionCodec |
(无) | 持久化引擎的压缩编解码器。none (默认), lz4, lzf, snappy, 和 zstd。目前,只有 FILESYSTEM 模式支持此配置。 | 4.0.0 |
spark.deploy.recoveryTimeout |
(无) | 恢复过程的超时时间。默认值与 spark.worker.timeout 相同。 |
4.0.0 |
spark.deploy.recoveryMode.factory |
"" | 实现 StandaloneRecoveryModeFactory 接口的类。 |
1.2.0 |
spark.deploy.zookeeper.url |
None | 当 spark.deploy.recoveryMode 设置为 ZOOKEEPER 时,此配置用于设置要连接的 ZooKeeper URL。 |
0.8.1 |
spark.deploy.zookeeper.dir |
None | 当 spark.deploy.recoveryMode 设置为 ZOOKEEPER 时,此配置用于设置存储恢复状态的 ZooKeeper 目录。 |
0.8.1 |
详细信息
- 此解决方案可以与像 monit 这样的进程监控器/管理器配合使用,或者仅用于通过重启启用手动恢复。
- 虽然文件系统恢复看起来显然优于不进行任何恢复,但这种模式对于某些开发或实验目的可能不是最优的。特别是,通过 stop-master.sh 杀死 Master 不会清除其恢复状态,因此每当你启动新的 Master 时,它都会进入恢复模式。如果它需要等待所有先前注册的 Worker/客户端超时,这可能会使启动时间增加多达 1 分钟。
- 虽然官方不支持,但你可以将 NFS 目录挂载为恢复目录。如果原始 Master 节点彻底死亡,你可以在不同的节点上启动一个 Master,它将正确恢复所有先前注册的 Worker/应用程序(等同于 ZooKeeper 恢复)。然而,未来的应用程序为了注册,必须能够找到新的 Master。