Spark 独立模式
- 安全
- 在集群中安装 Spark 独立模式
- 手动启动集群
- 集群启动脚本
- 资源分配和配置概述
- 将应用程序连接到集群
- 客户端属性
- 启动 Spark 应用程序
- 资源调度
- Executor 调度
- 阶段级别调度概述
- 监控与日志记录
- 与 Hadoop 并行运行
- 配置网络安全端口
- 高可用性
除了在 YARN 集群管理器上运行外,Spark 还提供了一种简单的独立部署模式。你可以手动启动 Master 和 Worker 来启动一个独立集群,或者使用我们提供的启动脚本。也可以在单机上运行这些守护进程进行测试。
安全
默认情况下,未启用身份验证等安全功能。在部署面向互联网或不可信网络的集群时,务必保护集群访问安全,以防止未经授权的应用程序在集群上运行。在运行 Spark 之前,请参阅Spark 安全以及本文档中的特定安全部分。
在集群中安装 Spark 独立模式
要安装 Spark 独立模式,只需将 Spark 的编译版本放置在集群中的每个节点上。你可以获取每个版本预构建的 Spark,或者自己构建。
手动启动集群
你可以通过执行以下命令启动一个独立的 Master 服务器:
./sbin/start-master.sh
启动后,Master 将打印出其自身的 spark://HOST:PORT
URL,你可以使用它将 Worker 连接到 Master,或者将其作为“master”参数传递给 SparkContext
。你也可以在 Master 的 Web UI 上找到此 URL,默认情况下为 http://localhost:8080。
同样,你可以启动一个或多个 Worker 并通过以下方式将它们连接到 Master:
./sbin/start-worker.sh <master-spark-URL>
启动 Worker 后,查看 Master 的 Web UI(默认情况下为 http://localhost:8080)。你应该会在其中看到新节点,以及它的 CPU 数量和内存(减去留给操作系统的 1GB)。
最后,以下配置选项可以传递给 Master 和 Worker
参数 | 含义 |
---|---|
-h HOST , --host HOST |
监听的主机名 |
-p PORT , --port PORT |
服务监听端口(默认:Master 为 7077,Worker 为随机) |
|
Web UI 端口(默认:Master 为 8080,Worker 为 8081) |
-c CORES , --cores CORES |
允许 Spark 应用程序在该机器上使用的 CPU 总核心数(默认:所有可用核心);仅限 Worker |
-m MEM , --memory MEM |
允许 Spark 应用程序在该机器上使用的总内存量,格式如 1000M 或 2G(默认:机器总内存减去 1 GiB);仅限 Worker |
-d DIR , --work-dir DIR |
用于临时空间和作业输出日志的目录(默认:SPARK_HOME/work);仅限 Worker |
|
要加载的自定义 Spark 属性文件的路径(默认:conf/spark-defaults.conf) |
集群启动脚本
要使用启动脚本启动 Spark 独立集群,你需要在 Spark 目录中创建一个名为 conf/workers
的文件,该文件必须包含你打算启动 Spark Worker 的所有机器的主机名,每行一个。如果 conf/workers
不存在,启动脚本将默认为单机(localhost),这对于测试很有用。注意,Master 机器通过 SSH 访问每个 Worker 机器。默认情况下,SSH 并行运行并要求设置免密码(使用私钥)访问。如果你没有设置免密码访问,可以设置环境变量 SPARK_SSH_FOREGROUND
并串行地为每个 Worker 提供密码。
设置好此文件后,你可以使用以下 shell 脚本启动或停止集群,这些脚本基于 Hadoop 的部署脚本,并位于 SPARK_HOME/sbin
中
sbin/start-master.sh
- 在执行此脚本的机器上启动一个 Master 实例。sbin/start-workers.sh
- 在conf/workers
文件中指定的每台机器上启动一个 Worker 实例。sbin/start-worker.sh
- 在执行此脚本的机器上启动一个 Worker 实例。sbin/start-connect-server.sh
- 在执行此脚本的机器上启动一个 Spark Connect 服务器。sbin/start-all.sh
- 启动 Master 和上述数量的 Worker。sbin/stop-master.sh
- 停止通过sbin/start-master.sh
脚本启动的 Master。sbin/stop-worker.sh
- 停止执行此脚本的机器上的所有 Worker 实例。sbin/stop-workers.sh
- 停止conf/workers
文件中指定的机器上的所有 Worker 实例。sbin/stop-connect-server.sh
- 停止执行此脚本的机器上的所有 Spark Connect 服务器实例。sbin/stop-all.sh
- 停止 Master 和上述 Worker。
请注意,这些脚本必须在你希望运行 Spark Master 的机器上执行,而不是你的本地机器。
你可以通过在 conf/spark-env.sh
中设置环境变量来进一步配置集群。通过 conf/spark-env.sh.template
文件创建此文件,并将其复制到所有 Worker 机器上以使设置生效。以下设置可用:
环境变量 | 含义 |
---|---|
SPARK_MASTER_HOST |
将 Master 绑定到特定的主机名或 IP 地址,例如公共地址。 |
SPARK_MASTER_PORT |
在不同的端口启动 Master(默认:7077)。 |
SPARK_MASTER_WEBUI_PORT |
Master Web UI 的端口(默认:8080)。 |
SPARK_MASTER_OPTS |
仅适用于 Master 的配置属性,形式为“-Dx=y”(默认:无)。请参阅下文以获取可能的选项列表。 |
SPARK_LOCAL_DIRS |
Spark 中用于“临时”空间的目录,包括映射输出文件和存储在磁盘上的 RDD。这应该位于系统中的快速本地磁盘上。它也可以是不同磁盘上多个目录的逗号分隔列表。 |
SPARK_LOG_DIR |
日志文件的存储位置。(默认:SPARK_HOME/logs)。 |
SPARK_LOG_MAX_FILES |
最大日志文件数(默认:5)。 |
SPARK_PID_DIR |
PID 文件的存储位置。(默认:/tmp)。 |
SPARK_WORKER_CORES |
允许 Spark 应用程序在该机器上使用的总核心数(默认:所有可用核心)。 |
SPARK_WORKER_MEMORY |
允许 Spark 应用程序在该机器上使用的总内存量,例如 1000m 、2g (默认:总内存减去 1 GiB);请注意,每个应用程序的独立内存是使用其 spark.executor.memory 属性配置的。 |
SPARK_WORKER_PORT |
在特定端口启动 Spark Worker(默认:随机)。 |
SPARK_WORKER_WEBUI_PORT |
Worker Web UI 的端口(默认:8081)。 |
SPARK_WORKER_DIR |
运行应用程序的目录,将包括日志和临时空间(默认:SPARK_HOME/work)。 |
SPARK_WORKER_OPTS |
仅适用于 Worker 的配置属性,形式为“-Dx=y”(默认:无)。请参阅下文以获取可能的选项列表。 |
SPARK_DAEMON_MEMORY |
分配给 Spark Master 和 Worker 守护进程自身的内存(默认:1g)。 |
SPARK_DAEMON_JAVA_OPTS |
Spark Master 和 Worker 守护进程自身的 JVM 选项,形式为“-Dx=y”(默认:无)。 |
SPARK_DAEMON_CLASSPATH |
Spark Master 和 Worker 守护进程自身的类路径(默认:无)。 |
SPARK_PUBLIC_DNS |
Spark Master 和 Worker 的公共 DNS 名称(默认:无)。 |
注意:启动脚本目前不支持 Windows。要在 Windows 上运行 Spark 集群,请手动启动 Master 和 Worker。
SPARK_MASTER_OPTS 支持以下系统属性
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.master.ui.port |
8080 |
指定 Master Web UI 端点的端口号。 | 1.1.0 |
spark.master.ui.title |
(无) | 指定 Master UI 页面的标题。如果未设置,则默认使用 Spark Master at 'master url' 。 |
4.0.0 |
spark.master.ui.decommission.allow.mode |
LOCAL |
指定 Master Web UI 的 /workers/kill 端点的行为。可能的选择是:LOCAL 表示允许来自运行 Master 的机器本地 IP 的访问,DENY 表示完全禁用此端点,ALLOW 表示允许从任何 IP 调用此端点。 |
3.1.0 |
spark.master.ui.historyServerUrl |
(无) | Spark 历史服务器运行的 URL。请注意,这假定所有 Spark 作业共享历史服务器访问的相同事件日志位置。 | 4.0.0 |
spark.master.rest.enabled |
false |
是否使用 Master REST API 端点。 | 1.3.0 |
spark.master.rest.host |
(无) | 指定 Master REST API 端点的主机。 | 4.0.0 |
spark.master.rest.port |
6066 |
指定 Master REST API 端点的端口号。 | 1.3.0 |
spark.master.rest.filters |
(无) | 应用于 Master REST API 的过滤器类名称的逗号分隔列表。 | 4.0.0 |
spark.master.useAppNameAsAppId.enabled |
false |
(实验性) 如果为 true,Spark Master 将使用用户提供的 appName 作为 appId。 | 4.0.0 |
spark.deploy.retainedApplications |
200 | 要显示的最大已完成应用程序数。较旧的应用程序将从 UI 中删除以保持此限制。 |
0.8.0 |
spark.deploy.retainedDrivers |
200 | 要显示的最大已完成驱动程序数。较旧的驱动程序将从 UI 中删除以保持此限制。 |
1.1.0 |
spark.deploy.spreadOutDrivers |
true | 独立集群管理器是应该将驱动程序分散到各个节点上,还是尽量将它们整合到尽可能少的节点上。分散通常有利于 HDFS 中的数据本地性,但整合对于计算密集型工作负载更有效。 | 4.0.0 |
spark.deploy.spreadOutApps |
true | 独立集群管理器是应该将应用程序分散到各个节点上,还是尽量将它们整合到尽可能少的节点上。分散通常有利于 HDFS 中的数据本地性,但整合对于计算密集型工作负载更有效。 |
0.6.1 |
spark.deploy.defaultCores |
Int.MaxValue | 如果 Spark 独立模式下的应用程序未设置 spark.cores.max ,则分配给应用程序的默认核心数。如果未设置,应用程序将始终获得所有可用核心,除非它们自己配置 spark.cores.max 。在共享集群上将此值设置得更低,以防止用户默认占用整个集群。 |
0.9.0 |
spark.deploy.maxExecutorRetries |
10 | 独立集群管理器在移除故障应用程序之前,允许连续发生的 Executor 故障的最大次数限制。如果应用程序有任何正在运行的 Executor,它将永远不会被移除。如果应用程序连续发生超过 spark.deploy.maxExecutorRetries 次故障,并且在这些故障之间没有 Executor 成功启动运行,并且应用程序没有正在运行的 Executor,则独立集群管理器将移除该应用程序并将其标记为失败。要禁用此自动移除功能,请将 spark.deploy.maxExecutorRetries 设置为 -1 。 |
1.6.3 |
spark.deploy.maxDrivers |
Int.MaxValue | 正在运行的驱动程序的最大数量。 | 4.0.0 |
spark.deploy.appNumberModulo |
(无) | 应用程序编号的模数。默认情况下,app-yyyyMMddHHmmss-9999 的下一个是 app-yyyyMMddHHmmss-10000 。如果我们设置模数为 10000,它将是 app-yyyyMMddHHmmss-0000 。在大多数情况下,前缀 app-yyyyMMddHHmmss 在创建 10000 个应用程序时已经增加。 |
4.0.0 |
spark.deploy.driverIdPattern |
driver-%s-%04d |
基于 Java String.format 方法的驱动程序 ID 生成模式。默认值为 driver-%s-%04d ,它表示现有的驱动程序 ID 字符串,例如 driver-20231031224459-0019 。请注意生成唯一 ID。 |
4.0.0 |
spark.deploy.appIdPattern |
app-%s-%04d |
基于 Java String.format 方法的应用程序 ID 生成模式。默认值为 app-%s-%04d ,它表示现有的应用程序 ID 字符串,例如 app-20231031224509-0008 。请注意生成唯一 ID。 |
4.0.0 |
spark.worker.timeout |
60 | 如果独立部署 Master 在多少秒内未收到心跳,则认为 Worker 已丢失。 | 0.6.2 |
spark.dead.worker.persistence |
15 | 在 UI 中保留死亡 Worker 信息的迭代次数。默认情况下,死亡 Worker 从其最后一次心跳起可见 (15 + 1) * spark.worker.timeout 。 |
0.8.0 |
spark.worker.resource.{name}.amount |
(无) | 在 Worker 上使用的特定资源数量。 | 3.0.0 |
spark.worker.resource.{name}.discoveryScript |
(无) | 资源发现脚本的路径,用于在 Worker 启动时查找特定资源。脚本的输出应格式化为 ResourceInformation 类。 |
3.0.0 |
spark.worker.resourcesFile |
(无) | 资源文件的路径,用于在 Worker 启动时查找各种资源。资源文件的内容应格式化为 [{"id":{"componentName": "spark.worker", "resourceName":"gpu"}, "addresses":["0","1","2"]}] 。如果在资源文件中未找到特定资源,则将使用发现脚本查找该资源。如果发现脚本也未找到资源,则 Worker 将无法启动。 |
3.0.0 |
SPARK_WORKER_OPTS 支持以下系统属性
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.worker.initialRegistrationRetries |
6 | 在短间隔(5 到 15 秒之间)内重新连接的重试次数。 | 4.0.0 |
spark.worker.maxRegistrationRetries |
16 | 重新连接的最大重试次数。在 spark.worker.initialRegistrationRetries 次尝试后,间隔为 30 到 90 秒。 |
4.0.0 |
spark.worker.cleanup.enabled |
true | 启用 Worker/应用程序目录的定期清理。请注意,这仅影响独立模式,因为 YARN 的工作方式不同。仅停止的应用程序的目录会被清理。如果 spark.shuffle.service.db.enabled 为 "true",则应启用此项。 |
1.0.0 |
spark.worker.cleanup.interval |
1800 (30 分钟) | 控制 Worker 在本地机器上清理旧应用程序工作目录的间隔(秒)。 | 1.0.0 |
spark.worker.cleanup.appDataTtl |
604800 (7 天, 7 * 24 * 3600) | 每个 Worker 上保留应用程序工作目录的秒数。这是一个生存时间(TTL),应取决于可用的磁盘空间量。应用程序日志和 Jar 包会下载到每个应用程序工作目录。随着时间的推移,工作目录可能会迅速占满磁盘空间,特别是当你非常频繁地运行作业时。 | 1.0.0 |
spark.shuffle.service.db.enabled |
true | 将外部 Shuffle 服务的状态存储在本地磁盘上,以便当外部 Shuffle 服务重新启动时,它会自动重新加载当前 Executor 的信息。这仅影响独立模式(YARN 始终启用此行为)。你还应该启用 spark.worker.cleanup.enabled ,以确保状态最终被清理。此配置将来可能会被移除。 |
3.0.0 |
spark.shuffle.service.db.backend |
ROCKSDB | 当 spark.shuffle.service.db.enabled 为 true 时,用户可以使用此选项指定 Shuffle 服务状态存储中使用的基于磁盘的存储类型。目前支持 ROCKSDB 和 LEVELDB (已弃用),默认值为 ROCKSDB 。目前,RocksDB/LevelDB 中的原始数据存储不会自动转换为另一种存储类型。 |
3.4.0 |
spark.storage.cleanupFilesAfterExecutorExit |
true | 启用在 Executor 退出后清理 Worker 目录中非 Shuffle 文件(例如临时 Shuffle 块、缓存的 RDD/广播块、溢出文件等)。请注意,这与 spark.worker.cleanup.enabled 没有重叠,因为此选项启用清理死亡 Executor 本地目录中的非 Shuffle 文件,而 spark.worker.cleanup.enabled 启用清理已停止和超时的应用程序的所有文件/子目录。这仅影响独立模式,将来可能会添加对其他集群管理器的支持。 |
2.4.0 |
spark.worker.ui.compressedLogFileLengthCacheSize |
100 | 对于压缩的日志文件,解压后的文件大小只能通过解压文件来计算。Spark 会缓存压缩日志文件的解压后文件大小。此属性控制缓存大小。 | 2.0.2 |
spark.worker.idPattern |
worker-%s-%s-%d |
基于 Java String.format 方法的 Worker ID 生成模式。默认值为 worker-%s-%s-%d ,它表示现有的 Worker ID 字符串,例如 worker-20231109183042-[fe80::1%lo0]-39729 。请注意生成唯一 ID。 |
4.0.0 |
资源分配和配置概述
请确保已阅读配置页面上的“自定义资源调度和配置概述”部分。本节仅讨论 Spark 独立模式中资源调度的特定方面。
Spark 独立模式包含两个部分,第一部分是配置 Worker 的资源,第二部分是为特定应用程序分配资源。
用户必须配置 Worker 以使其拥有一组可用资源,以便它可以将这些资源分配给 Executor。spark.worker.resource.{resourceName}.amount
用于控制 Worker 已分配的每种资源的数量。用户还必须指定 spark.worker.resourcesFile
或 spark.worker.resource.{resourceName}.discoveryScript
来指定 Worker 如何发现分配给它的资源。请参阅上面对每个选项的描述,以了解哪种方法最适合你的设置。
第二部分是在 Spark 独立模式下运行应用程序。标准 Spark 资源配置中唯一的特殊情况是在客户端模式下运行 Driver。对于客户端模式下的 Driver,用户可以通过 spark.driver.resourcesFile
或 spark.driver.resource.{resourceName}.discoveryScript
指定其使用的资源。如果 Driver 与其他 Driver 在同一主机上运行,请确保资源文件或发现脚本只返回与同一节点上运行的其他 Driver 不冲突的资源。
请注意,用户在提交应用程序时无需指定发现脚本,因为 Worker 将使用分配给每个 Executor 的资源来启动它们。
将应用程序连接到集群
要在 Spark 集群上运行应用程序,只需将 Master 的 spark://IP:PORT
URL 传递给 SparkContext
构造函数。
要针对集群运行交互式 Spark Shell,请运行以下命令:
./bin/spark-shell --master spark://IP:PORT
你还可以传递选项 --total-executor-cores <numCores>
来控制 spark-shell 在集群上使用的核心数量。
客户端属性
Spark 应用程序支持以下独立模式特定的配置属性
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.standalone.submit.waitAppCompletion |
false |
在独立集群模式下,控制客户端是否等待应用程序完成再退出。如果设置为 true ,客户端进程将保持活动状态并轮询驱动程序的状况。否则,客户端进程将在提交后退出。 |
3.1.0 |
启动 Spark 应用程序
Spark 协议
通过 spark-submit
脚本可以将编译好的 Spark 应用程序提交到集群,这是最直接的方式。对于独立集群,Spark 目前支持两种部署模式。在 client
模式下,驱动程序在提交应用程序的客户端的同一进程中启动。而在 cluster
模式下,驱动程序从集群内部的一个 Worker 进程中启动,并且客户端进程在完成提交应用程序的职责后立即退出,而无需等待应用程序完成。
如果你的应用程序是通过 Spark submit 启动的,那么应用程序 Jar 包会自动分发到所有 Worker 节点。对于你的应用程序所依赖的任何额外 Jar 包,你应该使用逗号作为分隔符通过 --jars
标志指定它们(例如 --jars jar1,jar2
)。要控制应用程序的配置或执行环境,请参阅Spark 配置。
此外,独立 cluster
模式支持在应用程序以非零退出代码退出时自动重启。要使用此功能,你可以在启动应用程序时将 --supervise
标志传递给 spark-submit
。然后,如果你希望终止一个反复失败的应用程序,可以通过以下方式进行:
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
你可以通过独立 Master Web UI http://<master url>:8080
找到驱动程序 ID。
REST API
如果 spark.master.rest.enabled
已启用,Spark Master 将通过 http://[host:port]/[version]/submissions/[action]
提供额外的 REST API,其中 host
是 Master 主机,port
是 spark.master.rest.port
指定的端口号(默认:6066),version
是协议版本,目前为 v1
,action
是以下支持的操作之一。
命令 | HTTP 方法 | 描述 | 起始版本 |
---|---|---|---|
create |
POST | 通过 cluster 模式创建 Spark 驱动程序。自 4.0.0 版本起,Spark Master 支持对 Spark 属性和环境变量的值进行服务器端变量替换。 |
1.3.0 |
kill |
POST | 终止单个 Spark 驱动程序。 | 1.3.0 |
killall |
POST | 终止所有正在运行的 Spark 驱动程序。 | 4.0.0 |
status |
GET | 检查 Spark 作业的状态。 | 1.3.0 |
clear |
POST | 清除已完成的驱动程序和应用程序。 | 4.0.0 |
以下是使用 pi.py
和 REST API 的 curl
CLI 命令示例。
$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Content-Type:application/json;charset=UTF-8" \
--data '{
"appResource": "",
"sparkProperties": {
"spark.master": "spark://master:7077",
"spark.app.name": "Spark Pi",
"spark.driver.memory": "1g",
"spark.driver.cores": "1",
"spark.jars": ""
},
"clientSparkVersion": "",
"mainClass": "org.apache.spark.deploy.SparkSubmit",
"environmentVariables": { },
"action": "CreateSubmissionRequest",
"appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ]
}'
以下是上述 create
请求的 REST API 响应。
{
"action" : "CreateSubmissionResponse",
"message" : "Driver successfully submitted as driver-20231124153531-0000",
"serverSparkVersion" : "4.0.0",
"submissionId" : "driver-20231124153531-0000",
"success" : true
}
当 Spark Master 通过 spark.master.rest.filters=org.apache.spark.ui.JWSFilter
和 spark.org.apache.spark.ui.JWSFilter.param.secretKey=BASE64URL-ENCODED-KEY
配置需要 HTTP Authorization
头时,curl
CLI 命令可以提供所需的头,如下所示。
$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Authorization: Bearer USER-PROVIDED-WEB-TOEN-SIGNED-BY-THE-SAME-SHARED-KEY"
...
对于 sparkProperties
和 environmentVariables
,用户可以使用服务器端环境变量的占位符,如下所示。
...
"sparkProperties": {
"spark.hadoop.fs.s3a.endpoint": "{{AWS_ENDPOINT_URL}}",
"spark.hadoop.fs.s3a.endpoint.region": "{{AWS_REGION}}"
},
"environmentVariables": {
"AWS_CA_BUNDLE": "{{AWS_CA_BUNDLE}}"
},
...
资源调度
独立集群模式目前仅支持跨应用程序的简单 FIFO 调度器。然而,为了允许多个并发用户,你可以控制每个应用程序将使用的最大资源数量。默认情况下,它将获取集群中的所有核心,这仅在你一次只运行一个应用程序时才有意义。你可以通过在 SparkConf 中设置 spark.cores.max
来限制核心数量。例如:
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)
此外,你可以在集群 Master 进程上配置 spark.deploy.defaultCores
,以更改未将 spark.cores.max
设置为非无限值的应用程序的默认值。通过将以下内容添加到 conf/spark-env.sh
来实现:
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
这在共享集群中很有用,因为用户可能没有单独配置最大核心数。
Executor 调度
分配给每个 Executor 的核心数是可配置的。当明确设置 spark.executor.cores
时,如果 Worker 有足够的内核和内存,来自同一应用程序的多个 Executor 可能会在同一个 Worker 上启动。否则,每个 Executor 默认会占用 Worker 上所有可用的核心,在这种情况下,在一次调度迭代中,每个 Worker 上可能只启动一个应用程序的一个 Executor。
阶段级别调度概述
独立模式支持阶段级别调度
- 当禁用动态分配时:它允许用户在阶段级别指定不同的任务资源要求,并将使用启动时请求的相同 Executor。
- 当启用动态分配时:目前,当 Master 为一个应用程序分配 Executor 时,它将根据多个 ResourceProfile 的 ResourceProfile ID 的顺序进行调度。ID 较小的 ResourceProfile 将首先被调度。通常这无关紧要,因为 Spark 在启动另一个阶段之前会完成一个阶段,唯一可能产生影响的情况是在作业服务器类型的场景中,所以这一点需要记住。对于调度,我们只会从内置 Executor 资源中获取 Executor 内存和 Executor 核心,并从 ResourceProfile 中获取所有其他自定义资源,其他内置 Executor 资源(如 offHeap 和 memoryOverhead)将不起作用。当你提交应用程序时,基础默认配置文件将根据 Spark 配置创建。来自基础默认配置文件的 Executor 内存和 Executor 核心可以传播到自定义 ResourceProfiles,但所有其他自定义资源无法传播。
注意事项
正如动态资源分配中所述,如果在启用动态分配时未明确指定每个 Executor 的核心数,Spark 可能会获取比预期多得多的 Executor。因此,在使用阶段级别调度时,建议你为每个资源配置文件明确设置 Executor 核心数。
监控与日志记录
Spark 的独立模式提供了一个基于 Web 的用户界面来监控集群。Master 和每个 Worker 都有自己的 Web UI,显示集群和作业统计信息。默认情况下,你可以通过 8080 端口访问 Master 的 Web UI。该端口可以在配置文件中或通过命令行选项更改。
此外,每个作业的详细日志输出也会写入每个 Worker 节点的工作目录(默认情况下为 SPARK_HOME/work
)。每个作业将看到两个文件,stdout
和 stderr
,其中包含其写入控制台的所有输出。
与 Hadoop 并行运行
你可以通过在相同的机器上将其作为单独的服务启动,从而使 Spark 与现有 Hadoop 集群并行运行。要从 Spark 访问 Hadoop 数据,只需使用 hdfs:// URL(通常是 hdfs://<namenode>:9000/path
,但你可以在 Hadoop Namenode 的 Web UI 上找到正确的 URL)。另外,你也可以为 Spark 设置一个单独的集群,并仍然让它通过网络访问 HDFS;这会比磁盘本地访问慢,但如果你仍在同一局域网中运行(例如,你在拥有 Hadoop 的每个机架上放置几台 Spark 机器),这可能不是问题。
配置网络安全端口
一般来说,Spark 集群及其服务不会部署在公共互联网上。它们通常是私有服务,并且只能在部署 Spark 的组织网络内部访问。对 Spark 服务使用的主机和端口的访问应仅限于需要访问这些服务的源主机。
这对于使用独立资源管理器的集群尤为重要,因为它们不像其他资源管理器那样支持细粒度访问控制。
有关要配置的端口的完整列表,请参阅安全页面。
高可用性
默认情况下,独立调度集群对 Worker 故障具有弹性(Spark 本身通过将工作转移到其他 Worker 来抵抗工作丢失)。然而,调度器使用 Master 来进行调度决策,这(默认情况下)创建了一个单点故障:如果 Master 崩溃,则无法创建新的应用程序。为了规避这一点,我们有两种高可用性方案,详述如下。
使用 ZooKeeper 实现备用 Master
概述
利用 ZooKeeper 提供领导者选举和一些状态存储,你可以在集群中启动连接到同一 ZooKeeper 实例的多个 Master。其中一个将被选举为“领导者”,其他 Master 将保持备用模式。如果当前领导者宕机,将选举另一个 Master,恢复旧 Master 的状态,然后恢复调度。整个恢复过程(从第一个领导者宕机开始)应在 1 到 2 分钟之间。请注意,此延迟仅影响调度新应用程序——在 Master 故障转移期间已经运行的应用程序不受影响。
在此处了解更多关于 ZooKeeper 入门的信息。
配置
为了启用此恢复模式,你可以通过配置 spark.deploy.recoveryMode
和相关的 spark.deploy.zookeeper.*
配置,在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS
。
可能的陷阱:如果你的集群中有多个 Master 但未能正确配置 Master 使用 ZooKeeper,则 Master 将无法相互发现,并认为它们都是领导者。这不会导致健康的集群状态(因为所有 Master 将独立调度)。
详情
设置好 ZooKeeper 集群后,启用高可用性非常简单。只需在不同节点上启动多个 Master 进程,并使用相同的 ZooKeeper 配置(ZooKeeper URL 和目录)。Master 可以随时添加和移除。
为了调度新的应用程序或向集群添加 Worker,它们需要知道当前领导者的 IP 地址。这可以通过简单地传入 Master 列表来实现,而以前你只传入单个 Master。例如,你可能会将 SparkContext 指向 spark://host1:port1,host2:port2
。这将导致你的 SparkContext 尝试向两个 Master 注册——如果 host1
宕机,此配置仍然正确,因为我们将找到新的领导者 host2
。
“向 Master 注册”和“正常操作”之间存在一个重要的区别。启动时,应用程序或 Worker 需要能够找到并向当前领导 Master 注册。一旦成功注册,它就“进入系统”(即存储在 ZooKeeper 中)。如果发生故障转移,新的领导者将联系所有先前注册的应用程序和 Worker,告知它们领导权变更,因此它们在启动时甚至无需知道新 Master 的存在。
由于这个特性,可以随时创建新的 Master,你唯一需要担心的是,新应用程序和 Worker 能够在它成为领导者时找到它进行注册。一旦注册成功,你就不必担心了。
使用本地文件系统实现单节点恢复
概述
对于生产级别的高可用性,ZooKeeper 是最佳选择,但如果你只是想在 Master 宕机时能够重新启动它,FILESYSTEM 模式可以解决这个问题。当应用程序和 Worker 注册时,它们会将足够的状态写入提供的目录,以便在 Master 进程重启时可以恢复它们。
配置
为了启用此恢复模式,你可以使用此配置在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS
系统属性 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.deploy.recoveryMode |
NONE | 恢复模式设置,用于在集群模式下已提交的 Spark 作业失败并重新启动时进行恢复。设置为 FILESYSTEM 以启用基于文件系统的单节点恢复模式,ROCKSDB 以启用基于 RocksDB 的单节点恢复模式,ZOOKEEPER 以使用基于 Zookeeper 的恢复模式,CUSTOM 则通过额外的 spark.deploy.recoveryMode.factory 配置提供自定义提供程序类。NONE 是默认值,它禁用此恢复模式。 |
0.8.1 |
spark.deploy.recoveryDirectory |
"" | Spark 将存储恢复状态的目录,可从 Master 的角度访问。请注意,如果 spark.deploy.recoveryMode 或 spark.deploy.recoveryCompressionCodec 发生更改,则应手动清除该目录。 |
0.8.1 |
spark.deploy.recoveryCompressionCodec |
(无) | 持久化引擎的压缩编解码器。none(默认)、lz4、lzf、snappy 和 zstd。目前,只有 FILESYSTEM 模式支持此配置。 | 4.0.0 |
spark.deploy.recoveryTimeout |
(无) | 恢复过程的超时时间。默认值与 spark.worker.timeout 相同。 |
4.0.0 |
spark.deploy.recoveryMode.factory |
"" | 实现 StandaloneRecoveryModeFactory 接口的类 |
1.2.0 |
spark.deploy.zookeeper.url |
无 | 当 spark.deploy.recoveryMode 设置为 ZOOKEEPER 时,此配置用于设置要连接的 Zookeeper URL。 |
0.8.1 |
spark.deploy.zookeeper.dir |
无 | 当 spark.deploy.recoveryMode 设置为 ZOOKEEPER 时,此配置用于设置存储恢复状态的 Zookeeper 目录。 |
0.8.1 |
详情
- 此解决方案可以与 monit 等进程监控/管理器结合使用,或者仅用于通过重启实现手动恢复。
- 虽然文件系统恢复似乎比不进行任何恢复要直接得多,但这种模式对于某些开发或实验目的可能不是最佳选择。特别是,通过 stop-master.sh 杀死 Master 不会清除其恢复状态,因此每当你启动新的 Master 时,它都会进入恢复模式。如果需要等待所有先前注册的 Worker/客户端超时,这可能会使启动时间增加多达 1 分钟。
- 尽管官方不支持,但你可以将 NFS 目录挂载为恢复目录。如果原始 Master 节点完全宕机,你可以在不同的节点上启动一个 Master,它将正确恢复所有先前注册的 Worker/应用程序(等同于 ZooKeeper 恢复)。然而,未来的应用程序必须能够找到新的 Master 才能注册。