监控和仪器

有多种方法可以监控 Spark 应用程序:Web UI、指标和外部仪器。

Web 界面

每个 SparkContext 都会启动一个 Web UI(默认为端口 4040),它显示有关应用程序的有用信息。这包括

您只需在 Web 浏览器中打开 http://<driver-node>:4040 即可访问此界面。如果多个 SparkContext 在同一主机上运行,它们将绑定到从 4040 开始的连续端口(4041、4042 等)。

请注意,此信息默认情况下仅在应用程序运行期间可用。要在事后查看 Web UI,请在启动应用程序之前将 spark.eventLog.enabled 设置为 true。这将配置 Spark 将 UI 中显示的信息编码为 Spark 事件并记录到持久化存储中。

事后查看

如果应用程序的事件日志存在,仍然可以通过 Spark 的历史服务器构建应用程序的 UI。您可以通过执行以下命令启动历史服务器:

./sbin/start-history-server.sh

这会默认在 http://<server-url>:18080 创建一个 Web 界面,列出未完成和已完成的应用程序及尝试。

当使用文件系统提供者类(参见下文的 spark.history.provider)时,必须在 spark.history.fs.logDirectory 配置选项中提供基础日志目录,并且该目录应包含每个代表一个应用程序事件日志的子目录。

Spark 作业本身必须配置为记录事件,并将其记录到相同的共享可写目录。例如,如果服务器配置的日志目录是 hdfs://namenode/shared/spark-logs,则客户端选项将是

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs

历史服务器可以配置如下:

环境变量

环境变量含义
SPARK_DAEMON_MEMORY 分配给历史服务器的内存(默认:1g)。
SPARK_DAEMON_JAVA_OPTS 历史服务器的 JVM 选项(默认:无)。
SPARK_DAEMON_CLASSPATH 历史服务器的 Classpath(默认:无)。
SPARK_PUBLIC_DNS 历史服务器的公共地址。如果未设置此项,应用程序历史记录的链接可能会使用服务器的内部地址,导致链接失效(默认:无)。
SPARK_HISTORY_OPTS 历史服务器的 spark.history.* 配置选项(默认:无)。

对滚动事件日志文件应用压缩

一个长时间运行的应用程序(例如流处理)可能会产生一个巨大的单个事件日志文件,这可能需要大量的维护成本,并且每次 Spark History Server 更新时也需要大量资源进行重放。

启用 spark.eventLog.rolling.enabledspark.eventLog.rolling.maxFileSize 将允许您拥有滚动事件日志文件,而不是单个巨大的事件日志文件,这本身可能在某些场景下有所帮助,但它仍然无法帮助您减少日志的整体大小。

Spark History Server 可以通过在 Spark History Server 上设置配置 spark.history.fs.eventLog.rolling.maxFilesToRetain,对滚动事件日志文件应用压缩以减小日志的整体大小。

详细信息将在下文描述,但请提前注意,压缩是**有损操作**。压缩会丢弃一些在 UI 上不再可见的事件——您可能需要在启用此选项之前检查哪些事件将被丢弃。

当发生压缩时,历史服务器会列出应用程序所有可用的事件日志文件,并考虑索引小于将要保留的最小索引文件的事件日志文件作为压缩目标。例如,如果应用程序 A 有 5 个事件日志文件,并且 spark.history.fs.eventLog.rolling.maxFilesToRetain 设置为 2,那么前 3 个日志文件将被选中进行压缩。

一旦选择目标,它会分析这些文件以找出哪些事件可以被排除,然后将它们重写为一个紧凑的文件,同时丢弃那些决定排除的事件。

压缩会尝试排除指向过期数据的事件。截至目前,下面描述了要排除的事件候选者:

重写完成后,原始日志文件将被尽力删除。历史服务器可能无法删除原始日志文件,但这不会影响历史服务器的运行。

请注意,如果 Spark History Server 发现压缩期间不会减少太多空间,它可能不会压缩旧的事件日志文件。对于流式查询,我们通常期望压缩会运行,因为每个微批次都会触发一个或多个很快完成的作业,但在许多情况下,批处理查询的压缩不会运行。

另请注意,这是 Spark 3.0 中引入的新功能,可能尚未完全稳定。在某些情况下,压缩可能会排除超出您预期的事件,导致应用程序在历史服务器 UI 上出现一些问题。请谨慎使用。

