Spark 独立模式

除了在 Mesos 或 YARN 集群管理器上运行之外,Spark 还提供了一个简单的独立部署模式。您可以手动启动独立集群,通过手动启动主节点和工作节点,或者使用我们提供的 启动脚本。也可以在一台机器上运行这些守护进程进行测试。

安全

默认情况下,不会启用身份验证等安全功能。当部署一个对互联网或不受信任的网络开放的集群时,重要的是要保护对集群的访问,以防止未经授权的应用程序在集群上运行。在运行 Spark 之前,请参阅 Spark 安全 以及本文档中的特定安全部分。

将 Spark 独立模式安装到集群

要安装 Spark 独立模式,您只需将 Spark 的编译版本放在集群中的每个节点上。您可以从每个版本中获取预构建的 Spark 版本,或者 自己构建

手动启动集群

您可以通过执行以下命令启动一个独立的主服务器

./sbin/start-master.sh

启动后,主节点将打印出自己的 spark://HOST:PORT URL,您可以使用它将工作节点连接到主节点,或者将其作为“master”参数传递给 SparkContext。您也可以在主节点的 Web UI 上找到此 URL,默认情况下为 https://127.0.0.1:8080

类似地,您可以启动一个或多个工作节点,并通过以下命令将它们连接到主节点

./sbin/start-worker.sh <master-spark-URL>

