作业调度

概述

Spark 提供了多种在计算之间调度资源的机制。首先,回顾一下集群模式概述中的描述,每个 Spark 应用程序(SparkContext 的实例)都运行着一组独立的执行器(executor)进程。Spark 所运行的集群管理器提供了跨应用程序调度的工具。其次,在每个 Spark 应用程序*内部*,如果由不同的线程提交,多个“作业”(Spark 操作)可以并发运行。如果您的应用程序通过网络处理请求,这种情况很常见。Spark 包含一个公平调度器(fair scheduler),用于在每个 SparkContext 内调度资源。

跨应用程序调度

在集群上运行时,每个 Spark 应用程序都会获得一组独立的执行器 JVM,这些 JVM 仅运行该应用程序的任务并存储其数据。如果多个用户需要共享您的集群,根据集群管理器的不同,有多种管理分配的选项。

所有集群管理器上最简单的选项是资源的*静态分区*。采用这种方法,每个应用程序都会被分配其可以使用的最大资源量,并在整个运行期间占用它们。这是 Spark 的独立模式(standalone)YARN 模式以及 K8s 模式所使用的方法。资源分配可以根据集群类型进行如下配置:

请注意,目前没有任何模式提供跨应用程序的内存共享。如果您希望以这种方式共享数据,我们建议运行一个单一的服务器应用程序,通过查询相同的 RDD 来处理多个请求。

动态资源分配

Spark 提供了一种根据工作负载动态调整应用程序所占资源的机制。这意味着如果资源不再被使用,您的应用程序可以将资源归还给集群,并在稍后有需求时再次请求它们。如果多个应用程序共享您的 Spark 集群中的资源,此功能特别有用。

此功能默认禁用,适用于所有粗粒度集群管理器,即独立模式YARN 模式K8s 模式

配置与设置

使用此功能有多种方法。无论选择哪种方法,您的应用程序必须首先将 spark.dynamicAllocation.enabled 设置为 true,此外:

外部 Shuffle 服务、shuffle 跟踪或支持可靠存储的 ShuffleDriverComponents 的目的是允许在不删除由执行器写入的 shuffle 文件的情况下移除执行器(更多细节详见下文)。虽然启用 shuffle 跟踪很简单,但设置外部 Shuffle 服务的方法在不同集群管理器之间有所不同。

在独立模式下,只需在启动 worker 时将 spark.shuffle.service.enabled 设置为 true 即可。

在 YARN 模式下,请遵循此处的说明。

所有其他相关配置均为可选,位于 spark.dynamicAllocation.*spark.shuffle.service.* 命名空间下。有关更多详细信息,请参阅配置页面

注意事项

资源分配策略

从高层面来看,Spark 应该在执行器不再被使用时释放它们,并在需要时获取执行器。由于没有确定的方法来预测即将被删除的执行器在不久的将来是否会运行任务,或者即将添加的新执行器是否真的会处于空闲状态,我们需要一套启发式方法来决定何时删除和请求执行器。

请求策略

启用了动态分配的 Spark 应用程序在有挂起的任务等待调度时,会请求额外的执行器。这种情况必然意味着现有的一组执行器不足以同时满足所有已提交但尚未完成的任务。

Spark 分轮次请求执行器。当存在挂起任务的时间超过 spark.dynamicAllocation.schedulerBacklogTimeout 秒时,会触发实际请求;此后,如果挂起任务队列仍然存在,则每隔 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 秒再次触发。此外,每一轮请求的执行器数量会呈指数级增加。例如,应用程序第一轮会增加 1 个执行器,后续轮次会分别增加 2、4、8 个,依此类推。

采用指数增加策略的动机有二。首先,应用程序在开始时应谨慎请求执行器,以防最终证明只需要少量额外执行器即可满足需求。这与 TCP 慢启动的逻辑相呼应。其次,如果最终确实需要大量执行器,应用程序应该能够及时增加其资源使用量。

移除策略

移除执行器的策略则简单得多。当执行器空闲时间超过 spark.dynamicAllocation.executorIdleTimeout 秒时,Spark 应用程序会将其移除。请注意,在大多数情况下,此条件与请求条件是互斥的,因为如果仍有任务等待调度,执行器就不应该处于空闲状态。

执行器优雅下线

在动态分配之前,如果 Spark 执行器在相关应用程序退出时也退出,那么与该执行器相关的所有状态都不再需要,可以安全地丢弃。但是,通过动态分配,当执行器被显式移除时,应用程序仍在运行。如果应用程序尝试访问存储在执行器中或由执行器写入的状态,它将不得不重新计算该状态。因此,Spark 需要一种机制,通过在移除执行器之前保留其状态来优雅地下线(decommission)执行器。

此要求对于 shuffle 尤为重要。在 shuffle 期间,Spark 执行器首先将其自身的 map 输出本地写入磁盘,然后当其他执行器尝试获取这些文件时充当这些文件的服务器。如果出现落后者(即运行时间远长于同类任务的任务),动态分配可能会在 shuffle 完成之前移除执行器,在这种情况下,该执行器写入的 shuffle 文件必须被不必要地重新计算。