Spark History Server 配置选项

Spark History Server 的安全选项在安全页面中有更详细的介绍。

属性名称 默认值 含义 起始版本
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider 实现应用程序历史记录后端的类名。目前 Spark 只提供一种实现,它查找存储在文件系统中的应用程序日志。 1.1.0
spark.history.fs.logDirectory file:/tmp/spark-events 对于文件系统历史提供者,这是包含要加载的应用程序事件日志的目录 URL。这可以是本地的 file:// 路径、HDFS 路径 hdfs://namenode/shared/spark-logs,或者 Hadoop API 支持的其他文件系统的路径。 1.1.0
spark.history.fs.update.interval 10秒 文件系统历史提供者检查日志目录中是否有新的或更新的日志的周期。较短的间隔可以更快地检测到新应用程序,但代价是重新读取已更新应用程序会增加服务器负载。一旦更新完成,已完成和未完成应用程序的列表将反映这些更改。 1.4.0
spark.history.retainedApplications 50 在缓存中保留 UI 数据的应用程序数量。如果超出此限制,则最旧的应用程序将从缓存中移除。如果应用程序不在缓存中,则在从 UI 访问时必须从磁盘加载。 1.0.0
spark.history.ui.maxApplications Int.MaxValue 历史摘要页面上显示的应用程序数量。即使应用程序 UI 未显示在历史摘要页面上,也可以通过直接访问其 URL 来查看。 2.0.1
spark.history.ui.port 18080 历史服务器 Web 界面绑定的端口。 1.0.0
spark.history.kerberos.enabled false 指示历史服务器是否应使用 Kerberos 登录。如果历史服务器正在安全 Hadoop 集群上访问 HDFS 文件,则需要此项。 1.0.1
spark.history.kerberos.principal (无) spark.history.kerberos.enabled=true 时,指定历史服务器的 Kerberos 主体名称。 1.0.1
spark.history.kerberos.keytab (无) spark.history.kerberos.enabled=true 时,指定历史服务器的 Kerberos keytab 文件位置。 1.0.1
spark.history.fs.cleaner.enabled false 指定历史服务器是否应定期从存储中清理事件日志。 1.4.0
spark.history.fs.cleaner.interval 1天 spark.history.fs.cleaner.enabled=true 时,指定文件系统作业历史清理器检查要删除文件的频率。如果满足以下两个条件之一,文件将被删除。首先,如果文件早于 spark.history.fs.cleaner.maxAge,则将其删除。如果文件数量超过 spark.history.fs.cleaner.maxNum,Spark 也会删除文件,它会根据应用程序最旧的尝试时间顺序清理已完成的尝试。 1.4.0
spark.history.fs.cleaner.maxAge 7天 spark.history.fs.cleaner.enabled=true 时,运行文件系统历史清理器时,早于此时间点的作业历史文件将被删除。 1.4.0
spark.history.fs.cleaner.maxNum Int.MaxValue spark.history.fs.cleaner.enabled=true 时,指定事件日志目录中的最大文件数量。Spark 尝试清理已完成的尝试日志,以将日志目录保持在此限制以下。这应该小于底层文件系统的限制,例如 HDFS 中的 `dfs.namenode.fs-limits.max-directory-items`。 3.0.0
spark.history.fs.endEventReparseChunkSize 1m 在日志文件末尾解析的字节数,用于查找结束事件。这用于通过跳过事件日志文件中不必要的部分来加快应用程序列表的生成。可以通过将此配置设置为 0 来禁用它。 2.4.0
spark.history.fs.inProgressOptimization.enabled true 启用对进行中日志的优化处理。此选项可能会导致未能重命名其事件日志的已完成应用程序仍被列为进行中。 2.4.0
spark.history.fs.driverlog.cleaner.enabled spark.history.fs.cleaner.enabled 指定历史服务器是否应定期从存储中清理驱动程序日志。 3.0.0
spark.history.fs.driverlog.cleaner.interval spark.history.fs.cleaner.interval spark.history.fs.driverlog.cleaner.enabled=true 时,指定文件系统驱动程序日志清理器检查要删除文件的频率。文件仅在早于 spark.history.fs.driverlog.cleaner.maxAge 时才会被删除。 3.0.0
spark.history.fs.driverlog.cleaner.maxAge spark.history.fs.cleaner.maxAge spark.history.fs.driverlog.cleaner.enabled=true 时,运行驱动程序日志清理器时,早于此时间点的驱动程序日志文件将被删除。 3.0.0
spark.history.fs.numReplayThreads 可用核心的 25% 历史服务器用于处理事件日志的线程数。 2.0.0
spark.history.store.maxDiskUsage 10g 存储缓存应用程序历史信息的本地目录的最大磁盘使用量。 2.3.0
spark.history.store.path (无) 缓存应用程序历史数据的本地目录。如果设置,历史服务器会将应用程序数据存储在磁盘上,而不是保留在内存中。写入磁盘的数据将在历史服务器重启时被重用。 2.3.0
spark.history.store.serializer JSON 用于将内存中的 UI 对象写入/从基于磁盘的 KV 存储读取的序列化器;JSON 或 PROTOBUF。JSON 序列化器是 Spark 3.4.0 之前的唯一选择,因此它是默认值。与 JSON 序列化器相比,PROTOBUF 序列化器速度更快且更紧凑。 3.4.0
spark.history.custom.executor.log.url (无) 指定自定义 Spark Executor 日志 URL,用于支持外部日志服务,而不是在历史服务器中使用集群管理器的应用程序日志 URL。Spark 将通过模式支持一些路径变量,这些模式可能因集群管理器而异。请查阅您的集群管理器的文档以了解支持哪些模式(如果有)。此配置对正在运行的应用程序没有影响,它只影响历史服务器。