启动工作节点后,查看主节点的 Web UI(默认情况下为 https://127.0.0.1:8080)。您应该看到新节点列在那里,以及它的 CPU 和内存数量(减去留给操作系统的 1 GB)。

最后,以下配置选项可以传递给主节点和工作节点

参数含义
-h HOST, --host HOST 要监听的主机名
-i HOST, --ip HOST 要监听的主机名(已弃用,请使用 -h 或 --host)
-p PORT, --port PORT 服务要监听的端口(默认:主节点为 7077,工作节点为随机端口)
--webui-port PORT Web UI 的端口(默认:主节点为 8080,工作节点为 8081)
-c CORES, --cores CORES 允许 Spark 应用程序在机器上使用的总 CPU 内核数(默认:所有可用内核);仅在工作节点上
-m MEM, --memory MEM 允许 Spark 应用程序在机器上使用的总内存量,格式为 1000M 或 2G(默认:机器的总 RAM 减去 1 GiB);仅在工作节点上
-d DIR, --work-dir DIR 用于临时空间和作业输出日志的目录(默认:SPARK_HOME/work);仅在工作节点上
--properties-file FILE 要加载的自定义 Spark 属性文件的路径(默认:conf/spark-defaults.conf)

集群启动脚本

要使用启动脚本启动 Spark 独立集群,您应该在 Spark 目录中创建一个名为 conf/workers 的文件,该文件必须包含您打算启动 Spark 工作节点的所有机器的主机名,每行一个。如果 conf/workers 不存在,启动脚本默认使用一台机器(localhost),这对于测试很有用。请注意,主节点通过 ssh 访问每个工作节点。默认情况下,ssh 是并行运行的,需要设置无密码(使用私钥)访问。如果您没有设置无密码访问,您可以设置环境变量 SPARK_SSH_FOREGROUND 并为每个工作节点依次提供密码。

设置完此文件后,您可以使用以下 shell 脚本启动或停止集群,这些脚本基于 Hadoop 的部署脚本,位于 SPARK_HOME/sbin

请注意,这些脚本必须在您要运行 Spark 主节点的机器上执行,而不是您的本地机器上。

您可以通过在 conf/spark-env.sh 中设置环境变量来进一步配置集群。通过从 conf/spark-env.sh.template 开始创建此文件,并将它复制到所有工作节点,以便设置生效。以下设置可用

环境变量含义
SPARK_MASTER_HOST 将主节点绑定到特定主机名或 IP 地址,例如公共地址。
SPARK_MASTER_PORT 在不同的端口启动主节点(默认:7077)。
SPARK_MASTER_WEBUI_PORT 主节点 Web UI 的端口(默认:8080)。
SPARK_MASTER_OPTS 仅适用于主节点的配置属性,格式为“-Dx=y”(默认:无)。有关可能的选项列表,请参见下文。
SPARK_LOCAL_DIRS 用于 Spark 中“临时”空间的目录,包括映射输出文件和存储在磁盘上的 RDD。这应该在您的系统中的快速本地磁盘上。它也可以是不同磁盘上多个目录的逗号分隔列表。
SPARK_WORKER_CORES 允许 Spark 应用程序在机器上使用的总内核数(默认:所有可用内核)。
SPARK_WORKER_MEMORY 允许 Spark 应用程序在机器上使用的总内存量,例如 1000m2g(默认:总内存减去 1 GiB);请注意,每个应用程序的单独内存是使用其 spark.executor.memory 属性配置的。
SPARK_WORKER_PORT 在特定端口启动 Spark 工作节点(默认:随机端口)。
SPARK_WORKER_WEBUI_PORT 工作节点 Web UI 的端口(默认:8081)。
SPARK_WORKER_DIR 在其中运行应用程序的目录,其中将包含日志和临时空间(默认:SPARK_HOME/work)。
SPARK_WORKER_OPTS 仅适用于工作节点的配置属性,格式为“-Dx=y”(默认:无)。有关可能的选项列表,请参见下文。
SPARK_DAEMON_MEMORY 分配给 Spark 主节点和工作节点守护进程本身的内存(默认:1g)。
SPARK_DAEMON_JAVA_OPTS Spark 主节点和工作节点守护进程本身的 JVM 选项,格式为“-Dx=y”(默认:无)。
SPARK_DAEMON_CLASSPATH Spark 主节点和工作节点守护进程本身的类路径(默认:无)。
SPARK_PUBLIC_DNS Spark 主节点和工作节点的公共 DNS 名称(默认:无)。

注意:启动脚本目前不支持 Windows。要在 Windows 上运行 Spark 集群,请手动启动主节点和工作节点。

SPARK_MASTER_OPTS 支持以下系统属性

属性名称默认值含义自版本
spark.master.ui.port 8080 指定主节点 Web UI 端点的端口号。 1.1.0
spark.master.ui.decommission.allow.mode LOCAL 指定主节点 Web UI 的 /workers/kill 端点的行为。可能的选项是:LOCAL 表示允许来自运行主节点的机器的本地 IP 的此端点,DENY 表示完全禁用此端点,ALLOW 表示允许从任何 IP 调用此端点。 3.1.0
spark.master.rest.enabled false 是否使用主节点 REST API 端点。 1.3.0
spark.master.rest.port 6066 指定主节点 REST API 端点的端口号。 1.3.0
spark.deploy.retainedApplications 200 要显示的已完成应用程序的最大数量。较旧的应用程序将从 UI 中删除以保持此限制。
0.8.0
spark.deploy.retainedDrivers 200 要显示的已完成驱动程序的最大数量。较旧的驱动程序将从 UI 中删除以保持此限制。
1.1.0
spark.deploy.spreadOut true 独立集群管理器是否应该将应用程序分散到各个节点上,还是尝试将它们合并到尽可能少的节点上。分散通常对 HDFS 中的数据局部性更好,但合并对计算密集型工作负载更有效。
0.6.1
spark.deploy.defaultCores (无限) 如果应用程序没有设置 spark.cores.max,则在 Spark 独立模式下为应用程序提供的默认内核数。如果未设置,应用程序将始终获得所有可用内核,除非它们自己配置 spark.cores.max。在共享集群上将此值设置得更低,以防止用户默认情况下占用整个集群。
0.9.0
spark.deploy.maxExecutorRetries 10 在独立集群管理器移除有故障的应用程序之前,可以发生的连续执行器故障的最大数量限制。如果应用程序有任何正在运行的执行器,则该应用程序永远不会被移除。如果应用程序在连续的故障中遇到超过 spark.deploy.maxExecutorRetries 次故障,并且在这些故障之间没有执行器成功启动运行,并且应用程序没有正在运行的执行器,那么独立集群管理器将移除该应用程序并将其标记为失败。要禁用此自动移除,请将 spark.deploy.maxExecutorRetries 设置为 -1
1.6.3
spark.worker.timeout 60 如果独立部署主节点在指定时间内未收到心跳,则它将认为工作节点已丢失的时间(秒)。 0.6.2
spark.worker.resource.{name}.amount (无) 在工作节点上使用的特定资源的数量。 3.0.0
spark.worker.resource.{name}.discoveryScript (无) 资源发现脚本的路径,用于在工作节点启动时查找特定资源。脚本的输出应格式化为 ResourceInformation 类。 3.0.0
spark.worker.resourcesFile (无) 资源文件的路径,用于在工作节点启动时查找各种资源。资源文件的内容应格式化为 [{"id":{"componentName": "spark.worker", "resourceName":"gpu"}, "addresses":["0","1","2"]}]。如果在资源文件中找不到特定资源,则将使用发现脚本查找该资源。如果发现脚本也找不到资源,则工作节点将无法启动。 3.0.0

SPARK_WORKER_OPTS 支持以下系统属性

属性名称默认值含义自版本
spark.worker.cleanup.enabled false 启用定期清理工作节点/应用程序目录。请注意,这仅影响独立模式,因为 YARN 的工作方式不同。仅清理已停止应用程序的目录。如果 spark.shuffle.service.db.enabled 为“true”,则应启用此选项。 1.0.0
spark.worker.cleanup.interval 1800(30 分钟) 控制工作节点清理本地机器上旧应用程序工作目录的间隔(秒)。 1.0.0
spark.worker.cleanup.appDataTtl 604800(7 天,7 * 24 * 3600) 在每个工作节点上保留应用程序工作目录的秒数。这是一个生存时间,应取决于您拥有的可用磁盘空间量。应用程序日志和 jar 包将下载到每个应用程序工作目录。随着时间的推移,工作目录会很快填满磁盘空间,尤其是在您频繁运行作业的情况下。 1.0.0
spark.shuffle.service.db.enabled true 将外部 Shuffle 服务状态存储在本地磁盘上,以便在外部 Shuffle 服务重新启动时,它将自动重新加载有关当前执行器的信息。这仅影响独立模式(yarn 始终启用此行为)。您还应启用 spark.worker.cleanup.enabled,以确保状态最终被清理。此配置将来可能会被移除。 3.0.0
spark.shuffle.service.db.backend LEVELDB spark.shuffle.service.db.enabled 为 true 时,用户可以使用此选项指定 Shuffle 服务状态存储中使用的磁盘存储类型。目前支持 `LEVELDB` 和 `ROCKSDB`,默认值为 `LEVELDB`。原始的 `LevelDB/RocksDB` 数据存储目前不会自动转换为其他类型的存储。 3.4.0
spark.storage.cleanupFilesAfterExecutorExit true 启用清理工作节点目录中执行器退出后的非 Shuffle 文件(如临时 Shuffle 块、缓存的 RDD/广播块、溢出文件等)。请注意,这与 spark.worker.cleanup.enabled 不重叠,因为这启用了清理已死执行器本地目录中的非 Shuffle 文件,而 spark.worker.cleanup.enabled 启用了清理已停止和超时应用程序的所有文件/子目录。这仅影响独立模式,将来可能会添加对其他集群管理器的支持。 2.4.0
spark.worker.ui.compressedLogFileLengthCacheSize 100 对于压缩日志文件,只能通过解压缩文件来计算未压缩文件。Spark 会缓存压缩日志文件的未压缩文件大小。此属性控制缓存大小。 2.0.2

资源分配和配置概述

请确保已阅读 配置页面 上的自定义资源调度和配置概述部分。本节仅讨论 Spark Standalone 资源调度的特定方面。

Spark Standalone 包含两个部分,第一部分是为工作节点配置资源,第二部分是为特定应用程序分配资源。

用户必须配置工作节点以提供一组可用的资源,以便它可以将这些资源分配给执行器。spark.worker.resource.{resourceName}.amount 用于控制工作节点分配的每个资源的数量。用户还必须指定 spark.worker.resourcesFilespark.worker.resource.{resourceName}.discoveryScript 来指定工作节点如何发现其分配的资源。请参阅上面对每个选项的描述,以了解哪种方法最适合您的设置。

第二部分是在 Spark Standalone 上运行应用程序。标准 Spark 资源配置中唯一的特殊情况是您在客户端模式下运行驱动程序时。对于客户端模式下的驱动程序,用户可以通过 spark.driver.resourcesFilespark.driver.resource.{resourceName}.discoveryScript 指定其使用的资源。如果驱动程序在与其他驱动程序相同的主机上运行,请确保资源文件或发现脚本仅返回与同一节点上运行的其他驱动程序不冲突的资源。

请注意,用户在提交应用程序时不需要指定发现脚本,因为工作节点将使用其分配的资源启动每个执行器。

将应用程序连接到集群

要在 Spark 集群上运行应用程序,只需将主节点的 spark://IP:PORT URL 传递给 SparkContext 构造函数

要在集群上运行交互式 Spark shell,请运行以下命令

./bin/spark-shell --master spark://IP:PORT

您还可以传递一个选项 --total-executor-cores <numCores> 来控制 spark-shell 在集群上使用的核心数。

客户端属性

Spark 应用程序支持以下特定于独立模式的配置属性

属性名称默认值含义自版本
spark.standalone.submit.waitAppCompletion false 在独立集群模式下,控制客户端是否等待应用程序完成才退出。如果设置为 true,则客户端进程将保持活动状态并轮询驱动程序的状态。否则,客户端进程将在提交后退出。 3.1.0

启动 Spark 应用程序

Spark 协议

spark-submit 脚本 提供了将已编译的 Spark 应用程序提交到集群的最直接方法。对于独立集群,Spark 目前支持两种部署模式。在 client 模式下,驱动程序在与提交应用程序的客户端相同的进程中启动。但是,在 cluster 模式下,驱动程序是从集群内的某个工作节点进程启动的,客户端进程在完成提交应用程序的职责后立即退出,而不会等待应用程序完成。

如果您的应用程序是通过 Spark submit 启动的,那么应用程序 jar 包将自动分发到所有工作节点。对于您的应用程序依赖的任何其他 jar 包,您应该通过 --jars 标志使用逗号作为分隔符来指定它们(例如 --jars jar1,jar2)。要控制应用程序的配置或执行环境,请参阅 Spark 配置

此外,独立 cluster 模式支持在应用程序以非零退出代码退出时自动重启应用程序。要使用此功能,您可以在启动应用程序时将 --supervise 标志传递给 spark-submit。然后,如果您希望杀死反复失败的应用程序,可以通过以下方式进行

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

您可以在独立主节点 Web UI 的 http://<master url>:8080 中找到驱动程序 ID。

REST API

如果启用了 spark.master.rest.enabled,Spark 主节点将通过 http://[host:port]/[version]/submissions/[action] 提供额外的 REST API,其中 host 是主节点主机,port 是由 spark.master.rest.port 指定的端口号(默认:6066),version 是协议版本,今天为 v1action 是以下支持的操作之一。

命令描述HTTP 方法自版本
create 通过 cluster 模式创建 Spark 驱动程序。 POST 1.3.0
kill 杀死单个 Spark 驱动程序。 POST 1.3.0
status 检查 Spark 作业的状态。 GET 1.3.0

以下是使用 pi.py 和 REST API 的 curl CLI 命令示例。

$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Content-Type:application/json;charset=UTF-8" \
--data '{
  "appResource": "",
  "sparkProperties": {
    "spark.master": "spark://master:7077",
    "spark.app.name": "Spark Pi",
    "spark.driver.memory": "1g",
    "spark.driver.cores": "1",
    "spark.jars": ""
  },
  "clientSparkVersion": "",
  "mainClass": "org.apache.spark.deploy.SparkSubmit",
  "environmentVariables": { },
  "action": "CreateSubmissionRequest",
  "appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ]
}'

