作业调度
概述
Spark 提供了多种资源调度机制以进行计算间的资源分配。首先,回想一下,如集群模式概述中所述,每个 Spark 应用程序(SparkContext 实例)运行一组独立的执行器进程。Spark 运行的集群管理器提供了跨应用程序调度的功能。其次,在每个 Spark 应用程序内部,如果多个“作业”(Spark actions)是由不同的线程提交的,它们可以同时运行。如果你的应用程序通过网络提供请求服务,这很常见。Spark 包含一个公平调度器来调度每个 SparkContext 中的资源。
跨应用程序调度
在集群上运行时,每个 Spark 应用程序都会获得一组独立的执行器 JVM,这些 JVM 只为该应用程序运行任务和存储数据。如果多个用户需要共享你的集群,则根据集群管理器的不同,有不同的选项来管理资源分配。
最简单的选项是静态资源分区,适用于所有集群管理器。通过这种方法,每个应用程序都分配有最大可用资源量,并在其整个运行期间一直持有这些资源。Spark 的Standalone和YARN模式以及K8s模式都采用了这种方法。资源分配可以根据集群类型进行如下配置:
- Standalone 模式:默认情况下,提交到 Standalone 模式集群的应用程序将按 FIFO(先进先出)顺序运行,并且每个应用程序都将尝试使用所有可用节点。你可以通过在其配置中设置
spark.cores.max
属性来限制应用程序使用的节点数量,或者通过spark.deploy.defaultCores
更改未设置此设置的应用程序的默认值。最后,除了控制核心数外,每个应用程序的spark.executor.memory
设置还控制其内存使用。 - YARN:Spark YARN 客户端的
--num-executors
选项控制它将在集群上分配多少个执行器(spark.executor.instances
作为配置属性),而--executor-memory
(spark.executor.memory
配置属性)和--executor-cores
(spark.executor.cores
配置属性)控制每个执行器的资源。更多信息,请参阅YARN Spark 属性。 - K8s:与 YARN 的情况相同,请参阅上述 YARN 的描述。此外,Spark on K8s 提供了比
spark.executor.cores
更高优先级的spark.kubernetes.executor.limit.cores
和spark.kubernetes.executor.request.cores
版本。更多信息,请参阅K8s Spark 属性。
请注意,目前所有模式都不支持跨应用程序的内存共享。如果你想以这种方式共享数据,我们建议运行一个独立的服务器应用程序,通过查询相同的 RDD 来处理多个请求。
动态资源分配
Spark 提供了一种机制,可以根据工作负载动态调整应用程序占用的资源。这意味着如果资源不再使用,你的应用程序可能会将其返还给集群,并在有需求时再次请求它们。如果多个应用程序在你的 Spark 集群中共享资源,此功能特别有用。
此功能默认禁用,适用于所有粗粒度集群管理器,即Standalone 模式、YARN 模式和K8s 模式。
配置和设置
使用此功能有几种方法。无论你选择哪种方法,你的应用程序都必须首先将spark.dynamicAllocation.enabled
设置为true
,此外,
- 在同一集群的每个工作节点上设置外部 Shuffle 服务后,你的应用程序必须将
spark.shuffle.service.enabled
设置为true
,或者 - 你的应用程序必须将
spark.dynamicAllocation.shuffleTracking.enabled
设置为true
,或者 - 你的应用程序必须将
spark.decommission.enabled
和spark.storage.decommission.shuffleBlocks.enabled
都设置为true
,或者 - 你的应用程序必须配置
spark.shuffle.sort.io.plugin.class
以使用自定义的ShuffleDataIO
,其ShuffleDriverComponents
支持可靠存储。
外部 Shuffle 服务、Shuffle 跟踪或支持可靠存储的ShuffleDriverComponents
的目的是允许移除执行器而不会删除它们写入的 Shuffle 文件(更多细节请参阅下文)。虽然启用 Shuffle 跟踪很简单,但设置外部 Shuffle 服务的方式因集群管理器而异:
在 Standalone 模式下,只需将工作节点启动时设置spark.shuffle.service.enabled
为true
即可。
在 YARN 模式下,请遵循此处的说明。
所有其他相关配置都是可选的,并位于spark.dynamicAllocation.*
和spark.shuffle.service.*
命名空间下。更多详细信息,请参阅配置页面。
注意事项
- 在Standalone 模式下,如果不显式设置
spark.executor.cores
,每个执行器将获得一个工作节点的所有可用核心。在这种情况下,当启用动态分配时,Spark 可能会获取比预期更多的执行器。当你想在Standalone 模式下使用动态分配时,建议你在问题 SPARK-30299 修复之前,为每个执行器显式设置核心数。 - 在K8s 模式下,由于 Spark on K8s 尚不支持外部 Shuffle 服务,因此我们无法通过将
spark.shuffle.service.enabled
设置为true
来使用此功能。
资源分配策略
从宏观层面看,Spark 应该在执行器不再使用时释放它们,并在需要时获取执行器。由于没有明确的方法可以预测即将被移除的执行器是否会在不久的将来运行任务,或者即将添加的新执行器是否会实际空闲,因此我们需要一套启发式方法来决定何时移除和请求执行器。
请求策略
启用动态分配的 Spark 应用程序会在有待调度任务时请求额外的执行器。这种情况必然意味着现有执行器集不足以同时满足所有已提交但尚未完成的任务。
Spark 按轮次请求执行器。当存在待调度任务持续spark.dynamicAllocation.schedulerBacklogTimeout
秒后,将触发实际请求;如果待调度任务队列仍然存在,则此后每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒再次触发。此外,每轮请求的执行器数量会从上一轮开始呈指数级增长。例如,一个应用程序在第一轮将添加 1 个执行器,然后在后续轮次中分别添加 2、4、8 等执行器。
指数增长策略的动机有两方面。首先,应用程序在开始时应谨慎请求执行器,以防最终发现只需要少量额外执行器就足够了。这与 TCP 慢启动的理由相呼应。其次,如果最终发现确实需要大量执行器,应用程序应该能够及时地提高其资源使用率。
移除策略
移除执行器的策略要简单得多。当一个 Spark 执行器空闲超过spark.dynamicAllocation.executorIdleTimeout
秒后,Spark 应用程序会将其移除。请注意,在大多数情况下,此条件与请求条件互斥,因为如果仍有待调度任务,执行器就不应处于空闲状态。
执行器优雅停机
在动态分配之前,如果 Spark 执行器在关联应用程序也已退出时退出,那么与该执行器相关的所有状态都不再需要,可以安全地丢弃。然而,启用动态分配后,当执行器被显式移除时,应用程序仍在运行。如果应用程序尝试访问存储在执行器中或由执行器写入的状态,则必须重新计算该状态。因此,Spark 需要一种机制,在移除执行器之前通过保留其状态来优雅地取消其运行。
此要求对于 Shuffle 尤其重要。在 Shuffle 过程中,Spark 执行器首先将其自身的 Map 输出本地写入磁盘,然后当其他执行器尝试获取这些文件时,充当这些文件的服务器。如果出现慢任务(即运行时间远超同伴的任务),动态分配可能会在 Shuffle 完成之前移除执行器,在这种情况下,该执行器写入的 Shuffle 文件必须不必要地重新计算。
保留 Shuffle 文件的解决方案是使用外部 Shuffle 服务,该服务也于 Spark 1.2 中引入。此服务是指在集群的每个节点上运行的长时间运行进程,独立于你的 Spark 应用程序及其执行器。如果服务已启用,Spark 执行器将从该服务而不是彼此之间获取 Shuffle 文件。这意味着执行器写入的任何 Shuffle 状态都可以在执行器生命周期结束后继续提供服务。
除了写入 Shuffle 文件外,执行器还会在磁盘或内存中缓存数据。然而,当执行器被移除时,所有缓存的数据将不再可访问。为了缓解这个问题,默认情况下,包含缓存数据的执行器永远不会被移除。你可以使用spark.dynamicAllocation.cachedExecutorIdleTimeout
配置此行为。当将spark.shuffle.service.fetch.rdd.enabled
设置为true
时,Spark 可以使用 ExternalShuffleService 来获取磁盘持久化的 RDD 块。在动态分配的情况下,如果启用此功能,则仅包含磁盘持久化块的执行器在spark.dynamicAllocation.executorIdleTimeout
后被视为空闲,并将相应地被释放。在未来的版本中,缓存数据可能会通过堆外存储来保留,其理念与通过外部 Shuffle 服务保留 Shuffle 文件类似。
应用程序内调度
在给定的 Spark 应用程序(SparkContext 实例)内部,如果多个并行作业是从单独的线程提交的,它们可以同时运行。在本节中,“作业”指的是一个 Spark action(例如save
、collect
)以及为评估该 action 所需运行的任何任务。Spark 的调度器是完全线程安全的,并支持这种用例,以使应用程序能够处理多个请求(例如,为多个用户查询)。
默认情况下,Spark 的调度器以 FIFO 方式运行作业。每个作业被划分为“阶段”(例如 Map 和 Reduce 阶段),第一个作业在其阶段有任务要启动时获得所有可用资源的优先权,然后第二个作业获得优先权,依此类推。如果队列头部的作业不需要使用整个集群,后面的作业可以立即开始运行;但如果队列头部的作业很大,那么后面的作业可能会被显著延迟。
从 Spark 0.8 开始,也可以配置作业之间的公平共享。在公平共享模式下,Spark 以“轮询”(round robin)方式在作业之间分配任务,从而使所有作业获得大致相等的集群资源份额。这意味着当一个长时间运行的作业正在运行时,提交的短期作业可以立即开始接收资源,并且仍然获得良好的响应时间,而无需等待长时间作业完成。此模式最适合多用户环境。
此功能默认禁用,适用于所有粗粒度集群管理器,即Standalone 模式、YARN 模式和K8s 模式。要启用公平调度器,只需在配置 SparkContext 时将spark.scheduler.mode
属性设置为FAIR
。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
公平调度池
公平调度器还支持将作业分组到池中,并为每个池设置不同的调度选项(例如权重)。这对于例如为更重要的作业创建“高优先级”池,或者将每个用户的作业分组在一起,并让用户获得相等的份额(无论他们有多少并发作业),而不是让作业获得相等的份额,都很有用。此方法是仿照Hadoop 公平调度器设计的。
在没有任何干预的情况下,新提交的作业将进入一个默认池中,但可以通过在提交作业的线程中向 SparkContext 添加spark.scheduler.pool
“局部属性”来设置作业的池。操作方法如下:
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")
设置此局部属性后,在此线程内提交的所有作业(通过此线程对RDD.save
、count
、collect
等方法的调用)都将使用此池名称。此设置是按线程进行的,以便于一个线程代表同一用户运行多个作业。如果你想清除线程关联的池,只需调用:
sc.setLocalProperty("spark.scheduler.pool", null)
池的默认行为
默认情况下,每个池获得集群的相等份额(也与默认池中的每个作业份额相等),但在每个池内部,作业按 FIFO 顺序运行。例如,如果你为每个用户创建一个池,这意味着每个用户将获得集群的相等份额,并且每个用户的查询将按顺序运行,而不是由后续查询占用该用户之前的资源。
配置池属性
特定池的属性也可以通过配置文件进行修改。每个池支持三个属性:
schedulingMode
:此属性可以是 FIFO 或 FAIR,用于控制池内作业是相互排队(默认)还是公平共享池的资源。weight
:此属性控制池相对于其他池在集群中的份额。默认情况下,所有池的权重均为 1。例如,如果你给特定池设置权重为 2,它将获得其他活跃池 2 倍的资源。设置较高的权重(例如 1000)也使得在池之间实现优先级成为可能——本质上,权重为 1000 的池只要有活跃作业,就总能优先启动任务。minShare
:除了总权重外,每个池还可以获得管理员希望其拥有的最小份额(以 CPU 核心数计)。公平调度器总是尝试满足所有活跃池的最小份额,然后根据权重重新分配额外资源。minShare
属性因此可以成为另一种方式,确保池总能快速获得一定数量的资源(例如 10 个核心),而无需为其在集群的其余部分设置高优先级。默认情况下,每个池的minShare
为 0。
池属性可以通过创建 XML 文件进行设置,该文件类似于conf/fairscheduler.xml.template
,然后将名为fairscheduler.xml
的文件放到 classpath 上,或者在你的SparkConf中设置spark.scheduler.allocation.file
属性。文件路径遵循 Hadoop 配置,可以是本地文件路径或 HDFS 文件路径。
// scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
XML 文件的格式非常简单,每个池都只有一个<pool>
元素,其中包含用于各种设置的不同子元素。例如:
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
完整的示例也可在conf/fairscheduler.xml.template
中找到。请注意,任何未在 XML 文件中配置的池将简单地获得所有设置的默认值(调度模式 FIFO、权重 1 和最小份额 0)。
使用 JDBC 连接进行调度
要为 JDBC 客户端会话设置公平调度器池,用户可以设置spark.sql.thriftserver.scheduler.pool
变量。
SET spark.sql.thriftserver.scheduler.pool=accounting;
PySpark 中的并发作业
PySpark 默认不支持 PVM 线程与 JVM 线程同步,并且在多个 PVM 线程中启动多个作业不能保证在每个相应的 JVM 线程中启动每个作业。由于此限制,无法在单独的 PVM 线程中通过sc.setJobGroup
设置不同的作业组,这还导致以后无法通过sc.cancelJobGroup
取消作业。
建议将pyspark.InheritableThread
与 PVM 线程一起使用,以便 PVM 线程继承 JVM 线程中的可继承属性,例如局部属性。