作业调度
概述
Spark 提供了几种机制来调度计算之间的资源。首先,回想一下,如 集群模式概述 中所述,每个 Spark 应用程序(SparkContext 的实例)运行一组独立的执行器进程。Spark 运行的集群管理器提供了 跨应用程序调度 的功能。其次,在每个 Spark 应用程序中,如果多个“作业”(Spark 操作)是由不同的线程提交的,那么它们可能同时运行。如果您的应用程序通过网络提供服务,这很常见。Spark 包含一个 公平调度器 来调度每个 SparkContext 中的资源。
跨应用程序调度
在集群上运行时,每个 Spark 应用程序都会获得一组独立的执行器 JVM,这些 JVM 仅运行任务并存储该应用程序的数据。如果多个用户需要共享您的集群,则根据集群管理器,有不同的选项来管理分配。
最简单的选项,在所有集群管理器上都可用,是资源的静态分区。使用这种方法,每个应用程序都会获得一个它可以使用的最大资源量,并在整个持续时间内保留这些资源。这是 Spark 的 独立 和 YARN 模式以及 粗粒度 Mesos 模式 中使用的方法。资源分配可以根据集群类型进行配置,如下所示
- 独立模式:默认情况下,提交到独立模式集群的应用程序将按 FIFO(先进先出)顺序运行,并且每个应用程序都将尝试使用所有可用节点。您可以通过在应用程序中设置
spark.cores.max
配置属性来限制应用程序使用的节点数,或者通过spark.deploy.defaultCores
更改未设置此设置的应用程序的默认值。最后,除了控制核心外,每个应用程序的spark.executor.memory
设置控制其内存使用。 - Mesos:要在 Mesos 上使用静态分区,请将
spark.mesos.coarse
配置属性设置为true
,并可选地设置spark.cores.max
以限制每个应用程序的资源份额,如独立模式一样。您还应该设置spark.executor.memory
来控制执行器内存。 - YARN:Spark YARN 客户端的
--num-executors
选项控制它将在集群上分配多少个执行器(作为配置属性的spark.executor.instances
),而--executor-memory
(spark.executor.memory
配置属性)和--executor-cores
(spark.executor.cores
配置属性)控制每个执行器的资源。有关更多信息,请参阅 YARN Spark 属性。
Mesos 上可用的第二个选项是 CPU 核心的动态共享。在这种模式下,每个 Spark 应用程序仍然具有固定且独立的内存分配(由 spark.executor.memory
设置),但当应用程序未在机器上运行任务时,其他应用程序可能会在这些核心上运行任务。当您预期大量不太活跃的应用程序时,这种模式很有用,例如来自不同用户的 shell 会话。但是,它存在延迟不可预测的风险,因为应用程序可能需要一段时间才能在它有工作要做时重新获得一个节点上的核心。要使用此模式,只需使用 mesos://
URL 并将 spark.mesos.coarse
设置为 false。
请注意,目前没有模式提供跨应用程序的内存共享。如果您想以这种方式共享数据,我们建议您运行一个可以查询相同 RDD 来处理多个请求的单一服务器应用程序。
动态资源分配
Spark 提供了一种机制来动态调整应用程序占用的资源,具体取决于工作负载。这意味着您的应用程序可能会在不再使用时将资源归还给集群,并在以后有需求时再次请求它们。如果多个应用程序在您的 Spark 集群中共享资源,此功能特别有用。
此功能默认情况下处于禁用状态,并且在所有粗粒度集群管理器上都可用,即 独立模式、YARN 模式、Mesos 粗粒度模式 和 K8s 模式。
注意事项
- 在 独立模式 中,如果没有显式设置
spark.executor.cores
,每个执行器将获得工作程序的所有可用核心。在这种情况下,当启用动态分配时,spark 可能比预期获得更多执行器。当您想要在 独立模式 中使用动态分配时,建议您在问题 SPARK-30299 得到解决之前为每个执行器显式设置核心。
配置和设置
使用此功能有几种方法。无论您选择哪种方法,您的应用程序都必须首先将 spark.dynamicAllocation.enabled
设置为 true
,此外,
- 您的应用程序必须在您在同一集群中的每个工作程序节点上设置外部 shuffle 服务后将
spark.shuffle.service.enabled
设置为true
,或者 - 您的应用程序必须将
spark.dynamicAllocation.shuffleTracking.enabled
设置为true
,或者 - 您的应用程序必须将
spark.decommission.enabled
和spark.storage.decommission.shuffleBlocks.enabled
都设置为true
,或者 - 您的应用程序必须配置
spark.shuffle.sort.io.plugin.class
以使用自定义的ShuffleDataIO
,其ShuffleDriverComponents
支持可靠存储。
外部 shuffle 服务或 shuffle 跟踪或 ShuffleDriverComponents
支持可靠存储的目的是允许在不删除它们编写的 shuffle 文件的情况下移除执行器(更多详细信息在 下面 描述)。虽然启用 shuffle 跟踪很简单,但设置外部 shuffle 服务的方式在不同的集群管理器之间有所不同
在独立模式下,只需将 spark.shuffle.service.enabled
设置为 true
来启动您的工作程序。
在 Mesos 粗粒度模式下,在所有工作程序节点上运行 $SPARK_HOME/sbin/start-mesos-shuffle-service.sh
,并将 spark.shuffle.service.enabled
设置为 true
。例如,您可以通过 Marathon 来执行此操作。
在 YARN 模式下,请按照 此处 的说明进行操作。
所有其他相关配置都是可选的,并且位于 spark.dynamicAllocation.*
和 spark.shuffle.service.*
命名空间下。有关更多详细信息,请参阅 配置页面。
资源分配策略
从高层次上讲,Spark 应该在执行器不再使用时释放它们,并在需要时获取执行器。由于没有明确的方法来预测即将被移除的执行器是否会在不久的将来运行任务,或者即将被添加的新执行器是否实际上处于空闲状态,因此我们需要一组启发式方法来确定何时移除和请求执行器。
请求策略
启用动态分配的 Spark 应用程序会在有待调度挂起的任务时请求额外的执行器。这种情况必然意味着现有的执行器集不足以同时饱和所有已提交但尚未完成的任务。
Spark 按轮次请求执行器。实际请求是在有挂起任务等待调度 spark.dynamicAllocation.schedulerBacklogTimeout
秒时触发的,然后如果挂起任务队列持续存在,则每隔 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒再次触发。此外,每轮请求的执行器数量从上一轮开始呈指数增长。例如,应用程序将在第一轮添加 1 个执行器,然后在后续轮次中添加 2、4、8 个执行器,依此类推。
指数增长策略的动机有两个。首先,应用程序应该在开始时谨慎地请求执行器,以防事实证明只需要几个额外的执行器就足够了。这呼应了 TCP 慢启动的理由。其次,如果事实证明实际上需要许多执行器,应用程序应该能够及时地提高其资源使用率。
移除策略
移除执行器的策略要简单得多。当 Spark 应用程序中的执行器空闲时间超过 spark.dynamicAllocation.executorIdleTimeout
秒时,该应用程序会移除该执行器。请注意,在大多数情况下,此条件与请求条件互斥,因为如果仍有待调度的任务,则执行器不应该处于空闲状态。
执行器优雅下线
在动态分配之前,如果 Spark 执行器在关联的应用程序也退出时退出,则与该执行器关联的所有状态将不再需要,可以安全地丢弃。但是,使用动态分配时,应用程序在显式移除执行器时仍在运行。如果应用程序尝试访问存储在执行器中或由执行器写入的状态,则它将必须重新计算该状态。因此,Spark 需要一种机制来通过在移除执行器之前保留其状态来优雅地停用执行器。
此要求对于混洗尤其重要。在混洗期间,Spark 执行器首先将其自己的映射输出本地写入磁盘,然后在其他执行器尝试获取这些文件时充当这些文件的服务器。在出现落后者(运行时间远长于其同行的任务)的情况下,动态分配可能会在混洗完成之前移除执行器,在这种情况下,该执行器写入的混洗文件必须不必要地重新计算。
保留混洗文件的解决方案是使用外部混洗服务,该服务也是在 Spark 1.2 中引入的。此服务指的是一个长期运行的进程,它独立于您的 Spark 应用程序及其执行器在集群的每个节点上运行。如果启用了该服务,Spark 执行器将从该服务而不是彼此获取混洗文件。这意味着由执行器写入的任何混洗状态都可以在执行器生命周期结束后继续提供服务。
除了写入混洗文件外,执行器还会在磁盘或内存中缓存数据。但是,当移除执行器时,所有缓存的数据将不再可访问。为了缓解这种情况,默认情况下,包含缓存数据的执行器永远不会被移除。您可以使用 spark.dynamicAllocation.cachedExecutorIdleTimeout
配置此行为。当 spark.shuffle.service.fetch.rdd.enabled
设置为 true
时,Spark 可以使用 ExternalShuffleService 来获取磁盘持久化的 RDD 块。在动态分配的情况下,如果启用了此功能,则只有磁盘持久化块的执行器将在 spark.dynamicAllocation.executorIdleTimeout
后被视为空闲,并将相应地释放。在将来的版本中,缓存的数据可能会通过类似于混洗文件通过外部混洗服务保留的方式通过堆外存储保留。
应用程序内调度
在给定的 Spark 应用程序(SparkContext 实例)中,如果多个并行作业是从不同的线程提交的,则它们可以同时运行。在本节中,“作业”是指 Spark 操作(例如 save
、collect
)以及评估该操作需要运行的任何任务。Spark 的调度程序是完全线程安全的,支持此用例,以使应用程序能够为多个请求(例如,多个用户的查询)提供服务。
默认情况下,Spark 的调度程序以 FIFO 方式运行作业。每个作业被划分为“阶段”(例如,映射和归约阶段),第一个作业在所有可用资源上优先级最高,直到其阶段有任务要启动,然后第二个作业优先级最高,依此类推。如果队列首部的作业不需要使用整个集群,则后面的作业可以立即开始运行,但如果队列首部的作业很大,则后面的作业可能会被显著延迟。
从 Spark 0.8 开始,还可以配置作业之间的公平共享。在公平共享下,Spark 以“循环”方式在作业之间分配任务,以便所有作业获得大致相等的集群资源份额。这意味着在长时间运行的作业运行时提交的短作业可以立即开始接收资源,并且仍然可以获得良好的响应时间,而无需等待长时间运行的作业完成。此模式最适合多用户设置。
此功能默认情况下处于禁用状态,并且在所有粗粒度集群管理器上可用,即 独立模式、YARN 模式、K8s 模式 和 Mesos 粗粒度模式。要启用公平调度程序,只需在配置 SparkContext 时将 spark.scheduler.mode
属性设置为 FAIR
。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
公平调度池
公平调度程序还支持将作业分组到池中,并为每个池设置不同的调度选项(例如,权重)。这对于创建用于更重要作业的“高优先级”池很有用,例如,或者将每个用户的作业分组在一起,并为用户提供相等的份额(无论他们有多少并发作业),而不是为作业提供相等的份额。这种方法借鉴了 Hadoop 公平调度程序。
在没有任何干预的情况下,新提交的作业将进入默认池,但可以通过将 spark.scheduler.pool
“本地属性”添加到提交作业的线程中的 SparkContext 来设置作业的池。操作如下
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")
设置此本地属性后,所有在此线程中提交的作业(通过此线程中的对 RDD.save
、count
、collect
等的调用)将使用此池名称。该设置是针对每个线程的,以便于让一个线程代表同一个用户运行多个作业。如果您想清除与线程关联的池,只需调用
sc.setLocalProperty("spark.scheduler.pool", null)
池的默认行为
默认情况下,每个池获得集群的相等份额(也与默认池中的每个作业的份额相等),但在每个池内,作业以 FIFO 顺序运行。例如,如果您为每个用户创建一个池,这意味着每个用户将获得集群的相等份额,并且每个用户的查询将按顺序运行,而不是后面的查询从该用户的前面查询中获取资源。
配置池属性
还可以通过配置文件修改特定池的属性。每个池支持三个属性
schedulingMode
:这可以是 FIFO 或 FAIR,用于控制池内的作业是排队等待彼此(默认值)还是公平地共享池的资源。weight
:这控制着池相对于其他池的集群份额。默认情况下,所有池的权重都为 1。例如,如果您为特定池赋予权重 2,那么它将获得其他活动池的 2 倍资源。设置高权重(例如 1000)也可以实现池之间的优先级——本质上,权重为 1000 的池将始终能够在有作业活动时优先启动任务。minShare
:除了总体权重外,还可以为每个池指定一个最小份额(以 CPU 核心数表示),管理员希望它拥有该份额。公平调度程序始终尝试在根据权重重新分配额外资源之前满足所有活动池的最小份额。因此,minShare
属性可以是另一种确保池始终能够快速获得一定数量的资源(例如 10 个核心)的方法,而不会为集群的其余部分赋予它高优先级。默认情况下,每个池的minShare
为 0。
可以通过创建类似于 conf/fairscheduler.xml.template
的 XML 文件来设置池属性,并将名为 fairscheduler.xml
的文件放在类路径上,或者在您的 SparkConf 中设置 spark.scheduler.allocation.file
属性。文件路径遵循 hadoop 配置,可以是本地文件路径或 HDFS 文件路径。
// scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
XML 文件的格式很简单,每个池都有一个 <pool>
元素,其中包含用于各种设置的不同元素。例如
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
完整的示例也位于 conf/fairscheduler.xml.template
中。请注意,在 XML 文件中未配置的任何池将简单地获得所有设置的默认值(调度模式 FIFO、权重 1 和 minShare 0)。
使用 JDBC 连接进行调度
要为 JDBC 客户端会话设置 公平调度程序 池,用户可以设置 spark.sql.thriftserver.scheduler.pool
变量
SET spark.sql.thriftserver.scheduler.pool=accounting;
PySpark 中的并发作业
默认情况下,PySpark 不支持将 PVM 线程与 JVM 线程同步,并且在多个 PVM 线程中启动多个作业不能保证在每个相应的 JVM 线程中启动每个作业。由于此限制,无法在单独的 PVM 线程中通过 sc.setJobGroup
设置不同的作业组,这也禁止稍后通过 sc.cancelJobGroup
取消作业。
pyspark.InheritableThread
建议与 PVM 线程一起使用,以继承 JVM 线程中的可继承属性(例如本地属性)。