目前,只有 YARN 模式支持此配置

3.0.0
spark.history.custom.executor.log.url.applyIncompleteApplication true 指定是否也将自定义 Spark Executor 日志 URL 应用于未完成的应用程序。如果正在运行的应用程序的 Executor 日志应作为原始日志 URL 提供,请将此设置为 false。请注意,未完成的应用程序可能包括未优雅关闭的应用程序。即使设置为 true,此配置对正在运行的应用程序也没有影响,它只影响历史服务器。 3.0.0
spark.history.fs.eventLog.rolling.maxFilesToRetain Int.MaxValue 将保留为未压缩状态的事件日志文件的最大数量。默认情况下,所有事件日志文件都将被保留。出于技术原因,最低值为 1。
请阅读“对旧事件日志文件应用压缩”部分了解更多详细信息。
3.0.0
spark.history.store.hybridStore.enabled false 解析事件日志时是否使用 HybridStore 作为存储。HybridStore 将首先把数据写入内存存储,并在内存存储写入完成后,有一个后台线程将数据转储到磁盘存储。 3.1.0
spark.history.store.hybridStore.maxMemoryUsage 2g 可用于创建 HybridStore 的最大内存空间。HybridStore 共享堆内存,因此如果启用 HybridStore,应通过 SHS 的内存选项增加堆内存。 3.1.0
spark.history.store.hybridStore.diskBackend ROCKSDB 指定混合存储中使用的基于磁盘的存储;ROCKSDB 或 LEVELDB(已弃用)。 3.3.0
spark.history.fs.update.batchSize Int.MaxValue 指定更新新事件日志文件的批处理大小。这控制了每个扫描过程在合理时间内完成,从而防止初始扫描运行时间过长并在大型环境中及时阻止新事件日志文件被扫描。 3.4.0

请注意,在所有这些 UI 中,表格都可以通过单击其标题进行排序,从而轻松识别慢任务、数据倾斜等。

注意

  1. 历史服务器显示已完成和未完成的 Spark 作业。如果应用程序在失败后进行了多次尝试,则会显示失败的尝试,以及任何正在进行的未完成尝试或最终成功的尝试。

  2. 未完成的应用程序仅间歇性更新。更新之间的时间由检查更改文件 (spark.history.fs.update.interval) 的间隔定义。在大型集群上,更新间隔可能会设置为较大的值。查看正在运行的应用程序的实际方法是查看其自己的 Web UI。

  3. 退出时未将自身注册为已完成的应用程序将列为未完成——即使它们不再运行。如果应用程序崩溃,可能会发生这种情况。

  4. 表示 Spark 作业完成的一种方法是显式停止 Spark Context (sc.stop()),或者在 Python 中使用 with SparkContext() as sc: 构造来处理 Spark Context 的设置和拆除。

