作业调度
概述
Spark 具有多种在计算之间调度资源的机制。首先,回想一下,如集群模式概述中所述,每个 Spark 应用程序(SparkContext 的实例)运行一组独立的 executor 进程。 Spark 运行的集群管理器提供了跨应用程序调度的机制。其次,在每个 Spark 应用程序中,如果多个“作业”(Spark 操作)是由不同的线程提交的,则它们可能会并发运行。如果您的应用程序通过网络为请求提供服务,则这种情况很常见。 Spark 包括一个公平调度器,用于调度每个 SparkContext 中的资源。
跨应用程序调度
在集群上运行时,每个 Spark 应用程序都会获得一组独立的 executor JVM,这些 JVM 仅运行任务并存储该应用程序的数据。如果多个用户需要共享您的集群,则可以根据集群管理器使用不同的选项来管理分配。
最简单的选项是所有集群管理器都可用的资源静态分区。使用此方法,每个应用程序都会获得它可以使用的最大资源量,并将其保留在其整个持续时间内。这是 Spark standalone 和 YARN 模式以及 粗粒度 Mesos 模式中使用的方案。资源分配可以按照集群类型配置如下:
- Standalone 模式: 默认情况下,提交到 standalone 模式集群的应用程序将按 FIFO(先进先出)顺序运行,并且每个应用程序都将尝试使用所有可用节点。 您可以通过在应用程序中设置
spark.cores.max
配置属性来限制应用程序使用的节点数,或者通过spark.deploy.defaultCores
更改未设置此设置的应用程序的默认值。 最后,除了控制核心之外,每个应用程序的spark.executor.memory
设置还可以控制其内存使用情况。 - Mesos: 要在 Mesos 上使用静态分区,请将
spark.mesos.coarse
配置属性设置为true
,并可以选择设置spark.cores.max
以限制每个应用程序的资源份额,就像在 standalone 模式下一样。 您还应该设置spark.executor.memory
以控制 executor 内存。 - YARN: Spark YARN 客户端的
--num-executors
选项控制它将在集群上分配多少个 executor(作为配置属性的spark.executor.instances
),而--executor-memory
(spark.executor.memory
配置属性)和--executor-cores
(spark.executor.cores
配置属性)控制每个 executor 的资源。 有关更多信息,请参见YARN Spark 属性。
Mesos 上可用的第二个选项是 CPU 核心的动态共享。 在此模式下,每个 Spark 应用程序仍然具有固定的独立内存分配(由 spark.executor.memory
设置),但是当应用程序未在计算机上运行任务时,其他应用程序可以在这些核心上运行任务。 当您期望大量不太活跃的应用程序(例如来自不同用户的 shell 会话)时,此模式很有用。 但是,它会带来延迟的可预测性降低的风险,因为应用程序可能需要一段时间才能在某个节点上重新获得核心才能完成工作。 要使用此模式,只需使用 mesos://
URL 并将 spark.mesos.coarse
设置为 false。
请注意,当前没有任何模式提供跨应用程序的内存共享。 如果您想以这种方式共享数据,我们建议运行一个可以查询相同 RDD 以提供多个请求的单个服务器应用程序。
动态资源分配
Spark 提供了一种机制,可以根据工作负载动态调整应用程序占用的资源。 这意味着,如果您的应用程序不再使用资源,则可以将其返回给集群,并在需要时再次请求它们。 如果多个应用程序在您的 Spark 集群中共享资源,则此功能特别有用。
此功能默认处于禁用状态,并且在所有粗粒度集群管理器上均可用,即 standalone 模式, YARN 模式, Mesos 粗粒度模式和 K8s 模式。
注意事项
- 在 standalone 模式中,如果不显式设置
spark.executor.cores
,则每个 executor 将获得 worker 的所有可用核心。 在这种情况下,启用动态分配后,Spark 可能会获得比预期多得多的 executor。 当您想在 standalone 模式中使用动态分配时,建议您在问题 SPARK-30299 解决之前,显式为每个 executor 设置核心。
配置和设置
有几种方法可以使用此功能。 无论选择哪种方法,您的应用程序都必须首先将 spark.dynamicAllocation.enabled
设置为 true
,此外,
- 在同一集群中的每个 worker 节点上设置外部 shuffle 服务后,您的应用程序必须将
spark.shuffle.service.enabled
设置为true
,或者 - 您的应用程序必须将
spark.dynamicAllocation.shuffleTracking.enabled
设置为true
,或者 - 您的应用程序必须将
spark.decommission.enabled
和spark.storage.decommission.shuffleBlocks.enabled
都设置为true
,或者 - 您的应用程序必须配置
spark.shuffle.sort.io.plugin.class
以使用自定义ShuffleDataIO
,该自定义ShuffleDataIO
的ShuffleDriverComponents
支持可靠的存储。
外部 shuffle 服务或 shuffle 跟踪或 ShuffleDriverComponents
支持可靠存储的目的是允许删除 executor 而无需删除它们编写的 shuffle 文件(更多详细信息请参见下面)。 虽然启用 shuffle 跟踪很简单,但是设置外部 shuffle 服务的方式因集群管理器而异
在 standalone 模式下,只需启动 worker 并将 spark.shuffle.service.enabled
设置为 true
。
在 Mesos 粗粒度模式下,使用 spark.shuffle.service.enabled
设置为 true
在所有 worker 节点上运行 $SPARK_HOME/sbin/start-mesos-shuffle-service.sh
。 例如,您可以通过 Marathon 做到这一点。
在 YARN 模式下,请按照此处的说明进行操作。
所有其他相关配置都是可选的,并且位于 spark.dynamicAllocation.*
和 spark.shuffle.service.*
命名空间下。 有关更多详细信息,请参见配置页面。
资源分配策略
从总体上讲,Spark 应该在不再使用 executor 时放弃它们,并在需要时获取 executor。 由于没有明确的方法来预测即将删除的 executor 在不久的将来是否会运行任务,或者即将添加的新 executor 实际上是否会处于空闲状态,因此我们需要一组启发式方法来确定何时删除和请求 executor。
请求策略
启用了动态分配的 Spark 应用程序会在有等待调度的挂起任务时请求其他 executor。 此条件必然意味着现有的一组 executor 不足以同时饱和所有已提交但尚未完成的任务。
Spark 以轮次的方式请求 executor。当有未完成的任务积压超过 spark.dynamicAllocation.schedulerBacklogTimeout
秒时,会触发实际的请求。如果未完成任务队列持续存在,之后每隔 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒会再次触发请求。此外,每轮请求的 executor 数量以前一轮为基础呈指数增长。例如,一个应用程序会在第一轮添加 1 个 executor,然后在随后的轮次中添加 2、4、8 等 executor。
指数增长策略的动机是双重的。首先,应用程序在一开始应该谨慎地请求 executor,以防只需要少量额外的 executor 就足够了。这与 TCP 慢启动的理由相呼应。其次,如果确实需要许多 executor,应用程序应该能够及时增加其资源使用量。
移除策略
移除 executor 的策略要简单得多。当一个 Spark 应用程序中的 executor 闲置超过 spark.dynamicAllocation.executorIdleTimeout
秒时,就会被移除。请注意,在大多数情况下,此条件与请求条件是互斥的,因为如果仍然有待调度的任务,则 executor 不应处于空闲状态。
Executor 的优雅停止
在动态分配之前,如果与应用程序关联的 Spark executor 在应用程序退出时也退出了,那么与该 executor 关联的所有状态都不再需要,可以安全地丢弃。但是,使用动态分配后,当 executor 被显式移除时,应用程序仍在运行。如果应用程序尝试访问存储在 executor 中或由 executor 写入的状态,则必须重新计算该状态。因此,Spark 需要一种机制来优雅地停用 executor,并在移除之前保留其状态。
对于 shuffle 而言,此要求尤为重要。在 shuffle 期间,Spark executor 首先将其自己的 map 输出本地写入磁盘,然后在其他 executor 尝试获取这些文件时充当服务器。如果出现落后者,即运行时间比其他任务长得多的任务,动态分配可能会在 shuffle 完成之前移除 executor,在这种情况下,该 executor 写入的 shuffle 文件必须不必要地重新计算。
保存 shuffle 文件的解决方案是使用外部 shuffle 服务,该服务也在 Spark 1.2 中引入。此服务是指在集群的每个节点上独立于 Spark 应用程序及其 executor 运行的长期运行进程。如果启用了该服务,Spark executor 将从该服务而不是彼此获取 shuffle 文件。这意味着 executor 写入的任何 shuffle 状态都可以在 executor 的生命周期结束后继续提供服务。
除了写入 shuffle 文件之外,executor 还可以将数据缓存在磁盘或内存中。但是,当 executor 被移除时,所有缓存的数据都将无法访问。为了缓解这个问题,默认情况下,包含缓存数据的 executor 永远不会被移除。您可以使用 spark.dynamicAllocation.cachedExecutorIdleTimeout
配置此行为。当 spark.shuffle.service.fetch.rdd.enabled
设置为 true
时,Spark 可以使用 ExternalShuffleService 来获取磁盘持久化的 RDD 块。 在动态分配的情况下,如果启用此功能,则仅具有磁盘持久化块的 executor 在 spark.dynamicAllocation.executorIdleTimeout
之后被视为空闲,并将相应地释放。 在未来的版本中,可以通过类似于外部 shuffle 服务保存 shuffle 文件的方式,通过堆外存储来保存缓存的数据。
应用程序内的调度
在给定的 Spark 应用程序(SparkContext 实例)中,如果多个并行作业是从单独的线程提交的,则可以同时运行。在本节中,“作业”是指 Spark 操作(例如 save
,collect
)以及评估该操作需要运行的任何任务。 Spark 的调度程序是完全线程安全的,并且支持此用例,以启用为多个请求(例如,多个用户的查询)提供服务的应用程序。
默认情况下,Spark 的调度程序以 FIFO 方式运行作业。 每个作业都分为“阶段”(例如,map 和 reduce 阶段),并且第一个作业在其阶段有要启动的任务时,将获得所有可用资源的优先级,然后第二个作业获得优先级,依此类推。 如果队列头部的作业不需要使用整个集群,则后面的作业可以立即开始运行,但是如果队列头部的作业很大,则后面的作业可能会被显着延迟。
从 Spark 0.8 开始,也可以配置作业之间的公平共享。 在公平共享下,Spark 以“轮询”方式在作业之间分配任务,以便所有作业都能获得大致相等的集群资源份额。 这意味着在长时间运行的作业正在运行时提交的短作业可以立即开始接收资源,并且仍然可以获得良好的响应时间,而无需等待长时间运行的作业完成。 此模式最适合多用户设置。
默认情况下禁用此功能,并且可在所有粗粒度集群管理器上使用,即 独立模式,YARN 模式,K8s 模式 和 Mesos 粗粒度模式。 要启用公平调度程序,只需在配置 SparkContext 时将 spark.scheduler.mode
属性设置为 FAIR
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
公平调度器池
公平调度程序还支持将作业分组到池中,并为每个池设置不同的调度选项(例如,权重)。 这对于为更重要的作业创建“高优先级”池,或者将每个用户的作业组合在一起,并为用户提供相等的份额,而不管他们有多少并发作业,而不是为作业提供相等的份额都非常有用。 这种方法是仿照 Hadoop Fair Scheduler。
没有任何干预的情况下,新提交的作业将进入默认池,但是可以通过将 spark.scheduler.pool
“本地属性”添加到提交它们的线程中的 SparkContext 来设置作业的池。 这样做如下
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")
设置此本地属性后,在此线程中提交的所有作业(通过此线程中对 RDD.save
,count
,collect
等的调用)将使用此池名称。 该设置是按线程进行的,以便可以轻松地使线程代表同一用户运行多个作业。 如果您想清除与线程关联的池,只需调用
sc.setLocalProperty("spark.scheduler.pool", null)
池的默认行为
默认情况下,每个池获得集群的相等份额(也等于默认池中每个作业的份额),但是在每个池中,作业按 FIFO 顺序运行。 例如,如果您为每个用户创建一个池,则意味着每个用户将获得集群的相等份额,并且每个用户的查询将按顺序运行,而不是后面的查询从该用户较早的查询中获取资源。
配置池属性
也可以通过配置文件修改特定池的属性。 每个池支持三个属性
schedulingMode
:可以是 FIFO 或 FAIR,用于控制池中的作业是否彼此排队(默认值)或公平地共享池的资源。weight
:控制池相对于其他池的集群份额。 默认情况下,所有池的权重均为 1。 例如,如果您给特定池的权重为 2,则它将获得其他活动池 2 倍的资源。 设置较高的权重(例如 1000)也可以实现池之间的优先级 - 本质上,权重为 1000 的池始终可以在有活动作业时首先启动任务。minShare
:除了整体权重之外,还可以为每个池提供管理员希望其拥有的最小份额(作为 CPU 核心数)。 在根据权重重新分配额外资源之前,公平调度程序始终尝试满足所有活动池的最小份额。 因此,minShare
属性可以是确保池始终可以快速获得一定数量的资源(例如 10 个核心)的另一种方法,而无需为其提供集群其余部分的高优先级。 默认情况下,每个池的minShare
为 0。
可以通过创建一个 XML 文件来设置池属性,类似于 conf/fairscheduler.xml.template
,并将一个名为 fairscheduler.xml
的文件放在 classpath 上,或在您的 SparkConf 中设置 spark.scheduler.allocation.file
属性。 文件路径遵循 Hadoop 配置,可以是本地文件路径或 HDFS 文件路径。
// scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
XML 文件的格式只是每个池的 <pool>
元素,其中包含用于各种设置的不同元素。 例如
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
一个完整的示例也可在 conf/fairscheduler.xml.template
中找到。 请注意,在 XML 文件中未配置的任何池都将仅获得所有设置的默认值(调度模式 FIFO,权重 1 和 minShare 0)。
使用 JDBC 连接进行调度
要为 JDBC 客户端会话设置 Fair Scheduler 池,用户可以设置 spark.sql.thriftserver.scheduler.pool
变量
SET spark.sql.thriftserver.scheduler.pool=accounting;
PySpark 中的并发作业
默认情况下,PySpark 不支持将 PVM 线程与 JVM 线程同步,并且在多个 PVM 线程中启动多个作业不能保证在每个相应的 JVM 线程中启动每个作业。 由于此限制,无法通过单独的 PVM 线程中的 sc.setJobGroup
设置不同的作业组,这也禁止以后通过 sc.cancelJobGroup
取消作业。
建议将 pyspark.InheritableThread
与 PVM 线程一起使用,以继承 JVM 线程中的可继承属性,例如本地属性。