保留 shuffle 文件的解决方案是使用外部 Shuffle 服务,该服务也在 Spark 1.2 中引入。此服务是指运行在集群每个节点上的一个长驻进程,独立于您的 Spark 应用程序及其执行器。如果启用了该服务,Spark 执行器将从该服务而不是从彼此那里获取 shuffle 文件。这意味着任何由执行器写入的 shuffle 状态在执行器生命周期结束后仍然可以被服务。

除了写入 shuffle 文件外,执行器还会将数据缓存在磁盘或内存中。然而,当执行器被移除时,所有缓存的数据将无法再访问。为了缓解这种情况,默认情况下,包含缓存数据的执行器永远不会被移除。您可以使用 spark.dynamicAllocation.cachedExecutorIdleTimeout 来配置此行为。当将 spark.shuffle.service.fetch.rdd.enabled 设置为 true 时,Spark 可以使用 ExternalShuffleService 来获取磁盘持久化的 RDD 块。在动态分配的情况下,如果启用了此功能,仅持有磁盘持久化块的执行器在 spark.dynamicAllocation.executorIdleTimeout 后被视为“空闲”,并将被相应释放。在未来的版本中,缓存的数据可能会通过类似于通过外部 Shuffle 服务保留 shuffle 文件的方式(堆外存储)进行保留。

应用程序内调度

在给定的 Spark 应用程序(SparkContext 实例)内部,如果从不同的线程提交,多个并行作业可以同时运行。在本节中,“作业”指的是一个 Spark 操作(例如 savecollect)以及为评估该操作而需要运行的任何任务。Spark 的调度器完全线程安全,并支持此用例,从而支持服务于多个请求(例如多个用户的查询)的应用程序。

默认情况下,Spark 的调度器以 FIFO(先进先出)方式运行作业。每个作业被划分为“阶段”(例如 map 和 reduce 阶段),第一个作业在有任务要启动时优先使用所有可用资源,然后第二个作业获得优先级,依此类推。如果队列前面的作业不需要使用整个集群,后面的作业可以立即开始运行,但如果队列前面的作业很大,则后面的作业可能会被严重延迟。

从 Spark 0.8 开始,也可以配置作业之间的公平共享。在公平共享下,Spark 以“轮询(round robin)”方式在作业之间分配任务,这样所有作业都能大致平分集群资源。这意味着在长作业运行期间提交的短作业可以立即开始获得资源,而无需等待长作业完成,从而获得良好的响应时间。此模式最适合多用户环境。

此功能默认禁用,适用于所有粗粒度集群管理器,即独立模式YARN 模式K8s 模式。要启用公平调度器,只需在配置 SparkContext 时将 spark.scheduler.mode 属性设置为 FAIR

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平调度池

公平调度器还支持将作业分组到池(pools)中,并为每个池设置不同的调度选项(例如权重)。例如,这对于创建用于重要作业的“高优先级”池,或者将每个用户的作业组合在一起,使用户获得平等的份额(无论他们有多少并发作业),而不是让作业获得平等的份额,都非常有用。这种方法借鉴了 Hadoop 公平调度器

在没有任何干预的情况下,新提交的作业会进入默认池,但可以通过在提交作业的线程中向 SparkContext 添加 spark.scheduler.pool “本地属性”来设置作业所在的池。具体做法如下:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

设置此本地属性后,在此线程内提交的所有作业(通过在此线程中调用 RDD.savecountcollect 等)都将使用此池名称。该设置是针对每个线程的,以便于让一个线程代表同一个用户运行多个作业。如果您想清除线程关联的池,只需调用:

sc.setLocalProperty("spark.scheduler.pool", null)

池的默认行为

默认情况下,每个池获得集群的平等份额(与默认池中的每个作业的份额也相等),但在每个池内部,作业按 FIFO 顺序运行。例如,如果您为每个用户创建一个池,这意味着每个用户将获得集群的相等份额,并且每个用户的查询将按顺序运行,而不是后续查询占用该用户早期查询的资源。

配置池属性

特定池的属性也可以通过配置文件进行修改。每个池支持三个属性:

可以通过创建类似于 conf/fairscheduler.xml.template 的 XML 文件并将其放在类路径下,或者在 SparkConf 中设置 spark.scheduler.allocation.file 属性来设置池属性。文件路径遵循 Hadoop 配置,可以是本地文件路径或 HDFS 文件路径。

// scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")

XML 文件的格式非常简单:每个池对应一个 <pool> 元素,其中包含用于各种设置的不同子元素。例如:

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

完整的示例也可在 conf/fairscheduler.xml.template 中找到。请注意,未在 XML 文件中配置的任何池都将仅获得所有设置的默认值(调度模式 FIFO,权重 1,minShare 0)。

使用 JDBC 连接进行调度

要为 JDBC 客户端会话设置公平调度器池,用户可以设置 spark.sql.thriftserver.scheduler.pool 变量:

SET spark.sql.thriftserver.scheduler.pool=accounting;

PySpark 中的并发作业

PySpark 默认不支持同步 PVM 线程与 JVM 线程,在多个 PVM 线程中启动多个作业不能保证在对应的 JVM 线程中启动每个作业。由于此限制,无法在单独的 PVM 线程中通过 sc.setJobGroup 设置不同的作业组,这也导致以后无法通过 sc.cancelJobGroup 取消作业。

建议在 PVM 线程中结合使用 pyspark.InheritableThread,以便继承 JVM 线程中的可继承属性(如本地属性)。