以下是上述 create 请求的 REST API 响应。

{
  "action" : "CreateSubmissionResponse",
  "message" : "Driver successfully submitted as driver-20231124153531-0000",
  "serverSparkVersion" : "3.5.1",
  "submissionId" : "driver-20231124153531-0000",
  "success" : true
}

资源调度

独立集群模式目前仅支持跨应用程序的简单 FIFO 调度程序。但是,为了允许多个并发用户,您可以控制每个应用程序将使用的最大资源数量。默认情况下,它将获取集群中的所有核心,这只有在您一次只运行一个应用程序时才有意义。您可以通过在 SparkConf 中设置 spark.cores.max 来限制核心数量。例如

val conf = new SparkConf()
  .setMaster(...)
  .setAppName(...)
  .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

此外,您可以在集群主节点进程上配置 spark.deploy.defaultCores,以将未设置 spark.cores.max 的应用程序的默认值更改为小于无穷大的值。为此,请将以下内容添加到 conf/spark-env.sh

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

这在共享集群中很有用,因为用户可能没有单独配置最大核心数量。

执行器调度

分配给每个执行器的核心数量是可配置的。当显式设置 spark.executor.cores 时,来自同一应用程序的多个执行器可能会在同一工作节点上启动,前提是工作节点有足够的核心和内存。否则,每个执行器默认情况下会获取工作节点上可用的所有核心,在这种情况下,在一次调度迭代中,每个应用程序可能只会在每个工作节点上启动一个执行器。

