监控和仪器
有多种方法可以监控 Spark 应用程序:Web UI、指标和外部仪器。
Web 界面
每个 SparkContext 都会启动一个 Web UI(默认为端口 4040),它显示有关应用程序的有用信息。这包括
- 调度器阶段和任务列表
- RDD 大小和内存使用情况的摘要
- 环境信息。
- 有关正在运行的 Executor 的信息
您只需在 Web 浏览器中打开 http://<driver-node>:4040
即可访问此界面。如果多个 SparkContext 在同一主机上运行,它们将绑定到从 4040 开始的连续端口(4041、4042 等)。
请注意,此信息默认情况下仅在应用程序运行期间可用。要在事后查看 Web UI,请在启动应用程序之前将 spark.eventLog.enabled
设置为 true。这将配置 Spark 将 UI 中显示的信息编码为 Spark 事件并记录到持久化存储中。
事后查看
如果应用程序的事件日志存在,仍然可以通过 Spark 的历史服务器构建应用程序的 UI。您可以通过执行以下命令启动历史服务器:
./sbin/start-history-server.sh
这会默认在 http://<server-url>:18080
创建一个 Web 界面,列出未完成和已完成的应用程序及尝试。
当使用文件系统提供者类(参见下文的 spark.history.provider
)时,必须在 spark.history.fs.logDirectory
配置选项中提供基础日志目录,并且该目录应包含每个代表一个应用程序事件日志的子目录。
Spark 作业本身必须配置为记录事件,并将其记录到相同的共享可写目录。例如,如果服务器配置的日志目录是 hdfs://namenode/shared/spark-logs
,则客户端选项将是
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs
历史服务器可以配置如下:
环境变量
环境变量 | 含义 |
---|---|
SPARK_DAEMON_MEMORY |
分配给历史服务器的内存(默认:1g)。 |
SPARK_DAEMON_JAVA_OPTS |
历史服务器的 JVM 选项(默认:无)。 |
SPARK_DAEMON_CLASSPATH |
历史服务器的 Classpath(默认:无)。 |
SPARK_PUBLIC_DNS |
历史服务器的公共地址。如果未设置此项,应用程序历史记录的链接可能会使用服务器的内部地址,导致链接失效(默认:无)。 |
SPARK_HISTORY_OPTS |
历史服务器的 spark.history.* 配置选项(默认:无)。 |
对滚动事件日志文件应用压缩
一个长时间运行的应用程序(例如流处理)可能会产生一个巨大的单个事件日志文件,这可能需要大量的维护成本,并且每次 Spark History Server 更新时也需要大量资源进行重放。
启用 spark.eventLog.rolling.enabled
和 spark.eventLog.rolling.maxFileSize
将允许您拥有滚动事件日志文件,而不是单个巨大的事件日志文件,这本身可能在某些场景下有所帮助,但它仍然无法帮助您减少日志的整体大小。
Spark History Server 可以通过在 Spark History Server 上设置配置 spark.history.fs.eventLog.rolling.maxFilesToRetain
,对滚动事件日志文件应用压缩以减小日志的整体大小。
详细信息将在下文描述,但请提前注意,压缩是**有损操作**。压缩会丢弃一些在 UI 上不再可见的事件——您可能需要在启用此选项之前检查哪些事件将被丢弃。
当发生压缩时,历史服务器会列出应用程序所有可用的事件日志文件,并考虑索引小于将要保留的最小索引文件的事件日志文件作为压缩目标。例如,如果应用程序 A 有 5 个事件日志文件,并且 spark.history.fs.eventLog.rolling.maxFilesToRetain
设置为 2,那么前 3 个日志文件将被选中进行压缩。
一旦选择目标,它会分析这些文件以找出哪些事件可以被排除,然后将它们重写为一个紧凑的文件,同时丢弃那些决定排除的事件。
压缩会尝试排除指向过期数据的事件。截至目前,下面描述了要排除的事件候选者:
- 已完成作业的事件,以及相关的阶段/任务事件
- 已终止 Executor 的事件
- 已完成 SQL 执行的事件,以及相关的作业/阶段/任务事件
重写完成后,原始日志文件将被尽力删除。历史服务器可能无法删除原始日志文件,但这不会影响历史服务器的运行。
请注意,如果 Spark History Server 发现压缩期间不会减少太多空间,它可能不会压缩旧的事件日志文件。对于流式查询,我们通常期望压缩会运行,因为每个微批次都会触发一个或多个很快完成的作业,但在许多情况下,批处理查询的压缩不会运行。
另请注意,这是 Spark 3.0 中引入的新功能,可能尚未完全稳定。在某些情况下,压缩可能会排除超出您预期的事件,导致应用程序在历史服务器 UI 上出现一些问题。请谨慎使用。
Spark History Server 配置选项
Spark History Server 的安全选项在安全页面中有更详细的介绍。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.history.provider | org.apache.spark.deploy.history.FsHistoryProvider |
实现应用程序历史记录后端的类名。目前 Spark 只提供一种实现,它查找存储在文件系统中的应用程序日志。 | 1.1.0 |
spark.history.fs.logDirectory | file:/tmp/spark-events | 对于文件系统历史提供者,这是包含要加载的应用程序事件日志的目录 URL。这可以是本地的 file:// 路径、HDFS 路径 hdfs://namenode/shared/spark-logs ,或者 Hadoop API 支持的其他文件系统的路径。 |
1.1.0 |
spark.history.fs.update.interval | 10秒 | 文件系统历史提供者检查日志目录中是否有新的或更新的日志的周期。较短的间隔可以更快地检测到新应用程序,但代价是重新读取已更新应用程序会增加服务器负载。一旦更新完成,已完成和未完成应用程序的列表将反映这些更改。 | 1.4.0 |
spark.history.retainedApplications | 50 | 在缓存中保留 UI 数据的应用程序数量。如果超出此限制,则最旧的应用程序将从缓存中移除。如果应用程序不在缓存中,则在从 UI 访问时必须从磁盘加载。 | 1.0.0 |
spark.history.ui.maxApplications | Int.MaxValue | 历史摘要页面上显示的应用程序数量。即使应用程序 UI 未显示在历史摘要页面上,也可以通过直接访问其 URL 来查看。 | 2.0.1 |
spark.history.ui.port | 18080 | 历史服务器 Web 界面绑定的端口。 | 1.0.0 |
spark.history.kerberos.enabled | false | 指示历史服务器是否应使用 Kerberos 登录。如果历史服务器正在安全 Hadoop 集群上访问 HDFS 文件,则需要此项。 | 1.0.1 |
spark.history.kerberos.principal | (无) | 当 spark.history.kerberos.enabled=true 时,指定历史服务器的 Kerberos 主体名称。 |
1.0.1 |
spark.history.kerberos.keytab | (无) | 当 spark.history.kerberos.enabled=true 时,指定历史服务器的 Kerberos keytab 文件位置。 |
1.0.1 |
spark.history.fs.cleaner.enabled | false | 指定历史服务器是否应定期从存储中清理事件日志。 | 1.4.0 |
spark.history.fs.cleaner.interval | 1天 | 当 spark.history.fs.cleaner.enabled=true 时,指定文件系统作业历史清理器检查要删除文件的频率。如果满足以下两个条件之一,文件将被删除。首先,如果文件早于 spark.history.fs.cleaner.maxAge ,则将其删除。如果文件数量超过 spark.history.fs.cleaner.maxNum ,Spark 也会删除文件,它会根据应用程序最旧的尝试时间顺序清理已完成的尝试。 |
1.4.0 |
spark.history.fs.cleaner.maxAge | 7天 | 当 spark.history.fs.cleaner.enabled=true 时,运行文件系统历史清理器时,早于此时间点的作业历史文件将被删除。 |
1.4.0 |
spark.history.fs.cleaner.maxNum | Int.MaxValue | 当 spark.history.fs.cleaner.enabled=true 时,指定事件日志目录中的最大文件数量。Spark 尝试清理已完成的尝试日志,以将日志目录保持在此限制以下。这应该小于底层文件系统的限制,例如 HDFS 中的 `dfs.namenode.fs-limits.max-directory-items`。 |
3.0.0 |
spark.history.fs.endEventReparseChunkSize | 1m | 在日志文件末尾解析的字节数,用于查找结束事件。这用于通过跳过事件日志文件中不必要的部分来加快应用程序列表的生成。可以通过将此配置设置为 0 来禁用它。 | 2.4.0 |
spark.history.fs.inProgressOptimization.enabled | true | 启用对进行中日志的优化处理。此选项可能会导致未能重命名其事件日志的已完成应用程序仍被列为进行中。 | 2.4.0 |
spark.history.fs.driverlog.cleaner.enabled | spark.history.fs.cleaner.enabled |
指定历史服务器是否应定期从存储中清理驱动程序日志。 | 3.0.0 |
spark.history.fs.driverlog.cleaner.interval | spark.history.fs.cleaner.interval |
当 spark.history.fs.driverlog.cleaner.enabled=true 时,指定文件系统驱动程序日志清理器检查要删除文件的频率。文件仅在早于 spark.history.fs.driverlog.cleaner.maxAge 时才会被删除。 |
3.0.0 |
spark.history.fs.driverlog.cleaner.maxAge | spark.history.fs.cleaner.maxAge |
当 spark.history.fs.driverlog.cleaner.enabled=true 时,运行驱动程序日志清理器时,早于此时间点的驱动程序日志文件将被删除。 |
3.0.0 |
spark.history.fs.numReplayThreads | 可用核心的 25% | 历史服务器用于处理事件日志的线程数。 | 2.0.0 |
spark.history.store.maxDiskUsage | 10g | 存储缓存应用程序历史信息的本地目录的最大磁盘使用量。 | 2.3.0 |
spark.history.store.path | (无) | 缓存应用程序历史数据的本地目录。如果设置,历史服务器会将应用程序数据存储在磁盘上,而不是保留在内存中。写入磁盘的数据将在历史服务器重启时被重用。 | 2.3.0 |
spark.history.store.serializer | JSON | 用于将内存中的 UI 对象写入/从基于磁盘的 KV 存储读取的序列化器;JSON 或 PROTOBUF。JSON 序列化器是 Spark 3.4.0 之前的唯一选择,因此它是默认值。与 JSON 序列化器相比,PROTOBUF 序列化器速度更快且更紧凑。 | 3.4.0 |
spark.history.custom.executor.log.url | (无) | 指定自定义 Spark Executor 日志 URL,用于支持外部日志服务,而不是在历史服务器中使用集群管理器的应用程序日志 URL。Spark 将通过模式支持一些路径变量,这些模式可能因集群管理器而异。请查阅您的集群管理器的文档以了解支持哪些模式(如果有)。此配置对正在运行的应用程序没有影响,它只影响历史服务器。 目前,只有 YARN 模式支持此配置 |
3.0.0 |
spark.history.custom.executor.log.url.applyIncompleteApplication | true | 指定是否也将自定义 Spark Executor 日志 URL 应用于未完成的应用程序。如果正在运行的应用程序的 Executor 日志应作为原始日志 URL 提供,请将此设置为 false 。请注意,未完成的应用程序可能包括未优雅关闭的应用程序。即使设置为 true ,此配置对正在运行的应用程序也没有影响,它只影响历史服务器。 |
3.0.0 |
spark.history.fs.eventLog.rolling.maxFilesToRetain | Int.MaxValue | 将保留为未压缩状态的事件日志文件的最大数量。默认情况下,所有事件日志文件都将被保留。出于技术原因,最低值为 1。 请阅读“对旧事件日志文件应用压缩”部分了解更多详细信息。 |
3.0.0 |
spark.history.store.hybridStore.enabled | false | 解析事件日志时是否使用 HybridStore 作为存储。HybridStore 将首先把数据写入内存存储,并在内存存储写入完成后,有一个后台线程将数据转储到磁盘存储。 | 3.1.0 |
spark.history.store.hybridStore.maxMemoryUsage | 2g | 可用于创建 HybridStore 的最大内存空间。HybridStore 共享堆内存,因此如果启用 HybridStore,应通过 SHS 的内存选项增加堆内存。 | 3.1.0 |
spark.history.store.hybridStore.diskBackend | ROCKSDB | 指定混合存储中使用的基于磁盘的存储;ROCKSDB 或 LEVELDB(已弃用)。 | 3.3.0 |
spark.history.fs.update.batchSize | Int.MaxValue | 指定更新新事件日志文件的批处理大小。这控制了每个扫描过程在合理时间内完成,从而防止初始扫描运行时间过长并在大型环境中及时阻止新事件日志文件被扫描。 | 3.4.0 |
请注意,在所有这些 UI 中,表格都可以通过单击其标题进行排序,从而轻松识别慢任务、数据倾斜等。
注意
-
历史服务器显示已完成和未完成的 Spark 作业。如果应用程序在失败后进行了多次尝试,则会显示失败的尝试,以及任何正在进行的未完成尝试或最终成功的尝试。
-
未完成的应用程序仅间歇性更新。更新之间的时间由检查更改文件 (
spark.history.fs.update.interval
) 的间隔定义。在大型集群上,更新间隔可能会设置为较大的值。查看正在运行的应用程序的实际方法是查看其自己的 Web UI。 -
退出时未将自身注册为已完成的应用程序将列为未完成——即使它们不再运行。如果应用程序崩溃,可能会发生这种情况。
-
表示 Spark 作业完成的一种方法是显式停止 Spark Context (
sc.stop()
),或者在 Python 中使用with SparkContext() as sc:
构造来处理 Spark Context 的设置和拆除。
REST API
除了在 UI 中查看指标外,它们也可以作为 JSON 提供。这为开发人员提供了一种创建新的 Spark 可视化和监控工具的简便方法。JSON 可用于正在运行的应用程序和历史服务器。端点安装在 /api/v1
。例如,对于历史服务器,它们通常可以在 http://<server-url>:18080/api/v1
访问,对于正在运行的应用程序,可以在 http://localhost:4040/api/v1
访问。
在 API 中,应用程序通过其应用程序 ID [app-id]
进行引用。在 YARN 上运行时,每个应用程序可能进行多次尝试,但只有集群模式下的应用程序有尝试 ID,客户端模式下的应用程序没有。YARN 集群模式下的应用程序可以通过其 [attempt-id]
进行识别。在下面列出的 API 中,当在 YARN 集群模式下运行时,[app-id]
实际上将是 [base-app-id]/[attempt-id]
,其中 [base-app-id]
是 YARN 应用程序 ID。
端点 | 含义 |
---|---|
/applications |
所有应用程序的列表。?status=[completed|running] 仅列出所选状态的应用程序。?minDate=[date] 列出最早的开始日期/时间。?maxDate=[date] 列出最晚的开始日期/时间。?minEndDate=[date] 列出最早的结束日期/时间。?maxEndDate=[date] 列出最晚的结束日期/时间。?limit=[limit] 限制列出的应用程序数量。示例 ?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10 |
/applications/[app-id]/jobs |
给定应用程序的所有作业列表。?status=[running|succeeded|failed|unknown] 仅列出特定状态的作业。 |
/applications/[app-id]/jobs/[job-id] |
给定作业的详细信息。 |
/applications/[app-id]/stages |
给定应用程序的所有阶段列表。?status=[active|complete|pending|failed] 仅列出给定状态的阶段。?details=true 列出所有包含任务数据的阶段。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 仅列出具有指定任务状态的任务。查询参数 taskStatus 仅在 details=true 时生效。这也支持多个 taskStatus ,例如 ?details=true&taskStatus=SUCCESS&taskStatus=FAILED ,它将返回匹配任何指定任务状态的所有任务。?withSummaries=true 列出包含任务指标分布和 Executor 指标分布的阶段。?quantiles=0.0,0.25,0.5,0.75,1.0 使用给定的分位数汇总指标。查询参数 quantiles 仅在 withSummaries=true 时生效。默认值为 0.0,0.25,0.5,0.75,1.0 。 |
/applications/[app-id]/stages/[stage-id] |
给定阶段的所有尝试列表。?details=true 列出给定阶段所有包含任务数据的尝试。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 仅列出具有指定任务状态的任务。查询参数 taskStatus 仅在 details=true 时生效。这也支持多个 taskStatus ,例如 ?details=true&taskStatus=SUCCESS&taskStatus=FAILED ,它将返回匹配任何指定任务状态的所有任务。?withSummaries=true 列出每个尝试的任务指标分布和 Executor 指标分布。?quantiles=0.0,0.25,0.5,0.75,1.0 使用给定的分位数汇总指标。查询参数 quantiles 仅在 withSummaries=true 时生效。默认值为 0.0,0.25,0.5,0.75,1.0 。示例 ?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] |
给定阶段尝试的详细信息。?details=true 列出给定阶段尝试的所有任务数据。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 仅列出具有指定任务状态的任务。查询参数 taskStatus 仅在 details=true 时生效。这也支持多个 taskStatus ,例如 ?details=true&taskStatus=SUCCESS&taskStatus=FAILED ,它将返回匹配任何指定任务状态的所有任务。?withSummaries=true 列出给定阶段尝试的任务指标分布和 Executor 指标分布。?quantiles=0.0,0.25,0.5,0.75,1.0 使用给定的分位数汇总指标。查询参数 quantiles 仅在 withSummaries=true 时生效。默认值为 0.0,0.25,0.5,0.75,1.0 。示例 ?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary |
给定阶段尝试中所有任务的汇总指标。?quantiles 使用给定的分位数汇总指标。示例: ?quantiles=0.01,0.5,0.99 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList |
给定阶段尝试的所有任务列表。?offset=[offset]&length=[len] 列出给定范围内的任务。?sortBy=[runtime|-runtime] 对任务进行排序。?status=[running|success|killed|failed|unknown] 仅列出处于该状态的任务。示例: ?offset=10&length=50&sortBy=runtime&status=running |
/applications/[app-id]/executors |
给定应用程序所有活跃 Executor 的列表。 |
/applications/[app-id]/executors/[executor-id]/threads |
给定活跃 Executor 中所有正在运行线程的堆栈跟踪。历史服务器不可用。 |
/applications/[app-id]/allexecutors |
给定应用程序所有(活跃和已终止)Executor 的列表。 |
/applications/[app-id]/storage/rdd |
给定应用程序的已存储 RDD 列表。 |
/applications/[app-id]/storage/rdd/[rdd-id] |
给定 RDD 存储状态的详细信息。 |
/applications/[base-app-id]/logs |
将给定应用程序所有尝试的事件日志作为 zip 文件中的文件下载。 |
/applications/[base-app-id]/[attempt-id]/logs |
将特定应用程序尝试的事件日志作为 zip 文件下载。 |
/applications/[app-id]/streaming/statistics |
流上下文的统计信息。 |
/applications/[app-id]/streaming/receivers |
所有流接收器列表。 |
/applications/[app-id]/streaming/receivers/[stream-id] |
给定接收器的详细信息。 |
/applications/[app-id]/streaming/batches |
所有保留批次列表。 |
/applications/[app-id]/streaming/batches/[batch-id] |
给定批次的详细信息。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations |
给定批次的所有输出操作列表。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] |
给定操作和给定批次的详细信息。 |
/applications/[app-id]/sql |
给定应用程序的所有查询列表。?details=[true (默认) | false] 列出/隐藏 Spark 计划节点的详细信息。?planDescription=[true (默认) | false] 在物理计划大小较高时按需启用/禁用物理 planDescription 。?offset=[offset]&length=[len] 列出给定范围内的查询。 |
/applications/[app-id]/sql/[execution-id] |
给定查询的详细信息。?details=[true (默认) | false] 除了给定的查询详细信息外,还列出/隐藏指标详细信息。?planDescription=[true (默认) | false] 在物理计划大小较高时,按需启用/禁用给定查询的物理 planDescription 。 |
/applications/[app-id]/environment |
给定应用程序的环境详细信息。 |
/version |
获取当前 Spark 版本。 |
可检索的作业和阶段数量受到独立 Spark UI 相同保留机制的限制;"spark.ui.retainedJobs"
定义了触发作业垃圾回收的阈值,而 spark.ui.retainedStages
则定义了阶段的阈值。请注意,垃圾回收发生在回放时:通过增加这些值并重新启动历史服务器,可以检索更多条目。
Executor 任务指标
REST API 以任务执行的粒度公开了 Spark Executor 收集的任务指标值。这些指标可用于性能故障排除和工作负载特征分析。以下是可用指标的列表,附带简要说明:
Spark Executor 任务指标名称 | 简短描述 |
---|---|
executorRunTime | Executor 运行此任务所花费的**时间**。这包括获取 shuffle 数据的时间。该值以毫秒表示。 |
executorCpuTime | Executor 运行此任务所花费的 CPU **时间**。这包括获取 shuffle 数据的时间。该值以纳秒表示。 |
executorDeserializeTime | 反序列化此任务所花费的**时间**。该值以毫秒表示。 |
executorDeserializeCpuTime | Executor 反序列化此任务所花费的 CPU **时间**。该值以纳秒表示。 |
resultSize | 此任务作为 TaskResult 传回驱动程序的字节数。 |
jvmGCTime | JVM 在执行此任务期间进行垃圾回收所花费的**时间**。该值以毫秒表示。 |
ConcurrentGCCount | 此指标返回已发生的收集总数。仅当 Java 垃圾回收器为 G1 并发 GC 时适用。 |
ConcurrentGCTime | 此指标返回以毫秒为单位的近似累积收集**时间**。仅当 Java 垃圾回收器为 G1 并发 GC 时适用。 |
resultSerializationTime | 序列化任务结果所花费的**时间**。该值以毫秒表示。 |
memoryBytesSpilled | 此任务溢出的内存中字节数。 |
diskBytesSpilled | 此任务溢出的磁盘上字节数。 |
peakExecutionMemory | 在 shuffle、聚合和 join 期间创建的内部数据结构使用的峰值内存。此累加器的值应大致等于此任务中创建的所有此类数据结构的峰值大小之和。对于 SQL 作业,这仅跟踪所有非安全操作符和 ExternalSort。 |
inputMetrics.* | 与从 org.apache.spark.rdd.HadoopRDD 或持久化数据读取数据相关的指标。 |
.bytesRead | 读取的总字节数。 |
.recordsRead | 读取的总记录数。 |
outputMetrics.* | 与向外部写入数据(例如,写入分布式文件系统)相关的指标,仅在具有输出的任务中定义。 |
.bytesWritten | 写入的总字节数 |
.recordsWritten | 写入的总记录数 |
shuffleReadMetrics.* | 与 shuffle 读取操作相关的指标。 |
.recordsRead | shuffle 操作中读取的记录数 |
.remoteBlocksFetched | shuffle 操作中获取的远程块数量 |
.localBlocksFetched | shuffle 操作中获取的本地(与从远程 Executor 读取相对)块的数量 |
.totalBlocksFetched | shuffle 操作中获取的块数量(包括本地和远程) |
.remoteBytesRead | shuffle 操作中读取的远程字节数 |
.localBytesRead | shuffle 操作中从本地磁盘读取的字节数(与从远程 Executor 读取相对) |
.totalBytesRead | shuffle 操作中读取的字节数(包括本地和远程) |
.remoteBytesReadToDisk | shuffle 操作中读取到磁盘的远程字节数。在 shuffle 读取操作中,大块数据会获取到磁盘,而不是读取到内存中(这是默认行为)。 |
.fetchWaitTime | 任务等待远程 shuffle 块的时间。这仅包括阻塞在 shuffle 输入数据上的时间。例如,如果任务在处理块 A 尚未完成时正在获取块 B,则不认为它阻塞在块 B 上。该值以毫秒表示。 |
shuffleWriteMetrics.* | 与写入 shuffle 数据操作相关的指标。 |
.bytesWritten | shuffle 操作中写入的字节数 |
.recordsWritten | shuffle 操作中写入的记录数 |
.writeTime | 阻塞在磁盘写入或缓冲区缓存写入上的时间。该值以纳秒表示。 |
Executor 指标
Executor 级别的指标作为心跳的一部分从每个 Executor 发送到驱动程序,以描述 Executor 本身的性能指标,例如 JVM 堆内存、GC 信息。Executor 指标值及其每个 Executor 测量的内存峰值通过 REST API 以 JSON 格式和 Prometheus 格式公开。JSON 端点位于:/applications/[app-id]/executors
,Prometheus 端点位于:/metrics/executors/prometheus
。此外,如果 spark.eventLog.logStageExecutorMetrics
为 true,则 Executor 内存指标的按阶段聚合峰值会写入事件日志。Executor 内存指标也通过基于 Dropwizard metrics library 的 Spark 指标系统公开。以下是可用指标的列表,附带简要描述:
Executor 级别指标名称 | 简短描述 |
---|---|
rddBlocks | 此 Executor 块管理器中的 RDD 块。 |
memoryUsed | 此 Executor 使用的存储内存。 |
diskUsed | 此 Executor 用于 RDD 存储的磁盘空间。 |
totalCores | 此 Executor 中可用的核心数量。 |
maxTasks | 此 Executor 中可同时运行的最大任务数。 |
activeTasks | 当前正在执行的任务数。 |
failedTasks | 此 Executor 中已失败的任务数。 |
completedTasks | 此 Executor 中已完成的任务数。 |
totalTasks | 此 Executor 中任务总数(运行中、失败和已完成)。 |
totalDuration | JVM 在此 Executor 中执行任务所花费的**时间**。该值以毫秒表示。 |
totalGCTime | JVM 在此 Executor 中进行垃圾回收所花费的**总时间**。该值以毫秒表示。 |
totalInputBytes | 此 Executor 中输入的总字节数。 |
totalShuffleRead | 此 Executor 中 shuffle 读取的总字节数。 |
totalShuffleWrite | 此 Executor 中 shuffle 写入的总字节数。 |
maxMemory | 可用于存储的内存总量,以字节为单位。 |
memoryMetrics.* | 内存指标的当前值 |
.usedOnHeapStorageMemory | 当前用于存储的堆内内存,以字节为单位。 |
.usedOffHeapStorageMemory | 当前用于存储的堆外内存,以字节为单位。 |
.totalOnHeapStorageMemory | 可用于存储的堆内内存总量,以字节为单位。此数量可能随 MemoryManager 实现而异。 |
.totalOffHeapStorageMemory | 可用于存储的堆外内存总量,以字节为单位。此数量可能随 MemoryManager 实现而异。 |
peakMemoryMetrics.* | 内存(和 GC)指标的峰值 |
.JVMHeapMemory | 用于对象分配的堆的峰值内存使用量。堆由一个或多个内存池组成。返回的内存使用量中已使用和已提交的大小是所有堆内存池这些值的总和,而返回的内存使用量中初始和最大大小表示堆内存的设置,可能不是所有堆内存池这些值的总和。返回的内存使用量中已使用的内存量是活对象和尚未收集的垃圾对象(如果有)所占用的内存量。 |
.JVMOffHeapMemory | Java 虚拟机使用的非堆内存的峰值内存使用量。非堆内存由一个或多个内存池组成。返回的内存使用量中已使用和已提交的大小是所有非堆内存池这些值的总和,而返回的内存使用量中初始和最大大小表示非堆内存的设置,可能不是所有非堆内存池这些值的总和。 |
.OnHeapExecutionMemory | 使用的峰值堆内执行内存,以字节为单位。 |
.OffHeapExecutionMemory | 使用的峰值堆外执行内存,以字节为单位。 |
.OnHeapStorageMemory | 使用的峰值堆内存储内存,以字节为单位。 |
.OffHeapStorageMemory | 使用的峰值堆外存储内存,以字节为单位。 |
.OnHeapUnifiedMemory | 堆内内存峰值(执行和存储)。 |
.OffHeapUnifiedMemory | 堆外内存峰值(执行和存储)。 |
.DirectPoolMemory | JVM 用于直接缓冲区池的峰值内存 (java.lang.management.BufferPoolMXBean ) |
.MappedPoolMemory | JVM 用于映射缓冲区池的峰值内存 (java.lang.management.BufferPoolMXBean ) |
.ProcessTreeJVMVMemory | 虚拟内存大小,以字节为单位。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。 |
.ProcessTreeJVMRSSMemory | 常驻集大小 (RSS):进程在实际内存中拥有的页数。这仅计算文本、数据或堆栈空间所占用的页。不包括尚未按需加载或已交换出去的页。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。 |
.ProcessTreePythonVMemory | Python 的虚拟内存大小,以字节为单位。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。 |
.ProcessTreePythonRSSMemory | Python 的常驻集大小。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。 |
.ProcessTreeOtherVMemory | 其他类型进程的虚拟内存大小,以字节为单位。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。 |
.ProcessTreeOtherRSSMemory | 其他类型进程的常驻集大小。如果 spark.executor.processTreeMetrics.enabled 为 true,则启用。 |
.MinorGCCount | 次要 GC 总数。例如,垃圾回收器可以是 Copy、PS Scavenge、ParNew、G1 Young Generation 等。 |
.MinorGCTime | 次要 GC 的总**时间**。该值以毫秒表示。 |
.MajorGCCount | 主要 GC 总数。例如,垃圾回收器可以是 MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation 等。 |
.MajorGCTime | 主要 GC 的总**时间**。该值以毫秒表示。 |
RSS 和 Vmem 的计算基于 proc(5)
API 版本策略
这些端点已经过严格版本控制,以便于在其上开发应用程序。特别是,Spark 保证:
- 一个版本中的端点永远不会被移除
- 任何给定端点的单个字段永远不会被移除
- 可能会添加新的端点
- 可能会向现有端点添加新字段
- 未来可能会以单独的端点(例如
api/v2
)添加 API 的新版本。新版本不要求向后兼容。 - API 版本可能会被弃用,但只有在与新 API 版本共存至少一个次要版本之后。
请注意,即使在检查正在运行的应用程序的 UI 时,applications/[app-id]
部分仍然是必需的,尽管只有一个可用的应用程序。例如,要查看正在运行的应用程序的作业列表,您需要访问 http://localhost:4040/api/v1/applications/[app-id]/jobs
。这是为了保持两种模式下路径的一致性。
指标
Spark 有一个基于 Dropwizard Metrics Library 的可配置指标系统。这允许用户将 Spark 指标报告给各种接收器,包括 HTTP、JMX 和 CSV 文件。这些指标由嵌入在 Spark 代码库中的源生成。它们为特定活动和 Spark 组件提供仪器。指标系统通过 Spark 期望存在于 $SPARK_HOME/conf/metrics.properties
的配置文件进行配置。可以通过 spark.metrics.conf
配置属性指定自定义文件位置。除了使用配置文件外,还可以使用一组带有前缀 spark.metrics.conf.
的配置参数。默认情况下,用于驱动程序或 Executor 指标的根命名空间是 spark.app.id
的值。然而,很多时候,用户希望能够跟踪驱动程序和 Executor 在应用程序之间的指标,这在使用应用程序 ID(即 spark.app.id
)时很难做到,因为它会随着应用程序的每次调用而改变。对于此类用例,可以使用 spark.metrics.namespace
配置属性为指标报告指定自定义命名空间。例如,如果用户希望将指标命名空间设置为应用程序的名称,他们可以将 spark.metrics.namespace
属性设置为类似 ${spark.app.name}
的值。然后 Spark 会适当地展开此值,并将其用作指标系统的根命名空间。非驱动程序和 Executor 指标永远不会以 spark.app.id
为前缀,spark.metrics.namespace
属性对此类指标也没有任何影响。
Spark 的指标被解耦为与 Spark 组件对应的不同**实例**。在每个实例中,您可以配置一组用于报告指标的接收器。目前支持以下实例:
master
: Spark standalone master 进程。applications
: master 中的一个组件,用于报告各种应用程序。worker
: Spark standalone worker 进程。executor
: Spark Executor。driver
: Spark 驱动程序进程(创建 SparkContext 的进程)。shuffleService
: Spark Shuffle 服务。applicationMaster
: 在 YARN 上运行时,Spark 的 ApplicationMaster。
每个实例可以向零个或多个**接收器**报告。接收器包含在 org.apache.spark.metrics.sink
包中:
ConsoleSink
: 将指标信息记录到控制台。CSVSink
: 定期将指标数据导出到 CSV 文件。JmxSink
: 注册指标以在 JMX 控制台中查看。MetricsServlet
: 在现有 Spark UI 中添加一个 Servlet,用于将指标数据作为 JSON 数据提供服务。PrometheusServlet
: (实验性) 在现有 Spark UI 中添加一个 Servlet,用于以 Prometheus 格式提供指标数据。GraphiteSink
: 将指标发送到 Graphite 节点。Slf4jSink
: 将指标作为日志条目发送到 slf4j。StatsdSink
: 将指标发送到 StatsD 节点。
Prometheus Servlet 镜像了 Metrics Servlet
和 REST API 公开的 JSON 数据,但采用时间序列格式。以下是等效的 Prometheus Servlet 端点。
组件 | 端口 | JSON 端点 | Prometheus 端点 |
---|---|---|---|
Master | 8080 | /metrics/master/json/ |
/metrics/master/prometheus/ |
Master | 8080 | /metrics/applications/json/ |
/metrics/applications/prometheus/ |
Worker | 8081 | /metrics/json/ |
/metrics/prometheus/ |
Driver | 4040 | /metrics/json/ |
/metrics/prometheus/ |
Driver | 4040 | /api/v1/applications/{id}/executors/ |
/metrics/executors/prometheus/ |
Spark 还支持 Ganglia 接收器,由于许可限制,它不包含在默认构建中
GangliaSink
: 将指标发送到 Ganglia 节点或多播组。
要安装 GangliaSink
,您需要执行 Spark 的自定义构建。请注意,通过嵌入此库,您的 Spark 包将包含 LGPL 许可的代码。对于 sbt 用户,请在构建之前设置 SPARK_GANGLIA_LGPL
环境变量。对于 Maven 用户,请启用 -Pspark-ganglia-lgpl
配置文件。除了修改集群的 Spark 构建之外,用户应用程序还需要链接到 spark-ganglia-lgpl
工件。
指标配置文件的语法以及每个接收器可用的参数在示例配置文件 $SPARK_HOME/conf/metrics.properties.template
中定义。
当使用 Spark 配置参数而不是指标配置文件时,相关的参数名称由前缀 spark.metrics.conf.
后跟配置详细信息组成,即参数采用以下形式:spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name]
。此示例显示了 Graphite 接收器的 Spark 配置参数列表:
"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"
Spark 指标配置的默认值如下:
"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"
可以使用指标配置文件或配置参数 spark.metrics.conf.[component_name].source.jvm.class=[source_name]
配置其他源。目前,JVM 源是唯一可用的可选源。例如,以下配置参数激活 JVM 源:"spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"
可用指标提供者列表
Spark 使用的指标有多种类型:gauge、counter、histogram、meter 和 timer,详见 Dropwizard 库文档。以下组件和指标列表报告了可用指标的名称和一些详细信息,按组件实例和源命名空间分组。Spark 仪器中最常见的指标类型是 gauge 和 counter。Counter 可以通过其 .count
后缀识别。Timer、meter 和 histogram 在列表中有标注,列表的其余元素是 gauge 类型的指标。绝大多数指标在其父组件实例配置后立即激活,某些指标还需要通过额外的配置参数启用,详细信息在列表中报告。
组件实例 = Driver
这是具有最多仪器化指标的组件
- namespace=BlockManager
- disk.diskSpaceUsed_MB
- memory.maxMem_MB
- memory.maxOffHeapMem_MB
- memory.maxOnHeapMem_MB
- memory.memUsed_MB
- memory.offHeapMemUsed_MB
- memory.onHeapMemUsed_MB
- memory.remainingMem_MB
- memory.remainingOffHeapMem_MB
- memory.remainingOnHeapMem_MB
- namespace=HiveExternalCatalog
- **注意:** 这些指标取决于一个配置参数:
spark.metrics.staticSources.enabled
(默认为 true) - fileCacheHits.count
- filesDiscovered.count
- hiveClientCalls.count
- parallelListingJobCount.count
- partitionsFetched.count
- **注意:** 这些指标取决于一个配置参数:
- namespace=CodeGenerator
- **注意:** 这些指标取决于一个配置参数:
spark.metrics.staticSources.enabled
(默认为 true) - compilationTime (直方图)
- generatedClassSize (直方图)
- generatedMethodSize (直方图)
- sourceCodeSize (直方图)
- **注意:** 这些指标取决于一个配置参数:
- namespace=DAGScheduler
- job.activeJobs
- job.allJobs
- messageProcessingTime (计时器)
- stage.failedStages
- stage.runningStages
- stage.waitingStages
- namespace=LiveListenerBus
- listenerProcessingTime.org.apache.spark.HeartbeatReceiver (计时器)
- listenerProcessingTime.org.apache.spark.scheduler.EventLoggingListener (计时器)
- listenerProcessingTime.org.apache.spark.status.AppStatusListener (计时器)
- numEventsPosted.count
- queue.appStatus.listenerProcessingTime (计时器)
- queue.appStatus.numDroppedEvents.count
- queue.appStatus.size
- queue.eventLog.listenerProcessingTime (计时器)
- queue.eventLog.numDroppedEvents.count
- queue.eventLog.size
- queue.executorManagement.listenerProcessingTime (计时器)
- namespace=appStatus (所有指标类型均为 counter)
- **注意:** 在 Spark 3.0 中引入。取决于一个配置参数:
spark.metrics.appStatusSource.enabled
(默认为 true) - stages.failedStages.count
- stages.skippedStages.count
- stages.completedStages.count
- tasks.blackListedExecutors.count // 已弃用,请改用 excludedExecutors
- tasks.excludedExecutors.count
- tasks.completedTasks.count
- tasks.failedTasks.count
- tasks.killedTasks.count
- tasks.skippedTasks.count
- tasks.unblackListedExecutors.count // 已弃用,请改用 unexcludedExecutors
- tasks.unexcludedExecutors.count
- jobs.succeededJobs
- jobs.failedJobs
- jobDuration
- **注意:** 在 Spark 3.0 中引入。取决于一个配置参数:
- namespace=AccumulatorSource
- **注意:** 用户可配置的源,用于将累加器附加到指标系统
- DoubleAccumulatorSource
- LongAccumulatorSource
- namespace=spark.streaming
- **注意:** 这仅适用于 Spark Structured Streaming。取决于一个配置参数:
spark.sql.streaming.metricsEnabled=true
(默认为 false) - eventTime-watermark
- inputRate-total
- latency
- processingRate-total
- states-rowsTotal
- states-usedBytes
- **注意:** 这仅适用于 Spark Structured Streaming。取决于一个配置参数:
- namespace=JVMCPU
- jvmCpuTime
- namespace=executor
- **注意:** 这些指标仅在本地模式下在驱动程序中可用。
- 此命名空间中可用指标的完整列表可在 Executor 组件实例的相应条目中找到。
- namespace=ExecutorMetrics
- **注意:** 这些指标取决于一个配置参数:
spark.metrics.executorMetricsSource.enabled
(默认为 true) - 此源包含与内存相关的指标。此命名空间中可用指标的完整列表可在 Executor 组件实例的相应条目中找到。
- **注意:** 这些指标取决于一个配置参数:
- namespace=ExecutorAllocationManager
- **注意:** 这些指标仅在使用动态分配时发出。取决于一个配置参数
spark.dynamicAllocation.enabled
(默认为 false) - executors.numberExecutorsToAdd
- executors.numberExecutorsPendingToRemove
- executors.numberAllExecutors
- executors.numberTargetExecutors
- executors.numberMaxNeededExecutors
- executors.numberDecommissioningExecutors
- executors.numberExecutorsGracefullyDecommissioned.count
- executors.numberExecutorsDecommissionUnfinished.count
- executors.numberExecutorsExitedUnexpectedly.count
- executors.numberExecutorsKilledByDriver.count
- **注意:** 这些指标仅在使用动态分配时发出。取决于一个配置参数
- namespace=plugin.<Plugin Class Name>
- 可选命名空间。此命名空间中的指标由用户提供的代码定义,并使用 Spark 插件 API 配置。有关如何将自定义插件加载到 Spark 中,请参见下面的“高级仪器”部分。
组件实例 = Executor
这些指标由 Spark Executor 公开。
- namespace=executor (指标类型为 counter 或 gauge)
- 备注
spark.executor.metrics.fileSystemSchemes
(默认:file,hdfs
)决定了公开的文件系统指标。
- bytesRead.count
- bytesWritten.count
- cpuTime.count
- deserializeCpuTime.count
- deserializeTime.count
- diskBytesSpilled.count
- filesystem.file.largeRead_ops
- filesystem.file.read_bytes
- filesystem.file.read_ops
- filesystem.file.write_bytes
- filesystem.file.write_ops
- filesystem.hdfs.largeRead_ops
- filesystem.hdfs.read_bytes
- filesystem.hdfs.read_ops
- filesystem.hdfs.write_bytes
- filesystem.hdfs.write_ops
- jvmGCTime.count
- memoryBytesSpilled.count
- recordsRead.count
- recordsWritten.count
- resultSerializationTime.count
- resultSize.count
- runTime.count
- shuffleBytesWritten.count
- shuffleFetchWaitTime.count
- shuffleLocalBlocksFetched.count
- shuffleLocalBytesRead.count
- shuffleRecordsRead.count
- shuffleRecordsWritten.count
- shuffleRemoteBlocksFetched.count
- shuffleRemoteBytesRead.count
- shuffleRemoteBytesReadToDisk.count
- shuffleTotalBytesRead.count
- shuffleWriteTime.count
- 与基于推送的 Shuffle 相关的指标
- shuffleCorruptMergedBlockChunks
- shuffleMergedFetchFallbackCount
- shuffleMergedRemoteBlocksFetched
- shuffleMergedLocalBlocksFetched
- shuffleMergedRemoteChunksFetched
- shuffleMergedLocalChunksFetched
- shuffleMergedRemoteBytesRead
- shuffleMergedLocalBytesRead
- shuffleRemoteReqsDuration
- shuffleMergedRemoteReqsDuration
- succeededTasks.count
- threadpool.activeTasks
- threadpool.completeTasks
- threadpool.currentPool_size
- threadpool.maxPool_size
- threadpool.startedTasks
- 备注
- namespace=ExecutorMetrics
- 备注
- 这些指标取决于一个配置参数:
spark.metrics.executorMetricsSource.enabled
(默认值为 true) - ExecutorMetrics 作为心跳进程的一部分进行更新,这些进程会定期为 Executor 和驱动程序调度:
spark.executor.heartbeatInterval
(默认值为 10 秒) - Executor 内存指标提供了一个可选的更快的轮询机制,可以通过使用配置参数
spark.executor.metrics.pollingInterval
设置轮询间隔(以毫秒为单位)来激活它。
- 这些指标取决于一个配置参数:
- JVMHeapMemory
- JVMOffHeapMemory
- OnHeapExecutionMemory
- OnHeapStorageMemory
- OnHeapUnifiedMemory
- OffHeapExecutionMemory
- OffHeapStorageMemory
- OffHeapUnifiedMemory
- DirectPoolMemory
- MappedPoolMemory
- MinorGCCount
- MinorGCTime
- MajorGCCount
- MajorGCTime
- “ProcessTree*” 指标计数器
- ProcessTreeJVMVMemory
- ProcessTreeJVMRSSMemory
- ProcessTreePythonVMemory
- ProcessTreePythonRSSMemory
- ProcessTreeOtherVMemory
- ProcessTreeOtherRSSMemory
- **注意:** “ProcessTree*” 指标仅在特定条件下收集。这些条件是以下条件的逻辑与:
/proc
文件系统存在,spark.executor.processTreeMetrics.enabled=true
。“ProcessTree*” 指标在不满足这些条件时报告 0。
- 备注
- namespace=JVMCPU
- jvmCpuTime
- namespace=NettyBlockTransfer
- shuffle-client.usedDirectMemory
- shuffle-client.usedHeapMemory
- shuffle-server.usedDirectMemory
- shuffle-server.usedHeapMemory
- namespace=HiveExternalCatalog
- **注意:** 这些指标取决于一个配置参数:
spark.metrics.staticSources.enabled
(默认为 true) - fileCacheHits.count
- filesDiscovered.count
- hiveClientCalls.count
- parallelListingJobCount.count
- partitionsFetched.count
- **注意:** 这些指标取决于一个配置参数:
- namespace=CodeGenerator
- **注意:** 这些指标取决于一个配置参数:
spark.metrics.staticSources.enabled
(默认为 true) - compilationTime (直方图)
- generatedClassSize (直方图)
- generatedMethodSize (直方图)
- sourceCodeSize (直方图)
- **注意:** 这些指标取决于一个配置参数:
- namespace=plugin.<Plugin Class Name>
- 可选命名空间。此命名空间中的指标由用户提供的代码定义,并使用 Spark 插件 API 配置。有关如何将自定义插件加载到 Spark 中,请参见下面的“高级仪器”部分。
源 = JVM 源
备注
- 通过设置相关的
metrics.properties
文件条目或配置参数来激活此源:spark.metrics.conf.*.source.jvm.class=org.apache.spark.metrics.source.JvmSource
- 这些指标取决于一个配置参数:
spark.metrics.staticSources.enabled
(默认为 true) - 此源适用于驱动程序和 Executor 实例,也适用于其他实例。
- 此源使用 Dropwizard/Codahale Metric Sets for JVM instrumentation 提供 JVM 指标信息,特别是 BufferPoolMetricSet、GarbageCollectorMetricSet 和 MemoryUsageGaugeSet 指标集。
组件实例 = ApplicationMaster
注意:在 YARN 上运行时适用
- numContainersPendingAllocate
- numExecutorsFailed
- numExecutorsRunning
- numLocalityAwareTasks
- numReleasedContainers
组件实例 = master
注意:作为 Master 在 Spark Standalone 中运行时适用
- workers
- aliveWorkers
- apps
- waitingApps
组件实例 = ApplicationSource
注意:作为 Master 在 Spark Standalone 中运行时适用
- status
- runtime_ms
- cores
组件实例 = worker
注意:作为 Worker 在 Spark Standalone 中运行时适用
- executors
- coresUsed
- memUsed_MB
- coresFree
- memFree_MB
组件实例 = ShuffleService
注意:适用于 Shuffle 服务
- blockTransferRate (计量器) - 传输块的速率
- blockTransferMessageRate (计量器) - 块传输消息的速率,即如果启用了批量获取,这表示批次的数量而不是块的数量
- blockTransferRateBytes (计量器)
- blockTransferAvgSize_1min (仪表盘 - 1分钟移动平均值)
- numActiveConnections.count
- numRegisteredConnections.count
- numCaughtExceptions.count
- openBlockRequestLatencyMillis (计时器)
- registerExecutorRequestLatencyMillis (计时器)
- fetchMergedBlocksMetaLatencyMillis (计时器)
- finalizeShuffleMergeLatencyMillis (计时器)
- registeredExecutorsSize
- shuffle-server.usedDirectMemory
-
shuffle-server.usedHeapMemory
- **注意:** 当服务器端配置
spark.shuffle.push.server.mergedShuffleFileManagerImpl
设置为org.apache.spark.network.shuffle.MergedShuffleFileManager
用于基于推送的 Shuffle 时,以下指标适用: - blockBytesWritten - 写入文件的推送块数据大小,以字节为单位
- blockAppendCollisions - 在 shuffle 服务中,当同一 reduce 分区的另一个块正在写入时,发生冲突的 shuffle 推送块的数量
- lateBlockPushes - 在特定 shuffle 合并完成后,shuffle 服务中收到的 shuffle 推送块的数量
- deferredBlocks - 当前在内存中缓冲的延迟块部分的数量
- deferredBlockBytes - 当前在内存中缓冲的延迟块部分的大小
- staleBlockPushes - 过时 shuffle 块推送请求的数量
- ignoredBlockBytes - 已传输到 ESS 但被忽略的推送块数据大小。推送的块数据在以下情况下被视为忽略:1. 在 shuffle 完成后收到;2. 推送请求是针对重复块的;3. ESS 无法写入该块。
高级仪器
可以使用多种外部工具来帮助分析 Spark 作业的性能:
- 集群范围的监控工具,例如 Ganglia,可以深入了解整体集群利用率和资源瓶颈。例如,Ganglia 仪表板可以快速揭示特定工作负载是磁盘受限、网络受限还是 CPU 受限。
- 操作系统分析工具,例如 dstat、iostat 和 iotop,可以在单个节点上提供细粒度分析。
- JVM 工具,例如用于提供堆栈跟踪的
jstack
、用于创建堆转储的jmap
、用于报告时间序列统计信息的jstat
和用于可视化探索各种 JVM 属性的jconsole
,对于熟悉 JVM 内部的人来说非常有用。
Spark 还提供了插件 API,以便可以将自定义仪器代码添加到 Spark 应用程序中。有两个可用于将插件加载到 Spark 中的配置键:
spark.plugins
spark.plugins.defaultList
两者都接受一个逗号分隔的类名列表,这些类名实现了 org.apache.spark.api.plugin.SparkPlugin
接口。存在这两个名称是为了可以将一个列表放置在 Spark 默认配置文件中,允许用户轻松地从命令行添加其他插件,而无需覆盖配置文件的列表。重复的插件将被忽略。