REST API

除了在 UI 中查看指标外,它们也可以作为 JSON 提供。这为开发人员提供了一种创建新的 Spark 可视化和监控工具的简便方法。JSON 可用于正在运行的应用程序和历史服务器。端点安装在 /api/v1。例如,对于历史服务器,它们通常可以在 http://<server-url>:18080/api/v1 访问,对于正在运行的应用程序,可以在 http://localhost:4040/api/v1 访问。

在 API 中,应用程序通过其应用程序 ID [app-id] 进行引用。在 YARN 上运行时,每个应用程序可能进行多次尝试,但只有集群模式下的应用程序有尝试 ID,客户端模式下的应用程序没有。YARN 集群模式下的应用程序可以通过其 [attempt-id] 进行识别。在下面列出的 API 中,当在 YARN 集群模式下运行时,[app-id] 实际上将是 [base-app-id]/[attempt-id],其中 [base-app-id] 是 YARN 应用程序 ID。

端点含义
/applications 所有应用程序的列表。
?status=[completed|running] 仅列出所选状态的应用程序。
?minDate=[date] 列出最早的开始日期/时间。
?maxDate=[date] 列出最晚的开始日期/时间。
?minEndDate=[date] 列出最早的结束日期/时间。
?maxEndDate=[date] 列出最晚的结束日期/时间。
?limit=[limit] 限制列出的应用程序数量。
示例
?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs 给定应用程序的所有作业列表。
?status=[running|succeeded|failed|unknown] 仅列出特定状态的作业。
/applications/[app-id]/jobs/[job-id] 给定作业的详细信息。
/applications/[app-id]/stages 给定应用程序的所有阶段列表。
?status=[active|complete|pending|failed] 仅列出给定状态的阶段。
?details=true 列出所有包含任务数据的阶段。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 仅列出具有指定任务状态的任务。查询参数 taskStatus 仅在 details=true 时生效。这也支持多个 taskStatus,例如 ?details=true&taskStatus=SUCCESS&taskStatus=FAILED,它将返回匹配任何指定任务状态的所有任务。
?withSummaries=true 列出包含任务指标分布和 Executor 指标分布的阶段。
?quantiles=0.0,0.25,0.5,0.75,1.0 使用给定的分位数汇总指标。查询参数 quantiles 仅在 withSummaries=true 时生效。默认值为 0.0,0.25,0.5,0.75,1.0
/applications/[app-id]/stages/[stage-id] 给定阶段的所有尝试列表。
?details=true 列出给定阶段所有包含任务数据的尝试。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 仅列出具有指定任务状态的任务。查询参数 taskStatus 仅在 details=true 时生效。这也支持多个 taskStatus,例如 ?details=true&taskStatus=SUCCESS&taskStatus=FAILED,它将返回匹配任何指定任务状态的所有任务。
?withSummaries=true 列出每个尝试的任务指标分布和 Executor 指标分布。
?quantiles=0.0,0.25,0.5,0.75,1.0 使用给定的分位数汇总指标。查询参数 quantiles 仅在 withSummaries=true 时生效。默认值为 0.0,0.25,0.5,0.75,1.0
示例
?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] 给定阶段尝试的详细信息。
?details=true 列出给定阶段尝试的所有任务数据。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 仅列出具有指定任务状态的任务。查询参数 taskStatus 仅在 details=true 时生效。这也支持多个 taskStatus,例如 ?details=true&taskStatus=SUCCESS&taskStatus=FAILED,它将返回匹配任何指定任务状态的所有任务。
?withSummaries=true 列出给定阶段尝试的任务指标分布和 Executor 指标分布。
?quantiles=0.0,0.25,0.5,0.75,1.0 使用给定的分位数汇总指标。查询参数 quantiles 仅在 withSummaries=true 时生效。默认值为 0.0,0.25,0.5,0.75,1.0
示例
?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary 给定阶段尝试中所有任务的汇总指标。
?quantiles 使用给定的分位数汇总指标。
示例:?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList 给定阶段尝试的所有任务列表。
?offset=[offset]&length=[len] 列出给定范围内的任务。
?sortBy=[runtime|-runtime] 对任务进行排序。
?status=[running|success|killed|failed|unknown] 仅列出处于该状态的任务。
示例:?offset=10&length=50&sortBy=runtime&status=running
/applications/[app-id]/executors 给定应用程序所有活跃 Executor 的列表。
/applications/[app-id]/executors/[executor-id]/threads 给定活跃 Executor 中所有正在运行线程的堆栈跟踪。历史服务器不可用。
/applications/[app-id]/allexecutors 给定应用程序所有(活跃和已终止)Executor 的列表。
/applications/[app-id]/storage/rdd 给定应用程序的已存储 RDD 列表。
/applications/[app-id]/storage/rdd/[rdd-id] 给定 RDD 存储状态的详细信息。
/applications/[base-app-id]/logs 将给定应用程序所有尝试的事件日志作为 zip 文件中的文件下载。
/applications/[base-app-id]/[attempt-id]/logs 将特定应用程序尝试的事件日志作为 zip 文件下载。
/applications/[app-id]/streaming/statistics 流上下文的统计信息。
/applications/[app-id]/streaming/receivers 所有流接收器列表。
/applications/[app-id]/streaming/receivers/[stream-id] 给定接收器的详细信息。
/applications/[app-id]/streaming/batches 所有保留批次列表。
/applications/[app-id]/streaming/batches/[batch-id] 给定批次的详细信息。
/applications/[app-id]/streaming/batches/[batch-id]/operations 给定批次的所有输出操作列表。
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] 给定操作和给定批次的详细信息。
/applications/[app-id]/sql 给定应用程序的所有查询列表。
?details=[true (默认) | false] 列出/隐藏 Spark 计划节点的详细信息。
?planDescription=[true (默认) | false] 在物理计划大小较高时按需启用/禁用物理 planDescription
?offset=[offset]&length=[len] 列出给定范围内的查询。
/applications/[app-id]/sql/[execution-id] 给定查询的详细信息。
?details=[true (默认) | false] 除了给定的查询详细信息外,还列出/隐藏指标详细信息。
?planDescription=[true (默认) | false] 在物理计划大小较高时,按需启用/禁用给定查询的物理 planDescription
/applications/[app-id]/environment 给定应用程序的环境详细信息。
/version 获取当前 Spark 版本。