阶段级调度概述

Standalone 支持阶段级调度

注意事项

动态资源分配 中所述,如果在启用动态分配的情况下没有显式指定每个执行器的核心,则 spark 可能获取的执行器数量远远超过预期。因此,建议您在使用阶段级调度时为每个资源配置文件显式设置执行器核心。

监控和日志记录

Spark 的独立模式提供了一个基于 Web 的用户界面来监控集群。主节点和每个工作节点都有自己的 Web UI,显示集群和作业统计信息。默认情况下,您可以在端口 8080 上访问主节点的 Web UI。可以通过配置文件或命令行选项更改端口。

此外,每个作业的详细日志输出也会写入每个工作节点的工作目录(默认情况下为 SPARK_HOME/work)。您将看到每个作业的两个文件,stdoutstderr,其中包含它写入控制台的所有输出。

与 Hadoop 一起运行

您只需在同一台机器上将 Spark 作为单独的服务启动,就可以将其与现有的 Hadoop 集群一起运行。要从 Spark 访问 Hadoop 数据,只需使用 hdfs:// URL(通常为 hdfs://<namenode>:9000/path,但您可以在 Hadoop NameNode 的 Web UI 上找到正确的 URL)。或者,您可以为 Spark 设置一个单独的集群,并且仍然可以通过网络访问 HDFS;这将比磁盘本地访问慢,但如果您仍然在同一个本地网络中运行(例如,您在每个安装了 Hadoop 的机架上放置几台 Spark 机器),则可能不是问题。

