Spark 配置
Spark 提供了三个位置来配置系统:
- Spark 属性 控制大多数应用程序参数,可以通过 SparkConf 对象或 Java 系统属性进行设置。
- 环境变量 可用于设置单机配置,例如通过每个节点上的
conf/spark-env.sh脚本设置 IP 地址。 - 日志记录 可以通过
log4j2.properties进行配置。
Spark 属性
Spark 属性控制大多数应用程序设置,并针对每个应用程序分别进行配置。这些属性可以直接在传递给 SparkContext 的 SparkConf 上设置。SparkConf 允许您配置一些常见属性(例如主 URL 和应用程序名称),以及通过 set() 方法设置任意键值对。例如,我们可以按如下方式初始化一个具有两个线程的应用程序:
请注意,我们在 local[2] 下运行,意味着两个线程——这代表“最小”并行度,有助于发现仅在分布式环境下才会出现的错误。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)请注意,在本地模式下我们可以拥有超过 1 个线程,而在 Spark Streaming 等情况下,我们可能实际上需要超过 1 个线程来防止任何形式的饥饿问题。
指定时间段的属性应配置为时间单位。接受以下格式:
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
指定字节大小的属性应配置为大小单位。接受以下格式:
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
虽然没有单位的数字通常被解释为字节,但少数会被解释为 KiB 或 MiB。请参阅各配置属性的文档。尽可能明确指定单位。
动态加载 Spark 属性
在某些情况下,您可能希望避免在 SparkConf 中硬编码某些配置。例如,如果您想使用不同的 master 或不同的内存量运行同一个应用程序。Spark 允许您创建一个空的 conf:
val sc = new SparkContext(new SparkConf())然后,您可以在运行时提供配置值:
./bin/spark-submit \
--name "My app" \
--master "local[4]" \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
myApp.jar
Spark shell 和 spark-submit 工具支持两种动态加载配置的方式。第一种是命令行选项,例如上面所示的 --master。spark-submit 可以使用 --conf/-c 标志接受任何 Spark 属性,但对在启动 Spark 应用程序时起作用的属性使用特殊标志。运行 ./bin/spark-submit --help 将显示这些选项的完整列表。
当通过 --conf/-c 标志指定配置时,bin/spark-submit 还会从 conf/spark-defaults.conf 读取配置选项,其中每一行由以空格分隔的键和值组成。例如:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
此外,可以通过 --properties-file 参数将带有 Spark 配置的属性文件传递给 bin/spark-submit。设置此项后,除非提供了另一个参数 --load-spark-defaults,否则 Spark 将不再从 conf/spark-defaults.conf 加载配置。
作为标志或在属性文件中指定的任何值都将传递给应用程序,并与通过 SparkConf 指定的值合并。直接在 SparkConf 上设置的属性优先级最高,其次是通过 spark-submit 或 spark-shell 传递的 --conf 标志或 --properties-file,最后是 spark-defaults.conf 文件中的选项。自 Spark 早期版本以来,一些配置键已重命名;在这种情况下,旧的键名仍然被接受,但优先级低于新键的任何实例。
Spark 属性主要分为两类:一类与部署相关,例如“spark.driver.memory”、“spark.executor.instances”,这类属性在运行时通过 SparkConf 以编程方式设置时可能不会生效,或者其行为取决于您选择的集群管理器和部署模式,因此建议通过配置文件或 spark-submit 命令行选项进行设置;另一类主要与 Spark 运行时控制相关,例如“spark.task.maxFailures”,这类属性可以用任何一种方式设置。
查看 Spark 属性
位于 http://<driver>:4040 的应用程序 Web UI 在“Environment”选项卡中列出了 Spark 属性。这是检查您的属性是否已正确设置的有用地方。请注意,只有明确通过 spark-defaults.conf、SparkConf 或命令行指定的值才会显示。对于所有其他配置属性,您可以假设使用了默认值。
可用属性
大多数控制内部设置的属性都有合理的默认值。一些最常见的设置选项包括:
应用程序属性
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.app.name |
(无) | 您的应用程序名称。这将显示在 UI 和日志数据中。 | 0.9.0 |
spark.driver.cores |
1 | 仅在集群模式下,驱动程序进程使用的核心数。 | 1.3.0 |
spark.driver.maxResultSize |
1g | 每个 Spark 操作(例如 collect)的所有分区序列化结果的总大小限制(以字节为单位)。至少应为 1M,或设为 0 表示无限制。如果总大小超过此限制,作业将被中止。设置过高的限制可能会导致驱动程序内存不足错误(取决于 spark.driver.memory 和 JVM 中对象的内存开销)。设置适当的限制可以保护驱动程序免受内存不足错误的影响。 | 1.2.0 |
spark.driver.memory |
1g | 驱动程序进程(即初始化 SparkContext 的位置)使用的内存量,格式与带有大小单位后缀(“k”、“m”、“g”或“t”)的 JVM 内存字符串相同(例如 512m, 2g)。注意: 在客户端模式下,此配置不得直接通过应用程序中的 SparkConf 设置,因为驱动程序 JVM 在此时已经启动。请改为通过 --driver-memory 命令行选项或在默认属性文件中进行设置。 |
1.1.1 |
spark.driver.memoryOverhead |
driverMemory * spark.driver.memoryOverheadFactor,最小值为 spark.driver.minMemoryOverhead |
集群模式下每个驱动程序进程分配的非堆内存量,除非另有说明,单位为 MiB。这是用于虚拟机开销、驻留字符串、其他本机开销等的内存。这往往会随着容器大小而增长(通常为 6-10%)。此选项目前在 YARN 和 Kubernetes 上受支持。注意: 非堆内存包括堆外内存(当 spark.memory.offHeap.enabled=true 时)、其他驱动程序进程使用的内存(例如随 PySpark 驱动程序运行的 python 进程)以及在同一容器中运行的其他非驱动程序进程使用的内存。运行驱动程序的容器的最大内存大小由 spark.driver.memoryOverhead 和 spark.driver.memory 的总和决定。 |
2.3.0 |
spark.driver.minMemoryOverhead |
384m | 如果未定义 spark.driver.memoryOverhead,则集群模式下每个驱动程序进程分配的最小非堆内存量,除非另有说明,单位为 MiB。此选项目前在 YARN 和 Kubernetes 上受支持。 |
4.0.0 |
spark.driver.memoryOverheadFactor |
0.10 | 集群模式下分配为驱动程序进程额外非堆内存的驱动程序内存比例。这是用于虚拟机开销、驻留字符串、其他本机开销等的内存。这往往会随着容器大小而增长。此值默认为 0.10,但对于 Kubernetes 非 JVM 作业,默认值为 0.40。这是因为非 JVM 任务需要更多的非 JVM 堆空间,此类任务通常会因“内存开销超出 (Memory Overhead Exceeded)”错误而失败。此设置通过更高的默认值来预防此错误。如果直接设置了 spark.driver.memoryOverhead,则忽略此值。 |
3.3.0 |
spark.driver.resource.{resourceName}.amount |
0 | 驱动程序上使用的特定资源类型的数量。如果使用此项,您还必须指定 spark.driver.resource.{resourceName}.discoveryScript,以便驱动程序在启动时查找资源。 |
3.0.0 |
spark.driver.resource.{resourceName}.discoveryScript |
None | 驱动程序运行以发现特定资源类型的脚本。这应向 STDOUT 写入 JSON 字符串,格式为 ResourceInformation 类。它包含一个名称和一个地址数组。对于客户端提交的驱动程序,发现脚本必须为该驱动程序分配与同一主机上的其他驱动程序不同的资源地址。 | 3.0.0 |
spark.driver.resource.{resourceName}.vendor |
None | 驱动程序使用的资源供应商。此选项目前仅在 Kubernetes 上受支持,实际上是遵循 Kubernetes 设备插件命名约定的供应商和域名。(例如,对于 Kubernetes 上的 GPU,此配置将设置为 nvidia.com 或 amd.com) | 3.0.0 |
spark.resources.discoveryPlugin |
org.apache.spark.resource.ResourceDiscoveryScriptPlugin | 实现 org.apache.spark.api.resource.ResourceDiscoveryPlugin 并加载到应用程序中的类名的逗号分隔列表。这是供高级用户使用自定义实现替换资源发现类。Spark 将尝试指定的每个类,直到其中一个类返回该资源的资源信息。如果没有任何插件返回该资源的信息,它最后会尝试发现脚本。 | 3.0.0 |
spark.executor.memory |
1g | 每个 executor 进程使用的内存量,格式与带有大小单位后缀(“k”、“m”、“g”或“t”)的 JVM 内存字符串相同(例如 512m, 2g)。 |
0.7.0 |
spark.executor.pyspark.memory |
未设置 | 每个 executor 中分配给 PySpark 的内存量,除非另有说明,单位为 MiB。如果设置,executor 的 PySpark 内存将限制为此数量。如果未设置,Spark 将不会限制 Python 的内存使用,并且由应用程序负责避免超过与其他非 JVM 进程共享的开销内存空间。当 PySpark 在 YARN 或 Kubernetes 中运行时,此内存将添加到 executor 资源请求中。 注意: 此功能依赖于 Python 的 resource 模块;因此,它继承了相应的行为和限制。例如,Windows 不支持资源限制,并且 MacOS 上实际资源不受限制。 |
2.4.0 |
spark.executor.memoryOverhead |
executorMemory * spark.executor.memoryOverheadFactor,最小值为 spark.executor.minMemoryOverhead |
每个 executor 进程分配的额外内存量,除非另有说明,单位为 MiB。这是用于虚拟机开销、驻留字符串、其他本机开销等的内存。这往往会随着 executor 大小而增长(通常为 6-10%)。此选项目前在 YARN 和 Kubernetes 上受支持。 注意: 额外内存包括 PySpark executor 内存(当未配置 spark.executor.pyspark.memory 时)以及在同一容器中运行的其他非 executor 进程使用的内存。运行 executor 的容器的最大内存大小由 spark.executor.memoryOverhead、spark.executor.memory、spark.memory.offHeap.size 和 spark.executor.pyspark.memory 的总和决定。 |
2.3.0 |
spark.executor.minMemoryOverhead |
384m | 如果未定义 spark.executor.memoryOverhead,则每个 executor 进程分配的最小非堆内存量,除非另有说明,单位为 MiB。此选项目前在 YARN 和 Kubernetes 上受支持。 |
4.0.0 |
spark.executor.memoryOverheadFactor |
0.10 | 分配为每个 executor 进程额外非堆内存的 executor 内存比例。这是用于虚拟机开销、驻留字符串、其他本机开销等的内存。这往往会随着容器大小而增长。此值默认为 0.10,但对于 Kubernetes 非 JVM 作业,默认值为 0.40。这是因为非 JVM 任务需要更多的非 JVM 堆空间,此类任务通常会因“内存开销超出 (Memory Overhead Exceeded)”错误而失败。此设置通过更高的默认值来预防此错误。如果直接设置了 spark.executor.memoryOverhead,则忽略此值。 |
3.3.0 |
spark.executor.resource.{resourceName}.amount |
0 | 每个 executor 进程使用的特定资源类型的数量。如果使用此项,您还必须指定 spark.executor.resource.{resourceName}.discoveryScript,以便 executor 在启动时查找资源。 |
3.0.0 |
spark.executor.resource.{resourceName}.discoveryScript |
None | executor 运行以发现特定资源类型的脚本。这应向 STDOUT 写入 JSON 字符串,格式为 ResourceInformation 类。它包含一个名称和一个地址数组。 | 3.0.0 |
spark.executor.resource.{resourceName}.vendor |
None | executor 使用的资源供应商。此选项目前仅在 Kubernetes 上受支持,实际上是遵循 Kubernetes 设备插件命名约定的供应商和域名。(例如,对于 Kubernetes 上的 GPU,此配置将设置为 nvidia.com 或 amd.com) | 3.0.0 |
spark.extraListeners |
(无) | 实现 SparkListener 的类的逗号分隔列表;初始化 SparkContext 时,将创建这些类的实例并将其注册到 Spark 的监听器总线上。如果一个类有一个接受 SparkConf 的单参数构造函数,则会调用该构造函数;否则,将调用零参数构造函数。如果找不到有效的构造函数,SparkContext 的创建将失败并抛出异常。 |
1.3.0 |
spark.local.dir |
/tmp | 用于 Spark 中“暂存”空间的目录,包括映射输出文件和存储在磁盘上的 RDD。这应位于系统上的高速本地磁盘上。它也可以是位于不同磁盘上的多个目录的逗号分隔列表。 注意: 这将被集群管理器设置的 SPARK_LOCAL_DIRS (Standalone) 或 LOCAL_DIRS (YARN) 环境变量覆盖。 |
0.5.0 |
spark.logConf |
false | 在启动 SparkContext 时,将有效的 SparkConf 作为 INFO 日志记录。 | 0.9.0 |
spark.master |
(无) | 要连接到的集群管理器。请参阅 允许的 master URL 列表。 | 0.9.0 |
spark.submit.deployMode |
client | Spark 驱动程序程序的部署模式,即“client”或“cluster”,这意味着在本地(“client”)启动驱动程序程序,或者远程(“cluster”)在集群内的节点之一上启动。 | 1.5.0 |
spark.log.callerContext |
(无) | 在 Yarn/HDFS 上运行时将写入 Yarn RM 日志/HDFS 审计日志的应用程序信息。其长度取决于 Hadoop 配置 hadoop.caller.context.max.size。它应该是简洁的,通常最多可以有 50 个字符。 |
2.2.0 |
spark.log.level |
(无) | 设置后,会覆盖任何用户定义的日志设置,就像在 Spark 启动时调用 SparkContext.setLogLevel() 一样。有效的日志级别包括:“ALL”、“DEBUG”、“ERROR”、“FATAL”、“INFO”、“OFF”、“TRACE”、“WARN”。 |
3.5.0 |
spark.driver.supervise |
false | 如果为 true,则在驱动程序以非零退出状态失败时自动重新启动它。仅在 Spark 独立模式下有效。 | 1.3.0 |
spark.driver.timeout |
0min | Spark 驱动程序的超时时间,以分钟为单位。0 表示无限。对于正的时间值,如果驱动程序在超时时间后仍在运行,则以退出代码 124 终止驱动程序。要使用此功能,必须将 spark.plugins 设置为 org.apache.spark.deploy.DriverTimeoutPlugin。 |
4.0.0 |
spark.driver.log.localDir |
(无) | 指定用于写入驱动程序日志并启用驱动程序日志 UI 选项卡的本地目录。 | 4.0.0 |
spark.driver.log.dfsDir |
(无) | 如果 spark.driver.log.persistToDfs.enabled 为 true,则同步 Spark 驱动程序日志的基础目录。在此基础目录中,每个应用程序将驱动程序日志记录到特定于应用程序的文件中。用户可能希望将其设置为统一位置(如 HDFS 目录),以便驱动程序日志文件可以保留供以后使用。此目录应允许任何 Spark 用户读/写文件,并允许 Spark 历史服务器用户删除文件。此外,如果 spark.history.fs.driverlog.cleaner.enabled 为 true,且日志早于通过设置 spark.history.fs.driverlog.cleaner.maxAge 配置的最大时长,则此目录中的旧日志将由 Spark 历史服务器 清理。 |
3.0.0 |
spark.driver.log.persistToDfs.enabled |
false | 如果为 true,则以客户端模式运行的 Spark 应用程序会将驱动程序日志写入持久存储,该存储在 spark.driver.log.dfsDir 中配置。如果未配置 spark.driver.log.dfsDir,驱动程序日志将不会持久化。此外,通过在 Spark 历史服务器 中将 spark.history.fs.driverlog.cleaner.enabled 设置为 true 来启用清理器。 |
3.0.0 |
spark.driver.log.layout |
%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex | 同步到 spark.driver.log.localDir 和 spark.driver.log.dfsDir 的驱动程序日志的布局。如果未配置此项,它将使用 log4j2.properties 中定义的第一个 appender 的布局。如果也未配置该布局,驱动程序日志将使用默认布局。 |
3.0.0 |
spark.driver.log.allowErasureCoding |
false | 是否允许驱动程序日志使用纠删码(erasure coding)。在 HDFS 上,纠删码文件的更新速度不如常规副本文件快,因此它们可能需要更长时间才能反映应用程序写入的更改。请注意,即使此项为 true,Spark 仍不会强制文件使用纠删码,它只会使用文件系统的默认设置。 | 3.0.0 |
spark.driver.log.redirectConsoleOutputs |
stdout,stderr | 需要重定向到日志系统的驱动程序控制台输出种类的逗号分隔列表。支持的值为 `stdout`、`stderr`。它仅在配置了 `spark.plugins` 为 `org.apache.spark.deploy.RedirectConsolePlugin` 时生效。 | 4.1.0 |
spark.decommission.enabled |
false | 当启用退役(decommission)时,Spark 将尽力优雅地关闭 executor。当启用 spark.storage.decommission.enabled 时,Spark 将尝试将所有 RDD 块(受 spark.storage.decommission.rddBlocks.enabled 控制)和 Shuffle 块(受 spark.storage.decommission.shuffleBlocks.enabled 控制)从正在退役的 executor 迁移到远程 executor。启用退役后,当启用 spark.dynamicAllocation.enabled 时,Spark 也将退役 executor 而不是杀死它们。 |
3.1.0 |
spark.executor.decommission.killInterval |
(无) | 退役的 executor 将被外部(例如非 spark)服务强制杀死的时间间隔。 | 3.1.0 |
spark.executor.decommission.forceKillTimeout |
(无) | Spark 将强制正在退役的 executor 退出之前的持续时间。在大多数情况下,这应该设置为一个较大的值,因为过小的值将阻止块迁移有足够的时间完成。 | 3.2.0 |
spark.executor.decommission.signal |
PWR | 用于触发 executor 开始退役的信号。 | 3.2.0 |
spark.executor.maxNumFailures |
numExecutors * 2,最小值为 3 | 在使应用程序失败之前的最大 executor 失败次数。此配置仅在 YARN 和 Kubernetes 上生效。 | 3.5.0 |
spark.executor.failuresValidityInterval |
(无) | 在此时间间隔之后,executor 失败将被视为独立的,并且不会累积到尝试计数中。此配置仅在 YARN 和 Kubernetes 上生效。 | 3.5.0 |
除了这些,以下属性也可用,在某些情况下可能很有用:
运行环境
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.driver.extraClassPath |
(无) | 要预置到驱动程序类路径的额外类路径条目。 注意: 在客户端模式下,此配置不得直接通过应用程序中的 SparkConf 设置,因为驱动程序 JVM 在此时已经启动。请改为通过 --driver-class-path 命令行选项或在默认属性文件中进行设置。 |
1.0.0 |
spark.driver.defaultJavaOptions |
(无) | 要预置到 spark.driver.extraJavaOptions 的默认 JVM 选项字符串。这旨在由管理员设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置最大堆大小 (-Xmx) 设置是非法的。最大堆大小设置可以在集群模式下通过 spark.driver.memory 设置,并在客户端模式下通过 --driver-memory 命令行选项设置。注意: 在客户端模式下,此配置不得直接通过应用程序中的 SparkConf 设置,因为驱动程序 JVM 在此时已经启动。请改为通过 --driver-java-options 命令行选项或在默认属性文件中进行设置。 |
3.0.0 |
spark.driver.extraJavaOptions |
(无) | 传递给驱动程序的额外 JVM 选项字符串。这旨在由用户设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置最大堆大小 (-Xmx) 设置是非法的。最大堆大小设置可以在集群模式下通过 spark.driver.memory 设置,并在客户端模式下通过 --driver-memory 命令行选项设置。注意: 在客户端模式下,此配置不得直接通过应用程序中的 SparkConf 设置,因为驱动程序 JVM 在此时已经启动。请改为通过 --driver-java-options 命令行选项或在默认属性文件中进行设置。spark.driver.defaultJavaOptions 将被预置到此配置中。 |
1.0.0 |
spark.driver.extraLibraryPath |
(无) | 设置启动驱动程序 JVM 时使用的特殊库路径。 注意: 在客户端模式下,此配置不得直接通过应用程序中的 SparkConf 设置,因为驱动程序 JVM 在此时已经启动。请改为通过 --driver-library-path 命令行选项或在默认属性文件中进行设置。 |
1.0.0 |
spark.driver.userClassPathFirst |
false | (实验性) 在驱动程序中加载类时,是否给予用户添加的 jar 比 Spark 自己的 jar 更高的优先级。此功能可用于减轻 Spark 依赖项与用户依赖项之间的冲突。它目前是一个实验性功能。此项仅在集群模式下使用。 | 1.3.0 |
spark.executor.extraClassPath |
(无) | 要预置到 executors 类路径的额外类路径条目。此项的存在主要是为了向后兼容旧版本的 Spark。用户通常不需要设置此选项。 | 1.0.0 |
spark.executor.defaultJavaOptions |
(无) | 要预置到 spark.executor.extraJavaOptions 的默认 JVM 选项字符串。这旨在由管理员设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置 Spark 属性或最大堆大小 (-Xmx) 设置是非法的。Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件进行设置。最大堆大小设置可以通过 spark.executor.memory 设置。如果存在以下符号,它们将被插值:将替换为应用程序 ID,将替换为 executor ID。例如,要启用将冗长的 gc 日志记录到以应用程序 executor ID 命名并位于 /tmp 中的文件中,请传递值:-verbose:gc -Xloggc:/tmp/-.gc |
3.0.0 |
spark.executor.extraJavaOptions |
(无) | 传递给 executors 的额外 JVM 选项字符串。这旨在由用户设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置 Spark 属性或最大堆大小 (-Xmx) 设置是非法的。Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件进行设置。最大堆大小设置可以通过 spark.executor.memory 设置。如果存在以下符号,它们将被插值:将替换为应用程序 ID,将替换为 executor ID。例如,要启用将冗长的 gc 日志记录到以应用程序 executor ID 命名并位于 /tmp 中的文件中,请传递值:-verbose:gc -Xloggc:/tmp/-.gc。spark.executor.defaultJavaOptions 将被预置到此配置中。 |
1.0.0 |
spark.executor.extraLibraryPath |
(无) | 设置启动 executor JVM 时使用的特殊库路径。 | 1.0.0 |
spark.executor.logs.rolling.maxRetainedFiles |
-1 | 设置系统将保留的最新滚动日志文件的数量。旧的日志文件将被删除。默认禁用。 | 1.1.0 |
spark.executor.logs.rolling.enableCompression |
false | 启用 executor 日志压缩。如果启用,滚动后的 executor 日志将被压缩。默认禁用。 | 2.0.2 |
spark.executor.logs.rolling.maxSize |
1024 * 1024 | 设置 executor 日志将滚动的文件最大大小(以字节为单位)。默认禁用滚动。请参阅 spark.executor.logs.rolling.maxRetainedFiles 以了解旧日志的自动清理。 |
1.4.0 |
spark.executor.logs.rolling.strategy |
"" (禁用) | 设置 executor 日志的滚动策略。默认情况下它是禁用的。它可以设置为 "time"(基于时间的滚动)、"size"(基于大小的滚动)或 ""(禁用)。对于 "time",使用 spark.executor.logs.rolling.time.interval 设置滚动间隔。对于 "size",使用 spark.executor.logs.rolling.maxSize 设置滚动的最大文件大小。 |
1.1.0 |
spark.executor.logs.rolling.time.interval |
daily | 设置 executor 日志滚动的间隔时间。默认禁用滚动。有效值为 daily、hourly、minutely 或任何以秒为单位的间隔。请参阅 spark.executor.logs.rolling.maxRetainedFiles 以了解旧日志的自动清理。 |
1.1.0 |
spark.executor.logs.redirectConsoleOutputs |
stdout,stderr | 需要重定向到日志系统的 executor 控制台输出种类的逗号分隔列表。支持的值为 `stdout`、`stderr`。它仅在配置了 `spark.plugins` 为 `org.apache.spark.deploy.RedirectConsolePlugin` 时生效。 | 4.1.0 |
spark.executor.userClassPathFirst |
false | (实验性) 与 spark.driver.userClassPathFirst 功能相同,但应用于 executor 实例。 |
1.3.0 |
spark.executorEnv.[EnvironmentVariableName] |
(无) | 将由 EnvironmentVariableName 指定的环境变量添加到 Executor 进程。用户可以指定多个此类变量以设置多个环境变量。 |
0.9.0 |
spark.redaction.regex |
(?i)secret|password|token|access[.]?key | 用于决定驱动程序和 executor 环境中的哪些 Spark 配置属性和环境变量包含敏感信息的正则表达式。当此正则表达式匹配属性键或值时,该值将从环境 UI 和各种日志(如 YARN 和事件日志)中脱敏。 | 2.1.2 |
spark.redaction.string.regex |
(无) | 用于决定 Spark 生成的字符串的哪些部分包含敏感信息的正则表达式。当此正则表达式匹配字符串的一部分时,该部分将被替换为虚拟值。此项目前用于脱敏 SQL explain 命令的输出。 | 2.2.0 |
spark.python.profile |
false | 启用 Python worker 中的性能分析,性能分析结果将通过 sc.show_profiles() 显示,或者在驱动程序退出前显示。它也可以通过 sc.dump_profiles(path) 转储到磁盘。如果某些性能分析结果已手动显示,它们将不会在驱动程序退出前自动显示。默认情况下将使用 pyspark.profiler.BasicProfiler,但可以通过将 profiler 类作为参数传递给 SparkContext 构造函数来覆盖它。 |
1.2.0 |
spark.python.profile.dump |
(无) | 驱动程序退出前用于转储性能分析结果的目录。结果将作为每个 RDD 的单独文件转储。它们可以通过 pstats.Stats() 加载。如果指定了此项,性能分析结果将不会自动显示。 |
1.2.0 |
spark.python.worker.memory |
512m | 聚合期间每个 python worker 进程使用的内存量,格式与带有大小单位后缀(“k”、“m”、“g”或“t”)的 JVM 内存字符串相同(例如 512m, 2g)。如果聚合期间使用的内存超过此数量,它会将数据溢出到磁盘。 |
1.1.0 |
spark.python.worker.reuse |
true | 是否重用 Python worker。如果为是,它将使用固定数量的 Python worker,不需要为每个任务 fork() 一个 Python 进程。如果存在大量广播,这非常有用,这样广播就不需要为每个任务从 JVM 传输到 Python worker。 | 1.2.0 |
spark.python.factory.idleWorkerMaxPoolSize |
(无) | 保持的最大空闲 Python worker 数量。如果未设置,则数量不受限制。如果设置为正整数 N,则最多保留 N 个空闲 worker;最久未使用的 worker 将首先被驱逐。 | 4.1.0 |
spark.python.worker.killOnIdleTimeout |
false | 当达到空闲超时(由 spark.python.worker.idleTimeoutSeconds 定义)时,Spark 是否应终止 Python worker 进程。如果启用,Spark 将在记录状态的同时终止 Python worker 进程。 |
4.1.0 |
spark.python.worker.tracebackDumpIntervalSeconds |
0 | Python worker 转储其回溯(traceback)的时间间隔(以秒为单位)。如果为正数,Python worker 将定期将回溯转储到其 `stderr` 中。默认值为 `0`,表示已禁用。 | 4.1.0 |
spark.python.unix.domain.socket.enabled |
false | 设置为 true 时,Python 驱动程序将 Unix 域套接字用于诸如从本地数据创建或收集 DataFrame、使用累加器以及使用 PySpark 执行 Python 函数(如 Python UDF)等操作。此配置仅适用于 Spark Classic 和 Spark Connect 服务器。 | 4.1.0 |
spark.files |
要放置在每个 executor 工作目录中的文件的逗号分隔列表。允许使用 glob 通配符。 | 1.0.0 | |
spark.submit.pyFiles |
要放置在 Python 应用程序 PYTHONPATH 上的 .zip、.egg 或 .py 文件的逗号分隔列表。允许使用 glob 通配符。 | 1.0.1 | |
spark.submit.callSystemExitOnMainExit |
false | 如果为 true,SparkSubmit 将在用户的主方法退出后调用 System.exit() 来启动 JVM 关闭。这在非守护进程 JVM 线程可能阻止 JVM 自行关闭的情况下非常有用。 | 4.1.0 |
spark.jars |
要包含在驱动程序和 executor 类路径中的 jar 文件的逗号分隔列表。允许使用 glob 通配符。 | 0.9.0 | |
spark.jars.packages |
要包含在驱动程序和 executor 类路径中的 jar 的 Maven 坐标的逗号分隔列表。坐标应为 groupId:artifactId:version。如果提供了 spark.jars.ivySettings,则工件将根据该文件中的配置进行解析,否则将在本地 maven 仓库中搜索工件,然后是 maven central,最后是命令行选项 --repositories 给出的任何额外远程仓库。有关更多详细信息,请参阅 高级依赖管理。 |
1.5.0 | |
spark.jars.excludes |
groupId:artifactId 的逗号分隔列表,用于在解析 spark.jars.packages 中提供的依赖项时排除,以避免依赖冲突。 |
1.5.0 | |
spark.jars.ivy |
指定 Ivy 用户目录的路径,用于本地 Ivy 缓存和来自 spark.jars.packages 的包文件。这将覆盖默认值为 ~/.ivy2 的 Ivy 属性 ivy.default.ivy.user.dir。 |
1.3.0 | |
spark.jars.ivySettings |
指向 Ivy 设置文件的路径,用于自定义使用 spark.jars.packages 指定的 jar 的解析,而不是内置的默认值(如 maven central)。命令行选项 --repositories 或 spark.jars.repositories 给出的额外仓库也将被包含。这对于允许 Spark 从防火墙后(例如通过内部工件服务器如 Artifactory)解析工件很有用。有关设置文件格式的详细信息,请参阅 设置文件。仅支持具有 file:// 方案的路径。没有方案的路径被假定具有 file:// 方案。在 YARN 集群模式下运行时,此文件也将被本地化到远程驱动程序,用于 |
2.2.0 | |
spark.jars.repositories |
逗号分隔的额外远程仓库列表,用于搜索 --packages 或 spark.jars.packages 提供的 maven 坐标。 |
2.3.0 | |
spark.archives |
要解压到每个 executor 工作目录中的存档的逗号分隔列表。支持 .jar, .tar.gz, .tgz 和 .zip。您可以通过在文件名后添加 # 来指定解压的目录名称,例如 file.zip#directory。此配置是实验性的。 |
3.1.0 | |
spark.pyspark.driver.python |
驱动程序中用于 PySpark 的 Python 二进制可执行文件。(默认为 spark.pyspark.python) |
2.1.0 | |
spark.pyspark.python |
驱动程序和 executor 中用于 PySpark 的 Python 二进制可执行文件。 | 2.1.0 |
Shuffle 行为
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.reducer.maxSizeInFlight |
48m | 从每个 reduce 任务同时获取的映射输出的最大大小,除非另有说明,单位为 MiB。由于每个输出都需要我们创建一个缓冲区来接收它,这代表了每个 reduce 任务固定的内存开销,因此除非您有大量内存,否则请保持较小的值。 | 1.4.0 |
spark.reducer.maxReqsInFlight |
Int.MaxValue | 此配置限制了在任何给定点获取块的远程请求数量。当集群中的主机数量增加时,这可能导致到一个或多个节点的极大量入站连接,从而导致 worker 在负载下失败。通过允许限制获取请求的数量,可以缓解这种情况。 | 2.0.0 |
spark.reducer.maxBlocksInFlightPerAddress |
Int.MaxValue | 此配置限制了每个 reduce 任务从给定主机端口获取的远程块的数量。当在单次获取中或同时从给定地址请求大量块时,这可能会导致提供服务的 executor 或 Node Manager 崩溃。这对于在启用外部 shuffle 时减少 Node Manager 上的负载特别有用。您可以通过将其设置为较低的值来缓解此问题。 | 2.2.1 |
spark.shuffle.compress |
true | 是否压缩映射输出文件。通常是个好主意。压缩将使用 spark.io.compression.codec。 |
0.6.0 |
spark.shuffle.file.buffer |
32k | 每个 shuffle 文件输出流的内存缓冲区大小,除非另有说明,单位为 KiB。这些缓冲区减少了创建中间 shuffle 文件时产生的磁盘寻道和系统调用次数。 | 1.4.0 |
spark.shuffle.file.merge.buffer |
32k | 每个 shuffle 文件输入流的内存缓冲区大小,除非另有说明,单位为 KiB。这些缓冲区使用堆外缓冲区,并与 shuffle 文件中的文件数量有关。应避免过大的缓冲区。 | 4.0.0 |
spark.shuffle.unsafe.file.output.buffer |
32k | 自 Spark 4.0 起弃用,请使用 spark.shuffle.localDisk.file.output.buffer。 |
2.3.0 |
spark.shuffle.localDisk.file.output.buffer |
32k | 在所有本地磁盘 shuffle writer 中写入每个分区后,此缓冲区大小的文件系统配置。除非另有说明,单位为 KiB。 | 4.0.0 |
spark.shuffle.spill.diskWriteBufferSize |
1024 * 1024 | 将排序后的记录写入磁盘文件时使用的缓冲区大小(以字节为单位)。 | 2.3.0 |
spark.shuffle.io.maxRetries |
3 | (仅限 Netty) 如果设置为非零值,因 IO 相关异常而失败的获取将自动重试。此重试逻辑有助于在面临长时间 GC 暂停或瞬时网络连接问题时稳定大规模 shuffle。 | 1.2.0 |
spark.shuffle.io.numConnectionsPerPeer |
1 | (仅限 Netty) 主机之间的连接被重用,以减少大规模集群的连接建立。对于具有许多硬盘和少数主机的集群,这可能导致并发性不足以使所有磁盘达到饱和,因此用户可以考虑增加此值。 | 1.2.1 |
spark.shuffle.io.preferDirectBufs |
true | (仅限 Netty) 使用堆外缓冲区来减少 shuffle 和缓存块传输期间的垃圾回收。对于堆外内存受限的环境,用户可能希望关闭此项,以强制 Netty 的所有分配都在堆上进行。 | 1.2.0 |
spark.shuffle.io.retryWait |
5s | (仅限 Netty) 获取重试之间的等待时间。重试导致的默认最大延迟为 15 秒,计算方式为 maxRetries * retryWait。 |
1.2.1 |
spark.shuffle.io.backLog |
-1 | shuffle 服务的接受队列长度。对于大型应用程序,可能需要增加此值,以便在服务无法跟上在短时间内到达的大量连接时,传入的连接不会被丢弃。这需要在 shuffle 服务本身运行的任何地方进行配置,这可能在应用程序之外(请参阅下面的 spark.shuffle.service.enabled 选项)。如果设置为小于 1,将回退到 Netty 的 io.netty.util.NetUtil#SOMAXCONN 定义的操作系统默认值。 |
1.1.1 |
spark.shuffle.io.connectionTimeout |
spark.network.timeout 的值 |
对于建立的 shuffle 服务器和客户端之间的连接,如果仍有未完成的获取请求但在至少 connectionTimeout 时间内通道上没有流量,则连接被标记为空闲并关闭。 |
1.2.0 |
spark.shuffle.io.connectionCreationTimeout |
spark.shuffle.io.connectionTimeout 的值 |
建立 shuffle 服务器和客户端之间连接的超时时间。 | 3.2.0 |
spark.shuffle.service.enabled |
false | 启用外部 shuffle 服务。此服务保留由 executors 写入的 shuffle 文件,例如,以便可以安全地移除 executors,或者在 executor 失败的情况下 shuffle 获取可以继续。必须设置外部 shuffle 服务才能启用它。有关更多信息,请参阅 动态分配配置和设置文档。 | 1.2.0 |
spark.shuffle.service.port |
7337 | 外部 shuffle 服务将运行的端口。 | 1.2.0 |
spark.shuffle.service.name |
spark_shuffle | 配置的客户端应与之通信的 Spark shuffle 服务名称。这必须与 YARN NodeManager 配置中用于配置 Shuffle 的名称(yarn.nodemanager.aux-services)匹配。仅在 spark.shuffle.service.enabled 设置为 true 时生效。 |
3.2.0 |
spark.shuffle.service.index.cache.size |
100m | 缓存条目限制为指定的内存占用,除非另有说明,单位为字节。 | 2.3.0 |
spark.shuffle.service.removeShuffle |
true | 当 shuffle 不再需要时,是否使用 ExternalShuffleService 删除已分配 executor 的 shuffle 块。如果不启用此项,被解除分配的 executor 上的 shuffle 数据将保留在磁盘上,直到应用程序结束。 | 3.3.0 |
spark.shuffle.maxChunksBeingTransferred |
Long.MAX_VALUE | shuffle 服务上允许同时传输的最大块数。请注意,当达到最大数量时,新的传入连接将被关闭。客户端将根据 shuffle 重试配置(请参阅 spark.shuffle.io.maxRetries 和 spark.shuffle.io.retryWait)进行重试,如果达到这些限制,任务将因获取失败而失败。 |
2.3.0 |
spark.shuffle.sort.bypassMergeThreshold |
200 | (高级) 在基于排序的 shuffle 管理器中,如果没有 map 端聚合并且 reduce 分区最多为此数量,则避免归并排序数据。 | 1.1.1 |
spark.shuffle.sort.io.plugin.class |
org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO | 用于 shuffle IO 的类名。 | 3.0.0 |
spark.shuffle.spill.compress |
true | 是否压缩 shuffle 期间溢出的数据。压缩将使用 spark.io.compression.codec。 |
0.9.0 |
spark.shuffle.accurateBlockThreshold |
100 * 1024 * 1024 | 阈值(以字节为单位),超过此阈值,HighlyCompressedMapStatus 中的 shuffle 块大小将被准确记录。这有助于防止在获取 shuffle 块时由于低估 shuffle 块大小而导致的 OOM。 | 2.2.1 |
spark.shuffle.accurateBlockSkewedFactor |
-1.0 | 如果 shuffle 块的大小大于此因子乘以中位数 shuffle 块大小或 spark.shuffle.accurateBlockThreshold,则该块被视为倾斜的,并将被准确记录在 HighlyCompressedMapStatus 中。建议将此参数设置为与 spark.sql.adaptive.skewJoin.skewedPartitionFactor 相同的值。设置为 -1.0 可默认禁用此功能。 |
3.3.0 |
spark.shuffle.registration.timeout |
5000 | 向外部 shuffle 服务注册的超时时间(以毫秒为单位)。 | 2.3.0 |
spark.shuffle.registration.maxAttempts |
3 | 当我们无法注册到外部 shuffle 服务时,我们将重试 maxAttempts 次。 | 2.3.0 |
spark.shuffle.reduceLocality.enabled |
true | 是否计算 reduce 任务的局部性偏好。 | 1.5.0 |
spark.shuffle.mapOutput.minSizeForBroadcast |
512k | 我们将使用 Broadcast 将 map 输出状态发送到 executors 的大小。 | 2.0.0 |
spark.shuffle.detectCorrupt |
true | 是否检测获取块中的任何损坏。 | 2.2.0 |
spark.shuffle.detectCorrupt.useExtraMemory |
false | 如果启用,部分压缩/加密流将通过使用额外内存进行解压缩/解密以尽早检测损坏。任何抛出的 IOException 将导致任务被重试一次,如果再次以相同异常失败,则抛出 FetchFailedException 以重试前一个阶段。 | 3.0.0 |
spark.shuffle.useOldFetchProtocol |
false | 在执行 shuffle 块获取时是否使用旧协议。它仅在我们需要在新版本 Spark 作业从旧版本外部 shuffle 服务获取 shuffle 块的情况下保持兼容性时启用。 | 3.0.0 |
spark.shuffle.readHostLocalDisk |
true | 如果启用(且 spark.shuffle.useOldFetchProtocol 被禁用),则从那些运行在同一主机上的块管理器请求的 shuffle 块将直接从磁盘读取,而不是作为远程块通过网络获取。 |
3.0.0 |
spark.files.io.connectionTimeout |
spark.network.timeout 的值 |
对于在 Spark RPC 环境中获取文件的已建立连接,如果仍有未完成的文件正在下载但在至少 connectionTimeout 时间内通道上没有流量,则连接被标记为空闲并关闭。 |
1.6.0 |
spark.files.io.connectionCreationTimeout |
spark.files.io.connectionTimeout 的值 |
在 Spark RPC 环境中获取文件时建立连接的超时时间。 | 3.2.0 |
spark.shuffle.checksum.enabled |
true | 是否计算 shuffle 数据的校验和。如果启用,Spark 将计算映射输出文件中每个分区数据的校验和值,并将这些值存储在磁盘上的校验和文件中。当检测到 shuffle 数据损坏时,Spark 将尝试通过使用校验和文件来诊断损坏的原因(例如,网络问题、磁盘问题等)。 | 3.2.0 |
spark.shuffle.checksum.algorithm |
ADLER32 | 用于计算 shuffle 校验和的算法。目前,它仅支持 JDK 的内置算法,例如 ADLER32、CRC32 和 CRC32C。 | 3.2.0 |
spark.shuffle.service.fetch.rdd.enabled |
false | 是否使用 ExternalShuffleService 来获取磁盘持久化的 RDD 块。在动态分配的情况下,如果启用此功能,仅具有磁盘持久化块的 executors 在 spark.dynamicAllocation.executorIdleTimeout 之后将被视为闲置并将被相应地释放。 |
3.0.0 |
spark.shuffle.service.db.enabled |
true | 是否在 ExternalShuffleService 中使用数据库。请注意,这仅影响独立模式。 | 3.0.0 |
spark.shuffle.service.db.backend |
ROCKSDB | 指定用于 shuffle 服务本地数据库的基于磁盘的存储。设置为 ROCKSDB 或 LEVELDB (弃用)。 | 3.4.0 |
Spark UI
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.eventLog.logBlockUpdates.enabled |
false | 如果 spark.eventLog.enabled 为 true,是否为每个块更新记录事件。*警告*:这将显著增加事件日志的大小。 |
2.3.0 |
spark.eventLog.longForm.enabled |
false | 如果为 true,则在事件日志中使用长格式的调用点(call sites)。否则使用短格式。 | 2.4.0 |
spark.eventLog.compress |
true | 如果 spark.eventLog.enabled 为 true,是否压缩已记录的事件。 |
1.0.0 |
spark.eventLog.compression.codec |
zstd | 压缩已记录事件的编解码器。默认情况下,Spark 提供四种编解码器:lz4, lzf, snappy 和 zstd。您也可以使用完全限定的类名来指定编解码器,例如 org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, org.apache.spark.io.SnappyCompressionCodec 和 org.apache.spark.io.ZStdCompressionCodec。 |
3.0.0 |
spark.eventLog.erasureCoding.enabled |
false | 无论文件系统默认设置如何,是否允许事件日志使用纠删码,或关闭纠删码。在 HDFS 上,纠删码文件的更新速度不如常规副本文件快,因此应用程序更新需要更长时间才能出现在历史服务器中。请注意,即使此项为 true,Spark 仍不会强制文件使用纠删码,它只会使用文件系统的默认设置。 | 3.0.0 |
spark.eventLog.excludedPatterns |
(无) | 指定要从事件日志中排除的事件名称的逗号分隔列表。 | 4.1.0 |
spark.eventLog.dir |
file:///tmp/spark-events | 如果 spark.eventLog.enabled 为 true,则 Spark 事件记录到的基础目录。在此基础目录中,Spark 为每个应用程序创建一个子目录,并将该应用程序特定的事件记录在此目录中。用户可能希望将其设置为统一位置(如 HDFS 目录),以便历史记录文件可以由历史服务器读取。 |
1.0.0 |
spark.eventLog.enabled |
false | 是否记录 Spark 事件,这对于在应用程序结束后重建 Web UI 非常有用。 | 1.0.0 |
spark.eventLog.overwrite |
false | 是否覆盖任何现有文件。 | 1.0.0 |
spark.eventLog.buffer.kb |
100k | 写入输出流时使用的缓冲区大小,除非另有说明,单位为 KiB。 | 1.0.0 |
spark.eventLog.rolling.enabled |
true | 是否启用事件日志文件的滚动。如果设置为 true,它将把每个事件日志文件截断为配置的大小。 | 3.0.0 |
spark.eventLog.rolling.maxFileSize |
128m | 当 spark.eventLog.rolling.enabled=true 时,指定滚动前的事件日志文件最大大小。 |
3.0.0 |
spark.ui.dagGraph.retainedRootRDDs |
Int.MaxValue | 在垃圾回收之前,Spark UI 和状态 API 记住多少个 DAG 图节点。 | 2.1.0 |
spark.ui.groupSQLSubExecutionEnabled |
true | 当 SQL 子执行属于同一个根执行时,是否在 SQL UI 中将它们分组在一起。 | 3.4.0 |
spark.ui.enabled |
true | 是否为 Spark 应用程序运行 Web UI。 | 1.1.1 |
spark.ui.store.path |
None | 用于缓存实时 UI 应用程序信息的本地目录。默认情况下未设置,意味着所有应用程序信息将保存在内存中。 | 3.4.0 |
spark.ui.killEnabled |
true | 允许从 Web UI 杀死作业和阶段。 | 1.0.0 |
spark.ui.threadDumpsEnabled |
true | 是否在阶段和 Executor 页面显示 executor 线程转储的链接。 | 1.2.0 |
spark.ui.threadDump.flamegraphEnabled |
true | 是否为 executor 线程转储渲染火焰图(Flamegraph)。 | 4.0.0 |
spark.ui.heapHistogramEnabled |
true | 是否在 Executor 页面显示 executor 堆直方图的链接。 | 3.5.0 |
spark.ui.liveUpdate.period |
100ms | 更新实时实体的频率。-1 表示重放应用程序时“从不更新”,这意味着只会发生最后一次写入。对于实时应用程序,这避免了在快速处理传入任务事件时我们可以省略的一些操作。 | 2.3.0 |
spark.ui.liveUpdate.minFlushPeriod |
1s | 刷新陈旧 UI 数据之前经过的最小时间。当传入的任务事件不频繁触发时,这避免了 UI 陈旧。 | 2.4.2 |
spark.ui.port |
4040 | 应用程序仪表板的端口,显示内存和工作负载数据。 | 0.7.0 |
spark.ui.retainedJobs |
1000 | 在垃圾回收之前,Spark UI 和状态 API 记住多少个作业。这是一个目标最大值,在某些情况下保留的元素可能会更少。 | 1.2.0 |
spark.ui.retainedStages |
1000 | 在垃圾回收之前,Spark UI 和状态 API 记住多少个阶段。这是一个目标最大值,在某些情况下保留的元素可能会更少。 | 0.9.0 |
spark.ui.retainedTasks |
100000 | 在垃圾回收之前,Spark UI 和状态 API 在一个阶段中记住多少个任务。这是一个目标最大值,在某些情况下保留的元素可能会更少。 | 2.0.1 |
spark.ui.reverseProxy |
false | 启用将 Spark Master 作为 worker 和应用程序 UI 的反向代理运行。在此模式下,Spark Master 将对 worker 和应用程序 UI 进行反向代理,以实现无需直接访问其主机即可访问。请谨慎使用,因为 worker 和应用程序 UI 将无法直接访问,您只能通过 Spark Master/代理公共 URL 访问它们。此设置影响集群中运行的所有 worker 和应用程序 UI,并且必须在所有 worker、驱动程序和 master 上进行设置。 | 2.1.0 |
spark.ui.reverseProxyUrl |
如果 Spark UI 应通过另一个前端反向代理提供服务,这是通过该反向代理访问 Spark Master UI 的 URL。这在为身份验证(例如 OAuth 代理)运行代理时非常有用。URL 可能包含路径前缀,例如 http://mydomain.com/path/to/spark/,允许您通过同一虚拟主机和端口为多个 Spark 集群和其他 Web 应用程序提供 UI。通常,这应该是一个包含方案(http/https)、主机和端口的绝对 URL。在此处指定以 "/" 开头的相对 URL 也是可能的。在这种情况下,Spark UI 和 Spark REST API 生成的所有 URL 都将是服务器相对链接——这仍然有效,因为整个 Spark UI 都是通过同一主机和端口提供的。该设置影响 Spark UI 中的链接生成,但前端反向代理负责:
spark.ui.reverseProxy 打开时有效。当 Spark Master Web UI 可直接访问时,不需要此设置。请注意,该设置的值在按 "/" 分割后不能包含关键字 proxy 或 history。Spark UI 依赖这两个关键字从 URI 获取 REST API 端点。 |
2.1.0 | |
spark.ui.proxyRedirectUri |
当 Spark 在代理后运行时,重定向的目标地址。这将使 Spark 修改重定向响应,使其指向代理服务器,而不是 Spark UI 自身的地址。这应该仅是服务器的地址,不包含应用程序的任何前缀路径;前缀应由代理服务器本身(通过添加 X-Forwarded-Context 请求头)设置,或者通过在 Spark 应用程序的配置中设置代理基路径来设置。 |
3.0.0 | |
spark.ui.showConsoleProgress |
false | 在控制台中显示进度条。进度条显示运行时间超过 500ms 的阶段进度。如果同时运行多个阶段,多个进度条将显示在同一行上。 注意: 在 shell 环境中,spark.ui.showConsoleProgress 的默认值为 true。 |
1.2.1 |
spark.ui.consoleProgress.update.interval |
200 | 更新控制台中进度条的时间间隔(以毫秒为单位)。 | 2.1.0 |
spark.ui.custom.executor.log.url |
(无) | 指定自定义 Spark executor 日志 URL,用于支持外部日志服务,而不是在 Spark UI 中使用集群管理器的应用程序日志 URL。Spark 将通过模式支持一些路径变量,这些模式可能因集群管理器而异。请查看您的集群管理器文档,了解支持哪些模式(如果有的话)。 请注意,此配置也会替换事件日志中的原始日志 URL,当在历史服务器上访问应用程序时,这也将生效。新的日志 URL 必须是永久的,否则您可能会得到 executor 日志 URL 的死链。 目前,仅 YARN 和 K8s 集群管理器支持此配置。 |
3.0.0 |
spark.ui.prometheus.enabled |
true | 在驱动程序 Web 页面上暴露 /metrics/executors/prometheus 处的 executor 指标。 | 3.0.0 |
spark.worker.ui.retainedExecutors |
1000 | 在垃圾回收之前,Spark UI 和状态 API 记住多少个已完成的 executors。 | 1.5.0 |
spark.worker.ui.retainedDrivers |
1000 | 在垃圾回收之前,Spark UI 和状态 API 记住多少个已完成的驱动程序。 | 1.5.0 |
spark.sql.ui.retainedExecutions |
1000 | 在垃圾回收之前,Spark UI 和状态 API 记住多少个已完成的执行。 | 1.5.0 |
spark.streaming.ui.retainedBatches |
1000 | 在垃圾回收之前,Spark UI 和状态 API 记住多少个已完成的批次。 | 1.0.0 |
spark.ui.retainedDeadExecutors |
100 | 在垃圾回收之前,Spark UI 和状态 API 记住多少个死掉的 executors。 | 2.0.0 |
spark.ui.filters |
None | 应用于 Spark Web UI 的过滤器类名的逗号分隔列表。过滤器应该是标准的 javax servlet Filter。 过滤器参数也可以在配置中指定,通过设置以下格式的配置条目: spark.<过滤器类名>.param.<参数名>=<值>例如: spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
|
1.0.0 |
spark.ui.requestHeaderSize |
8k | HTTP 请求头的最大允许大小,除非另有说明,单位为字节。此设置也适用于 Spark 历史服务器。 | 2.2.3 |
spark.ui.timelineEnabled |
true | 是否在 UI 页面上显示事件时间线数据。 | 3.4.0 |
spark.ui.timeline.executors.maximum |
250 | 事件时间线中显示的最大 executors 数量。 | 3.2.0 |
spark.ui.timeline.jobs.maximum |
500 | 事件时间线中显示的最大作业数量。 | 3.2.0 |
spark.ui.timeline.stages.maximum |
500 | 事件时间线中显示的最大阶段数量。 | 3.2.0 |
spark.ui.timeline.tasks.maximum |
1000 | 事件时间线中显示的最大任务数量。 | 1.4.0 |
压缩与序列化
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.broadcast.compress |
true | 是否在发送广播变量之前对其进行压缩。通常是个好主意。压缩将使用 spark.io.compression.codec。 |
0.6.0 |
spark.checkpoint.dir |
(无) | 设置检查点的默认目录。它可以通过 SparkContext.setCheckpointDir 覆盖。 | 4.0.0 |
spark.checkpoint.compress |
true | 是否压缩 RDD 检查点。通常是个好主意。压缩将使用 spark.io.compression.codec。 |
2.2.0 |
spark.io.compression.codec |
lz4 | 用于压缩内部数据(如 RDD 分区、事件日志、广播变量和 shuffle 输出)的编解码器。默认情况下,Spark 提供四种编解码器:lz4, lzf, snappy 和 zstd。您也可以使用完全限定的类名来指定编解码器,例如 org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, org.apache.spark.io.SnappyCompressionCodec 和 org.apache.spark.io.ZStdCompressionCodec。 |
0.8.0 |
spark.io.compression.lz4.blockSize |
32k | 使用 LZ4 压缩编解码器时,LZ4 压缩中使用的块大小。降低此块大小也会在 LZ4 被使用时降低 shuffle 内存使用量。默认单位是字节,除非另有说明。此配置仅适用于 spark.io.compression.codec。 |
1.4.0 |
spark.io.compression.snappy.blockSize |
32k | 使用 Snappy 压缩编解码器时,Snappy 压缩中的块大小。降低此块大小也会在 Snappy 被使用时降低 shuffle 内存使用量。默认单位是字节,除非另有说明。此配置仅适用于 spark.io.compression.codec。 |
1.4.0 |
spark.io.compression.zstd.level |
1 | Zstd 压缩编解码器的压缩级别。提高压缩级别将以牺牲更多的 CPU 和内存为代价来获得更好的压缩效果。此配置仅适用于 spark.io.compression.codec。 |
2.3.0 |
spark.io.compression.zstd.bufferSize |
32k | 使用 Zstd 压缩编解码器时,Zstd 压缩中使用的缓冲区大小(以字节为单位)。降低此大小会降低 Zstd 被使用时的 shuffle 内存使用量,但由于过多的 JNI 调用开销,它可能会增加压缩成本。此配置仅适用于 spark.io.compression.codec。 |
2.3.0 |
spark.io.compression.zstd.bufferPool.enabled |
true | 如果为 true,则启用 ZSTD JNI 库的缓冲区池。 | 3.2.0 |
spark.io.compression.zstd.strategy |
(无) | Zstd 压缩编解码器的压缩策略。值越高,复杂度越高,通常会导致更强但更慢的压缩或更高的 CPU 成本。 | 4.1.0 |
spark.io.compression.zstd.workers |
0 | 使用 Zstd 时并行压缩所生成的线程大小。当值为 0 时,不会生成 worker,它以单线程模式工作。当值 > 0 时,触发异步模式,并生成相应数量的线程。更多的 worker 可以提高性能,但也会增加内存成本。 | 4.0.0 |
spark.io.compression.lzf.parallel.enabled |
true | 当为 true 时,LZF 压缩将使用多个线程并行压缩数据。 | 4.0.0 |
spark.kryo.classesToRegister |
(无) | 如果您使用 Kryo 序列化,请提供要向 Kryo 注册的自定义类名的逗号分隔列表。有关更多详细信息,请参阅 调优指南。 | 1.2.0 |
spark.kryo.referenceTracking |
true | 使用 Kryo 序列化数据时是否跟踪对同一对象的引用,如果您的对象图有循环,这很有必要,如果它们包含同一对象的多个副本,这有助于效率。如果您知道并非如此,则可以禁用它以提高性能。 | 0.8.0 |
spark.kryo.registrationRequired |
false | 是否要求在 Kryo 中进行注册。如果设置为 'true',则如果序列化了未注册的类,Kryo 将抛出异常。如果设置为 false(默认),Kryo 将连同每个对象一起写入未注册的类名。写入类名可能会导致显著的性能开销,因此启用此选项可以严格执行用户未遗漏注册类。 | 1.1.0 |
spark.kryo.registrator |
(无) | 如果您使用 Kryo 序列化,请提供向 Kryo 注册自定义类的类的逗号分隔列表。如果您需要以自定义方式注册类(例如指定自定义字段序列化器),此属性非常有用。否则 spark.kryo.classesToRegister 更简单。它应该设置为扩展 KryoRegistrator 的类。有关更多详细信息,请参阅 调优指南。 |
0.5.0 |
spark.kryo.unsafe |
true | 是否使用基于 unsafe 的 Kryo 序列化器。通过使用基于 Unsafe 的 IO,可以大幅提升速度。 | 2.1.0 |
spark.kryoserializer.buffer.max |
64m | Kryo 序列化缓冲区的最大允许大小,除非另有说明,单位为 MiB。这必须大于您尝试序列化的任何对象,并且必须小于 2048m。如果您在 Kryo 内收到“buffer limit exceeded”异常,请增加此值。 | 1.4.0 |
spark.kryoserializer.buffer |
64k | Kryo 序列化缓冲区的初始大小,除非另有说明,单位为 KiB。请注意,每个 worker 的 每个内核 将有一个缓冲区。如果需要,此缓冲区将增长到 spark.kryoserializer.buffer.max。 |
1.4.0 |
spark.rdd.compress |
false | 是否压缩序列化的 RDD 分区(例如 Java 和 Scala 中的 StorageLevel.MEMORY_ONLY_SER 或 Python 中的 StorageLevel.MEMORY_ONLY)。以一些额外的 CPU 时间为代价,可以节省大量空间。压缩将使用 spark.io.compression.codec。 |
0.6.0 |
spark.serializer |
org.apache.spark.serializer. JavaSerializer |
用于序列化将通过网络发送或需要以序列化形式缓存的对象的类。Java 序列化的默认设置适用于任何可序列化的 Java 对象,但速度相当慢,因此当速度是必要的时候,我们建议 使用 org.apache.spark.serializer.KryoSerializer 并配置 Kryo 序列化。可以是 org.apache.spark.Serializer 的任何子类。 |
0.5.0 |
spark.serializer.objectStreamReset |
100 | 当使用 org.apache.spark.serializer.JavaSerializer 进行序列化时,序列化器会缓存对象以防止写入冗余数据,然而这会停止这些对象的垃圾回收。通过调用 'reset',您可以从序列化器中刷新该信息,并允许旧对象被收集。要关闭此定期重置,请将其设置为 -1。默认情况下,它每 100 个对象重置一次序列化器。 | 1.0.0 |
内存管理
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.memory.fraction |
0.6 | 用于执行和存储的(堆空间 - 300MB)的部分。此值越低,溢出和缓存数据逐出的频率就越高。此配置的目的是在稀疏且异常大的记录的情况下,为内部元数据、用户数据结构和不精确的大小估计预留内存。建议将其保留为默认值。有关更多详细信息,包括在增加此值时正确调优 JVM 垃圾回收的重要信息,请参阅 此描述。 | 1.6.0 |
spark.memory.storageFraction |
0.5 | 此项为不受驱逐影响的存储内存量,表示为 spark.memory.fraction 所设区域大小的分数。该值越大,可供执行任务的内存就越少,任务溢出到磁盘的频率可能会增加。建议保留默认值。更多详情,请参见 此说明。 |
1.6.0 |
spark.memory.offHeap.enabled |
false | 如果设为 true,Spark 将尝试对某些操作使用堆外内存。如果启用了堆外内存使用,则 spark.memory.offHeap.size 必须为正数。 |
1.6.0 |
spark.memory.offHeap.size |
0 | 可用于堆外分配的绝对内存量(单位为字节,除非另有说明)。此设置对堆内存使用没有影响,因此如果您的执行器总内存消耗必须符合硬性限制,请务必相应地缩减 JVM 堆大小。当 spark.memory.offHeap.enabled=true 时,此值必须设置为正数。 |
1.6.0 |
spark.memory.unmanagedMemoryPollingInterval |
0s | 轮询非托管内存用户以跟踪其内存使用情况的间隔。非托管内存用户是指在 Spark 核心内存管理之外管理自身内存的组件,例如流状态存储(Streaming State Store)的 RocksDB。将其设置为 0 可禁用非托管内存轮询。 | 4.1.0 |
spark.storage.unrollMemoryThreshold |
1024 * 1024 | 在展开任何块之前请求的初始内存。 | 1.1.0 |
spark.storage.replication.proactive |
true | 为 RDD 块启用主动块复制。如果存在任何现有的可用副本,由于执行器故障而丢失的缓存 RDD 块副本将得到补充。此功能尝试将块的复制级别恢复到初始数量。 | 2.2.0 |
spark.storage.localDiskByExecutors.cacheSize |
1000 | 存储本地目录的最大执行器数量。此大小适用于驱动程序(Driver)和执行器(Executor)侧,以避免存储无限增长。当从同一主机获取磁盘持久化的 RDD 块或 Shuffle 块(设置 spark.shuffle.readHostLocalDisk 时)时,此缓存将用于避免网络传输。 |
3.0.0 |
spark.cleaner.periodicGC.interval |
30min | 控制触发垃圾回收的频率。 此上下文清理器仅在弱引用被垃圾回收时触发清理。在长时间运行且具有大型驱动程序 JVM 的应用程序中,如果驱动程序的内存压力很小,这种情况可能偶尔发生,甚至根本不发生。完全不清理可能会导致执行器一段时间后磁盘空间耗尽。 |
1.6.0 |
spark.cleaner.referenceTracking |
true | 启用或禁用上下文清理。 | 1.0.0 |
spark.cleaner.referenceTracking.blocking |
true | 控制清理线程是否应在清理任务上阻塞(Shuffle 除外,Shuffle 由 spark.cleaner.referenceTracking.blocking.shuffle Spark 属性控制)。 |
1.0.0 |
spark.cleaner.referenceTracking.blocking.shuffle |
false | 控制清理线程是否应在 Shuffle 清理任务上阻塞。 | 1.1.1 |
spark.cleaner.referenceTracking.cleanCheckpoints |
false | 控制如果引用超出范围,是否清理检查点文件。 | 1.4.0 |
执行行为
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.broadcast.blockSize |
4m | TorrentBroadcastFactory 块中每个片段的大小,单位为 KiB(除非另有说明)。值过大会降低广播期间的并行度(使其变慢);然而,如果值太小,BlockManager 的性能可能会受到影响。 |
0.5.0 |
spark.broadcast.checksum |
true | 是否为广播启用校验和。如果启用,广播将包含一个校验和,这有助于检测损坏的块,代价是计算和发送更多的数据。如果网络有其他机制保证数据在广播期间不会损坏,则可以禁用此功能。 | 2.1.1 |
spark.broadcast.UDFCompressionThreshold |
1 * 1024 * 1024 | 用户定义函数 (UDF) 和 Python RDD 命令通过广播压缩的阈值,单位为字节(除非另有说明)。 | 3.0.0 |
spark.executor.cores |
YARN 模式下为 1,Standalone 模式下为工作节点上所有可用的核心数。 | 每个执行器上使用的核心数。 | 1.0.0 |
spark.default.parallelism |
对于分布式 Shuffle 操作(如 reduceByKey 和 join),这是父 RDD 中最大的分区数。对于没有父 RDD 的操作(如 parallelize),则取决于集群管理器。
|
当用户未设置时,由 join、reduceByKey 和 parallelize 等转换返回的 RDD 中的默认分区数。 |
0.5.0 |
spark.executor.heartbeatInterval |
10s | 每个执行器向驱动程序发送心跳的间隔。心跳让驱动程序知道执行器仍然存活,并向其更新正在进行任务的指标。spark.executor.heartbeatInterval 应显著小于 spark.network.timeout。 | 1.1.0 |
spark.files.fetchTimeout |
60s | 从驱动程序获取通过 SparkContext.addFile() 添加的文件时使用的通信超时。 | 1.0.0 |
spark.files.useFetchCache |
true | 如果设置为 true(默认值),文件获取将使用同一应用程序的执行器共享的本地缓存,这可以在同一主机上运行多个执行器时提高任务启动性能。如果设置为 false,这些缓存优化将被禁用,所有执行器将获取自己的文件副本。为了使用驻留在 NFS 文件系统上的 Spark 本地目录,可能需要禁用此优化(更多详情请参阅 SPARK-6313)。 | 1.2.2 |
spark.files.overwrite |
false | 是否覆盖启动时存在的任何文件。即使此选项设置为 true,用户也不能覆盖之前由 SparkContext.addFile 或 SparkContext.addJar 添加的文件。 |
1.0.0 |
spark.files.ignoreCorruptFiles |
false | 是否忽略损坏的文件。如果为 true,当遇到损坏或不存在的文件时,Spark 作业将继续运行,并且已读取的内容仍将返回。 | 2.1.0 |
spark.files.ignoreMissingFiles |
false | 是否忽略缺失的文件。如果为 true,当遇到缺失的文件时,Spark 作业将继续运行,并且已读取的内容仍将返回。 | 2.4.0 |
spark.files.maxPartitionBytes |
134217728 (128 MiB) | 读取文件时打包到单个分区中的最大字节数。 | 2.1.0 |
spark.files.openCostInBytes |
4194304 (4 MiB) | 打开文件的估计成本,以同时可扫描的字节数衡量。这在将多个文件放入一个分区时使用。最好高估,这样小文件的分区将比大文件的分区处理得更快。 | 2.1.0 |
spark.hadoop.cloneConf |
false | 如果设置为 true,则为每个任务克隆一个新的 Hadoop Configuration 对象。应启用此选项以解决 Configuration 线程安全问题(更多详情请参阅 SPARK-2546)。默认情况下禁用此选项,以避免对不受这些问题影响的作业造成意外的性能回退。 |
1.0.3 |
spark.hadoop.validateOutputSpecs |
true | 如果设置为 true,则验证 saveAsHadoopFile 和其他变体中使用的输出规范(例如检查输出目录是否已存在)。可以禁用此选项以屏蔽因先前存在的输出目录而产生的异常。除非是为了与之前版本的 Spark 兼容,否则我们建议用户不要禁用此功能。只需使用 Hadoop 的 FileSystem API 手动删除输出目录即可。对于通过 Spark Streaming 的 StreamingContext 生成的作业,此设置将被忽略,因为在检查点恢复期间,数据可能需要重新写入到先前存在的输出目录中。 | 1.0.1 |
spark.storage.memoryMapThreshold |
2m | 从磁盘读取块时触发 Spark 内存映射的块大小阈值。默认单位为字节,除非另有说明。这可以防止 Spark 对非常小的块进行内存映射。通常,对于接近或低于操作系统页面大小的块,内存映射的开销很高。 | 0.9.2 |
spark.storage.decommission.enabled |
false | 在退役执行器时是否退役块管理器(Block Manager)。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.enabled |
true | 在块管理器退役期间是否传输 Shuffle 块。需要可迁移的 Shuffle 解析器(如基于排序的 Shuffle)。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.maxThreads |
8 | 迁移 Shuffle 文件时使用的最大线程数。 | 3.1.0 |
spark.storage.decommission.rddBlocks.enabled |
true | 在块管理器退役期间是否传输 RDD 块。 | 3.1.0 |
spark.storage.decommission.fallbackStorage.path |
(无) | 块管理器退役期间回退存储的位置。例如,s3a://spark-storage/。如果为空,则禁用回退存储。该存储应通过 TTL 管理,因为 Spark 不会对其进行清理。 |
3.1.0 |
spark.storage.decommission.fallbackStorage.cleanUp |
false | 如果为 true,Spark 会在关闭时清理其回退存储数据。 | 3.2.0 |
spark.storage.decommission.shuffleBlocks.maxDiskSize |
(无) | 在拒绝远程 Shuffle 块之前用于存储 Shuffle 块的最大磁盘空间。拒绝远程 Shuffle 块意味着执行器将不会接收任何 Shuffle 迁移,如果此时没有其他可用的执行器用于迁移,除非配置了 spark.storage.decommission.fallbackStorage.path,否则 Shuffle 块将会丢失。 |
3.2.0 |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version |
1 | 文件输出提交器算法版本,有效版本号为 1 或 2。注意,2 可能会导致正确性问题,如 MAPREDUCE-7282。 | 2.2.0 |
Executor 指标
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.eventLog.logStageExecutorMetrics |
false | 是否将执行器指标(针对每个执行器)的阶段峰值写入事件日志。 注意:指标会在执行器心跳中轮询(收集)并发送,这是始终执行的;此配置仅用于确定聚合指标峰值是否写入事件日志。 |
3.0.0 |
spark.executor.processTreeMetrics.enabled |
false | 在收集执行器指标时是否收集进程树指标(来自 /proc 文件系统)。 注意:仅在存在 /proc 文件系统时才会收集进程树指标。 |
3.0.0 |
spark.executor.metrics.pollingInterval |
0 | 收集执行器指标的频率(以毫秒为单位)。 如果为 0,则在执行器心跳时执行轮询(即按 spark.executor.heartbeatInterval 指定的心跳间隔)。如果为正数,则按此间隔执行轮询。 |
3.0.0 |
spark.eventLog.gcMetrics.youngGenerationGarbageCollectors |
Copy,PS Scavenge,ParNew,G1 Young Generation | 支持的年轻代垃圾回收器的名称。名称通常是 GarbageCollectorMXBean.getName 的返回值。内置的年轻代垃圾回收器包括 Copy、PS Scavenge、ParNew 和 G1 Young Generation。 | 3.0.0 |
spark.eventLog.gcMetrics.oldGenerationGarbageCollectors |
MarkSweepCompact,PS MarkSweep,ConcurrentMarkSweep,G1 Old Generation | 支持的老年代垃圾回收器的名称。名称通常是 GarbageCollectorMXBean.getName 的返回值。内置的老年代垃圾回收器包括 MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep 和 G1 Old Generation。 | 3.0.0 |
spark.executor.metrics.fileSystemSchemes |
file,hdfs | 要在执行器指标中报告的文件系统方案。 | 3.1.0 |
网络
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.rpc.message.maxSize |
128 | 允许在“控制平面”通信中使用的最大消息大小(以 MiB 为单位);通常仅适用于在执行器和驱动程序之间发送的 Map 输出大小信息。如果您运行的作业有数以千计的 Map 和 Reduce 任务并看到有关 RPC 消息大小的消息,请增加此值。 | 2.0.0 |
spark.blockManager.port |
(随机) | 所有块管理器监听的端口。这些端口存在于驱动程序和执行器上。 | 1.1.0 |
spark.driver.blockManager.port |
(spark.blockManager.port 的值) | 块管理器监听的驱动程序专用端口,适用于无法使用与执行器相同配置的情况。 | 2.1.0 |
spark.driver.bindAddress |
(spark.driver.host 的值) | 绑定监听套接字的主机名或 IP 地址。此配置覆盖 SPARK_LOCAL_IP 环境变量(见下文)。 它还允许向执行器或外部系统通告与本地地址不同的地址。例如,这在使用网桥网络运行容器时非常有用。为了使其正常工作,驱动程序使用的不同端口(RPC、块管理器和 UI)需要从容器的主机进行转发。 |
2.1.0 |
spark.driver.host |
(本地主机名) | 驱动程序的主机名或 IP 地址。这用于与执行器和独立主节点(Standalone Master)进行通信。 | 0.7.0 |
spark.driver.port |
(随机) | 驱动程序监听的端口。这用于与执行器和独立主节点(Standalone Master)进行通信。 | 0.7.0 |
spark.driver.metrics.pollingInterval |
10s | 收集驱动程序指标的频率(以毫秒为单位)。如果未设置,则按执行器心跳间隔执行轮询。如果设置了,则按此间隔执行轮询。 | 4.1.0 |
spark.io.mode.default |
AUTO | Netty 传输的默认 IO 模式。可以是 NIO、EPOLL、KQUEUE 或 AUTO 之一。默认值为 AUTO,表示如果可用,则使用原生 Netty 库。换句话说,对于 Linux 环境,如果可用,优先使用 EPOLL,然后再使用 NIO。对于 MacOS/BSD 环境,如果可用,优先使用 KQUEUE,然后再使用 NIO。 |
4.1.0 |
spark.rpc.io.backLog |
64 | RPC 服务器的接受队列长度。对于大型应用程序,可能需要增加此值,以确保在短时间内有大量连接到达时不会丢弃传入的连接。 | 3.0.0 |
spark.network.timeout |
120s | 所有网络交互的默认超时时间。如果未配置 spark.storage.blockManagerHeartbeatTimeoutMs、spark.shuffle.io.connectionTimeout、spark.rpc.askTimeout 或 spark.rpc.lookupTimeout,将使用此配置来代替它们。 |
1.3.0 |
spark.network.timeoutInterval |
60s | 驱动程序检查并终止死执行器的间隔。 | 1.3.2 |
spark.network.io.preferDirectBufs |
true | 如果启用,共享分配器将优先进行堆外缓冲区的分配。堆外缓冲区用于减少 Shuffle 和缓存块传输期间的垃圾回收。对于堆外内存非常有限的环境,用户可能希望关闭此功能以强制所有分配都在堆内进行。 | 3.0.0 |
spark.port.maxRetries |
16 | 绑定到端口失败时的最大重试次数。当端口被赋予特定值(非 0)时,每次后续重试将在重试前将前一次尝试使用的端口增加 1。这本质上允许它尝试从指定的起始端口到 port + maxRetries 的端口范围。 | 1.1.1 |
spark.rpc.askTimeout |
spark.network.timeout |
RPC 询问操作在超时前等待的持续时间。 | 1.4.0 |
spark.rpc.lookupTimeout |
120s | RPC 远程端点查找操作在超时前等待的持续时间。 | 1.4.0 |
spark.network.maxRemoteBlockSizeFetchToMem |
200m | 当块大小超过此字节阈值时,远程块将被获取到磁盘。这是为了避免巨大的请求占用过多的内存。注意,此配置将影响 Shuffle 获取和块管理器远程块获取。对于启用了外部 Shuffle 服务的用户,此功能仅在外部 Shuffle 服务至少为 2.3.0 时才有效。 | 3.0.0 |
spark.rpc.io.connectionTimeout |
spark.network.timeout 的值 |
如果存在未完成的 RPC 请求,但通道上至少在 connectionTimeout 时间内没有流量,则 RPC 对等点之间建立的连接被标记为空闲并关闭的超时时间。 |
1.2.0 |
spark.rpc.io.connectionCreationTimeout |
spark.rpc.io.connectionTimeout 的值 |
在 RPC 对等点之间建立连接的超时时间。 | 3.2.0 |
调度
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.cores.max |
(未设置) | 当在 独立部署集群 上运行时,从整个集群(而非每台机器)中为应用程序请求的最大 CPU 核心数。如果未设置,默认值为 Spark 独立集群管理器上的 spark.deploy.defaultCores。 |
0.6.0 |
spark.locality.wait |
3s | 在放弃并转而在较不本地的节点上启动任务之前,等待启动数据本地任务的时间。相同的等待时间将用于逐步跨越多个本地级别(进程本地、节点本地、机架本地,然后是任意节点)。也可以通过设置 spark.locality.wait.node 等来自定义每个级别的等待时间。如果您的任务很长且本地性较差,您应该增加此设置,但默认值通常效果良好。 |
0.5.0 |
spark.locality.wait.node |
spark.locality.wait | 自定义节点本地性的本地性等待时间。例如,您可以将其设置为 0 以跳过节点本地性并立即搜索机架本地性(如果您的集群具有机架信息)。 | 0.8.0 |
spark.locality.wait.process |
spark.locality.wait | 自定义进程本地性的本地性等待时间。这会影响尝试访问特定执行器进程中缓存数据的任务。 | 0.8.0 |
spark.locality.wait.rack |
spark.locality.wait | 自定义机架本地性的本地性等待时间。 | 0.8.0 |
spark.scheduler.maxRegisteredResourcesWaitingTime |
30s | 在开始调度之前,等待资源注册的最长时间。 | 1.1.1 |
spark.scheduler.minRegisteredResourcesRatio |
KUBERNETES 模式为 0.8;YARN 模式为 0.8;独立模式为 0.0 | 在开始调度之前要等待的已注册资源(已注册资源 / 总预期资源)的最小比率(在 Yarn 和 Kubernetes 模式下资源为执行器,在独立模式下为 CPU 核心)。指定为 0.0 到 1.0 之间的双精度浮点数。无论是否达到最小资源比率,开始调度前等待的最长时间由配置 spark.scheduler.maxRegisteredResourcesWaitingTime 控制。 |
1.1.1 |
spark.scheduler.mode |
FIFO | 提交到同一 SparkContext 的作业之间的 调度模式。可以设置为 FAIR 以使用公平共享,而不是一个接一个地排队作业。这对于多用户服务非常有用。 |
0.8.0 |
spark.scheduler.revive.interval |
1s | 调度器恢复工作节点资源以运行任务的间隔长度。 | 0.8.1 |
spark.scheduler.listenerbus.eventqueue.capacity |
10000 | 事件队列的默认容量。Spark 首先会尝试使用 spark.scheduler.listenerbus.eventqueue.queueName.capacity 指定的容量来初始化事件队列。如果未配置,Spark 将使用此配置指定的默认容量。注意,容量必须大于 0。如果监听器事件被丢弃,请考虑增加此值(例如 20000)。增加此值可能会导致驱动程序使用更多内存。 |
2.3.0 |
spark.scheduler.listenerbus.eventqueue.shared.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 监听器总线中共享事件队列的容量,该队列持有注册到监听器总线的外部监听器的事件。如果对应共享队列的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.appStatus.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
appStatus 事件队列的容量,该队列持有内部应用程序状态监听器的事件。如果对应 appStatus 队列的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 监听器总线中 executorManagement 事件队列的容量,该队列持有内部执行器管理监听器的事件。如果对应 executorManagement 队列的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.eventLog.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 监听器总线中 eventLog 队列的容量,该队列持有将事件写入 eventLogs 的事件记录监听器的事件。如果对应 eventLog 队列的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.streams.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
streams 事件队列的容量,该队列持有内部流处理监听器的事件。如果对应 streams 队列的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.resource.profileMergeConflicts |
false | 如果设置为“true”,当 RDD 在被合并成单个阶段时指定了不同的 ResourceProfiles,Spark 将合并这些资源配置。合并时,Spark 会选择每种资源的最大值并创建一个新的 ResourceProfile。默认为 false,如果 RDD 进入同一阶段时发现多个不同的 ResourceProfiles,Spark 会抛出异常。 | 3.1.0 |
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout |
120s | 由于所有执行器都因任务失败而被排除,导致 TaskSet 无法调度时,在放弃任务集之前,等待获取新执行器并调度任务的超时时间(以秒为单位)。 | 2.4.1 |
spark.standalone.submit.waitAppCompletion |
false | 如果设置为 true,当 RDD 在被合并成单个阶段时指定了不同的 ResourceProfiles,Spark 将合并这些资源配置。合并时,Spark 会选择每种资源的最大值并创建一个新的 ResourceProfile。默认为 false,如果 RDD 进入同一阶段时发现多个不同的 ResourceProfiles,Spark 会抛出异常。 | 3.1.0 |
spark.excludeOnFailure.enabled |
false | 如果设置为“true”,则阻止 Spark 在因任务失败过多而被排除的执行器上调度任务。用于排除执行器和节点的算法可以通过其他“spark.excludeOnFailure”配置选项进一步控制。此配置将被“spark.excludeOnFailure.application.enabled”和“spark.excludeOnFailure.taskAndStage.enabled”覆盖,以指定各级别的排除启用情况。 | 2.1.0 |
spark.excludeOnFailure.application.enabled |
false | 如果设置为“true”,则允许因任务失败过多而在整个应用程序范围内排除执行器,并阻止 Spark 在这些执行器上调度任务。此配置将覆盖“spark.excludeOnFailure.enabled”。 | 4.0.0 |
spark.excludeOnFailure.taskAndStage.enabled |
false | 如果设置为“true”,则允许在任务集级别因任务失败过多而排除执行器,并阻止 Spark 在这些执行器上调度任务。此配置将覆盖“spark.excludeOnFailure.enabled”。 | 4.0.0 |
spark.excludeOnFailure.timeout |
1h | (实验性) 节点或执行器在整个应用程序中被排除多长时间后,会被无条件地从排除列表中移除以尝试运行新任务。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor |
1 | (实验性) 对于给定的任务,在一个执行器上可以重试多少次,之后该执行器会被排除在该任务之外。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerNode |
2 | (实验性) 对于给定的任务,在一个节点上可以重试多少次,之后整个节点会被排除在该任务之外。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedTasksPerExecutor |
2 | (实验性) 在一个阶段中,一个执行器上必须失败多少个不同的任务,该执行器才会被排除在该阶段之外。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode |
2 | (实验性) 在一个阶段中,有多少个不同的执行器被标记为排除,整个节点才会被标记为该阶段失败。 | 2.1.0 |
spark.excludeOnFailure.application.maxFailedTasksPerExecutor |
2 | (实验性) 在成功的任务集中,一个执行器上必须失败多少个不同的任务,该执行器才会被整个应用程序排除。被排除的执行器将在 spark.excludeOnFailure.timeout 指定的超时后自动添加回可用资源池。但请注意,对于动态分配,执行器可能会被标记为空闲并被集群管理器回收。 |
2.2.0 |
spark.excludeOnFailure.application.maxFailedExecutorsPerNode |
2 | (实验性) 在整个应用程序中,必须排除多少个不同的执行器,节点才会被整个应用程序排除。被排除的节点将在 spark.excludeOnFailure.timeout 指定的超时后自动添加回可用资源池。但请注意,对于动态分配,节点上的执行器可能会被标记为空闲并被集群管理器回收。 |
2.2.0 |
spark.excludeOnFailure.killExcludedExecutors |
false | (实验性) 如果设置为“true”,允许 Spark 在执行器因获取失败而被排除或被整个应用程序排除时自动杀死它们(由 spark.killExcludedExecutors.application.* 控制)。注意,当整个节点被添加为排除项时,该节点上的所有执行器都将被杀死。 | 2.2.0 |
spark.excludeOnFailure.application.fetchFailure.enabled |
false | (实验性) 如果设置为“true”,Spark 将在发生获取失败时立即排除执行器。如果启用了外部 Shuffle 服务,则整个节点将被排除。 | 2.3.0 |
spark.speculation |
false | 如果设置为“true”,则执行任务的投机执行。这意味着如果一个阶段中有任务运行缓慢,它们将被重新启动。 | 0.6.0 |
spark.speculation.interval |
100ms | Spark 检查任务以进行投机执行的频率。 | 0.6.0 |
spark.speculation.multiplier |
3 | 任务比中位数慢多少倍才会被考虑进行投机执行。 | 0.6.0 |
spark.speculation.quantile |
0.9 | 对于特定阶段,必须完成多少比例的任务后,才能启用投机执行。 | 0.6.0 |
spark.speculation.minTaskRuntime |
100ms | 任务运行的最短时间,超过此时间后才会被考虑进行投机执行。这可用于避免为非常短的任务启动投机副本。 | 3.2.0 |
spark.speculation.task.duration.threshold |
None | 任务持续时间,超过此时间后调度器将尝试投机运行任务。如果提供了此值,且当前阶段的任务数量少于或等于单个执行器上的槽位数量,且任务运行时间长于阈值,则将投机运行任务。此配置有助于在任务很少的情况下进行投机。如果执行器槽位足够多,常规投机配置也可能适用。例如,如果运行成功次数足够多,即使未达到阈值,任务也可能被重新启动。槽位数量基于 spark.executor.cores 和 spark.task.cpus 的最小值(至少为 1)计算得出。默认单位为字节,除非另有说明。 | 3.0.0 |
spark.speculation.efficiency.processRateMultiplier |
0.75 | 评估低效任务时使用的乘数。乘数越高,被认为低效的任务就越多。 | 3.4.0 |
spark.speculation.efficiency.longRunTaskFactor |
2 | 只要任务持续时间超过了乘数因子与时间阈值(spark.speculation.multiplier * successfulTaskDurations.median 或 spark.speculation.minTaskRuntime)的乘积,无论数据处理速率是否良好,该任务都将被投机执行。这避免了在任务缓慢与数据处理速率无关时遗漏低效任务。 |
3.4.0 |
spark.speculation.efficiency.enabled |
true | 当设置为 true 时,spark 将通过阶段任务指标或其持续时间评估任务处理效率,并且仅对低效任务进行投机。当满足以下任一条件时,任务即为低效:1) 其数据处理速率小于阶段内所有成功任务的平均数据处理速率乘以乘数,或 2) 其持续时间超过了 spark.speculation.efficiency.longRunTaskFactor 与时间阈值(spark.speculation.multiplier * successfulTaskDurations.median 或 spark.speculation.minTaskRuntime)的乘积。 |
3.4.0 |
spark.task.cpus |
1 | 为每个任务分配的核心数。 | 0.5.0 |
spark.task.resource.{resourceName}.amount |
1 | 为每个任务分配的特定资源类型的量,注意这可以是双精度浮点数。如果指定了此项,您还必须提供执行器配置 spark.executor.resource.{resourceName}.amount 及任何相应的发现配置,以便您的执行器使用该资源类型创建。除整数外,还可以指定分数(例如 0.25,意为 1/4 的资源)。分数必须小于或等于 0.5,换句话说,资源共享的最低限度是每个资源 2 个任务。此外,分数会被向下取整以分配资源槽(例如 0.2222 的配置,即 1/0.2222 槽,将变为每个资源 4 个任务,而不是 5 个)。 |
3.0.0 |
spark.task.maxFailures |
4 | 在放弃作业之前,任何特定任务连续失败的次数。跨不同任务分散的失败总数不会导致作业失败;特定任务必须连续失败此次数的尝试。如果任何尝试成功,该任务的失败计数将重置。应大于或等于 1。允许的重试次数 = 此值 - 1。 | 0.8.0 |
spark.task.reaper.enabled |
false | 启用对被杀死/中断任务的监控。设置为 true 时,任何被杀死的任务都将由执行器监控,直到该任务实际执行完毕。有关如何控制此监控行为的详细信息,请参阅其他 spark.task.reaper.* 配置。设置为 false(默认值)时,任务杀死将使用缺乏此类监控的旧代码路径。 |
2.0.3 |
spark.task.reaper.pollingInterval |
10s | 当 spark.task.reaper.enabled = true 时,此设置控制执行器轮询被杀死任务状态的频率。如果轮询时被杀死任务仍在运行,则会记录警告,并且默认情况下,将记录任务的线程转储(此线程转储可通过下文记录的 spark.task.reaper.threadDump 设置禁用)。 |
2.0.3 |
spark.task.reaper.threadDump |
true | 当 spark.task.reaper.enabled = true 时,此设置控制在轮询被杀死任务时是否记录任务线程转储。将其设置为 false 以禁用线程转储收集。 |
2.0.3 |
spark.task.reaper.killTimeout |
-1 | 当 spark.task.reaper.enabled = true 时,此设置指定一个超时时间,如果被杀死的任务仍未停止运行,执行器 JVM 将在此之后自杀。默认值 -1 会禁用此机制并防止执行器自毁。此设置的目的是作为安全网,防止失控的不可取消任务导致执行器不可用。 |
2.0.3 |
spark.stage.maxConsecutiveAttempts |
4 | 在放弃阶段之前允许的连续阶段尝试次数。 | 2.2.0 |
spark.stage.ignoreDecommissionFetchFailure |
true | 在统计 spark.stage.maxConsecutiveAttempts 时,是否忽略由执行器退役引起的阶段获取失败。 |
3.4.0 |
Barrier 执行模式
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.barrier.sync.timeout |
365d | 屏障任务(barrier task)中每个 barrier() 调用的超时时间(秒)。如果协调器未能在配置时间内从屏障任务接收到所有同步消息,则抛出 SparkException 以终止所有任务。默认值设置为 31536000 (3600 * 24 * 365),因此 barrier() 调用将等待一年。 |
2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.interval |
15s | 最大并发任务检查失败与下一次检查之间等待的时间(秒)。最大并发任务检查可确保集群在提交作业时能够启动比屏障阶段要求更多的并发任务。如果集群刚刚启动且未注册足够的执行器,则检查可能会失败,因此我们会等待一会儿并尝试再次执行检查。如果作业的检查失败超过配置的最大失败次数,则使当前的作业提交失败。注意,此配置仅适用于包含一个或多个屏障阶段的作业,对于非屏障作业,我们不会执行该检查。 | 2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures |
40 | 在使作业提交失败之前允许的最大并发任务检查失败次数。最大并发任务检查可确保集群在提交作业时能够启动比屏障阶段要求更多的并发任务。如果集群刚刚启动且未注册足够的执行器,则检查可能会失败,因此我们会等待一会儿并尝试再次执行检查。如果作业的检查失败超过配置的最大失败次数,则使当前的作业提交失败。注意,此配置仅适用于包含一个或多个屏障阶段的作业,对于非屏障作业,我们不会执行该检查。 | 2.4.0 |
动态分配
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.dynamicAllocation.enabled |
false | 是否使用动态资源分配,根据工作负载向上或向下扩展为此应用程序注册的执行器数量。更多详情,请参见此处的说明。 这需要满足以下条件之一:1) 通过 spark.shuffle.service.enabled 启用外部 Shuffle 服务,或者 2) 通过 spark.dynamicAllocation.shuffleTracking.enabled 启用 Shuffle 跟踪,或者 3) 通过 spark.decommission.enabled 和 spark.storage.decommission.shuffleBlocks.enabled 启用 Shuffle 块退役,或者 4) (实验性) 配置 spark.shuffle.sort.io.plugin.class 以使用支持可靠存储的自定义 ShuffleDataIO 的 ShuffleDriverComponents。以下配置也与之相关:spark.dynamicAllocation.minExecutors、spark.dynamicAllocation.maxExecutors、spark.dynamicAllocation.initialExecutors 和 spark.dynamicAllocation.executorAllocationRatio。 |
1.2.0 |
spark.dynamicAllocation.executorIdleTimeout |
60s | 如果启用了动态分配且执行器空闲时间超过此持续时间,则该执行器将被移除。更多详情,请参见此说明。 | 1.2.0 |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
无穷大 | 如果启用了动态分配且具有缓存数据块的执行器空闲时间超过此持续时间,则该执行器将被移除。更多详情,请参见此说明。 | 1.4.0 |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
如果启用了动态分配,则运行的执行器初始数量。 如果设置了 --num-executors(或 spark.executor.instances)且大于此值,它将用作初始执行器数量。 |
1.3.0 |
spark.dynamicAllocation.maxExecutors |
无穷大 | 如果启用了动态分配,则执行器数量的上限。 | 1.2.0 |
spark.dynamicAllocation.minExecutors |
0 | 如果启用了动态分配,则执行器数量的下限。 | 1.2.0 |
spark.dynamicAllocation.executorAllocationRatio |
1 | 默认情况下,动态分配将请求足够的执行器,以便根据要处理的任务数量最大化并行度。虽然这最大限度地减少了作业的延迟,但在小任务的情况下,由于执行器分配开销,此设置可能会浪费大量资源,因为某些执行器甚至可能不做任何工作。此设置允许设置一个比率,该比率将用于相对于全并行度减少执行器数量。默认为 1.0 以提供最大并行度。0.5 将使目标执行器数量除以 2。动态分配计算的目标执行器数量仍可被 spark.dynamicAllocation.minExecutors 和 spark.dynamicAllocation.maxExecutors 设置覆盖。 |
2.4.0 |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | 如果启用了动态分配,且有待处理任务积压时间超过此持续时间,将请求新的执行器。更多详情,请参见此说明。 | 1.2.0 |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
与 spark.dynamicAllocation.schedulerBacklogTimeout 相同,但仅用于后续执行器请求。更多详情,请参见此说明。 |
1.2.0 |
spark.dynamicAllocation.shuffleTracking.enabled |
true |
为执行器启用 Shuffle 文件跟踪,这允许动态分配而无需外部 Shuffle 服务。此选项将尝试保持存储正在运行的作业的 Shuffle 数据的执行器处于活动状态。 | 3.0.0 |
spark.dynamicAllocation.shuffleTracking.timeout |
无穷大 |
启用 Shuffle 跟踪后,控制持有 Shuffle 数据的执行器的超时时间。默认值意味着 Spark 将依赖 Shuffle 被垃圾回收来释放执行器。如果由于某种原因垃圾回收无法足够快地清理 Shuffle,则此选项可用于控制即使在存储 Shuffle 数据时何时让执行器超时。 | 3.0.0 |
线程配置
根据作业和集群配置,我们可以设置 Spark 中几个地方的线程数,以有效利用可用资源获得更好的性能。在 Spark 3.0 之前,这些线程配置适用于 Spark 的所有角色,例如驱动程序、执行器、工作节点和主节点。从 Spark 3.0 开始,我们可以从驱动程序和执行器开始,以更细的粒度配置线程。以下表中的 RPC 模块为例。对于其他模块(如 Shuffle),只需将属性名称中的“rpc”替换为“shuffle”,但 spark.{driver|executor}.rpc.netty.dispatcher.numThreads 除外(仅适用于 RPC 模块)。
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.{driver|executor}.rpc.io.serverThreads |
回退到 spark.rpc.io.serverThreads |
服务器线程池中使用的线程数 | 1.6.0 |
spark.{driver|executor}.rpc.io.clientThreads |
回退到 spark.rpc.io.clientThreads |
客户端线程池中使用的线程数 | 1.6.0 |
spark.{driver|executor}.rpc.netty.dispatcher.numThreads |
回退到 spark.rpc.netty.dispatcher.numThreads |
RPC 消息分发器线程池中使用的线程数 | 3.0.0 |
线程相关配置键的默认值是驱动程序或执行器请求的核心数的最小值,或者在没有该值的情况下,是 JVM 可用的核心数(硬编码上限为 8)。
Spark Connect
服务器配置
服务器配置是在 Spark Connect 服务器中设置的,例如,当您使用 ./sbin/start-connect-server.sh 启动 Spark Connect 服务器时。它们通常通过配置文件和带有 --conf/-c 的命令行选项进行设置。
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.api.mode |
classic | 对于 Spark Classic 应用程序,指定是否通过运行本地 Spark Connect 服务器自动使用 Spark Connect。该值可以是 classic 或 connect。 |
4.0.0 |
spark.connect.grpc.binding.address |
(无) | Spark Connect 服务器绑定的地址。 | 4.0.0 |
spark.connect.grpc.binding.port |
15002 | Spark Connect 服务器绑定的端口。 | 3.4.0 |
spark.connect.grpc.port.maxRetries |
0 | gRPC 服务器绑定的最大端口重试次数。默认情况下设置为 0,如果发生端口冲突,服务器将快速失败。 | 4.0.0 |
spark.connect.grpc.interceptor.classes |
(无) | 必须实现 io.grpc.ServerInterceptor 接口的类名列表(以逗号分隔)。 |
3.4.0 |
spark.connect.grpc.arrow.maxBatchSize |
4m | 使用 Apache Arrow 时,限制从服务器端发送到客户端的一个 Arrow 批次的最大大小。目前,我们保守地使用其 70%,因为该大小是不准确的估计值。 | 3.4.0 |
spark.connect.grpc.maxInboundMessageSize |
134217728 | 设置 gRPC 请求的最大入站消息大小。具有更大负载的请求将会失败。 | 3.4.0 |
spark.connect.extensions.relation.classes |
(无) | 实现 org.apache.spark.sql.connect.plugin.RelationPlugin 特征以支持 proto 中自定义 Relation 类型的类列表(以逗号分隔)。 |
3.4.0 |
spark.connect.extensions.expression.classes |
(无) | 实现 org.apache.spark.sql.connect.plugin.ExpressionPlugin 特征以支持 proto 中自定义 Expression 类型的类列表(以逗号分隔)。 |
3.4.0 |
spark.connect.extensions.command.classes |
(无) | 实现 org.apache.spark.sql.connect.plugin.CommandPlugin 特征以支持 proto 中自定义 Command 类型的类列表(以逗号分隔)。 |
3.4.0 |
spark.connect.ml.backend.classes |
(无) | 实现 org.apache.spark.sql.connect.plugin.MLBackendPlugin 特征以用后端特定实现替换指定 Spark ML 操作符的类列表(以逗号分隔)。 |
4.0.0 |
spark.connect.jvmStacktrace.maxSize |
1024 | 当 `spark.sql.pyspark.jvmStacktrace.enabled` 为 true 时,设置要显示的最大堆栈跟踪大小。 | 3.5.0 |
spark.sql.connect.ui.retainedSessions |
200 | Spark Connect UI 历史记录中保留的客户端会话数量。 | 3.5.0 |
spark.sql.connect.ui.retainedStatements |
200 | Spark Connect UI 历史记录中保留的语句数量。 | 3.5.0 |
spark.sql.connect.enrichError.enabled |
true | 当为 true 时,通过额外的 RPC 在客户端丰富包含完整异常消息和可选服务器端堆栈跟踪的错误信息。 | 4.0.0 |
spark.sql.connect.serverStacktrace.enabled |
true | 当为 true 时,在用户面临的 Spark 异常中设置服务器端堆栈跟踪。 | 4.0.0 |
spark.connect.grpc.maxMetadataSize |
1024 | 设置元数据字段的最大大小。例如,它限制 `ErrorInfo` 中的元数据字段。 | 4.0.0 |
spark.connect.progress.reportInterval |
2s | 向客户端报告查询进度的间隔。如果该值设置为负值,进度报告将被禁用。 | 4.0.0 |
安全性
请参考 安全 页面以了解如何保护不同 Spark 子系统的可用选项。
Spark SQL
运行时 SQL 配置
运行时 SQL 配置是每个会话可变的 Spark SQL 配置。它们可以通过带有 --conf/-c 前缀的配置文件和命令行选项设置初始值,或者通过设置用于创建 SparkSession 的 SparkConf 来设置。此外,它们可以通过 SET 命令进行设置和查询,并可以通过 RESET 命令或在运行时通过 SparkSession.conf 的 setter 和 getter 方法将其重置为初始值。
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.sql.adaptive.advisoryPartitionSizeInBytes |
(spark.sql.adaptive.shuffle.targetPostShuffleInputSize 的值) |
自适应优化期间 Shuffle 分区的建议大小(以字节为单位,当 spark.sql.adaptive.enabled 为 true 时)。当 Spark 合并小 Shuffle 分区或拆分倾斜 Shuffle 分区时,此配置生效。 |
3.0.0 |
spark.sql.adaptive.autoBroadcastJoinThreshold |
(无) | 配置执行连接(Join)时广播到所有工作节点的最大表大小(以字节为单位)。将此值设置为 -1 可禁用广播。默认值与 spark.sql.autoBroadcastJoinThreshold 相同。注意,此配置仅在自适应框架中使用。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.enabled |
true | 当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 将根据目标大小(由 'spark.sql.adaptive.advisoryPartitionSizeInBytes' 指定)合并连续的 Shuffle 分区,以避免过多的小任务。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
(无) | 合并前的初始 Shuffle 分区数。如果未设置,则等于 spark.sql.shuffle.partitions。此配置仅在 'spark.sql.adaptive.enabled' 和 'spark.sql.adaptive.coalescePartitions.enabled' 均为 true 时才有效。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB | 合并后 Shuffle 分区的最小大小。当分区合并期间自适应计算的目标大小太小时,此配置非常有用。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
true | 当为 true 时,Spark 在合并连续 Shuffle 分区时不遵循 'spark.sql.adaptive.advisoryPartitionSizeInBytes'(默认 64MB)指定的目标大小,而是根据 Spark 集群的默认并行度自适应地计算目标大小。计算出的通常小于配置的目标大小。这是为了最大限度地提高并行度,并避免在启用自适应查询执行时出现性能回归。建议在繁忙的集群上将此配置设置为 false,以使资源利用率更高效(避免产生大量小任务)。 |
3.2.0 |
spark.sql.adaptive.customCostEvaluatorClass |
(无) | 用于自适应执行的自定义成本评估器类。如果未设置,Spark 默认将使用其自身的 SimpleCostEvaluator。 |
3.2.0 |
spark.sql.adaptive.enabled |
true | 当为 true 时,启用自适应查询执行,该执行根据准确的运行时统计信息在查询执行过程中重新优化查询计划。 |
1.6.0 |
spark.sql.adaptive.forceOptimizeSkewedJoin |
false | 当为 true 时,强制启用 OptimizeSkewedJoin,即使它引入了额外的 Shuffle。 |
3.3.0 |
spark.sql.adaptive.localShuffleReader.enabled |
true | 当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 尝试在不需要 Shuffle 分区的情况下使用本地 Shuffle 读取器读取 Shuffle 数据,例如,在将排序合并连接转换为广播哈希连接之后。 |
3.0.0 |
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0b | 配置每个分区允许构建本地哈希映射的最大大小(以字节为单位)。如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 且所有分区大小都不大于此配置,则无论 spark.sql.join.preferSortMergeJoin 的值如何,连接选择都倾向于使用 Shuffle 哈希连接而不是排序合并连接。 |
3.2.0 |
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled |
true | 当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 将优化 RebalancePartitions 中的倾斜 Shuffle 分区,并根据目标大小(由 'spark.sql.adaptive.advisoryPartitionSizeInBytes' 指定)将它们拆分为更小的分区,以避免数据倾斜。 |
3.2.0 |
spark.sql.adaptive.optimizer.excludedRules |
(无) | 配置要在自适应优化器中禁用的规则列表,规则按其名称指定并用逗号分隔。优化器将记录实际上已被排除的规则。 |
3.1.0 |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor |
0.2 | 如果一个分区的大小小于此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes,则该分区在拆分期间将被合并。 |
3.3.0 |
spark.sql.adaptive.skewJoin.enabled |
true | 当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 通过拆分(并在需要时复制)倾斜分区来动态处理 Shuffle 连接(排序合并和 Shuffle 哈希)中的倾斜。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
5.0 | 如果一个分区的大小大于此因子乘以中位数分区大小,且也大于 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes',则该分区被认为是倾斜的。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB | 如果一个分区的大小(以字节为单位)大于此阈值,且也大于 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' 乘以中位数分区大小,则该分区被认为是倾斜的。理想情况下,此配置应设置得大于 'spark.sql.adaptive.advisoryPartitionSizeInBytes'。 |
3.0.0 |
spark.sql.allowNamedFunctionArguments |
true | 如果为 true,Spark 将为所有已实现命名参数支持的函数开启该功能。 |
3.5.0 |
spark.sql.ansi.doubleQuotedIdentifiers |
false | 当为 true 且 'spark.sql.ansi.enabled' 为 true 时,Spark SQL 将双引号 (") 括起来的字面量读取为标识符。当为 false 时,它们被读取为字符串字面量。 |
3.4.0 |
spark.sql.ansi.enabled |
true | 当为 true 时,Spark SQL 使用 ANSI 兼容的方言,而不是 Hive 兼容的方言。例如,当 SQL 运算符/函数的输入无效时,Spark 将在运行时抛出异常而不是返回 null 结果。有关此方言的完整详细信息,请参见 Spark 文档的“ANSI 兼容性”部分。某些 ANSI 方言特性可能不是直接来自 ANSI SQL 标准,但其行为符合 ANSI SQL 的风格。 |
3.0.0 |
spark.sql.ansi.enforceReservedKeywords |
false | 当为 true 且 'spark.sql.ansi.enabled' 为 true 时,Spark SQL 解析器强制执行 ANSI 保留关键字,并禁止使用保留关键字作为表、视图、函数等别名和/或标识符的 SQL 查询。 |
3.3.0 |
spark.sql.ansi.relationPrecedence |
false | 当为 true 且 'spark.sql.ansi.enabled' 为 true 时,JOIN 在组合关系时优先于逗号。例如, |
3.4.0 |
spark.sql.autoBroadcastJoinThreshold |
10MB | 配置执行连接(Join)时广播到所有工作节点的最大表大小(以字节为单位)。将此值设置为 -1 可禁用广播。 |
1.1.0 |
spark.sql.avro.compression.codec |
snappy | 写入 AVRO 文件时使用的压缩编解码器。支持的编解码器:uncompressed、deflate、snappy、bzip2、xz 和 zstandard。默认编解码器为 snappy。 |
2.4.0 |
spark.sql.avro.deflate.level |
-1 | 写入 AVRO 文件时使用的 deflate 编解码器的压缩级别。有效值必须在 1 到 9 之间(包含 1 和 9)或为 -1。默认值为 -1,对应于当前实现中的 6 级。 |
2.4.0 |
spark.sql.avro.filterPushdown.enabled |
true | 当为 true 时,启用将过滤器下推到 Avro 数据源。 |
3.1.0 |
spark.sql.avro.xz.level |
6 | 写入 AVRO 文件时使用的 xz 编解码器的压缩级别。有效值必须在 1 到 9 之间(包含)。默认值为 6。 |
4.0.0 |
spark.sql.avro.zstandard.bufferPool.enabled |
false | 如果为 true,则在写入 AVRO 文件时启用 ZSTD JNI 库的缓冲池。 |
4.0.0 |
spark.sql.avro.zstandard.level |
3 | 写入 AVRO 文件时使用的 zstandard 编解码器的压缩级别。 |
4.0.0 |
spark.sql.binaryOutputStyle |
(无) | 显示二进制数据时使用的输出样式。有效值为 'UTF-8'、'BASIC'、'BASE64'、'HEX' 和 'HEX_DISCRETE'。 |
4.0.0 |
spark.sql.broadcastTimeout |
300 | 广播连接中广播等待时间的超时时间(以秒为单位)。 |
1.3.0 |
spark.sql.bucketing.coalesceBucketsInJoin.enabled |
false | 当为 true 时,如果连接两个桶数不同的桶表,桶数较多的一侧将被合并以与另一侧具有相同的桶数。较大的桶数可被较小的桶数整除。桶合并应用于排序合并连接和 Shuffle 哈希连接。注意:合并桶表可以避免连接中不必要的 Shuffle,但它也会降低并行度,并可能导致 Shuffle 哈希连接出现 OOM。 |
3.1.0 |
spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio |
4 | 为应用桶合并,两个被合并桶的数量比率应小于或等于此值。此配置仅在 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' 设置为 true 时才有效。 |
3.1.0 |
spark.sql.catalog.spark_catalog |
builtin | 将用作 Spark 内置 v1 目录(spark_catalog)v2 接口的目录实现。此目录与其标识符命名空间与 spark_catalog 共享,并且必须与其保持一致;例如,如果表可以由 spark_catalog 加载,则此目录也必须返回表元数据。为了将操作委托给 spark_catalog,实现可以扩展 'CatalogExtension'。该值应为 'builtin'(代表 Spark 的内置 V2SessionCatalog)或目录实现的完全限定类名。 |
3.0.0 |
spark.sql.cbo.enabled |
false | 设置为 true 时,启用 CBO 以估计计划统计信息。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.star.filter |
false | 将星型连接过滤器启发法应用于基于成本的连接枚举。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.threshold |
12 | 动态规划算法中允许连接的最大节点数。 |
2.2.0 |
spark.sql.cbo.joinReorder.enabled |
false | 在 CBO 中启用连接重排序。 |
2.2.0 |
spark.sql.cbo.planStats.enabled |
false | 当为 true 时,逻辑计划将从目录中获取行数和列统计信息。 |
3.0.0 |
spark.sql.cbo.starSchemaDetection |
false | 当为 true 时,启用基于星型模式检测的连接重排序。 |
2.2.0 |
spark.sql.charAsVarchar |
false | 当为 true 时,Spark 在 CREATE/REPLACE/ALTER TABLE 命令中将 CHAR 类型替换为 VARCHAR 类型,因此新创建/更新的表将不会有 CHAR 类型的列/字段。具有 CHAR 类型列/字段的现有表不受此配置的影响。 |
3.3.0 |
spark.sql.chunkBase64String.enabled |
true | 是否截断由 |
3.5.2 |
spark.sql.classic.shuffleDependency.fileCleanup.enabled |
false | 启用后,Shuffle 文件将在经典 SQL 执行结束时被清理。注意,如果再次执行相同的 DataFrame 引用,此清理可能会导致阶段重试并重新生成 Shuffle 文件。 |
4.1.0 |
spark.sql.cli.print.header |
false | 当设置为 true 时,spark-sql CLI 会在查询输出中打印列名。 |
3.2.0 |
spark.sql.columnNameOfCorruptRecord |
_corrupt_record | 用于存储解析失败的原始/未解析 JSON 和 CSV 记录的内部列名。 |
1.2.0 |
spark.sql.connect.shuffleDependency.fileCleanup.enabled |
(spark.sql.shuffleDependency.fileCleanup.enabled 的值) |
启用后,Shuffle 文件将在 Spark Connect SQL 执行结束时被清理。 |
4.1.0 |
spark.sql.csv.filterPushdown.enabled |
true | 当为 true 时,启用将过滤器下推到 CSV 数据源。 |
3.0.0 |
spark.sql.datetime.java8API.enabled |
false | 如果配置属性设置为 true,则 Java 8 API 的 java.time.Instant 和 java.time.LocalDate 类将用作 Catalyst 的 TimestampType 和 DateType 的外部类型。如果设置为 false,则 java.sql.Timestamp 和 java.sql.Date 将用于相同目的。 |
3.0.0 |
spark.sql.debug.maxToStringFields |
25 | 在调试输出中可以转换为字符串的序列类条目的最大字段数。超过限制的任何元素都将被丢弃并由“... N more fields”占位符替换。 |
3.0.0 |
spark.sql.defaultCacheStorageLevel |
MEMORY_AND_DISK |
|
4.0.0 |
spark.sql.defaultCatalog |
spark_catalog | 默认目录的名称。如果用户尚未显式设置当前目录,这将是当前目录。 |
3.0.0 |
spark.sql.error.messageFormat |
PRETTY | 当为 PRETTY 时,错误消息包含错误类、消息和查询上下文的文本表示。MINIMAL 和 STANDARD 格式是漂亮的 JSON 格式,其中 STANDARD 包含一个额外的 JSON 字段 |
3.4.0 |
spark.sql.execution.arrow.compression.codec |
none | 在 JVM 和 Python 进程之间传输数据(例如 toPandas、toArrow)时用于压缩 Arrow IPC 数据的压缩编解码器。这可以在传输大数据集时显著减少内存使用量和网络带宽。支持的编解码器:'none'(无压缩)、'zstd'(Zstandard)、'lz4'(LZ4)。注意,压缩可能会增加 CPU 开销,但可以提供大量的内存节省,尤其是对于高压缩比的数据集。 |
4.1.0 |
spark.sql.execution.arrow.compression.zstd.level |
3 | 压缩 Arrow IPC 数据时 Zstandard (zstd) 编解码器的压缩级别。此配置仅在 spark.sql.execution.arrow.compression.codec 设置为 'zstd' 时使用。负值提供超快压缩,但压缩率较低。正值提供正常到最大压缩,值越大压缩效果越好但速度较慢。默认值 3 在压缩速度和压缩率之间提供了良好的平衡。 |
4.1.0 |
spark.sql.execution.arrow.enabled |
false | (自 Spark 3.0 起已弃用,请设置 'spark.sql.execution.arrow.pyspark.enabled'。) |
2.3.0 |
spark.sql.execution.arrow.fallback.enabled |
true | (自 Spark 3.0 起已弃用,请设置 'spark.sql.execution.arrow.pyspark.fallback.enabled'。) |
2.4.0 |
spark.sql.execution.arrow.localRelationThreshold |
48MB | 将 Arrow 批次转换为 Spark DataFrame 时,如果 Arrow 批次的字节大小小于此阈值,则在驱动程序端使用本地集合。否则,Arrow 批次将被发送并在执行器中反序列化为 Spark 内部行。 |
3.4.0 |
spark.sql.execution.arrow.maxRecordsPerBatch |
10000 | 使用 Apache Arrow 时,限制内存中单个 ArrowRecordBatch 可以写入的最大记录数。如果设置为零或负数,则没有限制。另请参阅 spark.sql.execution.arrow.maxBytesPerBatch。如果两者都设置,则满足任何条件时都会创建每个批次。 |
2.3.0 |
spark.sql.execution.arrow.pyspark.enabled |
(spark.sql.execution.arrow.enabled 的值) |
当为 true 时,在 PySpark 中利用 Apache Arrow 进行列式数据传输。此优化适用于:1. pyspark.sql.DataFrame.toPandas。2. 当输入为 Pandas DataFrame 或 NumPy ndarray 时的 pyspark.sql.SparkSession.createDataFrame。不支持以下数据类型:TimestampType 的 ArrayType。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.fallback.enabled |
(spark.sql.execution.arrow.fallback.enabled 的值) |
当为 true 时,如果发生错误,'spark.sql.execution.arrow.pyspark.enabled' 启用的优化将自动回退到非优化实现。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.selfDestruct.enabled |
false | (实验性) 当为 true 时,在从 Arrow 转换为 Pandas 时,利用 Apache Arrow 的自我毁灭(self-destruct)和拆分块(split-blocks)选项进行 PySpark 中的列式数据传输。这以一些 CPU 时间为代价减少了内存使用。此优化适用于:设置 'spark.sql.execution.arrow.pyspark.enabled' 时的 pyspark.sql.DataFrame.toPandas。 |
3.2.0 |
spark.sql.execution.arrow.pyspark.validateSchema.enabled |
false | 当为 true 时,验证 mapInArrow、mapInPandas 和 DataSource 返回的 Arrow 批次的模式与预期模式是否一致,以确保它们兼容。 |
4.1.0 |
spark.sql.execution.arrow.sparkr.enabled |
false | 当为 true 时,在 SparkR 中利用 Apache Arrow 进行列式数据传输。此优化适用于:1. 当输入为 R DataFrame 时的 createDataFrame 2. collect 3. dapply 4. gapply。不支持以下数据类型:FloatType、BinaryType、ArrayType、StructType 和 MapType。 |
3.0.0 |
spark.sql.execution.arrow.transformWithStateInPySpark.maxStateRecordsPerBatch |
10000 | 在使用 PySpark 的 TransformWithState(包括 Python Row 和 Pandas 模式)时,限制单个内存中 ArrowRecordBatch 可写入的状态记录的最大数量。 |
4.0.0 |
spark.sql.execution.arrow.useLargeVarTypes |
false | 在使用 Apache Arrow 时,对字符串和二进制类型使用大变长向量(Large Variable Width Vectors)。普通的字符串和二进制类型在单个记录批次(record batch)中对单列有 2GiB 的限制。大变长类型移除了这一限制,但会以每个值更高的内存占用为代价。 |
3.5.0 |
spark.sql.execution.interruptOnCancel |
true | 如果设为 true,当查询被取消时,所有正在运行的任务都将被中断。 |
4.0.0 |
spark.sql.execution.pandas.inferPandasDictAsMap |
false | 如果设为 true,spark.createDataFrame 会将 Pandas DataFrame 中的 dict 推断为 MapType。如果设为 false,spark.createDataFrame 会将 Pandas DataFrame 中的 dict 推断为 StructType,这也是 PyArrow 的默认推断方式。 |
4.0.0 |
spark.sql.execution.pandas.structHandlingMode |
legacy | 创建 pandas DataFrame 时结构体(struct)类型的转换模式。当为 "legacy" 时:1. 当禁用 Arrow 优化时,转换为 Row 对象;2. 当启用 Arrow 优化时,转换为 dict,如果存在重复的嵌套字段名则抛出异常。当为 "row" 时,无论是否启用 Arrow 优化,都转换为 Row 对象。当为 "dict" 时,无论是否启用 Arrow 优化,都转换为 dict 并使用后缀键名(例如 a_0, a_1)来处理重复的嵌套字段名。 |
3.5.0 |
spark.sql.execution.pandas.udf.buffer.size |
(spark.buffer.size 的值) |
与 |
3.0.0 |
spark.sql.execution.pyspark.binaryAsBytes |
true | 如果设为 true,BinaryType 在 PySpark 中将始终映射为 bytes。如果设为 false,则恢复 4.1.0 之前的 PySpark 行为。在 4.1.0 之前,BinaryType 在没有 Arrow 优化的常规 UDF 和 UDTF、DataFrame API(Spark Classic 和 Spark Connect)以及数据源中映射为 bytearray;而在使用旧版 pandas 转换且经过 Arrow 优化的 UDF 和 UDTF 中,BinaryType 映射为 bytes。 |
4.1.0 |
spark.sql.execution.pyspark.udf.daemonKillWorkerOnFlushFailure |
(spark.python.daemon.killWorkerOnFlushFailure 的值) |
与 spark.python.daemon.killWorkerOnFlushFailure 相同,适用于带有 DataFrame 和 SQL 的 Python 执行。它可以在运行时更改。 |
4.1.0 |
spark.sql.execution.pyspark.udf.faulthandler.enabled |
(spark.python.worker.faulthandler.enabled 的值) |
与 spark.python.worker.faulthandler.enabled 相同,适用于带有 DataFrame 和 SQL 的 Python 执行。它可以在运行时更改。 |
4.0.0 |
spark.sql.execution.pyspark.udf.hideTraceback.enabled |
false | 如果设为 true,仅显示 Python UDF 的异常消息,隐藏堆栈跟踪。如果启用了此功能,simplifiedTraceback 将失效。 |
4.0.0 |
spark.sql.execution.pyspark.udf.idleTimeoutSeconds |
(spark.python.worker.idleTimeoutSeconds 的值) |
与 spark.python.worker.idleTimeoutSeconds 相同,适用于带有 DataFrame 和 SQL 的 Python 执行。它可以在运行时更改。 |
4.0.0 |
spark.sql.execution.pyspark.udf.killOnIdleTimeout |
(spark.python.worker.killOnIdleTimeout 的值) |
与 spark.python.worker.killOnIdleTimeout 相同,适用于带有 DataFrame 和 SQL 的 Python 执行。它可以在运行时更改。 |
4.1.0 |
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled |
true | 如果设为 true,则简化 Python UDF 的回溯信息。它会在回溯中隐藏来自 PySpark 的 Python worker、(反)序列化等信息,仅显示来自 UDF 的异常消息。注意,此功能仅适用于 CPython 3.7+。 |
3.1.0 |
spark.sql.execution.pyspark.udf.tracebackDumpIntervalSeconds |
(spark.python.worker.tracebackDumpIntervalSeconds 的值) |
与 spark.python.worker.tracebackDumpIntervalSeconds 相同,适用于带有 DataFrame 和 SQL 的 Python 执行。它可以在运行时更改。 |
4.1.0 |
spark.sql.execution.python.udf.buffer.size |
(spark.buffer.size 的值) |
与 |
4.0.0 |
spark.sql.execution.python.udf.maxRecordsPerBatch |
100 | 在使用 Python UDF 时,限制可进行序列化/反序列化处理的单批次最大记录数。 |
4.0.0 |
spark.sql.execution.pythonUDF.arrow.concurrency.level |
(无) | 执行 Arrow 优化 Python UDF 的并发级别。如果 Python UDF 使用密集的 I/O 操作,此配置会很有用。 |
4.0.0 |
spark.sql.execution.pythonUDF.arrow.enabled |
false | 在常规 Python UDF 中启用 Arrow 优化。该优化仅在给定函数至少接收一个参数时才能启用。 |
3.4.0 |
spark.sql.execution.pythonUDF.pandas.intToDecimalCoercionEnabled |
false | 如果设为 true,在序列化期间将 Pandas.Series 转换为 Arrow 数组之前,先将 int 转换为 Decimal Python 对象。默认禁用,会影响性能。 |
4.1.0 |
spark.sql.execution.pythonUDTF.arrow.enabled |
false | 为 Python UDTF 启用 Arrow 优化。 |
3.5.0 |
spark.sql.execution.topKSortFallbackThreshold |
2147483632 | 在包含 SORT 后接 LIMIT 的 SQL 查询中(例如 'SELECT x FROM t ORDER BY y LIMIT m'),如果 m 低于此阈值,则执行内存中的 top-K 排序;否则执行全局排序,必要时会溢出到磁盘。 |
2.4.0 |
spark.sql.extendedExplainProviders |
(无) | 以逗号分隔的实现了 org.apache.spark.sql.ExtendedExplainGenerator 特质的类列表。如果提供,Spark 将在 explain 计划和 UI 中打印来自这些提供程序的扩展计划信息。 |
4.0.0 |
spark.sql.files.ignoreCorruptFiles |
false | 是否忽略损坏的文件。如果为 true,当遇到损坏的文件时,Spark 作业将继续运行,并且已读取的内容仍将返回。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 |
2.1.1 |
spark.sql.files.ignoreInvalidPartitionPaths |
false | 是否忽略与 <column>=<value> 不匹配的无效分区路径。启用此选项后,对于拥有 'table/invalid' 和 'table/col=1' 两个分区目录的表,将仅加载后者,而忽略无效分区。 |
4.0.0 |
spark.sql.files.ignoreMissingFiles |
false | 是否忽略丢失的文件。如果为 true,当遇到丢失的文件时,Spark 作业将继续运行,并且已读取的内容仍将返回。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 |
2.3.0 |
spark.sql.files.maxPartitionBytes |
128MB | 读取文件时打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 |
2.0.0 |
spark.sql.files.maxPartitionNum |
(无) | 建议的(非保证)拆分文件分区的最大数量。如果设置了此项,且初始分区数量超过此值,Spark 将重新缩放每个分区,使分区数量接近该值。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 |
3.5.0 |
spark.sql.files.maxRecordsPerFile |
0 | 写入单个文件的最大记录数。如果该值为零或负数,则没有限制。 |
2.2.0 |
spark.sql.files.minPartitionNum |
(无) | 建议的(非保证)拆分文件分区的最小数量。如果未设置,默认值为 |
3.1.0 |
spark.sql.function.concatBinaryAsString |
false | 当此选项设置为 false 且所有输入均为二进制时, |
2.3.0 |
spark.sql.function.eltOutputAsString |
false | 当此选项设置为 false 且所有输入均为二进制时, |
2.3.0 |
spark.sql.groupByAliases |
true | 如果设为 true,select 列表中的别名可以在 group by 子句中使用。如果设为 false,则在这种情况下抛出分析异常。 |
2.2.0 |
spark.sql.groupByOrdinal |
true | 如果设为 true,group by 子句中的序数数字将被视为 select 列表中的位置。如果设为 false,则忽略序数数字。 |
2.0.0 |
spark.sql.hive.convertInsertingPartitionedTable |
true | 当设置为 true,且 |
3.0.0 |
spark.sql.hive.convertInsertingUnpartitionedTable |
true | 当设置为 true,且 |
4.0.0 |
spark.sql.hive.convertMetastoreAsNullable |
false | 当设置为 true 时,当 Spark 使用数据源 API 而不是 Hive serde 来读取/写入 Parquet 或 ORC 格式的 Hive 表时,会将 nullable 属性应用于模式。此标志仅在分别启用 |
4.1.0 |
spark.sql.hive.convertMetastoreCtas |
true | 当设置为 true 时,Spark 将尝试在 CTAS 中使用内置数据源写入器而不是 Hive serde。此标志仅在分别针对 Parquet 和 ORC 格式启用 |
3.0.0 |
spark.sql.hive.convertMetastoreInsertDir |
true | 当设置为 true 时,Spark 将尝试在 INSERT OVERWRITE DIRECTORY 中使用内置数据源写入器而不是 Hive serde。此标志仅在分别针对 Parquet 和 ORC 格式启用 |
3.3.0 |
spark.sql.hive.convertMetastoreOrc |
true | 当设置为 true 时,使用内置的 ORC 读取器和写入器来处理使用 HiveQL 语法创建的 ORC 表,而不是 Hive serde。 |
2.0.0 |
spark.sql.hive.convertMetastoreParquet |
true | 当设置为 true 时,使用内置的 Parquet 读取器和写入器来处理使用 HiveQL 语法创建的 Parquet 表,而不是 Hive serde。 |
1.1.1 |
spark.sql.hive.convertMetastoreParquet.mergeSchema |
false | 当为 true 时,也会尝试合并不同 Parquet 数据文件中可能不同但兼容的 Parquet 模式。此配置仅在 "spark.sql.hive.convertMetastoreParquet" 为 true 时有效。 |
1.3.1 |
spark.sql.hive.dropPartitionByName.enabled |
false | 当为 true 时,Spark 将获取分区名称而不是分区对象来删除分区,这可以提高删除分区的性能。 |
3.4.0 |
spark.sql.hive.filesourcePartitionFileCacheSize |
262144000 | 当非零时,启用内存中分区文件元数据的缓存。所有表共享一个缓存,该缓存最多可以使用指定的字节数用于文件元数据。此配置仅在启用了 Hive 文件源分区管理时才有效。 |
2.1.1 |
spark.sql.hive.manageFilesourcePartitions |
true | 当为 true 时,也为文件源表启用元数据存储分区管理。这包括数据源表和转换后的 Hive 表。当启用分区管理时,数据源表将分区存储在 Hive 元数据存储中,并且当 |
2.1.1 |
spark.sql.hive.metastorePartitionPruning |
true | 当为 true 时,一些谓词将被下推到 Hive 元数据存储中,以便可以更早地消除不匹配的分区。 |
1.5.0 |
spark.sql.hive.metastorePartitionPruningFallbackOnException |
false | 当从元数据存储遇到 MetaException 时,是否回退到从 Hive 元数据存储获取所有分区并在 Spark 客户端侧执行分区修剪。注意,如果启用此功能且存在大量分区,Spark 查询性能可能会下降。如果禁用此功能,Spark 将改为使查询失败。 |
3.3.0 |
spark.sql.hive.metastorePartitionPruningFastFallback |
false | 启用此配置后,如果 Hive 不支持谓词,或者由于从元数据存储遇到 MetaException 而导致 Spark 回退,Spark 将改为通过首先获取分区名称,然后在客户端侧评估过滤表达式来修剪分区。注意,不支持具有 TimeZoneAwareExpression 的谓词。 |
3.3.0 |
spark.sql.hive.thriftServer.async |
true | 当设置为 true 时,Hive Thrift 服务器以异步方式执行 SQL 查询。 |
1.5.0 |
spark.sql.icu.caseMappings.enabled |
true | 启用后,我们将使用 ICU 库(而不是 JVM)在 UTF8_BINARY 排序规则下实现字符串的大小写映射。 |
4.0.0 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列式缓存的批次大小。较大的批次大小可以提高内存利用率和压缩效率,但在缓存数据时存在内存溢出 (OOM) 的风险。 |
1.1.1 |
spark.sql.inMemoryColumnarStorage.compressed |
true | 当设置为 true 时,Spark SQL 将根据数据的统计信息自动为每一列选择压缩编解码器。 |
1.0.1 |
spark.sql.inMemoryColumnarStorage.enableVectorizedReader |
true | 为列式缓存启用向量化读取器。 |
2.3.1 |
spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio |
1.2 | 当 spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 或所需内存小于该阈值时,spark 保留所需内存 * 2 的空间;否则,spark 保留所需内存 * 此比率的空间,并在读取下一批行之前释放此列向量内存。 |
4.0.0 |
spark.sql.inMemoryColumnarStorage.hugeVectorThreshold |
-1b | 当所需内存大于此值时,spark 下次保留所需内存 * spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio 的内存,并在读取下一批行之前释放此列向量内存。-1 表示禁用此优化。 |
4.0.0 |
spark.sql.json.filterPushdown.enabled |
true | 当为 true 时,启用 JSON 数据源的过滤下推。 |
3.1.0 |
spark.sql.json.useUnsafeRow |
false | 当设置为 true 时,在 JSON 解析器中使用 UnsafeRow 来表示结构体结果。它可以被 JSON 选项 |
4.0.0 |
spark.sql.jsonGenerator.ignoreNullFields |
true | 在 JSON 数据源和诸如 to_json 之类的 JSON 函数中生成 JSON 对象时是否忽略 null 字段。如果为 false,则在 JSON 对象中为 null 字段生成 null。 |
3.0.0 |
spark.sql.leafNodeDefaultParallelism |
(无) | 生产数据的 Spark SQL 叶子节点(如文件扫描节点、本地数据扫描节点、范围节点等)的默认并行度。此配置的默认值为 'SparkContext#defaultParallelism'。 |
3.2.0 |
spark.sql.legacy.hive.thriftServer.useZeroBasedColumnOrdinalPosition |
false | 当设置为 true 时,Hive Thrift 服务器在 GetColumns 操作的结果中返回基于 0 的 ORDINAL_POSITION,而不是校正后的基于 1 的位置。 |
4.1.0 |
spark.sql.mapKeyDedupPolicy |
EXCEPTION | 用于内置函数 CreateMap、MapFromArrays、MapFromEntries、StringToMap、MapConcat 和 TransformKeys 中映射键(map keys)去重的策略。当为 EXCEPTION 时,如果检测到重复的映射键,查询将失败。当为 LAST_WIN 时,最后插入的映射键优先。 |
3.0.0 |
spark.sql.maven.additionalRemoteRepositories |
https://maven-central.storage-download.googleapis.com/maven2/ | 可选的额外远程 Maven 镜像仓库的逗号分隔字符串配置。这仅用于在默认 Maven Central 仓库无法访问时,在 IsolatedClientLoader 中下载 Hive jar 包。 |
3.0.0 |
spark.sql.maxBroadcastTableSize |
8589934592b | 广播连接(broadcast joins)中可以广播的最大表大小(字节)。 |
4.1.0 |
spark.sql.maxMetadataStringLength |
100 | 元数据字符串输出的最大字符数。例如 |
3.1.0 |
spark.sql.maxPlanStringLength |
2147483632 | 计划字符串(plan string)输出的最大字符数。如果计划更长,后续输出将被截断。默认设置始终生成完整计划。如果计划字符串占用了太多内存,或者在驱动程序或 UI 进程中导致 OutOfMemory 错误,请将其设置为较小的值(例如 8k)。 |
3.0.0 |
spark.sql.maxSinglePartitionBytes |
128m | 单个分区允许的最大字节数。否则,规划器将引入 shuffle 以提高并行度。 |
3.4.0 |
spark.sql.operatorPipeSyntaxEnabled |
true | 如果为 true,则启用 Apache Spark SQL 的算子管道语法。这使用算子管道标记 |> 来指示 SQL 子句之间的分离,以一种可组合的方式描述查询执行的步骤顺序。 |
4.0.0 |
spark.sql.optimizer.avoidCollapseUDFWithExpensiveExpr |
true | 是否避免折叠会导致 UDF 中昂贵表达式重复的投影(projections)。 |
4.0.0 |
spark.sql.optimizer.collapseProjectAlwaysInline |
false | 是否始终折叠两个相邻的投影并内联表达式,即使这会导致额外的重复。 |
3.3.0 |
spark.sql.optimizer.dynamicPartitionPruning.enabled |
true | 当为 true 时,当分区列用作连接键(join key)时,我们将为其生成谓词。 |
3.0.0 |
spark.sql.optimizer.enableCsvExpressionOptimization |
true | 是否在 SQL 优化器中优化 CSV 表达式。这包括从 from_csv 中修剪不必要的列。 |
3.2.0 |
spark.sql.optimizer.enableJsonExpressionOptimization |
true | 是否在 SQL 优化器中优化 JSON 表达式。这包括从 from_json 中修剪不必要的列、简化 from_json + to_json、to_json + named_struct(from_json.col1, from_json.col2, ....)。 |
3.1.0 |
spark.sql.optimizer.excludedRules |
(无) | 配置要在优化器中禁用的规则列表,这些规则通过其名称指定并以逗号分隔。不能保证此配置中的所有规则最终都会被排除,因为有些规则对于正确性是必需的。优化器将记录实际已被排除的规则。 |
2.4.0 |
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold |
10GB | 布隆过滤器应用侧计划的聚合扫描大小的字节数阈值。布隆过滤器应用侧的聚合扫描字节大小需要超过此值才能注入布隆过滤器。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold |
10MB | 布隆过滤器创建侧计划的大小阈值。估计大小需要低于此值才能尝试注入布隆过滤器。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.enabled |
true | 当为 true 且 shuffle 连接的一侧具有选择性谓词时,我们尝试在另一侧插入布隆过滤器以减少 shuffle 数据量。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems |
1000000 | 运行时布隆过滤器的预期项目数默认值。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumBits |
67108864 | 用于运行时布隆过滤器的最大位数。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumItems |
4000000 | 运行时布隆过滤器允许的最大预期项目数。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.numBits |
8388608 | 用于运行时布隆过滤器的位数默认值。 |
3.3.0 |
spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled |
true | 为基于组的行级操作启用运行时组过滤。替换数据组(例如文件、分区)的数据源在规划行级操作扫描时,可以使用提供的数据源过滤器修剪整个组。然而,这种过滤是有限的,因为并非所有表达式都可以转换为数据源过滤器,并且某些表达式只能由 Spark(例如子查询)评估。由于重写组的开销很大,Spark 可以在运行时执行查询以查找哪些记录符合行级操作的条件。关于匹配记录的信息将被传回行级操作扫描,允许数据源丢弃不需要重写的组。 |
3.4.0 |
spark.sql.optimizer.runtimeFilter.number.threshold |
10 | 单次查询注入的运行时过滤器(非 DPP)的总数。这是为了防止因过多的布隆过滤器导致驱动程序 OOM。 |
3.3.0 |
spark.sql.orc.aggregatePushdown |
false | 如果为 true,聚合将下推到 ORC 进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。对于 MIN/MAX,支持布尔、整数、浮点和日期类型。对于 COUNT,支持所有数据类型。如果任何 ORC 文件页脚缺少统计信息,将抛出异常。 |
3.3.0 |
spark.sql.orc.columnarReaderBatchSize |
4096 | orc 向量化读取器批次中包含的行数。应仔细选择该数字以最小化开销并避免读取数据时出现 OOM。 |
2.4.0 |
spark.sql.orc.columnarWriterBatchSize |
1024 | orc 向量化写入器批次中包含的行数。应仔细选择该数字以最小化开销并避免写入数据时出现 OOM。 |
3.4.0 |
spark.sql.orc.compression.codec |
zstd | 设置写入 ORC 文件时使用的压缩编解码器。如果在表特定选项/属性中指定了 |
2.3.0 |
spark.sql.orc.enableNestedColumnVectorizedReader |
true | 为嵌套列启用向量化 orc 解码。 |
3.2.0 |
spark.sql.orc.enableVectorizedReader |
true | 启用向量化 orc 解码。 |
2.3.0 |
spark.sql.orc.filterPushdown |
true | 当为 true 时,启用 ORC 文件的过滤下推。 |
1.4.0 |
spark.sql.orc.mergeSchema |
false | 当为 true 时,Orc 数据源合并从所有数据文件收集的模式,否则模式是从随机数据文件中选取的。 |
3.0.0 |
spark.sql.orderByOrdinal |
true | 如果设为 true,序数数字将被视为 select 列表中的位置。如果设为 false,order/sort by 子句中的序数数字将被忽略。 |
2.0.0 |
spark.sql.parquet.aggregatePushdown |
false | 如果为 true,聚合将下推到 Parquet 进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。对于 MIN/MAX,支持布尔、整数、浮点和日期类型。对于 COUNT,支持所有数据类型。如果任何 Parquet 文件页脚缺少统计信息,将抛出异常。 |
3.3.0 |
spark.sql.parquet.binaryAsString |
false | 其他一些生成 Parquet 的系统(特别是 Impala 和旧版本的 Spark SQL)在写出 Parquet 模式时不区分二进制数据和字符串。此标志告诉 Spark SQL 将二进制数据解释为字符串,以提供与这些系统的兼容性。 |
1.1.1 |
spark.sql.parquet.columnarReaderBatchSize |
4096 | parquet 向量化读取器批次中包含的行数。应仔细选择该数字以最小化开销并避免读取数据时出现 OOM。 |
2.4.0 |
spark.sql.parquet.compression.codec |
snappy | 设置写入 Parquet 文件时使用的压缩编解码器。如果在表特定选项/属性中指定了 |
1.1.1 |
spark.sql.parquet.enableNestedColumnVectorizedReader |
true | 为嵌套列(例如 struct, list, map)启用向量化 Parquet 解码。需要启用 spark.sql.parquet.enableVectorizedReader。 |
3.3.0 |
spark.sql.parquet.enableVectorizedReader |
true | 启用向量化 parquet 解码。 |
2.0.0 |
spark.sql.parquet.fieldId.read.enabled |
false | 字段 ID 是 Parquet 模式规范的原生字段。启用后,Parquet 读取器将使用所请求 Spark 模式中的字段 ID(如果存在)来查找 Parquet 字段,而不是使用列名。 |
3.3.0 |
spark.sql.parquet.fieldId.read.ignoreMissing |
false | 当 Parquet 文件没有任何字段 ID,但 Spark 读取模式使用字段 ID 进行读取时,如果启用了此标志,我们将静默返回 null,否则报错。 |
3.3.0 |
spark.sql.parquet.fieldId.write.enabled |
true | 字段 ID 是 Parquet 模式规范的原生字段。启用后,Parquet 写入器将把 Spark 模式中的字段 ID 元数据(如果存在)填充到 Parquet 模式中。 |
3.3.0 |
spark.sql.parquet.filterPushdown |
true | 设置为 true 时启用 Parquet 过滤下推优化。 |
1.2.0 |
spark.sql.parquet.inferTimestampNTZ.enabled |
true | 启用后,带有注释 isAdjustedToUTC = false 的 Parquet 时间戳列在模式推断期间被推断为 TIMESTAMP_NTZ 类型。否则,所有 Parquet 时间戳列都被推断为 TIMESTAMP_LTZ 类型。注意,Spark 在写入文件时将输出模式写入 Parquet 页脚元数据,并在读取文件时利用它。因此,此配置仅影响非 Spark 编写的 Parquet 文件的模式推断。 |
3.4.0 |
spark.sql.parquet.int96AsTimestamp |
true | 某些生成 Parquet 的系统(特别是 Impala)将时间戳存储在 INT96 中。Spark 也会将时间戳存储为 INT96,因为我们需要避免纳秒精度的丢失。此标志告诉 Spark SQL 将 INT96 数据解释为时间戳,以提供与这些系统的兼容性。 |
1.3.0 |
spark.sql.parquet.int96TimestampConversion |
false | 此选项控制在转换为时间戳时是否应对 Impala 编写的数据中的 INT96 数据应用时间戳调整。这是必要的,因为 Impala 存储 INT96 数据时所用的时区偏移量与 Hive 和 Spark 不同。 |
2.3.0 |
spark.sql.parquet.mergeSchema |
false | 当为 true 时,Parquet 数据源合并从所有数据文件收集的模式,否则模式是从摘要文件(summary file)中选取的,如果没有摘要文件可用,则从随机数据文件中选取。 |
1.5.0 |
spark.sql.parquet.outputTimestampType |
INT96 | 设置 Spark 向 Parquet 文件写入数据时使用的 Parquet 时间戳类型。INT96 是一种非标准但常用的 Parquet 时间戳类型。TIMESTAMP_MICROS 是 Parquet 中的一种标准时间戳类型,它存储自 Unix 纪元以来的微秒数。TIMESTAMP_MILLIS 也是标准的,但精度为毫秒,这意味着 Spark 必须截断其时间戳值的微秒部分。 |
2.3.0 |
spark.sql.parquet.recordLevelFilter.enabled |
false | 如果为 true,则启用 Parquet 使用下推过滤器进行的原生记录级过滤。此配置仅在启用了 'spark.sql.parquet.filterPushdown' 且未启用向量化读取器时有效。你可以通过将 'spark.sql.parquet.enableVectorizedReader' 设置为 false 来确保不使用向量化读取器。 |
2.3.0 |
spark.sql.parquet.respectSummaryFiles |
false | 当为 true 时,我们假设所有 Parquet 的部分文件(part-files)与摘要文件一致,并且在合并模式时我们将忽略它们。否则,如果为 false(默认值),我们将合并所有部分文件。这应被视为仅限专家使用的选项,在不准确理解其含义之前不应启用。 |
1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | 如果为 true,数据将以 Spark 1.4 及更早版本的方式写入。例如,十进制值将以 Apache Parquet 的定长字节数组格式写入,其他系统(如 Apache Hive 和 Apache Impala)使用该格式。如果为 false,将使用 Parquet 中较新的格式。例如,十进制将以基于 int 的格式写入。如果 Parquet 输出计划用于不支持此较新格式的系统,请将其设置为 true。 |
1.6.0 |
spark.sql.parser.quotedRegexColumnNames |
false | 当为 true 时,SELECT 语句中带引号的标识符(使用反引号)被解释为正则表达式。 |
2.3.0 |
spark.sql.pipelines.maxFlowRetryAttempts |
2 | 流(flow)可以重试的最大次数。 |
4.1.0 |
spark.sql.pivotMaxValues |
10000 | 当执行 pivot 而未指定 pivot 列的值时,这是在不报错的情况下将收集的最大(不同)值数量。 |
1.6.0 |
spark.sql.planner.pythonExecution.memory |
(无) | 指定 Spark 驱动程序中执行 Python 代码的内存分配,单位为 MiB。设置后,它会将 Python 执行的内存上限设为指定金额。如果不设置,Spark 不会限制 Python 的内存使用,由应用程序负责避免超过与其它非 JVM 进程共享的开销内存空间。注意:Windows 不支持资源限制,在 MacOS 上实际资源不受限制。 |
4.0.0 |
spark.sql.preserveCharVarcharTypeInfo |
false | 当为 true 时,Spark 不会将 CHAR/VARCHAR 类型替换为 STRING 类型(这是 Spark 3.0 及更早版本的默认行为)。这意味着强制执行 CHAR/VARCHAR 类型的长度检查,并且 CHAR 类型也会被适当填充。 |
4.0.0 |
spark.sql.pyspark.inferNestedDictAsStruct.enabled |
false | PySpark 的 SparkSession.createDataFrame 默认将嵌套 dict 推断为 map。当设置为 true 时,它将嵌套 dict 推断为 struct。 |
3.3.0 |
spark.sql.pyspark.jvmStacktrace.enabled |
false | 当为 true 时,它会在面向用户的 PySpark 异常中与 Python 堆栈跟踪一起显示 JVM 堆栈跟踪。默认情况下,它被禁用以隐藏 JVM 堆栈跟踪,仅显示对 Python 友好的异常。注意,这与日志级别设置无关。 |
3.0.0 |
spark.sql.pyspark.plotting.max_rows |
1000 | 绘图的视觉限制。如果对于基于 top-n 的图(饼图、条形图、横向条形图)设置为 1000,则前 1000 个数据点将用于绘图。对于基于采样的图(散点图、面积图、折线图),将随机采样 1000 个数据点。 |
4.0.0 |
spark.sql.pyspark.udf.profiler |
(无) | 通过启用或禁用它来配置 Python/Pandas UDF 分析器,并可选择 "perf" 或 "memory" 类型,或者取消设置该配置以禁用分析器。默认情况下它是禁用的。 |
4.0.0 |
spark.sql.pyspark.worker.logging.enabled |
false | 当设置为 true 时,此配置在执行用户定义函数 (UDF)、用户定义表函数 (UDTF) 以及 Spark SQL 中其他基于 Python 的操作的 Python 工作进程中启用全面日志记录。 |
4.1.0 |
spark.sql.readSideCharPadding |
true | 当为 true 时,除了写入侧填充外,Spark 在读取 CHAR 类型列/字段时也会应用字符串填充。此配置默认开启,以便在外部表等情况下更好地强制执行 CHAR 类型语义。 |
3.4.0 |
spark.sql.redaction.options.regex |
(?i)url | 用于确定 Spark SQL 命令的选项映射中哪些键包含敏感信息的正则表达式。名称与此正则表达式匹配的选项的值将在 explain 输出中被脱敏。此脱敏操作是在 spark.redaction.regex 定义的全局脱敏配置之上应用的。 |
2.2.2 |
spark.sql.redaction.string.regex |
(spark.redaction.string.regex 的值) |
用于确定 Spark 生成的字符串的哪些部分包含敏感信息的正则表达式。当此正则表达式匹配字符串的某一部分时,该部分字符串将被替换为虚拟值。目前用于对 SQL explain 命令的输出进行脱敏。如果未设置此配置,则使用 |
2.3.0 |
spark.sql.repl.eagerEval.enabled |
false | 是否启用即时评估(eager evaluation)。当为 true 时,只有在 REPL 支持即时评估的情况下才会显示 Dataset 的前 K 行。目前,PySpark 和 SparkR 支持即时评估。在 PySpark 中,对于 Jupyter 等笔记本,将返回 HTML 表格(由 repr_html 生成)。对于纯 Python REPL,返回的输出格式化为 dataframe.show()。在 SparkR 中,返回的输出类似于 R data.frame 的显示方式。 |
2.4.0 |
spark.sql.repl.eagerEval.maxNumRows |
20 | 即时评估返回的最大行数。此配置仅在 spark.sql.repl.eagerEval.enabled 设置为 true 时生效。此配置的有效范围是从 0 到 (Int.MaxValue - 1),因此负数或大于 (Int.MaxValue - 1) 的无效配置将被归一化为 0 和 (Int.MaxValue - 1)。 |
2.4.0 |
spark.sql.repl.eagerEval.truncate |
20 | 即时评估返回的每个单元格的最大字符数。此配置仅在 spark.sql.repl.eagerEval.enabled 设置为 true 时生效。 |
2.4.0 |
spark.sql.scripting.enabled |
true | SQL Scripting 功能正在开发中,其使用应在次功能标记下进行。SQL Scripting 允许用户编写程序化 SQL,包括控制流和错误处理。 |
4.0.0 |
spark.sql.session.localRelationCacheThreshold |
1048576 | 序列化后在驱动程序侧缓存本地关系的字节大小阈值。 |
3.5.0 |
spark.sql.session.localRelationChunkSizeBytes |
16777216 | 将 ChunkedCachedLocalRelation.data 拆分为批次时的分块大小(字节)。当达到 spark.sql.session.localRelationChunkSizeBytes 或 spark.sql.session.localRelationChunkSizeRows 时,将创建新块。受 spark.sql.session.localRelationBatchOfChunksSizeBytes 的限制,这两个配置的较小值用于确定块大小。 |
4.1.0 |
spark.sql.session.localRelationChunkSizeRows |
10000 | 将 ChunkedCachedLocalRelation.data 拆分为批次时的分块行数大小。当达到 spark.sql.session.localRelationChunkSizeBytes 或 spark.sql.session.localRelationChunkSizeRows 时,将创建新块。 |
4.1.0 |
spark.sql.session.timeZone |
(本地时区的值) | 会话本地时区的 ID,格式为基于区域的时区 ID 或时区偏移量。区域 ID 必须具有 'area/city' 的形式,例如 'America/Los_Angeles'。时区偏移量必须为 '(+|-)HH', '(+|-)HH:mm' 或 '(+|-)HH:mm:ss' 格式,例如 '-08', '+01:00' 或 '-13:33:33'。此外,'UTC' 和 'Z' 也支持作为 '+00:00' 的别名。不建议使用其他简写名称,因为它们可能产生歧义。 |
2.2.0 |
spark.sql.shuffle.orderIndependentChecksum.enableFullRetryOnMismatch |
false | 当我们检测到与消费者阶段的生产者阶段存在校验和不匹配时,是否重试消费者阶段的所有任务。 |
4.1.0 |
spark.sql.shuffle.orderIndependentChecksum.enabled |
false | 是否为 shuffle 数据计算与顺序无关的校验和。如果启用,Spark 将计算每个映射器独立于输入行顺序的校验和,并将校验和从执行器返回到驱动程序。这与启用 |
4.1.0 |
spark.sql.shuffle.partitions |
200 | 用于连接或聚合 shuffle 数据时使用的默认分区数。 |
1.1.0 |
spark.sql.shuffleDependency.fileCleanup.enabled |
false | (自 Spark 4.1 起弃用,请设置 'spark.sql.connect.shuffleDependency.fileCleanup.enabled'。) |
4.0.0 |
spark.sql.shuffleDependency.skipMigration.enabled |
false | 启用后,Spark Connect SQL 执行的 shuffle 依赖项将在执行结束时标记,并且在退役期间不会迁移。 |
4.0.0 |
spark.sql.shuffledHashJoinFactor |
3 | 如果小侧的数据大小乘以该因子后仍小于大侧,则可以选择 shuffle 哈希连接。 |
3.3.0 |
spark.sql.sources.bucketing.autoBucketedScan.enabled |
true | 当为 true 时,根据查询计划自动决定是否对输入表执行分桶扫描。如果以下情况则不使用分桶扫描:1. 查询没有利用分桶的算子(例如 join, group-by 等),或 2. 在这些算子和表扫描之间存在交换(exchange)算子。注意当 'spark.sql.sources.bucketing.enabled' 设置为 false 时,此配置不起作用。 |
3.1.0 |
spark.sql.sources.bucketing.enabled |
true | 当为 false 时,我们将把分桶表视为普通表。 |
2.0.0 |
spark.sql.sources.bucketing.maxBuckets |
100000 | 允许的最大桶数。 |
2.4.0 |
spark.sql.sources.default |
parquet | 输入/输出中使用的默认数据源。 |
1.3.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
32 | 允许在驱动程序侧列出文件的最大路径数。如果在分区发现期间检测到的路径数超过此值,它将尝试使用另一个 Spark 分布式作业来列出文件。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 |
1.5.0 |
spark.sql.sources.partitionColumnTypeInference.enabled |
true | 当为 true 时,自动推断分区列的数据类型。 |
1.5.0 |
spark.sql.sources.partitionOverwriteMode |
STATIC | 当 INSERT OVERWRITE 一个分区数据源表时,我们目前支持 2 种模式:static 和 dynamic。在 static 模式下,Spark 在覆盖之前删除 INSERT 语句中匹配分区规范(例如 PARTITION(a=1,b))的所有分区。在 dynamic 模式下,Spark 不预先删除分区,仅覆盖运行时有数据写入的那些分区。默认情况下,我们使用 static 模式以保持与 2.3 之前的 Spark 相同的行为。注意此配置不影响 Hive serde 表,因为它们总是以 dynamic 模式覆盖。这也可以作为使用键 partitionOverwriteMode 的数据源输出选项进行设置(该选项优先于此设置),例如 dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)。 |
2.3.0 |
spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled |
false | 在分区转换兼容但不相同的情况下,是否允许存储分区连接(storage-partition join)。此配置要求同时启用 spark.sql.sources.v2.bucketing.enabled 和 spark.sql.sources.v2.bucketing.pushPartValues.enabled,并禁用 spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled。 |
4.0.0 |
spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled |
false | 在连接键是源表分区键的子集的情况下,是否允许存储分区连接。在规划阶段,Spark 将仅按连接键中的那些键对分区进行分组。目前仅在 spark.sql.requireAllClusterKeysForDistribution 为 false 时启用。 |
4.0.0 |
spark.sql.sources.v2.bucketing.enabled |
true | 与 spark.sql.sources.bucketing.enabled 类似,此配置用于为 V2 数据源启用分桶。开启后,Spark 将通过 SupportsReportPartitioning 识别 V2 数据源报告的特定分布,并在必要时尝试避免 shuffle。 |
3.3.0 |
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled |
false | 在存储分区连接期间,当连接的两侧都是 KeyGroupedPartitioning 时,是否允许输入分区部分聚类(partially clustered)。在规划阶段,Spark 将根据表统计信息选择数据大小较小的一侧,对其进行分组并复制以匹配另一侧。这是对倾斜连接的优化,当某些分区分配了大量数据时,可以帮助减少数据倾斜。此配置要求同时启用 spark.sql.sources.v2.bucketing.enabled 和 spark.sql.sources.v2.bucketing.pushPartValues.enabled。 |
3.4.0 |
spark.sql.sources.v2.bucketing.partition.filter.enabled |
false | 运行存储分区连接时是否过滤分区。启用后,如果连接类型允许,则可以省略另一侧没有匹配项的分区进行扫描。此配置要求同时启用 spark.sql.sources.v2.bucketing.enabled 和 spark.sql.sources.v2.bucketing.pushPartValues.enabled。 |
4.0.0 |
spark.sql.sources.v2.bucketing.pushPartValues.enabled |
true | 当启用 spark.sql.sources.v2.bucketing.enabled 时,是否下推常见分区值。开启后,如果连接的两侧都是 KeyGroupedPartitioning 且它们共享兼容的分区键,即使它们没有完全相同的分区值,Spark 也会计算分区值的超集并将该信息下推到扫描节点,扫描节点将为任一侧缺失的分区值使用空分区。这有助于消除不必要的 shuffle。 |
3.4.0 |
spark.sql.sources.v2.bucketing.shuffle.enabled |
false | 在存储分区连接期间,是否允许仅 shuffle 一侧。当只有一侧是 KeyGroupedPartitioning 时,如果满足条件,spark 将仅 shuffle 另一侧。此优化将减少需要 shuffle 的数据量。此配置要求启用 spark.sql.sources.v2.bucketing.enabled。 |
4.0.0 |
spark.sql.sources.v2.bucketing.sorting.enabled |
false | 开启后,Spark 将通过 SupportsReportPartitioning 识别 V2 数据源报告的特定分布,并尝试在按这些列排序时尽可能避免 shuffle。此配置要求启用 spark.sql.sources.v2.bucketing.enabled。 |
4.0.0 |
spark.sql.stackTracesInDataFrameContext |
1 | 捕获的 DataFrame 查询上下文中非 Spark 堆栈跟踪的数量。 |
4.0.0 |
spark.sql.statistics.fallBackToHdfs |
false | 当为 true 时,如果表元数据中没有可用的表统计信息,它将回退到 HDFS。这在确定表是否足够小以使用广播连接时很有用。此标志仅对非分区 Hive 表有效。对于非分区数据源表,如果统计信息不可用,它将自动重新计算。对于分区数据源表和分区 Hive 表,如果统计信息不可用,则为 'spark.sql.defaultSizeInBytes'。 |
2.0.0 |
spark.sql.statistics.histogram.enabled |
false | 如果启用,在计算列统计信息时生成直方图。直方图可以提供更好的估算准确性。目前,Spark 仅支持等高直方图(equi-height histogram)。注意收集直方图需要额外的成本。例如,收集列统计信息通常只需要一次表扫描,但生成等高直方图会导致额外的表扫描。 |
2.3.0 |
spark.sql.statistics.size.autoUpdate.enabled |
false | 启用表数据更改后表大小的自动更新。注意,如果表的总文件数非常大,这可能会很昂贵并减慢数据更改命令的速度。 |
2.3.0 |
spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled |
false | 启用此配置后,Spark 还将在 analyze table 命令(即 ANALYZE TABLE .. COMPUTE STATISTICS [NOSCAN])中更新分区统计信息。注意该命令也会变得更加昂贵。当禁用此配置时,Spark 仅更新表级统计信息。 |
4.0.0 |
spark.sql.storeAssignmentPolicy |
ANSI | 当将值插入具有不同数据类型的列时,Spark 将执行类型强制转换。目前,我们支持 3 种类型强制转换规则策略:ANSI, legacy 和 strict。使用 ANSI 策略,Spark 按照 ANSI SQL 执行类型强制转换。在实践中,其行为与 PostgreSQL 基本相同。它不允许某些不合理的类型转换,例如将 |
3.0.0 |
spark.sql.streaming.checkpointLocation |
(无) | 用于存储流查询检查点数据的默认位置。 |
2.0.0 |
spark.sql.streaming.continuous.epochBacklogQueueSize |
10000 | 队列中用于等待后期 epoch 的最大条目数。如果队列大小超过此参数,流将停止并报错。 |
3.0.0 |
spark.sql.streaming.disabledV2Writers |
禁用 StreamWriteSupport 的完全限定数据源注册类名的逗号分隔列表。向这些源的写入将回退到 V1 Sinks。 |
2.3.1 | |
spark.sql.streaming.fileSource.cleaner.numThreads |
1 | 文件源已完成文件清理器中使用的线程数。 |
3.0.0 |
spark.sql.streaming.forceDeleteTempCheckpointLocation |
false | 当为 true 时,启用临时检查点位置强制删除。 |
3.0.0 |
spark.sql.streaming.metricsEnabled |
false | 是否为活动流查询报告 Dropwizard/Codahale 指标。 |
2.0.2 |
spark.sql.streaming.multipleWatermarkPolicy |
min | 当流查询中有多个水印算子时计算全局水印值的策略。默认值为 'min',选择跨多个算子报告的最小水印。另一种替代值是 'max',选择跨多个算子的最大值。注意:此配置在来自同一检查点位置的查询重启之间不能更改。 |
2.4.0 |
spark.sql.streaming.noDataMicroBatches.enabled |
true | 流微批处理引擎是否将为有状态流查询执行无数据批次以进行即时状态管理。 |
2.4.1 |
spark.sql.streaming.numRecentProgressUpdates |
100 | 为流查询保留的进度更新数量。 |
2.1.1 |
spark.sql.streaming.realTimeMode.allowlistCheck |
true | 是否检查实时模式下使用的所有算子和接收器(sinks)是否都在允许列表中。 |
4.1.0 |
spark.sql.streaming.realTimeMode.minBatchDuration |
5000ms | 实时模式下最小长运行批次持续时间(毫秒)。 |
4.1.0 |
spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition |
false | 当为 true 时,流会话窗口在 shuffle 之前对本地分区中的会话进行排序和合并。这是为了减少需要 shuffle 的行数,但仅在批次中有大量行被分配给相同会话时才有效。 |
3.2.0 |
spark.sql.streaming.stateStore.commitValidation.enabled |
true | 当为 true 时,Spark 将验证所有 StateStore 实例已为使用 foreachBatch 的有状态流查询提交。这有助于检测 foreachBatch 中的用户定义函数(例如 show(), limit())未处理所有分区的情况,这可能导致错误的结果。该验证仅适用于没有全局聚合或限制的 foreachBatch 接收器。 |
4.1.0 |
spark.sql.streaming.stateStore.encodingFormat |
unsaferow | 有状态算子用于在状态存储中存储信息的编码格式。 |
4.0.0 |
spark.sql.streaming.stateStore.stateSchemaCheck |
true | 当为 true 时,Spark 将根据现有状态上的模式验证状态模式,如果它们不兼容,则使查询失败。 |
3.1.0 |
spark.sql.streaming.stopActiveRunOnRestart |
true | 不支持同时运行相同流查询的多次运行。如果我们发现流查询有并发的活动运行(在同一集群上的相同或不同 SparkSession 中)并且此标志为 true,我们将停止旧的流查询运行以启动新的运行。 |
3.0.0 |
spark.sql.streaming.stopTimeout |
0 | 调用流查询的 stop() 方法时等待流执行线程停止的时间(毫秒)。0 或负值表示无限期等待。 |
3.0.0 |
spark.sql.streaming.transformWithState.stateSchemaVersion |
3 | transformWithState 算子使用的状态模式版本。 |
4.0.0 |
spark.sql.thriftServer.interruptOnCancel |
(spark.sql.execution.interruptOnCancel 的值) |
当为 true 时,如果取消查询,所有正在运行的任务都将被中断。当为 false 时,所有正在运行的任务将保持直到完成。 |
3.2.0 |
spark.sql.thriftServer.queryTimeout |
0ms | 在 Thrift Server 中设置查询持续时间超时(秒)。如果超时设置为正值,正在运行的查询将在超过超时时自动取消,否则查询将继续运行直到完成。如果通过 |
3.1.0 |
spark.sql.thriftserver.scheduler.pool |
(无) | 为 JDBC 客户端会话设置公平调度程序(Fair Scheduler)池。 |
1.1.1 |
spark.sql.thriftserver.ui.retainedSessions |
200 | JDBC/ODBC Web UI 历史记录中保留的 SQL 客户端会话数。 |
1.4.0 |
spark.sql.thriftserver.ui.retainedStatements |
200 | JDBC/ODBC Web UI 历史记录中保留的 SQL 语句数。 |
1.4.0 |
spark.sql.timeTravelTimestampKey |
timestampAsOf | 读取表时指定时间旅行(time travel)时间戳的选项名称。 |
4.0.0 |
spark.sql.timeTravelVersionKey |
versionAsOf | 读取表时指定时间旅行表版本的选项名称。 |
4.0.0 |
spark.sql.timestampType |
TIMESTAMP_LTZ | 配置 Spark SQL 的默认时间戳类型,包括 SQL DDL、Cast 子句、类型字面量和数据源的模式推断。将配置设置为 TIMESTAMP_NTZ 将使用 TIMESTAMP WITHOUT TIME ZONE 作为默认类型,而设置为 TIMESTAMP_LTZ 将使用 TIMESTAMP WITH LOCAL TIME ZONE。在 3.4.0 版本发布之前,Spark 仅支持 TIMESTAMP WITH LOCAL TIME ZONE 类型。 |
3.4.0 |
spark.sql.transposeMaxValues |
500 | 当执行转置(transpose)而未指定索引列的值时,这是在不报错的情况下将转置的最大值数量。 |
4.0.0 |
spark.sql.tvf.allowMultipleTableArguments.enabled |
false | 当为 true 时,允许表值函数有多个表参数,接收这些表所有行的笛卡尔积。 |
3.5.0 |
spark.sql.ui.explainMode |
formatted | 配置 Spark SQL UI 中使用的查询解释模式。该值可以是 'simple', 'extended', 'codegen', 'cost' 或 'formatted'。默认值为 'formatted'。 |
3.1.0 |
spark.sql.variable.substitute |
true | 这启用了使用 |
2.0.0 |
静态 SQL 配置
静态 SQL 配置是跨会话、不可变的 Spark SQL 配置。它们可以通过配置文件和带有 --conf/-c 前缀的命令行选项设置最终值,或者通过设置用于创建 SparkSession 的 SparkConf 来设置。外部用户可以通过 SparkSession.conf 或 set 命令(例如 SET spark.sql.extensions;)查询静态 SQL 配置值,但不能设置/取消设置它们。
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.sql.cache.serializer |
org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer | 实现了 org.apache.spark.sql.columnar.CachedBatchSerializer 的类名。它将用于将 SQL 数据转换为可以更有效缓存的格式。底层 API 可能会发生变化,因此请谨慎使用。不能指定多个类。该类必须具有无参数构造函数。 |
3.1.0 |
spark.sql.catalog.spark_catalog.defaultDatabase |
default | 会话目录的默认数据库。 |
3.4.0 |
spark.sql.event.truncate.length |
2147483647 | SQL 长度阈值,超过该阈值将在添加到事件之前被截断。默认为不截断。如果设置为 0,则记录调用站点(callsite)。 |
3.0.0 |
spark.sql.extensions |
(无) | 用于配置 Spark Session 扩展的实现了 Function1[SparkSessionExtensions, Unit] 的类的逗号分隔列表。这些类必须具有无参数构造函数。如果指定了多个扩展,它们将按指定顺序应用。对于规则和规划器策略,它们按指定顺序应用。对于解析器,使用最后一个解析器,并且每个解析器都可以委托给其前任。对于函数名称冲突,使用最后注册的函数名称。 |
2.2.0 |
spark.sql.extensions.test.loadFromCp |
true | 确定我们是否应该使用 SparkSessionExtensionsProvider 机制从类路径加载扩展的标志。这是一个仅用于测试的标志。 |
|
spark.sql.hive.metastore.barrierPrefixes |
对于 Spark SQL 通信的每个版本的 Hive,应明确重新加载的类前缀的逗号分隔列表。例如,在通常共享的前缀中声明的 Hive UDF(例如 |
1.4.0 | |
spark.sql.hive.metastore.jars |
builtin | 用于实例化 HiveMetastoreClient 的 jar 包的位置。此属性可以是以下四个选项之一:1. "builtin" 使用 Hive 2.3.10,它在启用 |
1.4.0 |
spark.sql.hive.metastore.jars.path |
用于实例化 HiveMetastoreClient 的 jar 包的逗号分隔路径。此配置仅在 |
3.1.0 | |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc,com.mysql.cj,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | 应使用在 Spark SQL 和特定版本 Hive 之间共享的类加载器加载的类前缀的逗号分隔列表。应共享的类示例是与元数据存储通信所需的 JDBC 驱动程序。其他需要共享的类是那些与已经共享的类进行交互的类。例如,log4j 使用的自定义追加器(appenders)。 |
1.4.0 |
spark.sql.hive.metastore.version |
2.3.10 | Hive 元数据存储的版本。可用选项为 |
1.4.0 |
spark.sql.hive.thriftServer.singleSession |
false | 当设置为 true 时,Hive Thrift 服务器以单会话模式运行。所有 JDBC/ODBC 连接共享临时视图、函数注册表、SQL 配置和当前数据库。 |
1.6.0 |
spark.sql.hive.version |
2.3.10 | Spark 发行版自带的编译版本,即内置 Hive 版本。注意,这是一个只读配置,仅用于报告内置 Hive 版本。如果你希望 Spark 调用不同的元数据存储客户端,请参考 spark.sql.hive.metastore.version。 |
1.1.1 |
spark.sql.metadataCacheTTLSeconds |
-1000ms | 元数据缓存的生存时间 (TTL) 值:分区文件元数据缓存和会话目录缓存。此配置仅在具有正值 (> 0) 时生效。它还需要将 'spark.sql.catalogImplementation' 设置为 |
3.1.0 |
spark.sql.queryExecutionListeners |
(无) | 实现 QueryExecutionListener 的类名列表,这些类将自动添加到新创建的会话中。这些类应该具有无参数构造函数,或者需要 SparkConf 参数的构造函数。 |
2.3.0 |
spark.sql.sources.disabledJdbcConnProviderList |
配置禁用的 JDBC 连接提供程序列表。该列表包含以逗号分隔的 JDBC 连接提供程序的名称。 |
3.1.0 | |
spark.sql.streaming.streamingQueryListeners |
(无) | 实现 StreamingQueryListener 的类名列表,这些类将自动添加到新创建的会话中。这些类应该具有无参数构造函数,或者需要 SparkConf 参数的构造函数。 |
2.4.0 |
spark.sql.streaming.ui.enabled |
true | 当启用 Spark Web UI 时,是否为 Spark 应用程序运行结构化流 Web UI。 |
3.0.0 |
spark.sql.streaming.ui.retainedProgressUpdates |
100 | 结构化流 UI 为流查询保留的进度更新数量。 |
3.0.0 |
spark.sql.streaming.ui.retainedQueries |
100 | 结构化流 UI 保留的非活动查询数量。 |
3.0.0 |
spark.sql.ui.retainedExecutions |
1000 | Spark UI 中保留的执行次数。 |
1.5.0 |
spark.sql.warehouse.dir |
($PWD/spark-warehouse 的值) |
管理数据库和表的默认位置。 |
2.0.0 |
Spark Streaming
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.streaming.backpressure.enabled |
false | 启用或禁用 Spark Streaming 的内部背压机制(自 1.5 起)。这使 Spark Streaming 能够根据当前的批次调度延迟和处理时间来控制接收速率,从而使系统仅以其能够处理的速度接收数据。在内部,这会动态设置接收器的最大接收速率。如果设置了 spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition,则此速率受这些值的上限限制(见下文)。 |
1.5.0 |
spark.streaming.backpressure.initialRate |
未设置 | 这是启用背压机制时,每个接收器在第一批次中接收数据的初始最大接收速率。 | 2.0.0 |
spark.streaming.blockInterval |
200ms | Spark Streaming 接收器接收到的数据在存储到 Spark 之前被分块为数据块的间隔。建议最小值 - 50 毫秒。有关更多详细信息,请参阅 Spark Streaming 编程指南中的性能调优部分。 | 0.8.0 |
spark.streaming.receiver.maxRate |
未设置 | 每个接收器接收数据的最大速率(每秒记录数)。实际上,每个流每秒最多消耗此数量的记录。将此配置设置为 0 或负数将不对速率进行限制。有关更多详细信息,请参阅 Spark Streaming 编程指南中的部署指南。 | 1.0.2 |
spark.streaming.receiver.writeAheadLog.enable |
false | 为接收器启用预写日志(write-ahead logs)。所有通过接收器接收的输入数据都将保存到预写日志中,这将允许其在驱动程序故障后恢复。有关更多详细信息,请参阅 Spark Streaming 编程指南中的部署指南。 | 1.2.1 |
spark.streaming.unpersist |
true | 强制 Spark Streaming 生成并持久化的 RDD 从 Spark 内存中自动解除持久化。Spark Streaming 接收到的原始输入数据也会被自动清除。将其设置为 false 将允许在流应用程序之外访问原始数据和持久化的 RDD,因为它们不会被自动清除。但这会以更高的 Spark 内存使用量为代价。 | 0.9.0 |
spark.streaming.stopGracefullyOnShutdown |
false | 如果设置为 true,Spark 将在 JVM 关闭时优雅地停止 StreamingContext,而不是立即停止。 |
1.4.0 |
spark.streaming.kafka.maxRatePerPartition |
未设置 | 在使用新的 Kafka 直连流(direct stream)API 时,从每个 Kafka 分区读取数据的最大速率(每秒记录数)。更多详细信息请参阅 Kafka 集成指南。 | 1.3.0 |
spark.streaming.kafka.minRatePerPartition |
1 | 在使用新的 Kafka 直连流 API 时,从每个 Kafka 分区读取数据的最小速率(每秒记录数)。 | 2.4.0 |
spark.streaming.ui.retainedBatches |
1000 | Spark Streaming UI 和状态 API 在垃圾回收前保留的批次数。 | 1.0.0 |
spark.streaming.driver.writeAheadLog.closeFileAfterWrite |
false | 是否在驱动器(driver)上写入预写日志(WAL)记录后关闭文件。当您想在驱动器上为元数据 WAL 使用 S3(或任何不支持刷新/flush 的文件系统)时,请将其设置为 'true'。 | 1.6.0 |
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite |
false | 是否在接收器(receivers)上写入预写日志记录后关闭文件。当您想在接收器上为数据 WAL 使用 S3(或任何不支持刷新/flush 的文件系统)时,请将其设置为 'true'。 | 1.6.0 |
SparkR (已弃用)
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.r.numRBackendThreads |
2 | RBackend 用于处理来自 SparkR 包的 RPC 调用的线程数。 | 1.4.0 |
spark.r.command |
Rscript | 在集群模式下为驱动器和工作节点执行 R 脚本的可执行文件。 | 1.5.3 |
spark.r.driver.command |
spark.r.command | 在客户端模式下为驱动器执行 R 脚本的可执行文件。在集群模式下将被忽略。 | 1.5.3 |
spark.r.shell.command |
R | 在客户端模式下为驱动器执行 sparkR shell 的可执行文件。在集群模式下将被忽略。它与环境变量 SPARKR_DRIVER_R 相同,但优先级更高。spark.r.shell.command 用于 sparkR shell,而 spark.r.driver.command 用于运行 R 脚本。 |
2.1.0 |
spark.r.backendConnectionTimeout |
6000 | R 进程与其到 RBackend 连接的连接超时时间(以秒为单位)。 | 2.1.0 |
spark.r.heartBeatInterval |
100 | 从 SparkR 后端发送到 R 进程的心跳间隔,以防止连接超时。 | 2.1.0 |
GraphX
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.graphx.pregel.checkpointInterval |
-1 | Pregel 中图和消息的检查点间隔。它用于避免在多次迭代后因血统链(lineage chains)过长而导致 stackOverflowError。默认情况下禁用检查点。 | 2.2.0 |
集群管理器
Spark 中的每个集群管理器都有额外的配置选项。可以在每种模式的页面中找到相关配置。
YARN
Kubernetes
独立模式 (Standalone Mode)
环境变量
某些 Spark 设置可以通过环境变量进行配置,这些变量会从 Spark 安装目录中的 conf/spark-env.sh 脚本(或 Windows 上的 conf/spark-env.cmd)中读取。在独立模式(Standalone mode)下,此文件可以提供机器特定的信息,例如主机名。它在运行本地 Spark 应用程序或提交脚本时也会被引用(sourced)。
请注意,安装 Spark 时默认不存在 conf/spark-env.sh。但是,您可以复制 conf/spark-env.sh.template 来创建它。请确保使该副本具有可执行权限。
以下变量可以在 spark-env.sh 中设置
| 环境变量 | 含义 |
|---|---|
JAVA_HOME |
Java 的安装位置(如果它不在您的默认 PATH 中)。 |
PYSPARK_PYTHON |
在驱动器和工作节点中用于 PySpark 的 Python 二进制可执行文件(默认是 python3(如果可用),否则为 python)。如果设置了 spark.pyspark.python 属性,则以其为准。 |
PYSPARK_DRIVER_PYTHON |
仅在驱动器中用于 PySpark 的 Python 二进制可执行文件(默认是 PYSPARK_PYTHON)。如果设置了 spark.pyspark.driver.python 属性,则以其为准。 |
SPARKR_DRIVER_R |
用于 SparkR shell 的 R 二进制可执行文件(默认是 R)。如果设置了 spark.r.shell.command 属性,则以其为准。 |
SPARK_LOCAL_IP |
要绑定的机器 IP 地址。 |
SPARK_PUBLIC_DNS |
您的 Spark 程序将向其他机器广播的主机名。 |
除了上述内容,还有用于设置 Spark 独立集群脚本 的选项,例如每台机器使用的核心数和最大内存。
由于 spark-env.sh 是一个 shell 脚本,其中一些变量可以以编程方式设置——例如,您可以通过查找特定网络接口的 IP 来计算 SPARK_LOCAL_IP。
注意:在 YARN 的 cluster 模式下运行 Spark 时,需要使用 conf/spark-defaults.conf 文件中的 spark.yarn.appMasterEnv.[EnvironmentVariableName] 属性来设置环境变量。在 spark-env.sh 中设置的环境变量不会反映在 cluster 模式下的 YARN Application Master 进程中。有关更多信息,请参见 与 YARN 相关的 Spark 属性。
配置日志记录
Spark 使用 log4j 进行日志记录。您可以通过在 conf 目录中添加 log4j2.properties 文件来配置它。首先,复制提供的模板之一:log4j2.properties.template(用于纯文本日志记录)或 log4j2-json-layout.properties.template(用于结构化日志记录)。
纯文本日志
默认日志格式是纯文本,使用 Log4j 的 Pattern Layout。
默认情况下,MDC(映射诊断上下文)信息不包含在纯文本日志中。要包含它,请更新 log4j2.properties 文件中的 PatternLayout 配置。例如,添加 %X{task_name} 以在日志中包含任务名称。此外,使用 spark.sparkContext.setLocalProperty("key", "value") 将自定义数据添加到 MDC。
结构化日志
从 4.0.0 版本开始,spark-submit 支持使用 JSON 模板布局 进行可选的结构化日志记录。此格式支持使用 Spark SQL 和 JSON 数据源高效查询日志,并包含所有 MDC 信息,从而提高了可搜索性和可调试性。
要启用结构化日志记录并包含 MDC 信息,请将配置 spark.log.structuredLogging.enabled 设置为 true(默认值为 false)。如需额外自定义,请将 log4j2-json-layout.properties.template 复制到 conf/log4j2.properties 并根据需要进行调整。
使用 Spark SQL 查询结构化日志
要以 JSON 格式查询结构化日志,请使用以下代码片段
Python
from pyspark.logger import SPARK_LOG_SCHEMA
logDf = spark.read.schema(SPARK_LOG_SCHEMA).json("path/to/logs")
Scala
import org.apache.spark.util.LogUtils.SPARK_LOG_SCHEMA
val logDf = spark.read.schema(SPARK_LOG_SCHEMA).json("path/to/logs")
注意:如果您使用的是交互式 shell(pyspark shell 或 spark-shell),则可以省略代码中的 import 语句,因为 SPARK_LOG_SCHEMA 在 shell 的上下文中已经可用。
覆盖配置目录
要指定默认“SPARK_HOME/conf”之外的另一个配置目录,可以设置 SPARK_CONF_DIR。Spark 将使用来自该目录的配置文件(spark-defaults.conf、spark-env.sh、log4j2.properties 等)。
继承 Hadoop 集群配置
如果您计划使用 Spark 读取和写入 HDFS,则应在 Spark 的类路径中包含两个 Hadoop 配置文件
hdfs-site.xml,提供 HDFS 客户端的默认行为。core-site.xml,设置默认文件系统名称。
这些配置文件的位置因 Hadoop 版本而异,但常见位置是在 /etc/hadoop/conf 内。有些工具会动态创建配置,但也提供了下载副本的机制。
要使这些文件对 Spark 可见,请将 $SPARK_HOME/conf/spark-env.sh 中的 HADOOP_CONF_DIR 设置为包含配置文件的位置。
自定义 Hadoop/Hive 配置
如果您的 Spark 应用程序正在与 Hadoop、Hive 或两者交互,则 Spark 的类路径中可能存在 Hadoop/Hive 配置文件。
多个正在运行的应用程序可能需要不同的 Hadoop/Hive 客户端配置。您可以为每个应用程序复制并修改 Spark 类路径中的 hdfs-site.xml、core-site.xml、yarn-site.xml、hive-site.xml。在运行于 YARN 上的 Spark 集群中,这些配置文件是在整个集群范围内设置的,应用程序无法安全地更改它们。
更好的选择是使用 spark.hadoop.* 形式的 Spark Hadoop 属性,以及使用 spark.hive.* 形式的 Spark Hive 属性。例如,添加配置“spark.hadoop.abc.def=xyz”表示添加 hadoop 属性“abc.def=xyz”,添加配置“spark.hive.abc=xyz”表示添加 hive 属性“hive.abc=xyz”。它们可以被视为与可以在 $SPARK_HOME/conf/spark-defaults.conf 中设置的普通 Spark 属性相同。
在某些情况下,您可能希望避免在 SparkConf 中硬编码某些配置。例如,Spark 允许您简单地创建一个空的 conf 并设置 spark/spark hadoop/spark hive 属性。
val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)此外,您可以在运行时修改或添加配置
./bin/spark-submit \
--name "My app" \
--master "local[4]" \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
--conf spark.hive.abc=xyz
myApp.jar自定义资源调度与配置概述
GPU 和其他加速器已被广泛用于加速特殊工作负载,例如深度学习和信号处理。Spark 现在支持请求和调度通用资源(例如 GPU),但有一些注意事项。当前的实现要求资源具有可以由调度程序分配的地址。它要求您的集群管理器支持并正确配置这些资源。
有可用的配置用于请求驱动器资源:spark.driver.resource.{resourceName}.amount,请求执行器资源:spark.executor.resource.{resourceName}.amount,并指定每个任务的要求:spark.task.resource.{resourceName}.amount。spark.driver.resource.{resourceName}.discoveryScript 配置在 YARN、Kubernetes 和 Spark Standalone 的客户端驱动器上是必需的。spark.executor.resource.{resourceName}.discoveryScript 配置对于 YARN 和 Kubernetes 是必需的。Kubernetes 还要求 spark.driver.resource.{resourceName}.vendor 和/或 spark.executor.resource.{resourceName}.vendor。有关每个配置的更多信息,请参见上面的配置说明。
Spark 将使用指定的配置首先从集群管理器请求具有相应资源的容器。一旦获得容器,Spark 就会在该容器中启动一个执行器(Executor),该执行器将发现容器拥有哪些资源以及与每个资源关联的地址。执行器将向驱动器注册并报告该执行器可用的资源。Spark 调度程序随后可以将任务调度到每个执行器,并根据用户指定的资源要求分配特定的资源地址。用户可以使用 TaskContext.get().resources API 查看分配给任务的资源。在驱动器上,用户可以使用 SparkContext 的 resources 调用查看分配的资源。然后由用户决定使用分配的地址进行他们想要的处理,或者将它们传递给他们正在使用的 ML/AI 框架。
请参阅特定集群管理器的页面以获取关于 YARN、Kubernetes 和 Standalone Mode 的详细要求。它目前在本地模式下不可用。另请注意,不支持具有多个工作节点的本地集群模式(参见 Standalone 文档)。
阶段级调度概述
阶段级调度(Stage level scheduling)功能允许用户在阶段级别指定任务和执行器资源要求。这允许不同的阶段使用具有不同资源的执行器运行。一个主要的例子是一个 ETL 阶段仅使用 CPU 的执行器运行,而下一个阶段是需要 GPU 的 ML 阶段。阶段级调度允许用户在 ML 阶段运行时请求具有 GPU 的不同执行器,而不是在应用程序开始时获取具有 GPU 的执行器并在 ETL 阶段运行时使其闲置。这仅适用于 Scala、Java 和 Python 中的 RDD API。它在启用动态分配时可在 YARN、Kubernetes 和 Standalone 上使用。当禁用动态分配时,它允许用户在阶段级别指定不同的任务资源要求,这目前在 YARN、Kubernetes 和 Standalone 集群上得到支持。有关更多实现细节,请参见 YARN 页面、Kubernetes 页面或 Standalone 页面。
请参阅 RDD.withResources 和 ResourceProfileBuilder API 以使用此功能。当禁用动态分配时,具有不同任务资源要求的任务将与 DEFAULT_RESOURCE_PROFILE 共享执行器。而在启用动态分配时,当前的实现会为创建的每个 ResourceProfile 获取新的执行器,并且目前必须精确匹配。Spark 不会尝试将任务放入需要与执行器创建时不同的 ResourceProfile 的执行器中。未使用的执行器将通过动态分配逻辑在空闲超时后关闭。此功能的默认配置是仅允许每个阶段使用一个 ResourceProfile。如果用户将超过 1 个 ResourceProfile 关联到 RDD,Spark 默认会抛出异常。请参阅配置 spark.scheduler.resource.profileMergeConflicts 以控制该行为。当启用 spark.scheduler.resource.profileMergeConflicts 时,Spark 实现的当前合并策略是冲突 ResourceProfiles 中每个资源的最大值。Spark 将创建一个新的 ResourceProfile,其中包含每个资源的最大值。
基于推送的 Shuffle 概述
基于推送的 Shuffle(Push-based shuffle)有助于提高 Spark Shuffle 的可靠性和性能。它采取尽力而为的方法,将 Map 任务生成的 Shuffle 数据块推送到远程外部 Shuffle 服务,以便按 Shuffle 分区进行合并。Reduce 任务获取合并后的 Shuffle 分区和原始 Shuffle 数据块的组合作为其输入数据,从而将外部 Shuffle 服务的随机小磁盘读取转换为大的顺序读取。Reduce 任务更好的数据局部性也有助于最大限度地减少网络 IO。在某些场景下,例如当有合并后的输出可用时进行分区合并(partition coalesce),基于推送的 Shuffle 优先于批量获取。
基于推送的 Shuffle 提高了涉及大量磁盘 I/O 的长期运行作业/查询的性能。目前,它不太适合处理较少 Shuffle 数据量且运行快速的作业/查询。这将在未来的版本中进一步改进。
目前,基于推送的 Shuffle 仅支持带有外部 Shuffle 服务的 YARN 上的 Spark。
外部 Shuffle 服务 (服务器) 端配置选项
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.shuffle.push.server.mergedShuffleFileManagerImpl |
org.apache.spark.network.shuffle.
|
管理基于推送的 Shuffle 的 MergedShuffleFileManager 实现的类名。这充当服务器端配置,用于禁用或启用基于推送的 Shuffle。默认情况下,基于推送的 Shuffle 在服务器端处于禁用状态。要在服务器端启用基于推送的 Shuffle,请将此配置设置为 |
3.2.0 |
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile |
2m |
在基于推送的 Shuffle 期间将合并的 Shuffle 文件划分为多个块时,块的最小大小。合并的 Shuffle 文件由多个小的 Shuffle 数据块组成。在单次磁盘 I/O 中获取完整的合并 Shuffle 文件会增加客户端和外部 Shuffle 服务的内存需求。相反,外部 Shuffle 服务以 设置得过高会增加客户端和外部 Shuffle 服务的内存需求。 设置得过低会不必要地增加对外部 Shuffle 服务的 RPC 请求总数。 |
3.2.0 |
spark.shuffle.push.server.mergedIndexCacheSize |
100m |
基于推送的 Shuffle 中可用于存储合并索引文件的内存缓存最大大小。此缓存是 spark.shuffle.service.index.cache.size 所配置缓存之外的额外缓存。 |
3.2.0 |
客户端配置选项
| 属性名称 | 默认值 | 含义 | 起始版本 |
|---|---|---|---|
spark.shuffle.push.enabled |
false |
设置为 true 可启用客户端上的基于推送的 Shuffle,并与服务器端标志 spark.shuffle.push.server.mergedShuffleFileManagerImpl 结合使用。 |
3.2.0 |
spark.shuffle.push.finalize.timeout |
10s |
对于给定的 Shuffle Map 阶段,在所有 Mappers 完成后,驱动器等待发送合并终结请求给远程外部 Shuffle 服务的时间(以秒为单位)。这给了外部 Shuffle 服务额外的时间来合并数据块。设置得过长可能会导致性能下降。 | 3.2.0 |
spark.shuffle.push.maxRetainedMergerLocations |
500 |
为基于推送的 Shuffle 缓存的合并位置的最大数量。目前,合并位置是外部 Shuffle 服务的主机,负责处理推送的数据块、合并它们并为以后的 Shuffle 获取提供合并后的数据块。 | 3.2.0 |
spark.shuffle.push.mergersMinThresholdRatio |
0.05 |
用于根据 Reducer 阶段的分区数计算阶段所需的最小 Shuffle 合并位置数的比率。例如,一个有 100 个分区并使用默认值 0.05 的 Reduce 阶段至少需要 5 个唯一的合并位置才能启用基于推送的 Shuffle。 | 3.2.0 |
spark.shuffle.push.mergersMinStaticThreshold |
5 |
为使阶段能够启用基于推送的 Shuffle,必须可用的 Shuffle 推送合并位置数量的静态阈值。注意,此配置与 spark.shuffle.push.mergersMinThresholdRatio 结合使用。启用基于推送的 Shuffle 所需的合并器数量是 spark.shuffle.push.mergersMinStaticThreshold 和 spark.shuffle.push.mergersMinThresholdRatio 比率数的最大值。例如:对于子阶段有 1000 个分区,spark.shuffle.push.mergersMinStaticThreshold 为 5,spark.shuffle.push.mergersMinThresholdRatio 设置为 0.05,我们需要至少 50 个合并器才能为该阶段启用基于推送的 Shuffle。 |
3.2.0 |
spark.shuffle.push.numPushThreads |
(无) | 指定数据块推送池中的线程数。这些线程协助创建连接并将数据块推送到远程外部 Shuffle 服务。默认情况下,线程池大小等于 Spark 执行器的核心数。 | 3.2.0 |
spark.shuffle.push.maxBlockSizeToPush |
1m |
推送到远程外部 Shuffle 服务的单个数据块的最大大小。大于此阈值的数据块不会被推送以进行远程合并。这些 Shuffle 数据块将以原始方式获取。 设置得过高会导致更多的数据块被推送到远程外部 Shuffle 服务,但这些数据块已经通过现有机制被高效地获取,从而导致将大数据块推送到远程外部 Shuffle 服务的额外开销。建议将 设置得过低会导致合并的数据块数量减少,并直接从 Mapper 外部 Shuffle 服务获取结果,从而导致较高的随机小读取,影响整体磁盘 I/O 性能。 |
3.2.0 |
spark.shuffle.push.maxBlockBatchSize |
3m |
要归入单个推送请求的 Shuffle 数据块批次的最大大小。默认值设置为 3m,为了使其略高于 spark.storage.memoryMapThreshold 的默认值(即 2m),因为每个数据块批次很可能被内存映射,这会带来更高的开销。 |
3.2.0 |
spark.shuffle.push.merge.finalizeThreads |
8 | 驱动器用于完成 Shuffle 合并的线程数。由于大型 Shuffle 的完成可能需要几秒钟,因此拥有多个线程有助于驱动器在启用基于推送的 Shuffle 时处理并发的 Shuffle 合并完成请求。 | 3.3.0 |
spark.shuffle.push.minShuffleSizeToWait |
500m |
只有当总 Shuffle 数据大小超过此阈值时,驱动器才会等待合并完成。如果总 Shuffle 大小较小,驱动器将立即完成 Shuffle 输出。 | 3.3.0 |
spark.shuffle.push.minCompletedPushRatio |
1.0 |
在基于推送的 Shuffle 期间,驱动器开始 Shuffle 合并终结之前必须完成推送的最小 Map 分区比例。 | 3.3.0 |