可检索的作业和阶段数量受到独立 Spark UI 相同保留机制的限制;"spark.ui.retainedJobs" 定义了触发作业垃圾回收的阈值,而 spark.ui.retainedStages 则定义了阶段的阈值。请注意,垃圾回收发生在回放时:通过增加这些值并重新启动历史服务器,可以检索更多条目。

Executor 任务指标

REST API 以任务执行的粒度公开了 Spark Executor 收集的任务指标值。这些指标可用于性能故障排除和工作负载特征分析。以下是可用指标的列表,附带简要说明:

Spark Executor 任务指标名称 简短描述
executorRunTime Executor 运行此任务所花费的**时间**。这包括获取 shuffle 数据的时间。该值以毫秒表示。
executorCpuTime Executor 运行此任务所花费的 CPU **时间**。这包括获取 shuffle 数据的时间。该值以纳秒表示。
executorDeserializeTime 反序列化此任务所花费的**时间**。该值以毫秒表示。
executorDeserializeCpuTime Executor 反序列化此任务所花费的 CPU **时间**。该值以纳秒表示。
resultSize 此任务作为 TaskResult 传回驱动程序的字节数。
jvmGCTime JVM 在执行此任务期间进行垃圾回收所花费的**时间**。该值以毫秒表示。
ConcurrentGCCount 此指标返回已发生的收集总数。仅当 Java 垃圾回收器为 G1 并发 GC 时适用。
ConcurrentGCTime 此指标返回以毫秒为单位的近似累积收集**时间**。仅当 Java 垃圾回收器为 G1 并发 GC 时适用。
resultSerializationTime 序列化任务结果所花费的**时间**。该值以毫秒表示。
memoryBytesSpilled 此任务溢出的内存中字节数。
diskBytesSpilled 此任务溢出的磁盘上字节数。
peakExecutionMemory 在 shuffle、聚合和 join 期间创建的内部数据结构使用的峰值内存。此累加器的值应大致等于此任务中创建的所有此类数据结构的峰值大小之和。对于 SQL 作业,这仅跟踪所有非安全操作符和 ExternalSort。
inputMetrics.* 与从 org.apache.spark.rdd.HadoopRDD 或持久化数据读取数据相关的指标。
    .bytesRead 读取的总字节数。
    .recordsRead 读取的总记录数。