为网络安全配置端口

一般来说,Spark 集群及其服务不会部署在公共互联网上。它们通常是私有服务,并且应该只能在部署 Spark 的组织的网络内访问。对 Spark 服务使用的主机和端口的访问应限制为需要访问这些服务的源主机。

这对于使用独立资源管理器的集群尤其重要,因为它们不像其他资源管理器那样支持细粒度的访问控制。

有关要配置的端口的完整列表,请参阅 安全页面

高可用性

默认情况下,独立调度集群对工作节点故障具有弹性(就 Spark 本身而言,它对通过将工作转移到其他工作节点而丢失工作具有弹性)。但是,调度程序使用主节点来做出调度决策,这(默认情况下)会创建一个单点故障:如果主节点崩溃,将无法创建任何新应用程序。为了规避这种情况,我们有两个高可用性方案,如下所述。

使用 ZooKeeper 的备用主节点

概述

利用 ZooKeeper 提供领导者选举和一些状态存储,您可以在集群中启动多个主节点,这些主节点连接到同一个 ZooKeeper 实例。其中一个将被选为“领导者”,而其他节点将保持待机模式。如果当前领导者死亡,另一个主节点将被选出,恢复旧主节点的状态,然后恢复调度。整个恢复过程(从第一个领导者宕机开始)应该在 1 到 2 分钟内完成。请注意,此延迟只会影响调度应用程序 - 在主节点故障转移期间已经运行的应用程序不受影响。

