作业调度

概述

Spark 提供了多种资源调度机制以进行计算间的资源分配。首先,回想一下,如集群模式概述中所述,每个 Spark 应用程序(SparkContext 实例)运行一组独立的执行器进程。Spark 运行的集群管理器提供了跨应用程序调度的功能。其次,每个 Spark 应用程序内部,如果多个“作业”(Spark actions)是由不同的线程提交的,它们可以同时运行。如果你的应用程序通过网络提供请求服务,这很常见。Spark 包含一个公平调度器来调度每个 SparkContext 中的资源。

跨应用程序调度

在集群上运行时,每个 Spark 应用程序都会获得一组独立的执行器 JVM,这些 JVM 只为该应用程序运行任务和存储数据。如果多个用户需要共享你的集群,则根据集群管理器的不同,有不同的选项来管理资源分配。

最简单的选项是静态资源分区,适用于所有集群管理器。通过这种方法,每个应用程序都分配有最大可用资源量,并在其整个运行期间一直持有这些资源。Spark 的StandaloneYARN模式以及K8s模式都采用了这种方法。资源分配可以根据集群类型进行如下配置:

请注意,目前所有模式都不支持跨应用程序的内存共享。如果你想以这种方式共享数据,我们建议运行一个独立的服务器应用程序,通过查询相同的 RDD 来处理多个请求。

动态资源分配

Spark 提供了一种机制,可以根据工作负载动态调整应用程序占用的资源。这意味着如果资源不再使用,你的应用程序可能会将其返还给集群,并在有需求时再次请求它们。如果多个应用程序在你的 Spark 集群中共享资源,此功能特别有用。

此功能默认禁用,适用于所有粗粒度集群管理器,即Standalone 模式YARN 模式K8s 模式

配置和设置

使用此功能有几种方法。无论你选择哪种方法,你的应用程序都必须首先将spark.dynamicAllocation.enabled设置为true,此外,

外部 Shuffle 服务、Shuffle 跟踪或支持可靠存储的ShuffleDriverComponents的目的是允许移除执行器而不会删除它们写入的 Shuffle 文件(更多细节请参阅下文)。虽然启用 Shuffle 跟踪很简单,但设置外部 Shuffle 服务的方式因集群管理器而异:

在 Standalone 模式下,只需将工作节点启动时设置spark.shuffle.service.enabledtrue即可。

在 YARN 模式下,请遵循此处的说明。

所有其他相关配置都是可选的,并位于spark.dynamicAllocation.*spark.shuffle.service.*命名空间下。更多详细信息,请参阅配置页面

注意事项

资源分配策略

从宏观层面看,Spark 应该在执行器不再使用时释放它们,并在需要时获取执行器。由于没有明确的方法可以预测即将被移除的执行器是否会在不久的将来运行任务,或者即将添加的新执行器是否会实际空闲,因此我们需要一套启发式方法来决定何时移除和请求执行器。

请求策略

启用动态分配的 Spark 应用程序会在有待调度任务时请求额外的执行器。这种情况必然意味着现有执行器集不足以同时满足所有已提交但尚未完成的任务。

Spark 按轮次请求执行器。当存在待调度任务持续spark.dynamicAllocation.schedulerBacklogTimeout秒后,将触发实际请求;如果待调度任务队列仍然存在,则此后每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒再次触发。此外,每轮请求的执行器数量会从上一轮开始呈指数级增长。例如,一个应用程序在第一轮将添加 1 个执行器,然后在后续轮次中分别添加 2、4、8 等执行器。

指数增长策略的动机有两方面。首先,应用程序在开始时应谨慎请求执行器,以防最终发现只需要少量额外执行器就足够了。这与 TCP 慢启动的理由相呼应。其次,如果最终发现确实需要大量执行器,应用程序应该能够及时地提高其资源使用率。

移除策略

移除执行器的策略要简单得多。当一个 Spark 执行器空闲超过spark.dynamicAllocation.executorIdleTimeout秒后,Spark 应用程序会将其移除。请注意,在大多数情况下,此条件与请求条件互斥,因为如果仍有待调度任务,执行器就不应处于空闲状态。

执行器优雅停机

在动态分配之前,如果 Spark 执行器在关联应用程序也已退出时退出,那么与该执行器相关的所有状态都不再需要,可以安全地丢弃。然而,启用动态分配后,当执行器被显式移除时,应用程序仍在运行。如果应用程序尝试访问存储在执行器中或由执行器写入的状态,则必须重新计算该状态。因此,Spark 需要一种机制,在移除执行器之前通过保留其状态来优雅地取消其运行。

此要求对于 Shuffle 尤其重要。在 Shuffle 过程中,Spark 执行器首先将其自身的 Map 输出本地写入磁盘,然后当其他执行器尝试获取这些文件时,充当这些文件的服务器。如果出现慢任务(即运行时间远超同伴的任务),动态分配可能会在 Shuffle 完成之前移除执行器,在这种情况下,该执行器写入的 Shuffle 文件必须不必要地重新计算。

保留 Shuffle 文件的解决方案是使用外部 Shuffle 服务,该服务也于 Spark 1.2 中引入。此服务是指在集群的每个节点上运行的长时间运行进程,独立于你的 Spark 应用程序及其执行器。如果服务已启用,Spark 执行器将从该服务而不是彼此之间获取 Shuffle 文件。这意味着执行器写入的任何 Shuffle 状态都可以在执行器生命周期结束后继续提供服务。