outputMetrics.* 与向外部写入数据(例如,写入分布式文件系统)相关的指标,仅在具有输出的任务中定义。
    .bytesWritten 写入的总字节数
    .recordsWritten 写入的总记录数
shuffleReadMetrics.* 与 shuffle 读取操作相关的指标。
    .recordsRead shuffle 操作中读取的记录数
    .remoteBlocksFetched shuffle 操作中获取的远程块数量
    .localBlocksFetched shuffle 操作中获取的本地(与从远程 Executor 读取相对)块的数量
    .totalBlocksFetched shuffle 操作中获取的块数量(包括本地和远程)
    .remoteBytesRead shuffle 操作中读取的远程字节数
    .localBytesRead shuffle 操作中从本地磁盘读取的字节数(与从远程 Executor 读取相对)
    .totalBytesRead shuffle 操作中读取的字节数(包括本地和远程)
    .remoteBytesReadToDisk shuffle 操作中读取到磁盘的远程字节数。在 shuffle 读取操作中,大块数据会获取到磁盘,而不是读取到内存中(这是默认行为)。
    .fetchWaitTime 任务等待远程 shuffle 块的时间。这仅包括阻塞在 shuffle 输入数据上的时间。例如,如果任务在处理块 A 尚未完成时正在获取块 B,则不认为它阻塞在块 B 上。该值以毫秒表示。
shuffleWriteMetrics.* 与写入 shuffle 数据操作相关的指标。
    .bytesWritten shuffle 操作中写入的字节数
    .recordsWritten shuffle 操作中写入的记录数
    .writeTime 阻塞在磁盘写入或缓冲区缓存写入上的时间。该值以纳秒表示。

Executor 指标

Executor 级别的指标作为心跳的一部分从每个 Executor 发送到驱动程序,以描述 Executor 本身的性能指标,例如 JVM 堆内存、GC 信息。Executor 指标值及其每个 Executor 测量的内存峰值通过 REST API 以 JSON 格式和 Prometheus 格式公开。JSON 端点位于:/applications/[app-id]/executors,Prometheus 端点位于:/metrics/executors/prometheus。此外,如果 spark.eventLog.logStageExecutorMetrics 为 true,则 Executor 内存指标的按阶段聚合峰值会写入事件日志。Executor 内存指标也通过基于 Dropwizard metrics library 的 Spark 指标系统公开。以下是可用指标的列表,附带简要描述:

Executor 级别指标名称 简短描述
rddBlocks 此 Executor 块管理器中的 RDD 块。
memoryUsed 此 Executor 使用的存储内存。
diskUsed 此 Executor 用于 RDD 存储的磁盘空间。
totalCores 此 Executor 中可用的核心数量。
maxTasks 此 Executor 中可同时运行的最大任务数。
activeTasks 当前正在执行的任务数。
failedTasks 此 Executor 中已失败的任务数。
completedTasks 此 Executor 中已完成的任务数。
totalTasks 此 Executor 中任务总数(运行中、失败和已完成)。
totalDuration JVM 在此 Executor 中执行任务所花费的**时间**。该值以毫秒表示。
totalGCTime JVM 在此 Executor 中进行垃圾回收所花费的**总时间**。该值以毫秒表示。
totalInputBytes 此 Executor 中输入的总字节数。
totalShuffleRead 此 Executor 中 shuffle 读取的总字节数。
totalShuffleWrite 此 Executor 中 shuffle 写入的总字节数。
maxMemory 可用于存储的内存总量,以字节为单位。
memoryMetrics.* 内存指标的当前值
    .usedOnHeapStorageMemory 当前用于存储的堆内内存,以字节为单位。
    .usedOffHeapStorageMemory 当前用于存储的堆外内存,以字节为单位。
    .totalOnHeapStorageMemory 可用于存储的堆内内存总量,以字节为单位。此数量可能随 MemoryManager 实现而异。
    .totalOffHeapStorageMemory 可用于存储的堆外内存总量,以字节为单位。此数量可能随 MemoryManager 实现而异。