在此处了解有关 ZooKeeper 入门的更多信息 此处

配置

为了启用此恢复模式,您可以在 spark-env 中设置 SPARK_DAEMON_JAVA_OPTS,方法是配置 spark.deploy.recoveryMode 和相关的 spark.deploy.zookeeper.* 配置。有关这些配置的更多信息,请参阅 配置文档

可能的陷阱:如果您在集群中有多个主节点,但未能正确配置主节点以使用 ZooKeeper,则主节点将无法互相发现并认为它们都是领导者。这将不会导致健康的集群状态(因为所有主节点都将独立调度)。

细节

设置好 ZooKeeper 集群后,启用高可用性非常简单。只需在不同的节点上启动多个主节点进程,并使用相同的 ZooKeeper 配置(ZooKeeper URL 和目录)。主节点可以随时添加和删除。

为了调度新应用程序或将工作节点添加到集群,它们需要知道当前领导者的 IP 地址。这可以通过简单地传递一个主节点列表来完成,您以前是在其中传递单个主节点。例如,您可能会启动指向 spark://host1:port1,host2:port2 的 SparkContext。这将导致您的 SparkContext 尝试向两个主节点注册 - 如果 host1 宕机,此配置仍然正确,因为我们会找到新的领导者,host2

在“向主节点注册”和正常操作之间存在一个重要的区别。启动时,应用程序或工作节点需要能够找到并向当前领导主节点注册。但是,一旦成功注册,它就“在系统中”(即存储在 ZooKeeper 中)。如果发生故障转移,新的领导者将联系所有先前注册的应用程序和工作节点,通知他们领导者变更,因此他们甚至不需要在启动时知道新主节点的存在。

由于此属性,可以随时创建新的主节点,您唯一需要担心的是应用程序和工作节点能够找到它以注册,以防它成为领导者。一旦注册,您就不用担心了。

使用本地文件系统的单节点恢复

概述

ZooKeeper 是生产级高可用性的最佳选择,但如果您只想在主节点宕机时能够重新启动它,FILESYSTEM 模式可以解决这个问题。当应用程序和工作节点注册时,它们会在提供的目录中写入足够的状态,以便在重新启动主节点进程时可以恢复它们。

配置

为了启用此恢复模式,您可以在 spark-env 中使用此配置设置 SPARK_DAEMON_JAVA_OPTS

系统属性含义自版本
spark.deploy.recoveryMode 设置为 FILESYSTEM 以启用单节点恢复模式(默认值:NONE)。 0.8.1
spark.deploy.recoveryDirectory Spark 将存储恢复状态的目录,该目录可从主节点的角度访问。 0.8.1

细节