除了写入 Shuffle 文件外,执行器还会在磁盘或内存中缓存数据。然而,当执行器被移除时,所有缓存的数据将不再可访问。为了缓解这个问题,默认情况下,包含缓存数据的执行器永远不会被移除。你可以使用spark.dynamicAllocation.cachedExecutorIdleTimeout配置此行为。当将spark.shuffle.service.fetch.rdd.enabled设置为true时,Spark 可以使用 ExternalShuffleService 来获取磁盘持久化的 RDD 块。在动态分配的情况下,如果启用此功能,则仅包含磁盘持久化块的执行器在spark.dynamicAllocation.executorIdleTimeout后被视为空闲,并将相应地被释放。在未来的版本中,缓存数据可能会通过堆外存储来保留,其理念与通过外部 Shuffle 服务保留 Shuffle 文件类似。

应用程序内调度

在给定的 Spark 应用程序(SparkContext 实例)内部,如果多个并行作业是从单独的线程提交的,它们可以同时运行。在本节中,“作业”指的是一个 Spark action(例如savecollect)以及为评估该 action 所需运行的任何任务。Spark 的调度器是完全线程安全的,并支持这种用例,以使应用程序能够处理多个请求(例如,为多个用户查询)。

默认情况下,Spark 的调度器以 FIFO 方式运行作业。每个作业被划分为“阶段”(例如 Map 和 Reduce 阶段),第一个作业在其阶段有任务要启动时获得所有可用资源的优先权,然后第二个作业获得优先权,依此类推。如果队列头部的作业不需要使用整个集群,后面的作业可以立即开始运行;但如果队列头部的作业很大,那么后面的作业可能会被显著延迟。

从 Spark 0.8 开始,也可以配置作业之间的公平共享。在公平共享模式下,Spark 以“轮询”(round robin)方式在作业之间分配任务,从而使所有作业获得大致相等的集群资源份额。这意味着当一个长时间运行的作业正在运行时,提交的短期作业可以立即开始接收资源,并且仍然获得良好的响应时间,而无需等待长时间作业完成。此模式最适合多用户环境。

此功能默认禁用,适用于所有粗粒度集群管理器,即Standalone 模式YARN 模式K8s 模式。要启用公平调度器,只需在配置 SparkContext 时将spark.scheduler.mode属性设置为FAIR

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平调度池

公平调度器还支持将作业分组到中,并为每个池设置不同的调度选项(例如权重)。这对于例如为更重要的作业创建“高优先级”池,或者将每个用户的作业分组在一起,并让用户获得相等的份额(无论他们有多少并发作业),而不是让作业获得相等的份额,都很有用。此方法是仿照Hadoop 公平调度器设计的。

在没有任何干预的情况下,新提交的作业将进入一个默认池中,但可以通过在提交作业的线程中向 SparkContext 添加spark.scheduler.pool“局部属性”来设置作业的池。操作方法如下:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

设置此局部属性后,在此线程内提交的所有作业(通过此线程对RDD.savecountcollect等方法的调用)都将使用此池名称。此设置是按线程进行的,以便于一个线程代表同一用户运行多个作业。如果你想清除线程关联的池,只需调用:

sc.setLocalProperty("spark.scheduler.pool", null)

池的默认行为

默认情况下,每个池获得集群的相等份额(也与默认池中的每个作业份额相等),但在每个池内部,作业按 FIFO 顺序运行。例如,如果你为每个用户创建一个池,这意味着每个用户将获得集群的相等份额,并且每个用户的查询将按顺序运行,而不是由后续查询占用该用户之前的资源。

配置池属性

特定池的属性也可以通过配置文件进行修改。每个池支持三个属性:

池属性可以通过创建 XML 文件进行设置,该文件类似于conf/fairscheduler.xml.template,然后将名为fairscheduler.xml的文件放到 classpath 上,或者在你的SparkConf中设置spark.scheduler.allocation.file属性。文件路径遵循 Hadoop 配置,可以是本地文件路径或 HDFS 文件路径。

// scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")

XML 文件的格式非常简单,每个池都只有一个<pool>元素,其中包含用于各种设置的不同子元素。例如:

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

完整的示例也可在conf/fairscheduler.xml.template中找到。请注意,任何未在 XML 文件中配置的池将简单地获得所有设置的默认值(调度模式 FIFO、权重 1 和最小份额 0)。

使用 JDBC 连接进行调度

要为 JDBC 客户端会话设置公平调度器池,用户可以设置spark.sql.thriftserver.scheduler.pool变量。

SET spark.sql.thriftserver.scheduler.pool=accounting;

PySpark 中的并发作业

PySpark 默认不支持 PVM 线程与 JVM 线程同步,并且在多个 PVM 线程中启动多个作业不能保证在每个相应的 JVM 线程中启动每个作业。由于此限制,无法在单独的 PVM 线程中通过sc.setJobGroup设置不同的作业组,这还导致以后无法通过sc.cancelJobGroup取消作业。

建议将pyspark.InheritableThread与 PVM 线程一起使用,以便 PVM 线程继承 JVM 线程中的可继承属性,例如局部属性。