peakMemoryMetrics.* 内存(和 GC)指标的峰值
    .JVMHeapMemory 用于对象分配的堆的峰值内存使用量。堆由一个或多个内存池组成。返回的内存使用量中已使用和已提交的大小是所有堆内存池这些值的总和,而返回的内存使用量中初始和最大大小表示堆内存的设置,可能不是所有堆内存池这些值的总和。返回的内存使用量中已使用的内存量是活对象和尚未收集的垃圾对象(如果有)所占用的内存量。
    .JVMOffHeapMemory Java 虚拟机使用的非堆内存的峰值内存使用量。非堆内存由一个或多个内存池组成。返回的内存使用量中已使用和已提交的大小是所有非堆内存池这些值的总和,而返回的内存使用量中初始和最大大小表示非堆内存的设置,可能不是所有非堆内存池这些值的总和。
    .OnHeapExecutionMemory 使用的峰值堆内执行内存,以字节为单位。
    .OffHeapExecutionMemory 使用的峰值堆外执行内存,以字节为单位。
    .OnHeapStorageMemory 使用的峰值堆内存储内存,以字节为单位。
    .OffHeapStorageMemory 使用的峰值堆外存储内存,以字节为单位。
    .OnHeapUnifiedMemory 堆内内存峰值(执行和存储)。
    .OffHeapUnifiedMemory 堆外内存峰值(执行和存储)。
    .DirectPoolMemory JVM 用于直接缓冲区池的峰值内存 (java.lang.management.BufferPoolMXBean)
    .MappedPoolMemory JVM 用于映射缓冲区池的峰值内存 (java.lang.management.BufferPoolMXBean)
    .ProcessTreeJVMVMemory 虚拟内存大小,以字节为单位。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
    .ProcessTreeJVMRSSMemory 常驻集大小 (RSS):进程在实际内存中拥有的页数。这仅计算文本、数据或堆栈空间所占用的页。不包括尚未按需加载或已交换出去的页。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
    .ProcessTreePythonVMemory Python 的虚拟内存大小,以字节为单位。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
    .ProcessTreePythonRSSMemory Python 的常驻集大小。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
    .ProcessTreeOtherVMemory 其他类型进程的虚拟内存大小,以字节为单位。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
    .ProcessTreeOtherRSSMemory 其他类型进程的常驻集大小。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。
    .MinorGCCount 次要 GC 总数。例如,垃圾回收器可以是 Copy、PS Scavenge、ParNew、G1 Young Generation 等。
    .MinorGCTime 次要 GC 的总**时间**。该值以毫秒表示。
    .MajorGCCount 主要 GC 总数。例如,垃圾回收器可以是 MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation 等。
    .MajorGCTime 主要 GC 的总**时间**。该值以毫秒表示。

RSS 和 Vmem 的计算基于 proc(5)

API 版本策略

这些端点已经过严格版本控制,以便于在其上开发应用程序。特别是,Spark 保证:

请注意,即使在检查正在运行的应用程序的 UI 时,applications/[app-id] 部分仍然是必需的,尽管只有一个可用的应用程序。例如,要查看正在运行的应用程序的作业列表,您需要访问 http://localhost:4040/api/v1/applications/[app-id]/jobs。这是为了保持两种模式下路径的一致性。

指标

Spark 有一个基于 Dropwizard Metrics Library 的可配置指标系统。这允许用户将 Spark 指标报告给各种接收器,包括 HTTP、JMX 和 CSV 文件。这些指标由嵌入在 Spark 代码库中的源生成。它们为特定活动和 Spark 组件提供仪器。指标系统通过 Spark 期望存在于 $SPARK_HOME/conf/metrics.properties 的配置文件进行配置。可以通过 spark.metrics.conf 配置属性指定自定义文件位置。除了使用配置文件外,还可以使用一组带有前缀 spark.metrics.conf. 的配置参数。默认情况下,用于驱动程序或 Executor 指标的根命名空间是 spark.app.id 的值。然而,很多时候,用户希望能够跟踪驱动程序和 Executor 在应用程序之间的指标,这在使用应用程序 ID(即 spark.app.id)时很难做到,因为它会随着应用程序的每次调用而改变。对于此类用例,可以使用 spark.metrics.namespace 配置属性为指标报告指定自定义命名空间。例如,如果用户希望将指标命名空间设置为应用程序的名称,他们可以将 spark.metrics.namespace 属性设置为类似 ${spark.app.name} 的值。然后 Spark 会适当地展开此值,并将其用作指标系统的根命名空间。非驱动程序和 Executor 指标永远不会以 spark.app.id 为前缀,spark.metrics.namespace 属性对此类指标也没有任何影响。

Spark 的指标被解耦为与 Spark 组件对应的不同**实例**。在每个实例中,您可以配置一组用于报告指标的接收器。目前支持以下实例:

每个实例可以向零个或多个**接收器**报告。接收器包含在 org.apache.spark.metrics.sink 包中:

Prometheus Servlet 镜像了 Metrics Servlet 和 REST API 公开的 JSON 数据,但采用时间序列格式。以下是等效的 Prometheus Servlet 端点。

组件 端口 JSON 端点 Prometheus 端点
Master 8080 /metrics/master/json/ /metrics/master/prometheus/
Master 8080 /metrics/applications/json/ /metrics/applications/prometheus/
Worker 8081 /metrics/json/ /metrics/prometheus/
Driver 4040 /metrics/json/ /metrics/prometheus/
Driver 4040 /api/v1/applications/{id}/executors/ /metrics/executors/prometheus/

Spark 还支持 Ganglia 接收器,由于许可限制,它不包含在默认构建中

要安装 GangliaSink,您需要执行 Spark 的自定义构建。请注意,通过嵌入此库,您的 Spark 包将包含 LGPL 许可的代码。对于 sbt 用户,请在构建之前设置 SPARK_GANGLIA_LGPL 环境变量。对于 Maven 用户,请启用 -Pspark-ganglia-lgpl 配置文件。除了修改集群的 Spark 构建之外,用户应用程序还需要链接到 spark-ganglia-lgpl 工件。

指标配置文件的语法以及每个接收器可用的参数在示例配置文件 $SPARK_HOME/conf/metrics.properties.template 中定义。

当使用 Spark 配置参数而不是指标配置文件时,相关的参数名称由前缀 spark.metrics.conf. 后跟配置详细信息组成,即参数采用以下形式:spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name]。此示例显示了 Graphite 接收器的 Spark 配置参数列表:

"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"

Spark 指标配置的默认值如下:

"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"

可以使用指标配置文件或配置参数 spark.metrics.conf.[component_name].source.jvm.class=[source_name] 配置其他源。目前,JVM 源是唯一可用的可选源。例如,以下配置参数激活 JVM 源:"spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"

可用指标提供者列表

Spark 使用的指标有多种类型:gauge、counter、histogram、meter 和 timer,详见 Dropwizard 库文档。以下组件和指标列表报告了可用指标的名称和一些详细信息,按组件实例和源命名空间分组。Spark 仪器中最常见的指标类型是 gauge 和 counter。Counter 可以通过其 .count 后缀识别。Timer、meter 和 histogram 在列表中有标注,列表的其余元素是 gauge 类型的指标。绝大多数指标在其父组件实例配置后立即激活,某些指标还需要通过额外的配置参数启用,详细信息在列表中报告。

组件实例 = Driver

这是具有最多仪器化指标的组件

组件实例 = Executor

这些指标由 Spark Executor 公开。

源 = JVM 源

备注

组件实例 = ApplicationMaster

注意:在 YARN 上运行时适用

组件实例 = master

注意:作为 Master 在 Spark Standalone 中运行时适用

组件实例 = ApplicationSource

注意:作为 Master 在 Spark Standalone 中运行时适用

组件实例 = worker

注意:作为 Worker 在 Spark Standalone 中运行时适用

组件实例 = ShuffleService

注意:适用于 Shuffle 服务

高级仪器

可以使用多种外部工具来帮助分析 Spark 作业的性能:

Spark 还提供了插件 API,以便可以将自定义仪器代码添加到 Spark 应用程序中。有两个可用于将插件加载到 Spark 中的配置键:

两者都接受一个逗号分隔的类名列表,这些类名实现了 org.apache.spark.api.plugin.SparkPlugin 接口。存在这两个名称是为了可以将一个列表放置在 Spark 默认配置文件中,允许用户轻松地从命令行添加其他插件,而无需覆盖配置文件的列表。重复的插件将被忽略。