Spark 配置
- Spark 属性
- 环境变量
- 配置日志记录
- 覆盖配置目录
- 继承 Hadoop 集群配置
- 自定义 Hadoop/Hive 配置
- 自定义资源调度和配置概述
- 阶段级别调度概述
- 基于推送的 shuffle 概述
Spark 提供了三个位置来配置系统
- Spark 属性控制大多数应用程序参数,可以通过使用 SparkConf 对象或通过 Java 系统属性设置。
- 环境变量可用于设置每机器的设置,例如 IP 地址,通过每个节点上的
conf/spark-env.sh
脚本。 - 日志记录可以通过
log4j2.properties
配置。
Spark 属性
Spark 属性控制大多数应用程序设置,并为每个应用程序单独配置。这些属性可以直接在传递给 SparkContext
的 SparkConf 上设置。SparkConf
允许您配置一些常见属性(例如 master URL 和应用程序名称),以及通过 set()
方法设置任意键值对。例如,我们可以按如下方式初始化一个具有两个线程的应用程序
请注意,我们使用 local[2],这意味着两个线程——这表示“最小”并行度,这有助于检测仅在分布式环境中运行时才存在的错误。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
请注意,在本地模式下我们可以有不止一个线程,在 Spark Streaming 等情况下,我们实际上可能需要不止一个线程来防止任何形式的资源饥饿问题。
指定时间持续时间的属性应配置时间单位。接受以下格式
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
指定字节大小的属性应配置大小单位。接受以下格式
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
虽然没有单位的数字通常被解释为字节,但少数被解释为 KiB 或 MiB。请参阅各个配置属性的文档。在可能的情况下指定单位是可取的。
动态加载 Spark 属性
在某些情况下,您可能希望避免在 SparkConf
中硬编码某些配置。例如,如果您想使用不同的 master 或不同数量的内存运行相同的应用程序。Spark 允许您简单地创建一个空配置
val sc = new SparkContext(new SparkConf())
然后,您可以在运行时提供配置值
./bin/spark-submit \
--name "My app" \
--master "local[4]" \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
myApp.jar
Spark shell 和 spark-submit
工具支持两种动态加载配置的方式。第一种是命令行选项,例如上面所示的 --master
。spark-submit
可以使用 --conf/-c
标志接受任何 Spark 属性,但对在启动 Spark 应用程序中起作用的属性使用特殊标志。运行 ./bin/spark-submit --help
将显示这些选项的完整列表。
当通过 --conf/-c
标志指定配置时,bin/spark-submit
还会从 conf/spark-defaults.conf
读取配置选项,其中每行由一个键和由空格分隔的值组成。例如
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
此外,一个包含 Spark 配置的属性文件可以通过 --properties-file
参数传递给 bin/spark-submit
。如果设置此参数,除非提供了另一个参数 --load-spark-defaults
,否则 Spark 将不再从 conf/spark-defaults.conf
加载配置。
通过标志或在属性文件中指定的任何值都将传递给应用程序,并与通过 SparkConf 指定的值合并。直接在 SparkConf 上设置的属性优先级最高,其次是通过 --conf
标志或传递给 spark-submit
或 spark-shell
的 --properties-file
,最后是 spark-defaults.conf
文件中的选项。自早期版本的 Spark 以来,一些配置键已重命名;在这种情况下,旧的键名仍然被接受,但优先级低于任何新的键实例。
Spark 属性主要分为两类:一类与部署相关,如“spark.driver.memory”、“spark.executor.instances”,这类属性在运行时通过 SparkConf
以编程方式设置可能不受影响,或者其行为取决于您选择的集群管理器和部署模式,因此建议通过配置文件或 spark-submit
命令行选项设置;另一类主要与 Spark 运行时控制相关,如“spark.task.maxFailures”,这类属性可以通过任何一种方式设置。
查看 Spark 属性
位于 http://<driver>:4040
的应用程序 Web UI 在“Environment”选项卡中列出了 Spark 属性。这是一个有用的地方,可以检查以确保您的属性已正确设置。请注意,只有通过 spark-defaults.conf
、SparkConf
或命令行明确指定的值才会出现。对于所有其他配置属性,您可以假定使用了默认值。
可用属性
大多数控制内部设置的属性都有合理的默认值。一些最常见的设置选项是
应用程序属性
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.app.name |
(无) | 您的应用程序名称。这会显示在 UI 和日志数据中。 | 0.9.0 |
spark.driver.cores |
1 | 用于驱动程序进程的核心数,仅在集群模式下。 | 1.3.0 |
spark.driver.maxResultSize |
1g | 每个 Spark action(例如 collect)所有分区序列化结果的总大小限制,单位为字节。应至少为 1M,或 0 表示无限制。如果总大小超过此限制,作业将被中止。高限制可能导致驱动程序出现内存不足错误(取决于 spark.driver.memory 和 JVM 中对象的内存开销)。设置适当的限制可以保护驱动程序免受内存不足错误。 | 1.2.0 |
spark.driver.memory |
1g | 用于驱动程序进程的内存量,即 SparkContext 初始化的地方,格式与 JVM 内存字符串相同,带有大小单位后缀(“k”、“m”、“g”或“t”)(例如 512m 、2g )。注意:在客户端模式下,此配置不得通过应用程序中的 SparkConf 直接设置,因为驱动程序 JVM 在那时已经启动。相反,请通过 --driver-memory 命令行选项或在您的默认属性文件中设置此项。 |
1.1.1 |
spark.driver.memoryOverhead |
driverMemory * spark.driver.memoryOverheadFactor ,最小值为 spark.driver.minMemoryOverhead |
在集群模式下为每个驱动程序进程分配的非堆内存量,除非另有说明,否则以 MiB 为单位。此内存用于虚拟机开销、interned 字符串、其他本机开销等。它倾向于随着容器大小的增加而增加(通常为 6-10%)。此选项目前在 YARN 和 Kubernetes 上受支持。注意:非堆内存包括堆外内存(当 spark.memory.offHeap.enabled=true 时)和由其他驱动程序进程(例如与 PySpark 驱动程序一起的 python 进程)使用的内存以及在同一容器中运行的其他非驱动程序进程使用的内存。运行驱动程序的容器的最大内存大小由 spark.driver.memoryOverhead 和 spark.driver.memory 的总和决定。 |
2.3.0 |
spark.driver.minMemoryOverhead |
384m | 在集群模式下为每个驱动程序进程分配的非堆内存的最小量,除非另有说明,否则以 MiB 为单位,如果未定义 spark.driver.memoryOverhead 。此选项目前在 YARN 和 Kubernetes 上受支持。 |
4.0.0 |
spark.driver.memoryOverheadFactor |
0.10 | 在集群模式下,作为每个驱动程序进程的额外非堆内存分配的驱动程序内存的比例。此内存用于虚拟机开销、interned 字符串、其他本机开销等。它倾向于随着容器大小的增加而增加。此值默认为 0.10,但 Kubernetes 非 JVM 作业除外,后者默认为 0.40。这样做是因为非 JVM 任务需要更多的非 JVM 堆空间,并且此类任务通常会因“内存开销超出”错误而失败。这通过更高的默认值来预防此错误。如果直接设置了 spark.driver.memoryOverhead ,则此值将被忽略。 |
3.3.0 |
spark.driver.resource.{resourceName}.amount |
0 | 在驱动程序上使用的特定资源类型的数量。如果使用此项,您还必须指定 spark.driver.resource.{resourceName}.discoveryScript ,以便驱动程序在启动时找到资源。 |
3.0.0 |
spark.driver.resource.{resourceName}.discoveryScript |
无 | 驱动程序运行以发现特定资源类型的脚本。这应将 JSON 字符串以 ResourceInformation 类的格式写入 STDOUT。它具有名称和地址数组。对于客户端提交的驱动程序,发现脚本必须为该驱动程序分配与同一主机上的其他驱动程序不同的资源地址。 | 3.0.0 |
spark.driver.resource.{resourceName}.vendor |
无 | 用于驱动程序的资源供应商。此选项目前仅在 Kubernetes 上受支持,实际上是遵循 Kubernetes 设备插件命名约定的供应商和域。(例如,对于 Kubernetes 上的 GPU,此配置将设置为 nvidia.com 或 amd.com) | 3.0.0 |
spark.resources.discoveryPlugin |
org.apache.spark.resource.ResourceDiscoveryScriptPlugin | 以逗号分隔的实现 org.apache.spark.api.resource.ResourceDiscoveryPlugin 的类名列表,用于加载到应用程序中。这适用于高级用户,用自定义实现替换资源发现类。Spark 将尝试指定的每个类,直到其中一个返回该资源的资源信息。如果没有插件返回该资源的信息,它将最后尝试发现脚本。 | 3.0.0 |
spark.executor.memory |
1g | 每个执行器进程使用的内存量,格式与 JVM 内存字符串相同,带有大小单位后缀(“k”、“m”、“g”或“t”)(例如 512m 、2g )。 |
0.7.0 |
spark.executor.pyspark.memory |
未设置 | 分配给每个执行器中 PySpark 的内存量,除非另有说明,否则以 MiB 为单位。如果设置,执行器的 PySpark 内存将限制在此数量。如果未设置,Spark 将不限制 Python 的内存使用,并且由应用程序自行避免超出与其他非 JVM 进程共享的开销内存空间。当 PySpark 在 YARN 或 Kubernetes 中运行时,此内存将添加到执行器资源请求中。 注意:此功能依赖于 Python 的 resource 模块;因此,行为和限制是继承的。例如,Windows 不支持资源限制,并且 MacOS 上实际资源不受限制。 |
2.4.0 |
spark.executor.memoryOverhead |
executorMemory * spark.executor.memoryOverheadFactor ,最小值为 spark.executor.minMemoryOverhead |
为每个执行器进程分配的额外内存量,除非另有说明,否则以 MiB 为单位。此内存用于虚拟机开销、interned 字符串、其他本机开销等。它倾向于随着执行器大小的增加而增加(通常为 6-10%)。此选项目前在 YARN 和 Kubernetes 上受支持。 注意:额外内存包括 PySpark 执行器内存(当未配置 spark.executor.pyspark.memory 时)和同一容器中运行的其他非执行器进程使用的内存。运行执行器的容器的最大内存大小由 spark.executor.memoryOverhead 、spark.executor.memory 、spark.memory.offHeap.size 和 spark.executor.pyspark.memory 的总和决定。 |
2.3.0 |
spark.executor.minMemoryOverhead |
384m | 在集群模式下为每个执行器进程分配的非堆内存的最小量,除非另有说明,否则以 MiB 为单位,如果未定义 spark.executor.memoryOverhead 。此选项目前在 YARN 和 Kubernetes 上受支持。 |
4.0.0 |
spark.executor.memoryOverheadFactor |
0.10 | 作为每个执行器进程的额外非堆内存分配的执行器内存的比例。此内存用于虚拟机开销、interned 字符串、其他本机开销等。它倾向于随着容器大小的增加而增加。此值默认为 0.10,但 Kubernetes 非 JVM 作业除外,后者默认为 0.40。这样做是因为非 JVM 任务需要更多的非 JVM 堆空间,并且此类任务通常会因“内存开销超出”错误而失败。这通过更高的默认值来预防此错误。如果直接设置了 spark.executor.memoryOverhead ,则此值将被忽略。 |
3.3.0 |
spark.executor.resource.{resourceName}.amount |
0 | 每个执行器进程使用的特定资源类型的数量。如果使用此项,您还必须指定 spark.executor.resource.{resourceName}.discoveryScript ,以便执行器在启动时找到资源。 |
3.0.0 |
spark.executor.resource.{resourceName}.discoveryScript |
无 | 执行器运行以发现特定资源类型的脚本。这应将 JSON 字符串以 ResourceInformation 类的格式写入 STDOUT。它具有名称和地址数组。 | 3.0.0 |
spark.executor.resource.{resourceName}.vendor |
无 | 用于执行器的资源供应商。此选项目前仅在 Kubernetes 上受支持,实际上是遵循 Kubernetes 设备插件命名约定的供应商和域。(例如,对于 Kubernetes 上的 GPU,此配置将设置为 nvidia.com 或 amd.com) | 3.0.0 |
spark.extraListeners |
(无) | 以逗号分隔的实现 SparkListener 的类名列表;当初始化 SparkContext 时,将创建这些类的实例并将其注册到 Spark 的监听器总线。如果类有一个接受 SparkConf 的单参数构造函数,则将调用该构造函数;否则,将调用零参数构造函数。如果找不到有效的构造函数,则 SparkContext 创建将失败并抛出异常。 |
1.3.0 |
spark.local.dir |
/tmp | 用于 Spark “暂存”空间(包括映射输出文件和存储在磁盘上的 RDD)的目录。这应该在您系统的快速本地磁盘上。它也可以是逗号分隔的多个目录列表,这些目录位于不同的磁盘上。 注意:这将由集群管理器设置的 SPARK_LOCAL_DIRS (Standalone) 或 LOCAL_DIRS (YARN) 环境变量覆盖。 |
0.5.0 |
spark.logConf |
false | SparkContext 启动时以 INFO 级别记录有效的 SparkConf。 | 0.9.0 |
spark.master |
(无) | 要连接的集群管理器。请参阅 允许的 master URL 列表。 | 0.9.0 |
spark.submit.deployMode |
client | Spark 驱动程序部署模式,可以是“client”或“cluster”,这意味着在本地(“client”)或集群中的一个节点上远程(“cluster”)启动驱动程序。 | 1.5.0 |
spark.log.callerContext |
(无) | 在 Yarn/HDFS 上运行时,将写入 Yarn RM 日志/HDFS 审计日志的应用程序信息。其长度取决于 Hadoop 配置 hadoop.caller.context.max.size 。它应该简洁,通常最多可有 50 个字符。 |
2.2.0 |
spark.log.level |
(无) | 设置后,会在 Spark 启动时覆盖任何用户定义的日志设置,就像调用 SparkContext.setLogLevel() 一样。有效的日志级别包括:“ALL”、“DEBUG”、“ERROR”、“FATAL”、“INFO”、“OFF”、“TRACE”、“WARN”。 |
3.5.0 |
spark.driver.supervise |
false | 如果为 true,则在驱动程序以非零退出状态失败时自动重新启动。仅在 Spark standalone 模式下有效。 | 1.3.0 |
spark.driver.timeout |
0min | Spark 驱动程序的超时时间,单位为分钟。0 表示无限。对于正的时间值,如果驱动程序在超时持续时间后仍在运行,则以退出代码 124 终止。要使用,需要将 spark.plugins 设置为 org.apache.spark.deploy.DriverTimeoutPlugin 。 |
4.0.0 |
spark.driver.log.localDir |
(无) | 指定一个本地目录来写入驱动程序日志并启用驱动程序日志 UI 选项卡。 | 4.0.0 |
spark.driver.log.dfsDir |
(无) | 如果 spark.driver.log.persistToDfs.enabled 为 true,则 Spark 驱动程序日志同步到的基本目录。在此基本目录中,每个应用程序将其驱动程序日志记录到一个应用程序特定的文件中。用户可能希望将其设置为一个统一位置,如 HDFS 目录,以便驱动程序日志文件可以持久化以供以后使用。此目录应允许任何 Spark 用户读/写文件,并允许 Spark History Server 用户删除文件。此外,如果 spark.history.fs.driverlog.cleaner.enabled 为 true,并且如果它们早于通过设置 spark.history.fs.driverlog.cleaner.maxAge 配置的最大年龄,Spark History Server 将清理此目录中的旧日志。 |
3.0.0 |
spark.driver.log.persistToDfs.enabled |
false | 如果为 true,则在客户端模式下运行的 spark 应用程序会将驱动程序日志写入持久存储,该存储在 spark.driver.log.dfsDir 中配置。如果未配置 spark.driver.log.dfsDir ,则驱动程序日志将不会持久化。此外,通过将 spark.history.fs.driverlog.cleaner.enabled 设置为 true 来启用 Spark History Server 中的清理器。 |
3.0.0 |
spark.driver.log.layout |
%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex | 同步到 spark.driver.log.localDir 和 spark.driver.log.dfsDir 的驱动程序日志的布局。如果未配置,则使用 log4j2.properties 中定义的第一个 appender 的布局。如果也未配置,则驱动程序日志使用默认布局。 |
3.0.0 |
spark.driver.log.allowErasureCoding |
false | 是否允许驱动程序日志使用擦除编码。在 HDFS 上,擦除编码文件更新速度不如常规复制文件快,因此它们可能需要更长时间才能反映应用程序写入的更改。请注意,即使此设置为 true,Spark 也不会强制文件使用擦除编码,它只会使用文件系统默认值。 | 3.0.0 |
spark.decommission.enabled |
false | 当启用下线时,Spark 将尽力优雅地关闭执行器。当 spark.storage.decommission.enabled 启用时,Spark 将尝试将所有 RDD 块(由 spark.storage.decommission.rddBlocks.enabled 控制)和 shuffle 块(由 spark.storage.decommission.shuffleBlocks.enabled 控制)从下线的执行器迁移到远程执行器。启用下线后,当 spark.dynamicAllocation.enabled 启用时,Spark 也将下线执行器而不是杀死它。 |
3.1.0 |
spark.executor.decommission.killInterval |
(无) | 下线执行器将被外部(例如非 Spark)服务强制杀死的持续时间。 | 3.1.0 |
spark.executor.decommission.forceKillTimeout |
(无) | Spark 强制下线执行器退出的持续时间。在大多数情况下,此值应设置得较高,因为较低的值会阻止块迁移有足够的时间完成。 | 3.2.0 |
spark.executor.decommission.signal |
PWR | 用于触发执行器开始下线的信号。 | 3.2.0 |
spark.executor.maxNumFailures |
numExecutors * 2,最小值为 3 | 在应用程序失败之前允许的最大执行器故障次数。此配置仅在 YARN 和 Kubernetes 上生效。 | 3.5.0 |
spark.executor.failuresValidityInterval |
(无) | 在此间隔之后,执行器故障将被视为独立的,并且不会累积到尝试计数。此配置仅在 YARN 和 Kubernetes 上生效。 | 3.5.0 |
除此之外,还有以下属性可用,在某些情况下可能有用
运行时环境
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.driver.extraClassPath |
(无) | 要添加到驱动程序类路径开头的额外类路径条目。 注意:在客户端模式下,此配置不得通过应用程序中的 SparkConf 直接设置,因为驱动程序 JVM 在那时已经启动。相反,请通过 --driver-class-path 命令行选项或在您的默认属性文件中设置此项。 |
1.0.0 |
spark.driver.defaultJavaOptions |
(无) | 一个默认 JVM 选项字符串,将添加到 spark.driver.extraJavaOptions 的开头。这旨在由管理员设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置最大堆大小 (-Xmx) 设置是非法的。最大堆大小设置可以在集群模式下使用 spark.driver.memory 设置,并在客户端模式下通过 --driver-memory 命令行选项设置。注意:在客户端模式下,此配置不得通过应用程序中的 SparkConf 直接设置,因为驱动程序 JVM 在那时已经启动。相反,请通过 --driver-java-options 命令行选项或在您的默认属性文件中设置此项。 |
3.0.0 |
spark.driver.extraJavaOptions |
(无) | 一个额外的 JVM 选项字符串,将传递给驱动程序。这旨在由用户设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置最大堆大小 (-Xmx) 设置是非法的。最大堆大小设置可以在集群模式下使用 spark.driver.memory 设置,并在客户端模式下通过 --driver-memory 命令行选项设置。注意:在客户端模式下,此配置不得通过应用程序中的 SparkConf 直接设置,因为驱动程序 JVM 在那时已经启动。相反,请通过 --driver-java-options 命令行选项或在您的默认属性文件中设置此项。spark.driver.defaultJavaOptions 将添加到此配置的开头。 |
1.0.0 |
spark.driver.extraLibraryPath |
(无) | 设置启动驱动程序 JVM 时要使用的特殊库路径。 注意:在客户端模式下,此配置不得通过应用程序中的 SparkConf 直接设置,因为驱动程序 JVM 在那时已经启动。相反,请通过 --driver-library-path 命令行选项或在您的默认属性文件中设置此项。 |
1.0.0 |
spark.driver.userClassPathFirst |
false | (实验性)在驱动程序中加载类时,用户添加的 jar 是否优先于 Spark 自己的 jar。此功能可用于缓解 Spark 依赖项与用户依赖项之间的冲突。目前是实验性功能。仅在集群模式下使用。 | 1.3.0 |
spark.executor.extraClassPath |
(无) | 要添加到执行器类路径开头的额外类路径条目。这主要用于向后兼容旧版本的 Spark。用户通常不需要设置此选项。 | 1.0.0 |
spark.executor.defaultJavaOptions |
(无) | 一个默认 JVM 选项字符串,将添加到 spark.executor.extraJavaOptions 的开头。这旨在由管理员设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置 Spark 属性或最大堆大小 (-Xmx) 设置是非法的。Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件设置。最大堆大小设置可以使用 spark.executor.memory 设置。如果存在以下符号,它们将被插值:将替换为应用程序 ID,将替换为执行器 ID。例如,要将详细的 GC 日志记录到 /tmp 中以应用程序执行器 ID 命名的文件,请传递一个值为:-verbose:gc -Xloggc:/tmp/-.gc |
3.0.0 |
spark.executor.extraJavaOptions |
(无) | 一个额外的 JVM 选项字符串,将传递给执行器。这旨在由用户设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置 Spark 属性或最大堆大小 (-Xmx) 设置是非法的。Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件设置。最大堆大小设置可以使用 spark.executor.memory 设置。如果存在以下符号,它们将被插值:将替换为应用程序 ID,将替换为执行器 ID。例如,要将详细的 GC 日志记录到 /tmp 中以应用程序执行器 ID 命名的文件,请传递一个值为:-verbose:gc -Xloggc:/tmp/-.gc spark.executor.defaultJavaOptions 将添加到此配置的开头。 |
1.0.0 |
spark.executor.extraLibraryPath |
(无) | 设置启动执行器 JVM 时要使用的特殊库路径。 | 1.0.0 |
spark.executor.logs.rolling.maxRetainedFiles |
-1 | 设置系统将保留的最新滚动日志文件的数量。旧的日志文件将被删除。默认禁用。 | 1.1.0 |
spark.executor.logs.rolling.enableCompression |
false | 启用执行器日志压缩。如果启用,滚动执行器日志将被压缩。默认禁用。 | 2.0.2 |
spark.executor.logs.rolling.maxSize |
1024 * 1024 | 设置执行器日志将滚动的文件的最大大小(以字节为单位)。默认禁用滚动。请参阅 spark.executor.logs.rolling.maxRetainedFiles 以自动清理旧日志。 |
1.4.0 |
spark.executor.logs.rolling.strategy |
"" (禁用) | 设置执行器日志的滚动策略。默认情况下禁用。可以设置为“time”(基于时间的滚动)或“size”(基于大小的滚动)或“” (禁用)。对于“time”,使用 spark.executor.logs.rolling.time.interval 设置滚动间隔。对于“size”,使用 spark.executor.logs.rolling.maxSize 设置滚动的最大文件大小。 |
1.1.0 |
spark.executor.logs.rolling.time.interval |
daily | 设置执行器日志滚动的时间间隔。默认禁用滚动。有效值为 daily 、hourly 、minutely 或任何以秒为单位的间隔。请参阅 spark.executor.logs.rolling.maxRetainedFiles 以自动清理旧日志。 |
1.1.0 |
spark.executor.userClassPathFirst |
false | (实验性)与 spark.driver.userClassPathFirst 功能相同,但应用于执行器实例。 |
1.3.0 |
spark.executorEnv.[EnvironmentVariableName] |
(无) | 将由 EnvironmentVariableName 指定的环境变量添加到执行器进程。用户可以指定多个此类变量来设置多个环境变量。 |
0.9.0 |
spark.redaction.regex |
(?i)secret|password|token|access[.]?key | 用于决定驱动程序和执行器环境中哪些 Spark 配置属性和环境变量包含敏感信息的正则表达式。当此正则表达式匹配属性键或值时,该值将从环境 UI 和各种日志(如 YARN 和事件日志)中 redacted。 | 2.1.2 |
spark.redaction.string.regex |
(无) | 用于决定 Spark 生成的字符串的哪些部分包含敏感信息的正则表达式。当此正则表达式匹配字符串的一部分时,该字符串部分将替换为虚拟值。这目前用于 redacted SQL 解释命令的输出。 | 2.2.0 |
spark.python.profile |
false | 在 Python worker 中启用分析,分析结果将通过 sc.show_profiles() 显示,或者在驱动程序退出前显示。它也可以通过 sc.dump_profiles(path) 转储到磁盘。如果某些分析结果已手动显示,则在驱动程序退出前它们不会自动显示。默认情况下将使用 pyspark.profiler.BasicProfiler ,但这可以通过将分析器类作为参数传递给 SparkContext 构造函数来覆盖。 |
1.2.0 |
spark.python.profile.dump |
(无) | 用于在驱动程序退出前转储分析结果的目录。结果将作为每个 RDD 的单独文件转储。它们可以通过 pstats.Stats() 加载。如果指定了此项,则分析结果将不会自动显示。 |
1.2.0 |
spark.python.worker.memory |
512m | 在聚合期间每个 python worker 进程使用的内存量,格式与 JVM 内存字符串相同,带有大小单位后缀(“k”、“m”、“g”或“t”)(例如 512m 、2g )。如果聚合期间使用的内存超过此数量,它将把数据溢出到磁盘。 |
1.1.0 |
spark.python.worker.reuse |
true | 是否重用 Python worker。如果重用,它将使用固定数量的 Python worker,无需为每个任务 fork() 一个 Python 进程。如果存在大量广播,这将非常有用,因为广播无需为每个任务从 JVM 传输到 Python worker。 | 1.2.0 |
spark.files |
逗号分隔的文件列表,将被放置在每个执行器的工作目录中。允许使用 glob 模式。 | 1.0.0 | |
spark.submit.pyFiles |
以逗号分隔的 .zip、.egg 或 .py 文件列表,用于放置在 Python 应用程序的 PYTHONPATH 上。允许使用 glob 模式。 | 1.0.1 | |
spark.jars |
以逗号分隔的 jar 列表,包含在驱动程序和执行器类路径中。允许使用 glob 模式。 | 0.9.0 | |
spark.jars.packages |
以逗号分隔的 Maven 坐标列表,用于包含在驱动程序和执行器类路径中的 jar。坐标应为 groupId:artifactId:version。如果给定 spark.jars.ivySettings ,则工件将根据文件中的配置解析,否则将在本地 maven 仓库、maven central 以及通过命令行选项 --repositories 提供的任何其他远程仓库中搜索工件。有关更多详细信息,请参阅 高级依赖管理。 |
1.5.0 | |
spark.jars.excludes |
以逗号分隔的 groupId:artifactId 列表,在解析 spark.jars.packages 中提供的依赖项时排除,以避免依赖冲突。 |
1.5.0 | |
spark.jars.ivy |
指定 Ivy 用户目录的路径,用于 Ivy 本地缓存和 spark.jars.packages 中的包文件。这将覆盖 Ivy 属性 ivy.default.ivy.user.dir ,该属性默认为 ~/.ivy2。 |
1.3.0 | |
spark.jars.ivySettings |
Ivy 设置文件的路径,用于自定义使用 spark.jars.packages 指定的 jar 的解析,而不是内置默认值(例如 maven central)。通过命令行选项 --repositories 或 spark.jars.repositories 提供的额外仓库也将包括在内。这对于允许 Spark 从防火墙后面(例如通过内部工件服务器 Artifactory)解析工件非常有用。有关设置文件格式的详细信息,请参阅 Settings Files。仅支持 file:// 方案的路径。没有方案的路径假定具有 file:// 方案。在 YARN 集群模式下运行时,此文件也将本地化到远程驱动程序,用于 |
2.2.0 | |
spark.jars.repositories |
逗号分隔的额外远程仓库列表,用于搜索通过 --packages 或 spark.jars.packages 给定的 maven 坐标。 |
2.3.0 | |
spark.archives |
逗号分隔的档案列表,将提取到每个执行器的工作目录中。支持 .jar、.tar.gz、.tgz 和 .zip。您可以通过在文件名后添加 # 来指定要解压的目录名,例如 file.zip#directory 。此配置是实验性的。 |
3.1.0 | |
spark.pyspark.driver.python |
用于 PySpark 驱动程序的 Python 二进制可执行文件。(默认为 spark.pyspark.python ) |
2.1.0 | |
spark.pyspark.python |
用于 PySpark 驱动程序和执行器的 Python 二进制可执行文件。 | 2.1.0 |
Shuffle 行为
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.reducer.maxSizeInFlight |
48m | 从每个 reduce 任务同时获取的最大 map 输出大小,除非另有说明,否则以 MiB 为单位。由于每个输出都需要我们创建一个缓冲区来接收它,这表示每个 reduce 任务的固定内存开销,因此除非您有大量内存,否则请保持其较小。 | 1.4.0 |
spark.reducer.maxReqsInFlight |
Int.MaxValue | 此配置限制了在任何给定时间点获取块的远程请求数量。当集群中的主机数量增加时,可能导致大量入站连接到一个或多个节点,从而导致 worker 在负载下失败。通过限制获取请求的数量,可以缓解这种情况。 | 2.0.0 |
spark.reducer.maxBlocksInFlightPerAddress |
Int.MaxValue | 此配置限制了从给定主机端口的每个 reduce 任务中获取的远程块的数量。当在单个获取或同时从给定地址请求大量块时,这可能会导致服务执行器或 Node Manager 崩溃。当启用外部 shuffle 时,这对于减少 Node Manager 上的负载特别有用。您可以通过将其设置为较低值来缓解此问题。 | 2.2.1 |
spark.shuffle.compress |
true | 是否压缩 map 输出文件。通常是个好主意。压缩将使用 spark.io.compression.codec 。 |
0.6.0 |
spark.shuffle.file.buffer |
32k | 每个 shuffle 文件输出流的内存中缓冲区大小,除非另有说明,否则以 KiB 为单位。这些缓冲区减少了创建中间 shuffle 文件时进行的磁盘查找和系统调用次数。 | 1.4.0 |
spark.shuffle.file.merge.buffer |
32k | 每个 shuffle 文件输入流的内存中缓冲区大小,除非另有说明,否则以 KiB 为单位。这些缓冲区使用堆外缓冲区,并与 shuffle 文件中的文件数量相关。应避免过大的缓冲区。 | 4.0.0 |
spark.shuffle.unsafe.file.output.buffer |
32k | 自 Spark 4.0 起已废弃,请使用 spark.shuffle.localDisk.file.output.buffer 。 |
2.3.0 |
spark.shuffle.localDisk.file.output.buffer |
32k | 所有本地磁盘 shuffle writer 在写入每个分区后此缓冲区大小的文件系统。除非另有说明,否则以 KiB 为单位。 | 4.0.0 |
spark.shuffle.spill.diskWriteBufferSize |
1024 * 1024 | 将排序后的记录写入磁盘文件时要使用的缓冲区大小,单位为字节。 | 2.3.0 |
spark.shuffle.io.maxRetries |
3 | (仅限 Netty)如果设置为非零值,因 IO 相关异常而失败的获取会自动重试。此重试逻辑有助于在大规模 shuffle 中,面对长时间的 GC 暂停或瞬态网络连接问题时保持稳定。 | 1.2.0 |
spark.shuffle.io.numConnectionsPerPeer |
1 | (仅限 Netty)主机之间的连接被重用,以减少大型集群的连接建立。对于硬盘多而主机少的集群,这可能导致并发不足以饱和所有磁盘,因此用户可以考虑增加此值。 | 1.2.1 |
spark.shuffle.io.preferDirectBufs |
true | (仅限 Netty)堆外缓冲区用于减少 shuffle 和缓存块传输期间的垃圾回收。对于堆外内存严格受限的环境,用户可能希望关闭此选项以强制 Netty 的所有分配都在堆上。 | 1.2.0 |
spark.shuffle.io.retryWait |
5s | (仅限 Netty)两次获取重试之间的等待时间。通过重试导致的最大延迟默认为 15 秒,计算方式为 maxRetries * retryWait 。 |
1.2.1 |
spark.shuffle.io.backLog |
-1 | Shuffle 服务的接受队列长度。对于大型应用程序,可能需要增加此值,以便在服务无法跟上短时间内大量传入连接时,传入连接不会被丢弃。这需要在 shuffle 服务本身运行的任何地方进行配置,这可能在应用程序之外(请参阅下面的 spark.shuffle.service.enabled 选项)。如果设置低于 1,将回退到 Netty 的 io.netty.util.NetUtil#SOMAXCONN 定义的操作系统默认值。 |
1.1.1 |
spark.shuffle.io.connectionTimeout |
spark.network.timeout 的值 |
Shuffle 服务器和客户端之间已建立连接的超时时间,如果在至少 connectionTimeout 时间内仍有未完成的获取请求但通道上没有流量,则将其标记为空闲并关闭。 |
1.2.0 |
spark.shuffle.io.connectionCreationTimeout |
spark.shuffle.io.connectionTimeout 的值 |
Shuffle 服务器和客户端之间建立连接的超时时间。 | 3.2.0 |
spark.shuffle.service.enabled |
false | 启用外部 shuffle 服务。此服务保留执行器写入的 shuffle 文件,例如,以便可以安全地删除执行器,或者在执行器失败时可以继续 shuffle 获取。必须设置外部 shuffle 服务才能启用它。有关更多信息,请参阅 动态分配配置和设置文档。 | 1.2.0 |
spark.shuffle.service.port |
7337 | 外部 shuffle 服务将运行的端口。 | 1.2.0 |
spark.shuffle.service.name |
spark_shuffle | 客户端应与之通信的 Spark shuffle 服务的配置名称。此名称必须与 YARN NodeManager 配置 (yarn.nodemanager.aux-services ) 中用于配置 Shuffle 的名称匹配。仅当 spark.shuffle.service.enabled 设置为 true 时才生效。 |
3.2.0 |
spark.shuffle.service.index.cache.size |
100m | 缓存条目限制为指定的内存占用,除非另有说明,否则以字节为单位。 | 2.3.0 |
spark.shuffle.service.removeShuffle |
true | 当 shuffle 不再需要时,是否使用 ExternalShuffleService 删除已解除分配的执行器的 shuffle 块。如果未启用此功能,则已解除分配的执行器上的 shuffle 数据将保留在磁盘上,直到应用程序结束。 | 3.3.0 |
spark.shuffle.maxChunksBeingTransferred |
Long.MAX_VALUE | Shuffle 服务上同时允许传输的最大块数。请注意,当达到最大数量时,新的传入连接将被关闭。客户端将根据 shuffle 重试配置(请参阅 spark.shuffle.io.maxRetries 和 spark.shuffle.io.retryWait )进行重试,如果达到这些限制,任务将因获取失败而失败。 |
2.3.0 |
spark.shuffle.sort.bypassMergeThreshold |
200 | (高级)在基于排序的 shuffle 管理器中,如果没有 map 端聚合且 reduce 分区最多为这个数量时,避免合并排序数据。 | 1.1.1 |
spark.shuffle.sort.io.plugin.class |
org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO | 用于 shuffle IO 的类名。 | 3.0.0 |
spark.shuffle.spill.compress |
true | 是否压缩 shuffle 期间溢出的数据。压缩将使用 spark.io.compression.codec 。 |
0.9.0 |
spark.shuffle.accurateBlockThreshold |
100 * 1024 * 1024 | 高于此阈值的 HighlyCompressedMapStatus 中的 shuffle 块大小将被精确记录。这有助于通过避免在获取 shuffle 块时低估 shuffle 块大小来防止 OOM。 | 2.2.1 |
spark.shuffle.accurateBlockSkewedFactor |
-1.0 | 如果 shuffle 块的大小大于中值 shuffle 块大小乘以这个因子或 spark.shuffle.accurateBlockThreshold ,则该 shuffle 块被认为是倾斜的,并将在 HighlyCompressedMapStatus 中精确记录。建议将此参数设置为与 spark.sql.adaptive.skewJoin.skewedPartitionFactor 相同。默认设置为 -1.0 以禁用此功能。 |
3.3.0 |
spark.shuffle.registration.timeout |
5000 | 向外部 shuffle 服务注册的超时时间(毫秒)。 | 2.3.0 |
spark.shuffle.registration.maxAttempts |
3 | 当我们无法注册到外部 shuffle 服务时,我们将重试 maxAttempts 次。 | 2.3.0 |
spark.shuffle.reduceLocality.enabled |
true | 是否计算 reduce 任务的本地性偏好。 | 1.5.0 |
spark.shuffle.mapOutput.minSizeForBroadcast |
512k | 我们使用广播将 map 输出状态发送给执行器时的大小。 | 2.0.0 |
spark.shuffle.detectCorrupt |
true | 是否检测获取的块中的任何损坏。 | 2.2.0 |
spark.shuffle.detectCorrupt.useExtraMemory |
false | 如果启用,将通过使用额外的内存来解压缩/解密压缩/加密流的一部分,以检测早期损坏。任何抛出的 IOException 将导致任务重试一次,如果再次失败并出现相同的异常,则将抛出 FetchFailedException 以重试上一个阶段。 | 3.0.0 |
spark.shuffle.useOldFetchProtocol |
false | 在进行 shuffle 块获取时是否使用旧协议。仅当我们新的 Spark 版本作业从旧版本外部 shuffle 服务获取 shuffle 块时需要兼容性时才启用。 | 3.0.0 |
spark.shuffle.readHostLocalDisk |
true | 如果启用(并且 spark.shuffle.useOldFetchProtocol 被禁用),则从在同一主机上运行的块管理器请求的 shuffle 块将直接从磁盘读取,而不是通过网络作为远程块获取。 |
3.0.0 |
spark.files.io.connectionTimeout |
spark.network.timeout 的值 |
在 Spark RPC 环境中获取文件时,已建立连接的超时时间,如果在至少 connectionTimeout 时间内仍有未完成的文件正在下载但通道上没有流量,则将其标记为空闲并关闭。 |
1.6.0 |
spark.files.io.connectionCreationTimeout |
spark.files.io.connectionTimeout 的值 |
在 Spark RPC 环境中建立获取文件连接的超时时间。 | 3.2.0 |
spark.shuffle.checksum.enabled |
true | 是否计算 shuffle 数据的校验和。如果启用,Spark 将计算 map 输出文件中每个分区数据的校验和值,并将这些值存储在磁盘上的校验和文件中。当检测到 shuffle 数据损坏时,Spark 将尝试使用校验和文件诊断损坏的原因(例如,网络问题、磁盘问题等)。 | 3.2.0 |
spark.shuffle.checksum.algorithm |
ADLER32 | 用于计算 shuffle 校验和的算法。目前,它仅支持 JDK 的内置算法,例如 ADLER32、CRC32 和 CRC32C。 | 3.2.0 |
spark.shuffle.service.fetch.rdd.enabled |
false | 是否使用 ExternalShuffleService 来获取磁盘持久化的 RDD 块。在动态分配的情况下,如果启用此功能,则只有磁盘持久化块的执行器在 spark.dynamicAllocation.executorIdleTimeout 后被视为空闲,并将相应释放。 |
3.0.0 |
spark.shuffle.service.db.enabled |
true | 是否在 ExternalShuffleService 中使用数据库。请注意,这仅影响 standalone 模式。 | 3.0.0 |
spark.shuffle.service.db.backend |
ROCKSDB | 指定 shuffle 服务本地数据库中使用的基于磁盘的存储。设置为 ROCKSDB 或 LEVELDB(已废弃)。 | 3.4.0 |
Spark UI
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.eventLog.logBlockUpdates.enabled |
false | 如果 spark.eventLog.enabled 为 true,是否记录每个块更新的事件。*警告*:这将大大增加事件日志的大小。 |
2.3.0 |
spark.eventLog.longForm.enabled |
false | 如果为 true,则在事件日志中使用长形式的调用站点。否则使用短形式。 | 2.4.0 |
spark.eventLog.compress |
true | 如果 spark.eventLog.enabled 为 true,是否压缩记录的事件。 |
1.0.0 |
spark.eventLog.compression.codec |
zstd | 用于压缩记录事件的编解码器。默认情况下,Spark 提供四种编解码器:lz4 、lzf 、snappy 和 zstd 。您还可以使用完全限定的类名指定编解码器,例如 org.apache.spark.io.LZ4CompressionCodec 、org.apache.spark.io.LZFCompressionCodec 、org.apache.spark.io.SnappyCompressionCodec 和 org.apache.spark.io.ZStdCompressionCodec 。 |
3.0.0 |
spark.eventLog.erasureCoding.enabled |
false | 是否允许事件日志使用擦除编码,或关闭擦除编码,无论文件系统默认值如何。在 HDFS 上,擦除编码文件更新速度不如常规复制文件快,因此应用程序更新需要更长时间才能出现在历史服务器中。请注意,即使此设置为 true,Spark 也不会强制文件使用擦除编码,它只会使用文件系统默认值。 | 3.0.0 |
spark.eventLog.dir |
file:///tmp/spark-events | 如果 spark.eventLog.enabled 为 true,则 Spark 事件记录的基本目录。在此基本目录中,Spark 为每个应用程序创建一个子目录,并将特定于应用程序的事件记录到此目录中。用户可能希望将其设置为统一位置,如 HDFS 目录,以便历史文件可以被历史服务器读取。 |
1.0.0 |
spark.eventLog.enabled |
false | 是否记录 Spark 事件,这对于应用程序完成后重建 Web UI 很有用。 | 1.0.0 |
spark.eventLog.overwrite |
false | 是否覆盖任何现有文件。 | 1.0.0 |
spark.eventLog.buffer.kb |
100k | 写入输出流时使用的缓冲区大小,除非另有说明,否则以 KiB 为单位。 | 1.0.0 |
spark.eventLog.rolling.enabled |
false | 是否启用事件日志文件滚动。如果设置为 true,则将每个事件日志文件截断为配置的大小。 | 3.0.0 |
spark.eventLog.rolling.maxFileSize |
128m | 当 spark.eventLog.rolling.enabled=true 时,指定事件日志文件在滚动之前允许的最大大小。 |
3.0.0 |
spark.ui.dagGraph.retainedRootRDDs |
Int.MaxValue | Spark UI 和状态 API 在垃圾回收之前记住的 DAG 图节点数量。 | 2.1.0 |
spark.ui.groupSQLSubExecutionEnabled |
true | 当子执行属于同一根执行时,是否在 SQL UI 中将它们分组 | 3.4.0 |
spark.ui.enabled |
true | 是否为 Spark 应用程序运行 web UI。 | 1.1.1 |
spark.ui.store.path |
无 | 用于缓存实时 UI 应用程序信息的本地目录。默认情况下未设置,这意味着所有应用程序信息将保留在内存中。 | 3.4.0 |
spark.ui.killEnabled |
true | 允许从 web UI 杀死作业和阶段。 | 1.0.0 |
spark.ui.threadDumpsEnabled |
true | 是否在“Stages”和“Executor”页面中显示执行器线程转储的链接。 | 1.2.0 |
spark.ui.threadDump.flamegraphEnabled |
true | 是否渲染执行器线程转储的火焰图。 | 4.0.0 |
spark.ui.heapHistogramEnabled |
true | 是否在“Executor”页面中显示执行器堆直方图的链接。 | 3.5.0 |
spark.ui.liveUpdate.period |
100ms | 实时实体更新的频率。-1 表示在重放应用程序时“从不更新”,这意味着只会发生最后一次写入。对于实时应用程序,这避免了一些在快速处理传入任务事件时可以避免的操作。 | 2.3.0 |
spark.ui.liveUpdate.minFlushPeriod |
1s | 过时的 UI 数据刷新之前的最小时间。这可以避免在传入任务事件不频繁时 UI 过时。 | 2.4.2 |
spark.ui.port |
4040 | 应用程序仪表板的端口,显示内存和工作负载数据。 | 0.7.0 |
spark.ui.retainedJobs |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的作业数量。这是一个目标最大值,在某些情况下可能会保留较少的元素。 | 1.2.0 |
spark.ui.retainedStages |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的阶段数量。这是一个目标最大值,在某些情况下可能会保留较少的元素。 | 0.9.0 |
spark.ui.retainedTasks |
100000 | Spark UI 和状态 API 在垃圾回收之前记住的单个阶段中的任务数量。这是一个目标最大值,在某些情况下可能会保留较少的元素。 | 2.0.1 |
spark.ui.reverseProxy |
false | 启用将 Spark Master 作为 worker 和应用程序 UI 的反向代理运行。在此模式下,Spark master 将反向代理 worker 和应用程序 UI,以实现在无需直接访问其主机的情况下进行访问。请谨慎使用,因为 worker 和应用程序 UI 将无法直接访问,您只能通过 Spark master/代理公共 URL 访问它们。此设置影响集群中运行的所有 worker 和应用程序 UI,并且必须在所有 worker、驱动程序和 master 上设置。 | 2.1.0 |
spark.ui.reverseProxyUrl |
如果 Spark UI 应通过另一个前端反向代理提供服务,则这是通过该反向代理访问 Spark master UI 的 URL。这在运行代理进行身份验证(例如 OAuth 代理)时非常有用。URL 可以包含路径前缀,例如 http://mydomain.com/path/to/spark/ ,允许您通过相同的虚拟主机和端口为多个 Spark 集群和其他 Web 应用程序提供 UI 服务。通常,这应该是一个包含方案 (http/https)、主机和端口的绝对 URL。也可以在此处指定以“/”开头的相对 URL。在这种情况下,Spark UI 和 Spark REST API 生成的所有 URL 都将是服务器相对链接——这仍然有效,因为整个 Spark UI 通过相同的主机和端口提供服务。此设置会影响 Spark UI 中的链接生成,但前端反向代理负责
spark.ui.reverseProxy 开启时才有效。当 Spark master web UI 可直接访问时,不需要此设置。请注意,此设置的值在按“/”分割后不能包含关键字 proxy 或 history 。Spark UI 依赖这两个关键字从 URI 获取 REST API 端点。 |
2.1.0 | |
spark.ui.proxyRedirectUri |
当 Spark 在代理后面运行时,重定向的地址。这将使 Spark 修改重定向响应,使其指向代理服务器,而不是 Spark UI 自己的地址。这应该只是服务器的地址,不包含应用程序的任何前缀路径;前缀应该由代理服务器本身设置(通过添加 X-Forwarded-Context 请求头),或者通过在 Spark 应用程序的配置中设置代理基本路径。 |
3.0.0 | |
spark.ui.showConsoleProgress |
false | 在控制台中显示进度条。进度条显示运行时间超过 500ms 的阶段的进度。如果同时运行多个阶段,则会在同一行显示多个进度条。 注意:在 shell 环境中,spark.ui.showConsoleProgress 的默认值为 true。 |
1.2.1 |
spark.ui.consoleProgress.update.interval |
200 | 在控制台中更新进度条的间隔(毫秒)。 | 2.1.0 |
spark.ui.custom.executor.log.url |
(无) | 指定自定义 Spark 执行器日志 URL,以支持外部日志服务,而不是在 Spark UI 中使用集群管理器的应用程序日志 URL。Spark 将通过模式支持一些路径变量,这些变量可能因集群管理器而异。请查看您的集群管理器的文档,以了解支持哪些模式(如果有)。 请注意,此配置还会替换事件日志中的原始日志 URL,当在历史服务器上访问应用程序时,此配置也有效。新的日志 URL 必须是永久的,否则执行器日志 URL 可能会出现死链接。 目前,只有 YARN 和 K8s 集群管理器支持此配置 |
3.0.0 |
spark.ui.prometheus.enabled |
true | 在驱动程序网页的 /metrics/executors/prometheus 处公开执行器指标。 | 3.0.0 |
spark.worker.ui.retainedExecutors |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的已完成执行器的数量。 | 1.5.0 |
spark.worker.ui.retainedDrivers |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的已完成驱动程序的数量。 | 1.5.0 |
spark.sql.ui.retainedExecutions |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的已完成执行的数量。 | 1.5.0 |
spark.streaming.ui.retainedBatches |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的已完成批次数量。 | 1.0.0 |
spark.ui.retainedDeadExecutors |
100 | Spark UI 和状态 API 在垃圾回收之前记住的已死亡执行器的数量。 | 2.0.0 |
spark.ui.filters |
无 | 要应用于 Spark Web UI 的过滤器类名列表,以逗号分隔。过滤器应为标准 javax servlet Filter。 过滤器参数也可以在配置中指定,通过设置形式为 spark.<filter 的类名>.param.<参数名>=<值> 的配置条目例如 spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
|
1.0.0 |
spark.ui.requestHeaderSize |
8k | HTTP 请求头的最大允许大小,除非另有说明,否则以字节为单位。此设置也适用于 Spark 历史服务器。 | 2.2.3 |
spark.ui.timelineEnabled |
true | 是否在 UI 页面上显示事件时间线数据。 | 3.4.0 |
spark.ui.timeline.executors.maximum |
250 | 事件时间线中显示的最大执行器数量。 | 3.2.0 |
spark.ui.timeline.jobs.maximum |
500 | 事件时间线中显示的最大作业数量。 | 3.2.0 |
spark.ui.timeline.stages.maximum |
500 | 事件时间线中显示的最大阶段数量。 | 3.2.0 |
spark.ui.timeline.tasks.maximum |
1000 | 事件时间线中显示的最大任务数量。 | 1.4.0 |
spark.appStatusStore.diskStoreDir |
无 | 用于存储 SQL 执行诊断信息的本地目录。此配置仅适用于实时 UI。 | 3.4.0 |
压缩与序列化
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.broadcast.compress |
true | 是否在发送广播变量之前对其进行压缩。通常是个好主意。压缩将使用 spark.io.compression.codec 。 |
0.6.0 |
spark.checkpoint.dir |
(无) | 设置检查点的默认目录。可以通过 SparkContext.setCheckpointDir 覆盖。 | 4.0.0 |
spark.checkpoint.compress |
false | 是否压缩 RDD 检查点。通常是个好主意。压缩将使用 spark.io.compression.codec 。 |
2.2.0 |
spark.io.compression.codec |
lz4 | 用于压缩内部数据(如 RDD 分区、事件日志、广播变量和 shuffle 输出)的编解码器。默认情况下,Spark 提供四种编解码器:lz4 、lzf 、snappy 和 zstd 。您还可以使用完全限定的类名指定编解码器,例如 org.apache.spark.io.LZ4CompressionCodec 、org.apache.spark.io.LZFCompressionCodec 、org.apache.spark.io.SnappyCompressionCodec 和 org.apache.spark.io.ZStdCompressionCodec 。 |
0.8.0 |
spark.io.compression.lz4.blockSize |
32k | 使用 LZ4 压缩编解码器时,LZ4 压缩中使用的块大小。降低此块大小也会降低使用 LZ4 时的 shuffle 内存使用。默认单位为字节,除非另有说明。此配置仅适用于 spark.io.compression.codec 。 |
1.4.0 |
spark.io.compression.snappy.blockSize |
32k | 使用 Snappy 压缩编解码器时,Snappy 压缩中的块大小。降低此块大小也会降低使用 Snappy 时的 shuffle 内存使用。默认单位为字节,除非另有说明。此配置仅适用于 spark.io.compression.codec 。 |
1.4.0 |
spark.io.compression.zstd.level |
1 | Zstd 压缩编解码器的压缩级别。提高压缩级别将导致更好的压缩,但会消耗更多的 CPU 和内存。此配置仅适用于 spark.io.compression.codec 。 |
2.3.0 |
spark.io.compression.zstd.bufferSize |
32k | 使用 Zstd 压缩编解码器时,Zstd 压缩中使用的缓冲区大小(以字节为单位)。降低此大小将降低使用 Zstd 时的 shuffle 内存使用,但可能会因过多的 JNI 调用开销而增加压缩成本。此配置仅适用于 spark.io.compression.codec 。 |
2.3.0 |
spark.io.compression.zstd.bufferPool.enabled |
true | 如果为 true,则启用 ZSTD JNI 库的缓冲区池。 | 3.2.0 |
spark.io.compression.zstd.workers |
0 | 使用 Zstd 并行压缩时生成的线程大小。当值为 0 时不生成 worker,它在单线程模式下工作。当值 > 0 时,它触发异步模式,生成相应数量的线程。更多的 worker 可以提高性能,但也会增加内存成本。 | 4.0.0 |
spark.io.compression.lzf.parallel.enabled |
false | 当为 true 时,LZF 压缩将使用多线程并行压缩数据。 | 4.0.0 |
spark.kryo.classesToRegister |
(无) | 如果您使用 Kryo 序列化,请提供一个逗号分隔的自定义类名列表以向 Kryo 注册。有关更多详细信息,请参阅 调优指南。 | 1.2.0 |
spark.kryo.referenceTracking |
true | 使用 Kryo 序列化数据时是否跟踪对同一对象的引用,如果您的对象图存在循环,这是必需的;如果它们包含同一对象的多个副本,这对于提高效率很有用。如果您确定不是这种情况,可以禁用此选项以提高性能。 | 0.8.0 |
spark.kryo.registrationRequired |
false | 是否要求向 Kryo 注册。如果设置为“true”,如果序列化未注册的类,Kryo 将抛出异常。如果设置为 false(默认值),Kryo 将随每个对象写入未注册的类名。写入类名可能会导致显著的性能开销,因此启用此选项可以严格强制用户没有遗漏注册类。 | 1.1.0 |
spark.kryo.registrator |
(无) | 如果您使用 Kryo 序列化,请提供一个逗号分隔的类列表,用于向 Kryo 注册您的自定义类。此属性在您需要以自定义方式注册类(例如,指定自定义字段序列化器)时很有用。否则,spark.kryo.classesToRegister 更简单。它应该设置为扩展 KryoRegistrator 的类。有关更多详细信息,请参阅 调优指南。 |
0.5.0 |
spark.kryo.unsafe |
true | 是否使用基于 unsafe 的 Kryo 序列化器。通过使用基于 Unsafe 的 IO,可以显著提高速度。 | 2.1.0 |
spark.kryoserializer.buffer.max |
64m | Kryo 序列化缓冲区的最大允许大小,除非另有说明,否则以 MiB 为单位。此值必须大于您尝试序列化的任何对象,并且必须小于 2048m。如果您在 Kryo 内部收到“缓冲区限制超出”异常,请增加此值。 | 1.4.0 |
spark.kryoserializer.buffer |
64k | Kryo 序列化缓冲区的初始大小,除非另有说明,否则以 KiB 为单位。请注意,每个 worker 上每个核心将有一个缓冲区。此缓冲区在需要时将增长到 spark.kryoserializer.buffer.max 。 |
1.4.0 |
spark.rdd.compress |
false | 是否压缩序列化的 RDD 分区(例如,Java 和 Scala 中的 StorageLevel.MEMORY_ONLY_SER 或 Python 中的 StorageLevel.MEMORY_ONLY )。可以在牺牲一些额外 CPU 时间的情况下节省大量空间。压缩将使用 spark.io.compression.codec 。 |
0.6.0 |
spark.serializer |
org.apache.spark.serializer. JavaSerializer |
用于序列化将通过网络发送或需要以序列化形式缓存的对象的类。默认的 Java 序列化适用于任何可序列化的 Java 对象,但速度相当慢,因此当需要速度时,我们建议使用 org.apache.spark.serializer.KryoSerializer 并配置 Kryo 序列化。可以是 org.apache.spark.Serializer 的任何子类。 |
0.5.0 |
spark.serializer.objectStreamReset |
100 | 使用 org.apache.spark.serializer.JavaSerializer 进行序列化时,序列化器会缓存对象以防止写入冗余数据,但这会阻止这些对象的垃圾回收。通过调用“reset”,您可以从序列化器中刷新该信息,并允许旧对象被回收。要关闭此定期重置,请将其设置为 -1。默认情况下,它将每 100 个对象重置序列化器。 | 1.0.0 |
内存管理
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.memory.fraction |
0.6 | 用于执行和存储的(堆空间 - 300MB)的比例。此值越低,溢出和缓存数据逐出发生的频率越高。此配置的目的是为内部元数据、用户数据结构以及在稀疏、异常大的记录情况下的不精确大小估计预留内存。建议保留默认值。有关更多详细信息,包括正确调整 JVM 垃圾回收以增加此值的重要信息,请参阅此说明。 | 1.6.0 |
spark.memory.storageFraction |
0.5 | 不受逐出影响的存储内存量,表示为由 spark.memory.fraction 预留区域大小的一部分。此值越高,可用于执行的工作内存可能越少,任务可能更频繁地溢出到磁盘。建议保留默认值。有关更多详细信息,请参阅此说明。 |
1.6.0 |
spark.memory.offHeap.enabled |
false | 如果为 true,Spark 将尝试将堆外内存用于某些操作。如果启用堆外内存使用,则 spark.memory.offHeap.size 必须为正值。 |
1.6.0 |
spark.memory.offHeap.size |
0 | 可用于堆外分配的绝对内存量,除非另有说明,否则以字节为单位。此设置对堆内存使用没有影响,因此如果您的执行器的总内存消耗必须在某个硬限制内,请务必相应地缩小您的 JVM 堆大小。当 spark.memory.offHeap.enabled=true 时,此值必须设置为正值。 |
1.6.0 |
spark.storage.unrollMemoryThreshold |
1024 * 1024 | 在展开任何块之前请求的初始内存。 | 1.1.0 |
spark.storage.replication.proactive |
true | 启用 RDD 块的主动块复制。因执行器故障而丢失的缓存 RDD 块副本,如果存在任何可用的现有副本,则会进行补充。这试图将块的复制级别恢复到初始数量。 | 2.2.0 |
spark.storage.localDiskByExecutors.cacheSize |
1000 | 存储本地目录的执行器的最大数量。此大小同时应用于驱动程序和执行器端,以避免无限存储。此缓存将用于在从同一主机获取磁盘持久化 RDD 块或 shuffle 块(当设置了 spark.shuffle.readHostLocalDisk 时)的情况下避免网络开销。 |
3.0.0 |
spark.cleaner.periodicGC.interval |
30min | 控制触发垃圾回收的频率。 此上下文清理器仅在弱引用被垃圾回收时触发清理。在长时间运行且具有大型驱动程序 JVM 的应用程序中,如果驱动程序上的内存压力很小,这可能很少发生或根本不发生。完全不清理可能导致执行器在一段时间后耗尽磁盘空间。 |
1.6.0 |
spark.cleaner.referenceTracking |
true | 启用或禁用上下文清理。 | 1.0.0 |
spark.cleaner.referenceTracking.blocking |
true | 控制清理线程是否应阻塞在清理任务上(shuffle 除外,其由 spark.cleaner.referenceTracking.blocking.shuffle Spark 属性控制)。 |
1.0.0 |
spark.cleaner.referenceTracking.blocking.shuffle |
false | 控制清理线程是否应阻塞在 shuffle 清理任务上。 | 1.1.1 |
spark.cleaner.referenceTracking.cleanCheckpoints |
false | 控制引用超出范围时是否清理检查点文件。 | 1.4.0 |
执行行为
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.broadcast.blockSize |
4m | TorrentBroadcastFactory 的每个块的块大小,除非另有说明,否则以 KiB 为单位。值过大在广播期间会降低并行度(使其变慢);但是,如果值过小,BlockManager 可能会受到性能影响。 |
0.5.0 |
spark.broadcast.checksum |
true | 是否为广播启用校验和。如果启用,广播将包含校验和,这有助于检测损坏的块,但会增加计算和发送的数据量。如果网络有其他机制保证广播期间数据不会损坏,则可以禁用它。 | 2.1.1 |
spark.broadcast.UDFCompressionThreshold |
1 * 1024 * 1024 | 用户定义函数 (UDF) 和 Python RDD 命令通过广播压缩的阈值(以字节为单位),除非另有说明。 | 3.0.0 |
spark.executor.cores |
YARN 模式下为 1,Standalone 模式下为 worker 上所有可用核心。 | 每个执行器使用的核心数量。 | 1.0.0 |
spark.default.parallelism |
对于像 reduceByKey 和 join 这样的分布式 shuffle 操作,是父 RDD 中最大的分区数。对于像 parallelize 这样没有父 RDD 的操作,它取决于集群管理器。
|
用户未设置时,由 join 、reduceByKey 和 parallelize 等转换操作返回的 RDD 的默认分区数。 |
0.5.0 |
spark.executor.heartbeatInterval |
10 秒 | 每个执行器向驱动程序发送心跳的间隔。心跳让驱动程序知道执行器仍然存活,并用正在进行的任务的指标更新驱动程序。spark.executor.heartbeatInterval 应显著小于 spark.network.timeout。 | 1.1.0 |
spark.files.fetchTimeout |
60 秒 | 从驱动程序获取通过 SparkContext.addFile() 添加的文件时使用的通信超时时间。 | 1.0.0 |
spark.files.useFetchCache |
true | 如果设置为 true(默认),文件获取将使用属于同一应用程序的执行器共享的本地缓存,这可以在同一主机上运行许多执行器时提高任务启动性能。如果设置为 false,这些缓存优化将被禁用,并且所有执行器将获取自己的文件副本。为了使用位于 NFS 文件系统上的 Spark 本地目录,可以禁用此优化(有关更多详细信息,请参阅 SPARK-6313)。 | 1.2.2 |
spark.files.overwrite |
false | 是否覆盖启动时存在的任何文件。即使此选项设置为 true ,用户也无法覆盖之前通过 SparkContext.addFile 或 SparkContext.addJar 添加的文件。 |
1.0.0 |
spark.files.ignoreCorruptFiles |
false | 是否忽略损坏的文件。如果为 true,Spark 作业在遇到损坏或不存在的文件时将继续运行,并且已读取的内容仍将返回。 | 2.1.0 |
spark.files.ignoreMissingFiles |
false | 是否忽略缺失的文件。如果为 true,Spark 作业在遇到缺失的文件时将继续运行,并且已读取的内容仍将返回。 | 2.4.0 |
spark.files.maxPartitionBytes |
134217728 (128 MiB) | 读取文件时要打包到单个分区中的最大字节数。 | 2.1.0 |
spark.files.openCostInBytes |
4194304 (4 MiB) | 打开文件的估计成本,通过同时可扫描的字节数来衡量。这用于将多个文件放入一个分区时。最好高估,这样小文件分区将比大文件分区更快。 | 2.1.0 |
spark.hadoop.cloneConf |
false | 如果设置为 true,则为每个任务克隆一个新的 Hadoop Configuration 对象。应启用此选项以解决 Configuration 线程安全问题(有关更多详细信息,请参阅 SPARK-2546)。默认情况下禁用此选项,以避免对不受这些问题影响的作业造成意外的性能退化。 |
1.0.3 |
spark.hadoop.validateOutputSpecs |
true | 如果设置为 true,则验证 saveAsHadoopFile 和其他变体中使用的输出规范(例如,检查输出目录是否已存在)。可以禁用此选项以抑制由于预先存在的输出目录引起的异常。我们建议用户不要禁用此选项,除非是为了实现与 Spark 早期版本的兼容性。只需手动使用 Hadoop 的 FileSystem API 删除输出目录。此设置对于通过 Spark Streaming 的 StreamingContext 生成的作业将被忽略,因为在检查点恢复期间可能需要将数据重写到预先存在的输出目录。 | 1.0.1 |
spark.storage.memoryMapThreshold |
2m | 当从磁盘读取块时,Spark 内存映射的块大小阈值。默认单位是字节,除非另有说明。这可以防止 Spark 内存映射非常小的块。通常,内存映射对于接近或小于操作系统页面大小的块开销很高。 | 0.9.2 |
spark.storage.decommission.enabled |
false | 停用执行器时是否停用块管理器。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.enabled |
true | 块管理器停用期间是否传输 shuffle 块。需要可迁移的 shuffle 解析器(如基于排序的 shuffle)。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.maxThreads |
8 | 迁移 shuffle 文件时使用的最大线程数。 | 3.1.0 |
spark.storage.decommission.rddBlocks.enabled |
true | 块管理器停用期间是否传输 RDD 块。 | 3.1.0 |
spark.storage.decommission.fallbackStorage.path |
(无) | 块管理器停用期间备用存储的位置。例如,s3a://spark-storage/ 。如果为空,则禁用备用存储。存储应由 TTL 管理,因为 Spark 不会清理它。 |
3.1.0 |
spark.storage.decommission.fallbackStorage.cleanUp |
false | 如果为 true,Spark 在关闭期间会清理其备用存储数据。 | 3.2.0 |
spark.storage.decommission.shuffleBlocks.maxDiskSize |
(无) | 在拒绝远程 shuffle 块之前,用于存储 shuffle 块的最大磁盘空间。拒绝远程 shuffle 块意味着执行器将不会接收任何 shuffle 迁移,如果没有其他执行器可用于迁移,则 shuffle 块将丢失,除非配置了 spark.storage.decommission.fallbackStorage.path 。 |
3.2.0 |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version |
1 | 文件输出提交器算法版本,有效的算法版本号:1 或 2。请注意,2 可能会导致正确性问题,如 MAPREDUCE-7282。 | 2.2.0 |
执行器指标
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.eventLog.logStageExecutorMetrics |
false | 是否将每个执行器的每阶段指标峰值写入事件日志。 注意: 指标是在执行器心跳中轮询(收集)并发送的,这是始终进行的;此配置仅用于确定是否将聚合指标峰值写入事件日志。 |
3.0.0 |
spark.executor.processTreeMetrics.enabled |
false | 收集执行器指标时是否收集进程树指标(来自 /proc 文件系统)。 注意: 仅当 /proc 文件系统存在时才收集进程树指标。 |
3.0.0 |
spark.executor.metrics.pollingInterval |
0 | 收集执行器指标的频率(毫秒)。 如果为 0,则在执行器心跳时(即在由 spark.executor.heartbeatInterval 指定的心跳间隔)进行轮询。如果为正数,则在此间隔进行轮询。 |
3.0.0 |
spark.eventLog.gcMetrics.youngGenerationGarbageCollectors |
Copy,PS Scavenge,ParNew,G1 Young Generation | 支持的年轻代垃圾收集器的名称。名称通常是 GarbageCollectorMXBean.getName 的返回值。内置的年轻代垃圾收集器有 Copy、PS Scavenge、ParNew、G1 Young Generation。 | 3.0.0 |
spark.eventLog.gcMetrics.oldGenerationGarbageCollectors |
MarkSweepCompact,PS MarkSweep,ConcurrentMarkSweep,G1 Old Generation | 支持的老年代垃圾收集器的名称。名称通常是 GarbageCollectorMXBean.getName 的返回值。内置的老年代垃圾收集器有 MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation。 | 3.0.0 |
spark.executor.metrics.fileSystemSchemes |
file,hdfs | 在执行器指标中报告的文件系统方案。 | 3.1.0 |
网络
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.rpc.message.maxSize |
128 | “控制平面”通信中允许的最大消息大小(以 MiB 为单位);通常仅适用于执行器和驱动程序之间发送的 map 输出大小信息。如果您正在运行包含数千个 map 和 reduce 任务的作业并看到有关 RPC 消息大小的消息,请增加此值。 | 2.0.0 |
spark.blockManager.port |
(随机) | 所有块管理器监听的端口。这些存在于驱动程序和执行器上。 | 1.1.0 |
spark.driver.blockManager.port |
(spark.blockManager.port 的值) | 驱动程序专用的块管理器监听端口,用于不能与执行器使用相同配置的情况。 | 2.1.0 |
spark.driver.bindAddress |
(spark.driver.host 的值) | 绑定监听套接字的主机名或 IP 地址。此配置会覆盖 SPARK_LOCAL_IP 环境变量(见下文)。 它还允许将与本地地址不同的地址通告给执行器或外部系统。例如,在运行具有桥接网络的容器时,这很有用。为了使其正常工作,驱动程序使用的不同端口(RPC、块管理器和 UI)需要从容器的主机转发。 |
2.1.0 |
spark.driver.host |
(本地主机名) | 驱动程序的主机名或 IP 地址。这用于与执行器和 Standalone Master 通信。 | 0.7.0 |
spark.driver.port |
(随机) | 驱动程序监听的端口。这用于与执行器和 Standalone Master 通信。 | 0.7.0 |
spark.rpc.io.backLog |
64 | RPC 服务器的接受队列长度。对于大型应用程序,可能需要增加此值,以便在短时间内到达大量连接时不会丢弃传入连接。 | 3.0.0 |
spark.network.timeout |
120 秒 | 所有网络交互的默认超时时间。如果未配置 spark.storage.blockManagerHeartbeatTimeoutMs 、spark.shuffle.io.connectionTimeout 、spark.rpc.askTimeout 或 spark.rpc.lookupTimeout ,则将使用此配置。 |
1.3.0 |
spark.network.timeoutInterval |
60 秒 | 驱动程序检查和过期死亡执行器的间隔。 | 1.3.2 |
spark.network.io.preferDirectBufs |
true | 如果启用,共享分配器将优先使用堆外缓冲区分配。堆外缓冲区用于减少 shuffle 和缓存块传输期间的垃圾回收。对于堆外内存严格受限的环境,用户可能希望关闭此选项以强制所有分配都在堆内。 | 3.0.0 |
spark.port.maxRetries |
16 | 绑定端口时放弃前的最大重试次数。当给定端口特定值(非 0)时,每次后续重试将在重试之前将上次尝试中使用的端口增加 1。这实际上允许它尝试从指定起始端口到端口 + maxRetries 的一系列端口。 | 1.1.1 |
spark.rpc.askTimeout |
spark.network.timeout |
RPC ask 操作在超时前等待的时长。 | 1.4.0 |
spark.rpc.lookupTimeout |
120 秒 | RPC 远程端点查找操作在超时前等待的时长。 | 1.4.0 |
spark.network.maxRemoteBlockSizeFetchToMem |
200m | 当远程块的大小超过此字节阈值时,将将其获取到磁盘。这是为了避免巨大的请求占用过多内存。请注意,此配置将影响 shuffle 获取和块管理器远程块获取。对于启用外部 shuffle 服务的用户,此功能仅在外部 shuffle 服务版本至少为 2.3.0 时才能工作。 | 3.0.0 |
spark.rpc.io.connectionTimeout |
spark.network.timeout 的值 |
RPC 对等体之间已建立连接的超时时间,如果存在未完成的 RPC 请求但通道上至少在 connectionTimeout 时间内没有流量,则连接将被标记为空闲并关闭。 |
1.2.0 |
spark.rpc.io.connectionCreationTimeout |
spark.rpc.io.connectionTimeout 的值 |
RPC 对等体之间建立连接的超时时间。 | 3.2.0 |
调度
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.cores.max |
(未设置) | 在 Standalone 部署集群上运行时,为应用程序从整个集群(而不是每台机器)请求的最大 CPU 核心数。如果未设置,默认值将是 Spark Standalone 集群管理器上的 spark.deploy.defaultCores 。 |
0.6.0 |
spark.locality.wait |
3 秒 | 在放弃并在本地性较差的节点上启动任务之前,等待启动数据本地任务的时长。相同的等待时间将用于逐步通过多个本地性级别(进程本地、节点本地、机架本地,然后是任意)。也可以通过设置 spark.locality.wait.node 等来自定义每个级别的等待时间。如果您的任务运行时间长且本地性较差,您应该增加此设置,但默认值通常效果很好。 |
0.5.0 |
spark.locality.wait.node |
spark.locality.wait | 自定义节点本地性的本地性等待时间。例如,您可以将其设置为 0 以跳过节点本地性并立即搜索机架本地性(如果您的集群具有机架信息)。 | 0.8.0 |
spark.locality.wait.process |
spark.locality.wait | 自定义进程本地性的本地性等待时间。这会影响尝试访问特定执行器进程中缓存数据的任务。 | 0.8.0 |
spark.locality.wait.rack |
spark.locality.wait | 自定义机架本地性的本地性等待时间。 | 0.8.0 |
spark.scheduler.maxRegisteredResourcesWaitingTime |
30 秒 | 在调度开始之前,等待资源注册的最长时间。 | 1.1.1 |
spark.scheduler.minRegisteredResourcesRatio |
KUBERNETES 模式为 0.8;YARN 模式为 0.8;Standalone 模式为 0.0 | 在调度开始之前等待注册资源的最小比率(注册资源 / 总预期资源)(资源在 YARN 模式和 Kubernetes 模式下是执行器,在 Standalone 模式下是 CPU 核心)。指定为 0.0 到 1.0 之间的双精度浮点数。无论是否达到最小资源比率,在调度开始之前等待的最长时间由配置 spark.scheduler.maxRegisteredResourcesWaitingTime 控制。 |
1.1.1 |
spark.scheduler.mode |
FIFO | 提交到同一 SparkContext 的作业之间的调度模式。可以设置为 FAIR 以使用公平共享,而不是一个接一个地排队作业。这对于多用户服务很有用。 |
0.8.0 |
spark.scheduler.revive.interval |
1s | 调度器恢复 worker 资源以运行任务的间隔长度。 | 0.8.1 |
spark.scheduler.listenerbus.eventqueue.capacity |
10000 | 事件队列的默认容量。Spark 将首先尝试使用 spark.scheduler.listenerbus.eventqueue.queueName.capacity 指定的容量初始化事件队列。如果未配置,Spark 将使用此配置指定的默认容量。请注意,容量必须大于 0。如果监听器事件被丢弃,请考虑增加值(例如 20000)。增加此值可能会导致驱动程序使用更多内存。 |
2.3.0 |
spark.scheduler.listenerbus.eventqueue.shared.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 监听器总线中共享事件队列的容量,该队列保存注册到监听器总线的外部监听器的事件。如果与共享队列对应的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.appStatus.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
appStatus 事件队列的容量,该队列保存内部应用程序状态监听器的事件。如果与 appStatus 队列对应的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 监听器总线中 executorManagement 事件队列的容量,该队列保存内部执行器管理监听器的事件。如果与 executorManagement 队列对应的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.eventLog.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 监听器总线中 eventLog 队列的容量,该队列保存将事件写入 eventLogs 的事件日志监听器的事件。如果与 eventLog 队列对应的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.streams.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 监听器总线中 streams 队列的容量,该队列保存内部流式处理监听器的事件。如果与 streams 队列对应的监听器事件被丢弃,请考虑增加此值。增加此值可能会导致驱动程序使用更多内存。 | 3.0.0 |
spark.scheduler.resource.profileMergeConflicts |
false | 如果设置为 "true",当不同配置文件在合并到单个阶段的 RDD 中指定时,Spark 将合并 ResourceProfiles。合并时,Spark 会选择每个资源的最大值并创建一个新的 ResourceProfile。默认值为 false,如果同一阶段中的 RDD 包含多个不同的 ResourceProfiles,Spark 将抛出异常。 | 3.1.0 |
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout |
120 秒 | 在中止因所有执行器因任务失败而被排除而无法调度的 TaskSet 之前,等待获取新执行器并调度任务的超时时间(秒)。 | 2.4.1 |
spark.standalone.submit.waitAppCompletion |
false | 如果设置为 true,当不同配置文件在合并到单个阶段的 RDD 中指定时,Spark 将合并 ResourceProfiles。合并时,Spark 会选择每个资源的最大值并创建一个新的 ResourceProfile。默认值为 false,如果同一阶段中的 RDD 包含多个不同的 ResourceProfiles,Spark 将抛出异常。 | 3.1.0 |
spark.excludeOnFailure.enabled |
false | 如果设置为 "true",则阻止 Spark 在因任务失败过多而被排除的执行器上调度任务。排除执行器和节点的算法可以通过其他 "spark.excludeOnFailure" 配置选项进一步控制。此配置将被 "spark.excludeOnFailure.application.enabled" 和 "spark.excludeOnFailure.taskAndStage.enabled" 覆盖,以指定在单个级别上启用排除。 | 2.1.0 |
spark.excludeOnFailure.application.enabled |
false | 如果设置为 "true",则启用因任务失败过多而排除整个应用程序的执行器,并阻止 Spark 在其上调度任务。此配置会覆盖 "spark.excludeOnFailure.enabled"。 | 4.0.0 |
spark.excludeOnFailure.taskAndStage.enabled |
false | 如果设置为 "true",则启用在任务集级别排除执行器,因为任务失败过多,并阻止 Spark 在其上调度任务。此配置会覆盖 "spark.excludeOnFailure.enabled"。 | 4.0.0 |
spark.excludeOnFailure.timeout |
1 小时 | (实验性)节点或执行器在整个应用程序中被排除的时长,在此之后它将无条件地从排除列表中移除,以尝试运行新任务。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor |
1 | (实验性)对于给定任务,在执行器因该任务而被排除之前,可以在一个执行器上重试的次数。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerNode |
2 | (实验性)对于给定任务,在整个节点因该任务而被排除之前,可以在一个节点上重试的次数。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedTasksPerExecutor |
2 | (实验性)在一个执行器上,在一个阶段内,必须有多少个不同的任务失败,该执行器才会被排除在该阶段。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode |
2 | (实验性)在给定阶段中,有多少个不同的执行器被标记为排除,整个节点才会被标记为该阶段失败。 | 2.1.0 |
spark.excludeOnFailure.application.maxFailedTasksPerExecutor |
2 | (实验性)在一个执行器上,在成功的任务集中,有多少个不同的任务必须失败,该执行器才会被排除在整个应用程序之外。被排除的执行器将在 spark.excludeOnFailure.timeout 指定的超时后自动重新添加到可用资源池中。但请注意,在动态分配的情况下,执行器可能会被标记为空闲并被集群管理器回收。 |
2.2.0 |
spark.excludeOnFailure.application.maxFailedExecutorsPerNode |
2 | (实验性)在整个应用程序中,有多少个不同的执行器必须被排除,该节点才会被排除在整个应用程序之外。被排除的节点将在 spark.excludeOnFailure.timeout 指定的超时后自动重新添加到可用资源池中。但请注意,在动态分配的情况下,节点上的执行器可能会被标记为空闲并被集群管理器回收。 |
2.2.0 |
spark.excludeOnFailure.killExcludedExecutors |
false | (实验性)如果设置为 "true",则允许 Spark 在执行器因获取失败而被排除或因整个应用程序而被排除时(由 spark.killExcludedExecutors.application.* 控制)自动杀死执行器。请注意,当整个节点被排除时,该节点上的所有执行器都将被杀死。 | 2.2.0 |
spark.excludeOnFailure.application.fetchFailure.enabled |
false | (实验性)如果设置为 "true",当发生获取失败时,Spark 将立即排除该执行器。如果启用了外部 shuffle 服务,则整个节点将被排除。 | 2.3.0 |
spark.speculation |
false | 如果设置为 "true",则执行任务的推测执行。这意味着如果一个或多个任务在某个阶段中运行缓慢,它们将被重新启动。 | 0.6.0 |
spark.speculation.interval |
100ms | Spark 检查任务以进行推测的频率。 | 0.6.0 |
spark.speculation.multiplier |
3 | 任务比中位数慢多少倍才会被考虑进行推测。 | 0.6.0 |
spark.speculation.quantile |
0.9 | 在某个阶段启用推测之前必须完成的任务比例。 | 0.6.0 |
spark.speculation.minTaskRuntime |
100ms | 任务在被考虑进行推测之前运行的最短时间。这可用于避免启动非常短的任务的推测副本。 | 3.2.0 |
spark.speculation.task.duration.threshold |
无 | 调度器尝试推测性运行任务的任务持续时间。如果提供,如果当前阶段的任务数小于或等于单个执行器上的槽位数且任务花费的时间超过阈值,则将推测性运行任务。此配置有助于推测任务很少的阶段。如果执行器槽位足够大,常规推测配置也可能适用。例如,即使未达到阈值,如果有足够的成功运行,任务也可能重新启动。槽位数根据 spark.executor.cores 和 spark.task.cpus 的配置值(最小为 1)计算。默认单位是字节,除非另有说明。 | 3.0.0 |
spark.speculation.efficiency.processRateMultiplier |
0.75 | 评估低效任务时使用的乘数。乘数越高,越多的任务可能被认为是低效的。 | 3.4.0 |
spark.speculation.efficiency.longRunTaskFactor |
2 | 只要任务持续时间超过乘数和时间阈值(spark.speculation.multiplier * successfulTaskDurations.median 或 spark.speculation.minTaskRuntime )的乘积值,无论其数据处理速率是否良好,任务都将被推测。这避免了在任务慢与数据处理速率无关时错过低效任务。 |
3.4.0 |
spark.speculation.efficiency.enabled |
true | 当设置为 true 时,Spark 将通过阶段任务指标或其持续时间评估任务处理效率,并且只需推测低效任务。当任务 1)其数据处理速率小于该阶段所有成功任务的平均数据处理速率乘以一个乘数,或者 2)其持续时间已超过 spark.speculation.efficiency.longRunTaskFactor 和时间阈值(spark.speculation.multiplier * successfulTaskDurations.median 或 spark.speculation.minTaskRuntime )的乘积值时,该任务是低效的。 |
3.4.0 |
spark.task.cpus |
1 | 为每个任务分配的核心数。 | 0.5.0 |
spark.task.resource.{resourceName}.amount |
1 | 为每个任务分配的特定资源类型的数量,请注意这可以是双精度浮点数。如果指定此项,您还必须提供执行器配置 spark.executor.resource.{resourceName}.amount 以及任何相应的发现配置,以便您的执行器使用该资源类型创建。除了整数数量外,还可以指定分数数量(例如,0.25,表示资源的 1/4)。分数数量必须小于或等于 0.5,换句话说,资源共享的最小数量是每个资源 2 个任务。此外,分数数量会向下取整以便分配资源槽位(例如,0.2222 配置,或 1/0.2222 槽位将变为每个资源 4 个任务,而不是 5 个)。 |
3.0.0 |
spark.task.maxFailures |
4 | 在放弃作业之前,任何特定任务连续失败的次数。分散在不同任务中的失败总数不会导致作业失败;特定任务必须连续失败此尝试次数。如果任何尝试成功,则该任务的失败计数将重置。应大于或等于 1。允许的重试次数 = 此值 - 1。 | 0.8.0 |
spark.task.reaper.enabled |
false | 启用对被终止/中断任务的监控。当设置为 true 时,任何被终止的任务将由执行器监控,直到该任务实际完成执行。有关如何控制此监控确切行为的详细信息,请参阅其他 spark.task.reaper.* 配置。当设置为 false(默认值)时,任务终止将使用较旧的代码路径,该路径缺乏此类监控。 |
2.0.3 |
spark.task.reaper.pollingInterval |
10 秒 | 当 spark.task.reaper.enabled = true 时,此设置控制执行器轮询被终止任务状态的频率。如果被终止的任务在轮询时仍在运行,则会记录警告,并且默认情况下会记录任务的线程转储(可以通过 spark.task.reaper.threadDump 设置禁用此线程转储,如下所述)。 |
2.0.3 |
spark.task.reaper.threadDump |
true | 当 spark.task.reaper.enabled = true 时,此设置控制在定期轮询被终止任务期间是否记录任务线程转储。将其设置为 false 以禁用线程转储的收集。 |
2.0.3 |
spark.task.reaper.killTimeout |
-1 | 当 spark.task.reaper.enabled = true 时,此设置指定一个超时时间,在此之后,如果被终止的任务尚未停止运行,执行器 JVM 将自行终止。默认值 -1 禁用此机制并阻止执行器自毁。此设置的目的是作为安全网,防止失控的不可取消任务使执行器无法使用。 |
2.0.3 |
spark.stage.maxConsecutiveAttempts |
4 | 在阶段中止之前允许的连续阶段尝试次数。 | 2.2.0 |
spark.stage.ignoreDecommissionFetchFailure |
true | 计算 spark.stage.maxConsecutiveAttempts 时,是否忽略由执行器停用引起的阶段获取失败。 |
3.4.0 |
屏障执行模式
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.barrier.sync.timeout |
365 天 | 屏障任务中每次 barrier() 调用的超时时间(秒)。如果协调器在配置的时间内没有收到所有屏障任务的同步消息,则抛出 SparkException 以使所有任务失败。默认值设置为 31536000(3600 * 24 * 365),因此 barrier() 调用将等待一年。 |
2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.interval |
15 秒 | 最大并发任务检查失败与下一次检查之间等待的时间(秒)。最大并发任务检查确保集群在提交作业时可以启动比屏障阶段所需更多的并发任务。如果集群刚刚启动并且没有足够的执行器注册,检查可能会失败,因此我们等待一段时间并尝试再次执行检查。如果检查失败的次数超过作业配置的最大失败次数,则使当前作业提交失败。请注意,此配置仅适用于包含一个或多个屏障阶段的作业,我们不会对非屏障作业执行检查。 | 2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures |
40 | 在作业提交失败之前允许的最大并发任务检查失败次数。最大并发任务检查确保集群在提交作业时可以启动比屏障阶段所需更多的并发任务。如果集群刚刚启动并且没有足够的执行器注册,检查可能会失败,因此我们等待一段时间并尝试再次执行检查。如果检查失败的次数超过作业配置的最大失败次数,则使当前作业提交失败。请注意,此配置仅适用于包含一个或多个屏障阶段的作业,我们不会对非屏障作业执行检查。 | 2.4.0 |
动态分配
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.dynamicAllocation.enabled |
false | 是否使用动态资源分配,它根据工作负载动态增减为此应用程序注册的执行器数量。有关更多详细信息,请参阅此处的说明。 这需要满足以下条件之一:1) 通过 spark.shuffle.service.enabled 启用外部 shuffle 服务,或 2) 通过 spark.dynamicAllocation.shuffleTracking.enabled 启用 shuffle 跟踪,或 3) 通过 spark.decommission.enabled 和 spark.storage.decommission.shuffleBlocks.enabled 启用 shuffle 块停用,或 4)(实验性)配置 spark.shuffle.sort.io.plugin.class 以使用其 ShuffleDriverComponents 支持可靠存储的自定义 ShuffleDataIO 。以下配置也相关:spark.dynamicAllocation.minExecutors 、spark.dynamicAllocation.maxExecutors 和 spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio |
1.2.0 |
spark.dynamicAllocation.executorIdleTimeout |
60 秒 | 如果启用了动态分配,并且执行器空闲时间超过此持续时间,则将移除该执行器。有关更多详细信息,请参阅此处的说明。 | 1.2.0 |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
无限 | 如果启用了动态分配,并且具有缓存数据块的执行器空闲时间超过此持续时间,则将移除该执行器。有关更多详细信息,请参阅此处的说明。 | 1.4.0 |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
如果启用了动态分配,要运行的初始执行器数量。 如果设置了 --num-executors (或 spark.executor.instances )并且大于此值,则它将用作初始执行器数量。 |
1.3.0 |
spark.dynamicAllocation.maxExecutors |
无限 | 如果启用了动态分配,执行器数量的上限。 | 1.2.0 |
spark.dynamicAllocation.minExecutors |
0 | 如果启用了动态分配,执行器数量的下限。 | 1.2.0 |
spark.dynamicAllocation.executorAllocationRatio |
1 | 默认情况下,动态分配将请求足够的执行器,以根据要处理的任务数量最大化并行度。虽然这最大限度地减少了作业的延迟,但对于小型任务,此设置可能会由于执行器分配开销而浪费大量资源,因为某些执行器甚至可能不执行任何工作。此设置允许设置一个比率,该比率将用于相对于完全并行度来减少执行器的数量。默认为 1.0 以提供最大并行度。0.5 将目标执行器数量除以 2。动态分配计算的目标执行器数量仍可由 spark.dynamicAllocation.minExecutors 和 spark.dynamicAllocation.maxExecutors 设置覆盖。 |
2.4.0 |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | 如果启用了动态分配,并且积压的待处理任务超过此持续时间,将请求新的执行器。有关更多详细信息,请参阅此处的说明。 | 1.2.0 |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
与 spark.dynamicAllocation.schedulerBacklogTimeout 相同,但仅用于后续执行器请求。有关更多详细信息,请参阅此处的说明。 |
1.2.0 |
spark.dynamicAllocation.shuffleTracking.enabled |
true |
启用执行器的 shuffle 文件跟踪,这允许动态分配而无需外部 shuffle 服务。此选项将尝试保持存储活动作业的 shuffle 数据的执行器处于活动状态。 | 3.0.0 |
spark.dynamicAllocation.shuffleTracking.timeout |
无限 |
启用 shuffle 跟踪时,控制持有 shuffle 数据的执行器的超时时间。默认值意味着 Spark 将依赖于 shuffle 被垃圾回收才能释放执行器。如果由于某种原因垃圾回收未能足够快地清理 shuffle,此选项可用于控制何时使执行器超时,即使它们正在存储 shuffle 数据。 | 3.0.0 |
线程配置
根据作业和集群配置,我们可以在 Spark 中的多个位置设置线程数,以有效利用可用资源,从而获得更好的性能。在 Spark 3.0 之前,这些线程配置适用于 Spark 的所有角色,例如驱动程序、执行器、worker 和 master。从 Spark 3.0 开始,我们可以从驱动程序和执行器开始,以更细的粒度配置线程。以下表中的 RPC 模块为例。对于其他模块(如 shuffle),只需在属性名称中将“rpc”替换为“shuffle”,但 spark.{driver|executor}.rpc.netty.dispatcher.numThreads
除外,它仅适用于 RPC 模块。
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.{driver|executor}.rpc.io.serverThreads |
回退到 spark.rpc.io.serverThreads |
服务器线程池中使用的线程数 | 1.6.0 |
spark.{driver|executor}.rpc.io.clientThreads |
回退到 spark.rpc.io.clientThreads |
客户端线程池中使用的线程数 | 1.6.0 |
spark.{driver|executor}.rpc.netty.dispatcher.numThreads |
回退到 spark.rpc.netty.dispatcher.numThreads |
RPC 消息调度器线程池中使用的线程数 | 3.0.0 |
线程相关配置键的默认值是为驱动程序或执行器请求的核心数中的最小值,或者在没有该值的情况下,是 JVM 可用的核心数(硬编码上限为 8)。
Spark Connect
服务器配置
服务器配置在 Spark Connect 服务器中设置,例如,当您使用 ./sbin/start-connect-server.sh
启动 Spark Connect 服务器时。它们通常通过配置文件和命令行选项 --conf/-c
进行设置。
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.api.mode |
classic | 对于 Spark Classic 应用程序,指定是否通过运行本地 Spark Connect 服务器来自动使用 Spark Connect。该值可以是 classic 或 connect 。 |
4.0.0 |
spark.connect.grpc.binding.address |
(无) | Spark Connect 服务器绑定的地址。 | 4.0.0 |
spark.connect.grpc.binding.port |
15002 | Spark Connect 服务器绑定的端口。 | 3.4.0 |
spark.connect.grpc.port.maxRetries |
0 | gRPC 服务器绑定时的最大端口重试次数。默认情况下,它设置为 0,服务器在端口冲突时将快速失败。 | 4.0.0 |
spark.connect.grpc.interceptor.classes |
(无) | 逗号分隔的类名列表,这些类必须实现 io.grpc.ServerInterceptor 接口 |
3.4.0 |
spark.connect.grpc.arrow.maxBatchSize |
4m | 使用 Apache Arrow 时,限制从服务器端发送到客户端的单个 arrow 批处理的最大大小。目前,我们保守地使用其 70%,因为大小不准确但经过估算。 | 3.4.0 |
spark.connect.grpc.maxInboundMessageSize |
134217728 | 设置 gRPC 请求的最大入站消息大小。负载较大的请求将失败。 | 3.4.0 |
spark.connect.extensions.relation.classes |
(无) | 逗号分隔的类列表,这些类实现了特性 org.apache.spark.sql.connect.plugin.RelationPlugin 以支持 proto 中的自定义 Relation 类型。 |
3.4.0 |
spark.connect.extensions.expression.classes |
(无) | 逗号分隔的类列表,这些类实现了特性 org.apache.spark.sql.connect.plugin.ExpressionPlugin 以支持 proto 中的自定义 Expression 类型。 |
3.4.0 |
spark.connect.extensions.command.classes |
(无) | 逗号分隔的类列表,这些类实现了特性 org.apache.spark.sql.connect.plugin.CommandPlugin 以支持 proto 中的自定义 Command 类型。 |
3.4.0 |
spark.connect.ml.backend.classes |
(无) | 逗号分隔的类列表,这些类实现了特性 org.apache.spark.sql.connect.plugin.MLBackendPlugin 以用后端特定实现替换指定的 Spark ML 运算符。 |
4.0.0 |
spark.connect.jvmStacktrace.maxSize |
1024 | 当 `spark.sql.pyspark.jvmStacktrace.enabled` 为 true 时,设置要显示的最大堆栈跟踪大小。 | 3.5.0 |
spark.sql.connect.ui.retainedSessions |
200 | Spark Connect UI 历史中保留的客户端会话数。 | 3.5.0 |
spark.sql.connect.ui.retainedStatements |
200 | Spark Connect UI 历史中保留的语句数。 | 3.5.0 |
spark.sql.connect.enrichError.enabled |
true | 当为 true 时,它通过额外的 RPC 在客户端使用完整的异常消息和可选的服务器端堆栈跟踪来丰富错误。 | 4.0.0 |
spark.sql.connect.serverStacktrace.enabled |
true | 当为 true 时,它会在面向用户的 Spark 异常中设置服务器端堆栈跟踪。 | 4.0.0 |
spark.connect.grpc.maxMetadataSize |
1024 | 设置元数据字段的最大大小。例如,它限制了 `ErrorInfo` 中的元数据字段。 | 4.0.0 |
spark.connect.progress.reportInterval |
2 秒 | 向客户端报告查询进度的间隔。如果该值设置为负值,则将禁用进度报告。 | 4.0.0 |
安全性
有关如何保护不同 Spark 子系统的可用选项,请参阅安全页面。
Spark SQL
运行时 SQL 配置
运行时 SQL 配置是每个会话的、可变的 Spark SQL 配置。它们可以通过配置文件和以 --conf/-c
为前缀的命令行选项设置初始值,或者通过设置用于创建 SparkSession
的 SparkConf
来设置。此外,它们可以通过 SET 命令设置和查询,并通过 RESET 命令或在运行时通过 SparkSession.conf
的 setter 和 getter 方法重置为初始值。
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.sql.adaptive.advisoryPartitionSizeInBytes |
(spark.sql.adaptive.shuffle.targetPostShuffleInputSize 的值) |
自适应优化期间(当 spark.sql.adaptive.enabled 为 true 时)shuffle 分区的建议大小(以字节为单位)。当 Spark 合并小的 shuffle 分区或拆分倾斜的 shuffle 分区时生效。 |
3.0.0 |
spark.sql.adaptive.autoBroadcastJoinThreshold |
(无) | 配置在执行连接时将广播到所有 worker 节点的最大表大小(以字节为单位)。通过将此值设置为 -1 可以禁用广播。默认值与 spark.sql.autoBroadcastJoinThreshold 相同。请注意,此配置仅在自适应框架中使用。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.enabled |
true | 当 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 将根据目标大小(由 'spark.sql.adaptive.advisoryPartitionSizeInBytes' 指定)合并连续的 shuffle 分区,以避免过多的小型任务。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
(无) | 合并之前的初始 shuffle 分区数。如果未设置,则等于 spark.sql.shuffle.partitions。此配置仅在 'spark.sql.adaptive.enabled' 和 'spark.sql.adaptive.coalescePartitions.enabled' 都为 true 时才有效。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB | 合并后 shuffle 分区的最小大小。当分区合并期间自适应计算的目标大小过小时,这很有用。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
true | 当为 true 时,Spark 在合并连续的 shuffle 分区时不会遵守 'spark.sql.adaptive.advisoryPartitionSizeInBytes'(默认 64MB)指定的目标大小,而是根据 Spark 集群的默认并行度自适应地计算目标大小。计算出的尺寸通常小于配置的目标尺寸。这是为了在启用自适应查询执行时最大化并行度并避免性能退化。建议在繁忙的集群上将此配置设置为 false,以使资源利用更高效(任务数量不多的小任务)。 |
3.2.0 |
spark.sql.adaptive.customCostEvaluatorClass |
(无) | 用于自适应执行的自定义成本评估器类。如果未设置,Spark 将默认使用其自己的 SimpleCostEvaluator。 |
3.2.0 |
spark.sql.adaptive.enabled |
true | 当为 true 时,启用自适应查询执行,它根据准确的运行时统计信息在查询执行过程中重新优化查询计划。 |
1.6.0 |
spark.sql.adaptive.forceOptimizeSkewedJoin |
false | 当为 true 时,强制启用 OptimizeSkewedJoin,即使它会引入额外的 shuffle。 |
3.3.0 |
spark.sql.adaptive.localShuffleReader.enabled |
true | 当 true 且 'spark.sql.adaptive.enabled' 为 true 时,当不需要 shuffle 分区时,Spark 尝试使用本地 shuffle 读取器读取 shuffle 数据,例如,在将排序合并连接转换为广播哈希连接之后。 |
3.0.0 |
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0b | 配置每个分区允许构建本地哈希映射的最大字节大小。如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 且所有分区大小不大于此配置,则无论 spark.sql.join.preferSortMergeJoin 的值如何,连接选择都倾向于使用 shuffled hash join 而不是 sort merge join。 |
3.2.0 |
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled |
true | 当 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 将在 RebalancePartitions 中优化倾斜的 shuffle 分区,并根据目标大小(由 'spark.sql.adaptive.advisoryPartitionSizeInBytes' 指定)将其拆分为更小的分区,以避免数据倾斜。 |
3.2.0 |
spark.sql.adaptive.optimizer.excludedRules |
(无) | 配置要在自适应优化器中禁用的规则列表,其中规则通过其规则名称指定并用逗号分隔。优化器将记录实际已排除的规则。 |
3.1.0 |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor |
0.2 | 如果分区的大小小于此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes,则该分区将在拆分期间被合并。 |
3.3.0 |
spark.sql.adaptive.skewJoin.enabled |
true | 当 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 通过拆分(并在需要时复制)倾斜分区来动态处理 shuffle 连接(排序合并和 shuffle hash)中的倾斜。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
5.0 | 如果分区的大小大于此因子乘以中位数分区大小并且也大于 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes',则该分区被视为倾斜分区。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB | 如果分区的大小(以字节为单位)大于此阈值,并且也大于 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' 乘以中位数分区大小,则该分区被视为倾斜分区。理想情况下,此配置应设置为大于 'spark.sql.adaptive.advisoryPartitionSizeInBytes'。 |
3.0.0 |
spark.sql.allowNamedFunctionArguments |
true | 如果为 true,Spark 将为所有已实现命名参数的函数启用命名参数支持。 |
3.5.0 |
spark.sql.ansi.doubleQuotedIdentifiers |
false | 当为 true 且 'spark.sql.ansi.enabled' 为 true 时,Spark SQL 将双引号 (") 引起来的字面量读取为标识符。当为 false 时,它们被读取为字符串字面量。 |
3.4.0 |
spark.sql.ansi.enabled |
true | 当为 true 时,Spark SQL 使用符合 ANSI 标准的方言而不是 Hive 方言。例如,当 SQL 运算符/函数的输入无效时,Spark 将在运行时抛出异常而不是返回 null 结果。有关此方言的完整详细信息,您可以在 Spark 文档的“ANSI Compliance”部分中找到。某些 ANSI 方言功能可能并非直接来自 ANSI SQL 标准,但它们的行为与 ANSI SQL 的风格一致。 |
3.0.0 |
spark.sql.ansi.enforceReservedKeywords |
false | 当为 true 且 'spark.sql.ansi.enabled' 为 true 时,Spark SQL 解析器强制执行 ANSI 保留关键字,并禁止使用保留关键字作为表、视图、函数等的别名和/或标识符的 SQL 查询。 |
3.3.0 |
spark.sql.ansi.relationPrecedence |
false | 当为 true 且 'spark.sql.ansi.enabled' 为 true 时,在组合关系时,JOIN 优先于逗号。例如, |
3.4.0 |
spark.sql.autoBroadcastJoinThreshold |
10MB | 配置在执行连接时将广播到所有 worker 节点的最大表大小(以字节为单位)。通过将此值设置为 -1 可以禁用广播。 |
1.1.0 |
spark.sql.avro.compression.codec |
snappy | 写入 AVRO 文件时使用的压缩编解码器。支持的编解码器有:uncompressed、deflate、snappy、bzip2、xz 和 zstandard。默认编解码器是 snappy。 |
2.4.0 |
spark.sql.avro.deflate.level |
-1 | 写入 AVRO 文件时 deflate 编解码器使用的压缩级别。有效值必须在 1 到 9(包括)或 -1 的范围内。默认值是 -1,这在当前实现中对应于 6 级。 |
2.4.0 |
spark.sql.avro.filterPushdown.enabled |
true | 当为 true 时,启用 Avro 数据源的过滤器下推。 |
3.1.0 |
spark.sql.avro.xz.level |
6 | 写入 AVRO 文件时 xz 编解码器使用的压缩级别。有效值必须在 1 到 9(包括)的范围内。默认值是 6。 |
4.0.0 |
spark.sql.avro.zstandard.bufferPool.enabled |
false | 如果为 true,在写入 AVRO 文件时启用 ZSTD JNI 库的缓冲区池 |
4.0.0 |
spark.sql.avro.zstandard.level |
3 | 写入 AVRO 文件时 zstandard 编解码器使用的压缩级别。 |
4.0.0 |
spark.sql.binaryOutputStyle |
(无) | 显示二进制数据时使用的输出样式。有效值包括 'UTF-8'、'BASIC'、'BASE64'、'HEX' 和 'HEX_DISCRETE'。 |
4.0.0 |
spark.sql.broadcastTimeout |
300 | 广播连接中广播等待时间的超时时间(秒)。 |
1.3.0 |
spark.sql.bucketing.coalesceBucketsInJoin.enabled |
false | 当为 true 时,如果连接两个具有不同桶数量的桶表,则具有较大桶数量的一方将被合并以与另一方具有相同数量的桶。较大桶数量可被较小桶数量整除。桶合并应用于排序合并连接和 shuffle hash 连接。注意:合并桶表可以避免连接中不必要的 shuffle,但它也会降低并行度,并可能导致 shuffled hash join 的 OOM。 |
3.1.0 |
spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio |
4 | 两个桶合并的数量比率应小于或等于此值,以便应用桶合并。此配置仅在 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' 设置为 true 时才有效。 |
3.1.0 |
spark.sql.catalog.spark_catalog |
builtin | 一个目录实现,将用作 Spark 内置 v1 目录 spark_catalog 的 v2 接口。此目录与 spark_catalog 共享其标识符命名空间,并且必须与其保持一致;例如,如果 spark_catalog 可以加载表,则此目录也必须返回表元数据。要将操作委托给 spark_catalog,实现可以扩展 'CatalogExtension'。该值应为 'builtin'(表示 Spark 的内置 V2SessionCatalog)或目录实现的完全限定类名。 |
3.0.0 |
spark.sql.cbo.enabled |
false | 设置为 true 时,启用 CBO 以估计计划统计信息。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.star.filter |
false | 将星型连接过滤器启发式应用于基于成本的连接枚举。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.threshold |
12 | 动态规划算法中允许的最大连接节点数。 |
2.2.0 |
spark.sql.cbo.joinReorder.enabled |
false | 在 CBO 中启用连接重新排序。 |
2.2.0 |
spark.sql.cbo.planStats.enabled |
false | 当为 true 时,逻辑计划将从目录中获取行数和列统计信息。 |
3.0.0 |
spark.sql.cbo.starSchemaDetection |
false | 当为 true 时,它启用基于星型模式检测的连接重新排序。 |
2.2.0 |
spark.sql.charAsVarchar |
false | 当为 true 时,Spark 会在 CREATE/REPLACE/ALTER TABLE 命令中用 VARCHAR 类型替换 CHAR 类型,以便新创建/更新的表不会有 CHAR 类型列/字段。现有表中带有 CHAR 类型列/字段的表不受此配置的影响。 |
3.3.0 |
spark.sql.chunkBase64String.enabled |
true | 是否截断由 |
3.5.2 |
spark.sql.cli.print.header |
false | 当设置为 true 时,spark-sql CLI 在查询输出中打印列名。 |
3.2.0 |
spark.sql.columnNameOfCorruptRecord |
_corrupt_record | 用于存储无法解析的原始/未解析 JSON 和 CSV 记录的内部列的名称。 |
1.2.0 |
spark.sql.csv.filterPushdown.enabled |
true | 当为 true 时,启用 CSV 数据源的过滤器下推。 |
3.0.0 |
spark.sql.datetime.java8API.enabled |
false | 如果配置属性设置为 true,Java 8 API 的 java.time.Instant 和 java.time.LocalDate 类将用作 Catalyst 的 TimestampType 和 DateType 的外部类型。如果设置为 false,则使用 java.sql.Timestamp 和 java.sql.Date 来实现相同的目的。 |
3.0.0 |
spark.sql.debug.maxToStringFields |
25 | 在调试输出中,序列样条目可转换为字符串的最大字段数。超出限制的任何元素都将被丢弃并替换为“... N more fields”占位符。 |
3.0.0 |
spark.sql.defaultCacheStorageLevel |
MEMORY_AND_DISK |
|
4.0.0 |
spark.sql.defaultCatalog |
spark_catalog | 默认目录的名称。如果用户尚未明确设置当前目录,这将是当前目录。 |
3.0.0 |
spark.sql.error.messageFormat |
PRETTY | 当为 PRETTY 时,错误消息由错误类、消息和查询上下文的文本表示组成。MINIMAL 和 STANDARD 格式是漂亮的 JSON 格式,其中 STANDARD 包含一个额外的 JSON 字段 |
3.4.0 |
spark.sql.execution.arrow.enabled |
false | (自 Spark 3.0 起已弃用,请设置 'spark.sql.execution.arrow.pyspark.enabled'。) |
2.3.0 |
spark.sql.execution.arrow.fallback.enabled |
true | (自 Spark 3.0 起已弃用,请设置 'spark.sql.execution.arrow.pyspark.fallback.enabled'。) |
2.4.0 |
spark.sql.execution.arrow.localRelationThreshold |
48MB | 当将 Arrow 批处理转换为 Spark DataFrame 时,如果 Arrow 批处理的字节大小小于此阈值,则在驱动程序端使用本地集合。否则,Arrow 批处理将被发送并反序列化为执行器中的 Spark 内部行。 |
3.4.0 |
spark.sql.execution.arrow.maxRecordsPerBatch |
10000 | 使用 Apache Arrow 时,限制可以写入内存中单个 ArrowRecordBatch 的最大记录数。此配置对于分组 API(例如 DataFrame(.cogroup).groupby.applyInPandas)无效,因为每个组都成为一个 ArrowRecordBatch。如果设置为零或负数,则没有限制。另请参阅 spark.sql.execution.arrow.maxBytesPerBatch。如果两者都设置,则当满足其中任何一个条件时创建每个批处理。 |
2.3.0 |
spark.sql.execution.arrow.pyspark.enabled |
(spark.sql.execution.arrow.enabled 的值) |
当为 true 时,在 PySpark 中使用 Apache Arrow 进行列式数据传输。此优化适用于:1. pyspark.sql.DataFrame.toPandas。2. pyspark.sql.SparkSession.createDataFrame,当其输入是 Pandas DataFrame 或 NumPy ndarray 时。不支持以下数据类型:TimestampType 的 ArrayType。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.fallback.enabled |
(spark.sql.execution.arrow.fallback.enabled 的值) |
当为 true 时,如果发生错误,'spark.sql.execution.arrow.pyspark.enabled' 启用的优化将自动回退到非优化实现。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.selfDestruct.enabled |
false | (实验性)当为 true 时,在 PySpark 中从 Arrow 转换为 Pandas 时,利用 Apache Arrow 的自毁和拆分块选项进行列式数据传输。这以牺牲一些 CPU 时间为代价减少了内存使用。此优化适用于:当设置 'spark.sql.execution.arrow.pyspark.enabled' 时 pyspark.sql.DataFrame.toPandas。 |
3.2.0 |
spark.sql.execution.arrow.sparkr.enabled |
false | 当为 true 时,在 SparkR 中使用 Apache Arrow 进行列式数据传输。此优化适用于:1. createDataFrame,当其输入为 R DataFrame 时;2. collect;3. dapply;4. gapply。不支持以下数据类型:FloatType、BinaryType、ArrayType、StructType 和 MapType。 |
3.0.0 |
spark.sql.execution.arrow.transformWithStateInPandas.maxRecordsPerBatch |
10000 | 使用 TransformWithStateInPandas 时,限制可写入内存中单个 ArrowRecordBatch 的最大状态记录数。 |
4.0.0 |
spark.sql.execution.arrow.useLargeVarTypes |
false | 使用 Apache Arrow 时,对字符串和二进制类型使用大型可变宽度向量。常规字符串和二进制类型在单个记录批处理中,列的限制为 2GiB。大型可变类型消除了此限制,但每个值的内存使用量更高。 |
3.5.0 |
spark.sql.execution.interruptOnCancel |
true | 当为 true 时,如果一个取消查询,所有正在运行的任务将被中断。 |
4.0.0 |
spark.sql.execution.pandas.inferPandasDictAsMap |
false | 当为 true 时,spark.createDataFrame 会将 Pandas DataFrame 中的 dict 推断为 MapType。当为 false 时,spark.createDataFrame 会将 Pandas DataFrame 中的 dict 推断为 StructType,这是 PyArrow 的默认推断方式。 |
4.0.0 |
spark.sql.execution.pandas.structHandlingMode |
legacy | 创建 pandas DataFrame 时结构体类型的转换模式。当为 "legacy" 时,1. 当 Arrow 优化被禁用时,转换为 Row 对象;2. 当 Arrow 优化被启用时,转换为 dict,如果存在重复的嵌套字段名则抛出异常。当为 "row" 时,无论 Arrow 优化如何,都转换为 Row 对象。当为 "dict" 时,转换为 dict 并使用后缀键名(例如 a_0, a_1),如果存在重复的嵌套字段名,无论 Arrow 优化如何。 |
3.5.0 |
spark.sql.execution.pandas.udf.buffer.size |
(spark.buffer.size 的值) |
与 |
3.0.0 |
spark.sql.execution.pyspark.udf.faulthandler.enabled |
(spark.python.worker.faulthandler.enabled 的值) |
与 spark.python.worker.faulthandler.enabled 相同,用于使用 DataFrame 和 SQL 的 Python 执行。它可以在运行时更改。 |
4.0.0 |
spark.sql.execution.pyspark.udf.hideTraceback.enabled |
false | 当为 true 时,只显示 Python UDF 的异常消息,隐藏堆栈跟踪。如果启用此选项,simplifiedTraceback 将无效。 |
4.0.0 |
spark.sql.execution.pyspark.udf.idleTimeoutSeconds |
(spark.python.worker.idleTimeoutSeconds 的值) |
与 spark.python.worker.idleTimeoutSeconds 相同,用于使用 DataFrame 和 SQL 的 Python 执行。它可以在运行时更改。 |
4.0.0 |
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled |
true | 当为 true 时,Python UDF 的回溯被简化。它在回溯中隐藏了 PySpark 的 Python worker、(反)序列化等,并且只显示 UDF 的异常消息。请注意,这仅适用于 CPython 3.7+。 |
3.1.0 |
spark.sql.execution.python.udf.buffer.size |
(spark.buffer.size 的值) |
与 |
4.0.0 |
spark.sql.execution.python.udf.maxRecordsPerBatch |
100 | 使用 Python UDF 时,限制可批量进行序列化/反序列化的最大记录数。 |
4.0.0 |
spark.sql.execution.pythonUDF.arrow.concurrency.level |
(无) | 执行 Arrow 优化 Python UDF 的并发级别。如果 Python UDF 密集使用 I/O,这会很有用。 |
4.0.0 |
spark.sql.execution.pythonUDF.arrow.enabled |
false | 在常规 Python UDF 中启用 Arrow 优化。此优化仅在给定函数至少带有一个参数时才能启用。 |
3.4.0 |
spark.sql.execution.pythonUDTF.arrow.enabled |
false | 为 Python UDTF 启用 Arrow 优化。 |
3.5.0 |
spark.sql.execution.topKSortFallbackThreshold |
2147483632 | 在包含 SORT 后跟 LIMIT 的 SQL 查询(例如 'SELECT x FROM t ORDER BY y LIMIT m')中,如果 m 低于此阈值,则在内存中执行 top-K 排序,否则执行全局排序,必要时溢出到磁盘。 |
2.4.0 |
spark.sql.extendedExplainProviders |
(无) | 逗号分隔的类列表,这些类实现了 org.apache.spark.sql.ExtendedExplainGenerator 特性。如果提供,Spark 将在 explain plan 和 UI 中打印来自这些提供程序的扩展计划信息。 |
4.0.0 |
spark.sql.files.ignoreCorruptFiles |
false | 是否忽略损坏的文件。如果为 true,Spark 作业在遇到损坏的文件时将继续运行,并且已读取的内容仍将返回。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 |
2.1.1 |
spark.sql.files.ignoreInvalidPartitionPaths |
false | 是否忽略不匹配 <column>=<value> 的无效分区路径。启用此选项时,包含两个分区目录 'table/invalid' 和 'table/col=1' 的表将仅加载后一个目录并忽略无效分区。 |
4.0.0 |
spark.sql.files.ignoreMissingFiles |
false | 是否忽略缺失的文件。如果为 true,Spark 作业在遇到缺失的文件时将继续运行,并且已读取的内容仍将返回。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 |
2.3.0 |
spark.sql.files.maxPartitionBytes |
128MB | 读取文件时要打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 |
2.0.0 |
spark.sql.files.maxPartitionNum |
(无) | 建议(不保证)的最大拆分文件分区数。如果设置了此值,当初始分区数超过此值时,Spark 将重新调整每个分区,使分区数接近此值。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 |
3.5.0 |
spark.sql.files.maxRecordsPerFile |
0 | 写入单个文件的最大记录数。如果此值为零或负数,则没有限制。 |
2.2.0 |
spark.sql.files.minPartitionNum |
(无) | 建议(不保证)的最小拆分文件分区数。如果未设置,默认值为 |
3.1.0 |
spark.sql.function.concatBinaryAsString |
false | 当此选项设置为 false 并且所有输入都是二进制时, |
2.3.0 |
spark.sql.function.eltOutputAsString |
false | 当此选项设置为 false 并且所有输入都是二进制时, |
2.3.0 |
spark.sql.groupByAliases |
true | 当为 true 时,select 列表中的别名可以在 group by 子句中使用。当为 false 时,在这种情况下会抛出分析异常。 |
2.2.0 |
spark.sql.groupByOrdinal |
true | 当为 true 时,group by 子句中的序号被视为 select 列表中的位置。当为 false 时,序号被忽略。 |
2.0.0 |
spark.sql.hive.convertInsertingPartitionedTable |
true | 当设置为 true,并且 |
3.0.0 |
spark.sql.hive.convertInsertingUnpartitionedTable |
true | 当设置为 true,并且 |
4.0.0 |
spark.sql.hive.convertMetastoreCtas |
true | 当设置为 true 时,Spark 将尝试在 CTAS 中使用内置数据源写入器而不是 Hive serde。此标志仅当分别针对 Parquet 和 ORC 格式启用了 |
3.0.0 |
spark.sql.hive.convertMetastoreInsertDir |
true | 当设置为 true 时,Spark 将尝试在 INSERT OVERWRITE DIRECTORY 中使用内置数据源写入器而不是 Hive serde。此标志仅当分别针对 Parquet 和 ORC 格式启用了 |
3.3.0 |
spark.sql.hive.convertMetastoreOrc |
true | 当设置为 true 时,将使用内置的 ORC 读取器和写入器来处理使用 HiveQL 语法创建的 ORC 表,而不是 Hive serde。 |
2.0.0 |
spark.sql.hive.convertMetastoreParquet |
true | 当设置为 true 时,将使用内置的 Parquet 读取器和写入器来处理使用 HiveQL 语法创建的 parquet 表,而不是 Hive serde。 |
1.1.1 |
spark.sql.hive.convertMetastoreParquet.mergeSchema |
false | 当为 true 时,还会尝试合并不同 Parquet 数据文件中可能不同但兼容的 Parquet 模式。此配置仅在 "spark.sql.hive.convertMetastoreParquet" 为 true 时有效。 |
1.3.1 |
spark.sql.hive.dropPartitionByName.enabled |
false | 当为 true 时,Spark 将获取分区名称而不是分区对象来删除分区,这可以提高删除分区的性能。 |
3.4.0 |
spark.sql.hive.filesourcePartitionFileCacheSize |
262144000 | 当非零时,启用内存中分区文件元数据的缓存。所有表共享一个缓存,该缓存最多可以使用指定字节数的文件的元数据。此配置仅在启用 hive 文件源分区管理时才有效。 |
2.1.1 |
spark.sql.hive.manageFilesourcePartitions |
true | 当为 true 时,也为文件源表启用元数据存储分区管理。这包括数据源表和转换后的 Hive 表。当启用分区管理时,数据源表将分区存储在 Hive 元数据存储中,并在查询计划期间当 spark.sql.hive.metastorePartitionPruning 设置为 true 时,使用元数据存储修剪分区。 |
2.1.1 |
spark.sql.hive.metastorePartitionPruning |
true | 当为 true 时,一些谓词将被下推到 Hive 元数据存储中,以便可以更早地消除不匹配的分区。 |
1.5.0 |
spark.sql.hive.metastorePartitionPruningFallbackOnException |
false | 当遇到来自元数据存储的 MetaException 时,是否回退到从 Hive 元数据存储获取所有分区并在 Spark 客户端执行分区裁剪。请注意,如果启用此选项且有大量分区需要列出,则 Spark 查询性能可能会下降。如果禁用此选项,Spark 将失败查询。 |
3.3.0 |
spark.sql.hive.metastorePartitionPruningFastFallback |
false | 当启用此配置时,如果谓词不受 Hive 支持或者 Spark 因遇到来自元数据存储的 MetaException 而回退,Spark 将改为通过首先获取分区名称,然后客户端评估过滤表达式来裁剪分区。请注意,不支持带有 TimeZoneAwareExpression 的谓词。 |
3.3.0 |
spark.sql.hive.thriftServer.async |
true | 当设置为 true 时,Hive Thrift 服务器将以异步方式执行 SQL 查询。 |
1.5.0 |
spark.sql.icu.caseMappings.enabled |
true | 启用时,我们使用 ICU 库(而不是 JVM)来实现 UTF8_BINARY 排序规则下字符串的大小写映射。 |
4.0.0 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列式缓存的批处理大小。更大的批处理大小可以提高内存利用率和压缩率,但在缓存数据时有 OOM 风险。 |
1.1.1 |
spark.sql.inMemoryColumnarStorage.compressed |
true | 当设置为 true 时,Spark SQL 将根据数据统计信息自动为每列选择一个压缩编解码器。 |
1.0.1 |
spark.sql.inMemoryColumnarStorage.enableVectorizedReader |
true | 为列式缓存启用向量化读取器。 |
2.3.1 |
spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio |
1.2 | 当 spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 或所需内存小于 spark.sql.inMemoryColumnarStorage.hugeVectorThreshold 时,Spark 会保留所需内存 * 2 的内存;否则,Spark 会保留所需内存 * 此比率的内存,并在读取下一批行之前释放此列向量内存。 |
4.0.0 |
spark.sql.inMemoryColumnarStorage.hugeVectorThreshold |
-1b | 当所需内存大于此值时,Spark 将在下次保留所需内存 * spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio 内存,并在读取下一批行之前释放此列向量内存。-1 表示禁用此优化。 |
4.0.0 |
spark.sql.json.filterPushdown.enabled |
true | 当为 true 时,启用 JSON 数据源的过滤器下推。 |
3.1.0 |
spark.sql.json.useUnsafeRow |
false | 当设置为 true 时,在 JSON 解析器中使用 UnsafeRow 表示结构体结果。它可以被 JSON 选项 |
4.0.0 |
spark.sql.jsonGenerator.ignoreNullFields |
true | 在 JSON 数据源和诸如 to_json 等 JSON 函数中生成 JSON 对象时是否忽略 null 字段。如果为 false,则为 JSON 对象中的 null 字段生成 null。 |
3.0.0 |
spark.sql.leafNodeDefaultParallelism |
(无) | 生成数据的 Spark SQL 叶节点的默认并行度,例如文件扫描节点、本地数据扫描节点、范围节点等。此配置的默认值是 'SparkContext#defaultParallelism'。 |
3.2.0 |
spark.sql.mapKeyDedupPolicy |
EXCEPTION | 内置函数(CreateMap、MapFromArrays、MapFromEntries、StringToMap、MapConcat 和 TransformKeys)中去重 map 键的策略。当为 EXCEPTION 时,如果检测到重复的 map 键,查询将失败。当为 LAST_WIN 时,最后插入的 map 键优先。 |
3.0.0 |
spark.sql.maven.additionalRemoteRepositories |
https://maven-central.storage-download.googleapis.com/maven2/ | 以逗号分隔的字符串配置,用于可选的附加远程 Maven 镜像仓库。这仅在默认 Maven Central 仓库不可达时,用于 IsolatedClientLoader 中下载 Hive jar 包。 |
3.0.0 |
spark.sql.maxMetadataStringLength |
100 | 元数据字符串输出的最大字符数。例如,在 |
3.1.0 |
spark.sql.maxPlanStringLength |
2147483632 | 计划字符串输出的最大字符数。如果计划更长,后续输出将被截断。默认设置始终生成完整计划。如果计划字符串占用过多内存或导致驱动程序或 UI 进程中出现 OutOfMemory 错误,请将其设置为较低的值,例如 8k。 |
3.0.0 |
spark.sql.maxSinglePartitionBytes |
128m | 单个分区允许的最大字节数。否则,规划器将引入 shuffle 以提高并行度。 |
3.4.0 |
spark.sql.operatorPipeSyntaxEnabled |
true | 如果为 true,则为 Apache Spark SQL 启用操作符管道语法。这使用操作符管道标记 |> 来表示 SQL 子句之间的分隔,以一种可组合的方式描述查询执行的步骤序列。 |
4.0.0 |
spark.sql.optimizer.avoidCollapseUDFWithExpensiveExpr |
true | 是否避免折叠会在 UDF 中复制昂贵表达式的投影。 |
4.0.0 |
spark.sql.optimizer.collapseProjectAlwaysInline |
false | 是否总是折叠两个相邻的投影并内联表达式,即使这会导致额外的重复。 |
3.3.0 |
spark.sql.optimizer.dynamicPartitionPruning.enabled |
true | 当为 true 时,当分区列用作连接键时,我们将为分区列生成谓词。 |
3.0.0 |
spark.sql.optimizer.enableCsvExpressionOptimization |
true | 是否在 SQL 优化器中优化 CSV 表达式。它包括从 from_csv 中修剪不必要的列。 |
3.2.0 |
spark.sql.optimizer.enableJsonExpressionOptimization |
true | 是否在 SQL 优化器中优化 JSON 表达式。它包括从 from_json 中修剪不必要的列,简化 from_json + to_json,to_json + named_struct(from_json.col1, from_json.col2, ....)。 |
3.1.0 |
spark.sql.optimizer.excludedRules |
(无) | 配置要在优化器中禁用的规则列表,其中规则通过其规则名称指定并用逗号分隔。不保证此配置中的所有规则最终都将被排除,因为某些规则对于正确性是必需的。优化器将记录实际已排除的规则。 |
2.4.0 |
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold |
10GB | 布隆过滤器应用侧计划的聚合扫描大小的字节大小阈值。布隆过滤器应用侧的聚合扫描字节大小需要超过此值才能注入布隆过滤器。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold |
10MB | 布隆过滤器创建侧计划的大小阈值。估计大小需要在此值以下才能尝试注入布隆过滤器。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.enabled |
true | 当为 true 且 shuffle 连接的一侧具有选择性谓词时,我们尝试在另一侧插入布隆过滤器以减少 shuffle 数据量。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems |
1000000 | 运行时布隆过滤器的默认预期项目数 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumBits |
67108864 | 运行时布隆过滤器使用的最大位数 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumItems |
4000000 | 运行时布隆过滤器允许的最大预期项数 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.numBits |
8388608 | 运行时布隆过滤器默认使用的位数 |
3.3.0 |
spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled |
true | 为基于组的行级操作启用运行时组过滤。在规划行级操作扫描时,替换数据组(例如文件、分区)的数据源可以使用提供的数据源过滤器来修剪整个组。然而,这种过滤是有限的,因为并非所有表达式都可以转换为数据源过滤器,并且某些表达式只能由Spark评估(例如子查询)。由于重写组的成本很高,Spark可以在运行时执行查询以查找哪些记录符合行级操作的条件。关于匹配记录的信息将被传递回行级操作扫描,允许数据源丢弃不需要重写的组。 |
3.4.0 |
spark.sql.optimizer.runtimeFilter.number.threshold |
10 | 单个查询注入的运行时过滤器(非DPP)的总数。这是为了防止由于过多的布隆过滤器导致驱动程序OOM。 |
3.3.0 |
spark.sql.orc.aggregatePushdown |
false | 如果为true,聚合操作将被下推到ORC以进行优化。支持MIN、MAX和COUNT作为聚合表达式。对于MIN/MAX,支持布尔、整数、浮点和日期类型。对于COUNT,支持所有数据类型。如果任何ORC文件页脚中缺少统计信息,将抛出异常。 |
3.3.0 |
spark.sql.orc.columnarReaderBatchSize |
4096 | ORC向量化读取器批次中包含的行数。应仔细选择此数字,以最大程度地减少开销并避免读取数据时的OOM。 |
2.4.0 |
spark.sql.orc.columnarWriterBatchSize |
1024 | ORC向量化写入器批次中包含的行数。应仔细选择此数字,以最大程度地减少开销并避免写入数据时的OOM。 |
3.4.0 |
spark.sql.orc.compression.codec |
zstd | 设置写入ORC文件时使用的压缩编解码器。如果表特定的选项/属性中同时指定了 |
2.3.0 |
spark.sql.orc.enableNestedColumnVectorizedReader |
true | 启用嵌套列的向量化ORC解码。 |
3.2.0 |
spark.sql.orc.enableVectorizedReader |
true | 启用向量化ORC解码。 |
2.3.0 |
spark.sql.orc.filterPushdown |
true | 当为true时,为ORC文件启用过滤器下推。 |
1.4.0 |
spark.sql.orc.mergeSchema |
false | 当为true时,ORC数据源会合并从所有数据文件中收集到的Schema;否则,Schema将从随机数据文件中选取。 |
3.0.0 |
spark.sql.orderByOrdinal |
true | 当为true时,序号被视为SELECT列表中的位置。当为false时,order/sort by子句中的序号将被忽略。 |
2.0.0 |
spark.sql.parquet.aggregatePushdown |
false | 如果为true,聚合操作将被下推到Parquet以进行优化。支持MIN、MAX和COUNT作为聚合表达式。对于MIN/MAX,支持布尔、整数、浮点和日期类型。对于COUNT,支持所有数据类型。如果任何Parquet文件页脚中缺少统计信息,将抛出异常。 |
3.3.0 |
spark.sql.parquet.binaryAsString |
false | 其他一些Parquet生成系统,特别是Impala和较旧版本的Spark SQL,在写入Parquet Schema时,不区分二进制数据和字符串。此标志告诉Spark SQL将二进制数据解释为字符串,以提供与这些系统的兼容性。 |
1.1.1 |
spark.sql.parquet.columnarReaderBatchSize |
4096 | Parquet向量化读取器批次中包含的行数。应仔细选择此数字,以最大程度地减少开销并避免读取数据时的OOM。 |
2.4.0 |
spark.sql.parquet.compression.codec |
snappy | 设置写入Parquet文件时使用的压缩编解码器。如果表特定的选项/属性中同时指定了 |
1.1.1 |
spark.sql.parquet.enableNestedColumnVectorizedReader |
true | 启用嵌套列(例如,struct、list、map)的向量化Parquet解码。需要启用spark.sql.parquet.enableVectorizedReader。 |
3.3.0 |
spark.sql.parquet.enableVectorizedReader |
true | 启用向量化Parquet解码。 |
2.0.0 |
spark.sql.parquet.fieldId.read.enabled |
false | 字段ID是Parquet Schema规范的一个原生字段。启用后,Parquet读取器将使用请求的Spark Schema中的字段ID(如果存在)来查找Parquet字段,而不是使用列名。 |
3.3.0 |
spark.sql.parquet.fieldId.read.ignoreMissing |
false | 当Parquet文件没有任何字段ID但Spark读取Schema使用字段ID进行读取时,如果此标志启用,我们将静默返回null,否则报错。 |
3.3.0 |
spark.sql.parquet.fieldId.write.enabled |
true | 字段ID是Parquet Schema规范的一个原生字段。启用后,Parquet写入器将在Spark Schema中填充字段ID元数据(如果存在)到Parquet Schema。 |
3.3.0 |
spark.sql.parquet.filterPushdown |
true | 当设置为true时,启用Parquet过滤器下推优化。 |
1.2.0 |
spark.sql.parquet.inferTimestampNTZ.enabled |
true | 启用后,在Schema推断期间,带有annotation isAdjustedToUTC = false的Parquet时间戳列将被推断为TIMESTAMP_NTZ类型。否则,所有Parquet时间戳列都将被推断为TIMESTAMP_LTZ类型。请注意,Spark在文件写入时会将输出Schema写入Parquet的页脚元数据,并在文件读取时利用它。因此,此配置仅影响非Spark写入的Parquet文件的Schema推断。 |
3.4.0 |
spark.sql.parquet.int96AsTimestamp |
true | 其他一些Parquet生成系统,特别是Impala,将Timestamp存储为INT96。Spark也会将Timestamp存储为INT96,因为我们需要避免纳秒字段的精度损失。此标志告诉Spark SQL将INT96数据解释为时间戳,以提供与这些系统的兼容性。 |
1.3.0 |
spark.sql.parquet.int96TimestampConversion |
false | 这控制了在将Impala写入的INT96数据转换为时间戳时,是否应应用时间戳调整。这是必要的,因为Impala存储INT96数据时使用的时区偏移与Hive和Spark不同。 |
2.3.0 |
spark.sql.parquet.mergeSchema |
false | 当为true时,Parquet数据源会合并从所有数据文件中收集到的Schema;否则,如果存在摘要文件,则从摘要文件中选取Schema,如果没有摘要文件,则从随机数据文件中选取。 |
1.5.0 |
spark.sql.parquet.outputTimestampType |
INT96 | 设置Spark写入Parquet文件时使用的Parquet时间戳类型。INT96是Parquet中一种非标准但常用的时间戳类型。TIMESTAMP_MICROS是Parquet中的标准时间戳类型,它存储自Unix纪元以来的微秒数。TIMESTAMP_MILLIS也是标准类型,但精度为毫秒,这意味着Spark必须截断其时间戳值的微秒部分。 |
2.3.0 |
spark.sql.parquet.recordLevelFilter.enabled |
false | 如果为true,则使用下推过滤器启用Parquet的原生记录级过滤。此配置仅在启用“spark.sql.parquet.filterPushdown”且未使用向量化读取器时生效。您可以通过将“spark.sql.parquet.enableVectorizedReader”设置为false来确保不使用向量化读取器。 |
2.3.0 |
spark.sql.parquet.respectSummaryFiles |
false | 当为true时,我们假设Parquet的所有分部文件与摘要文件一致,并且在合并Schema时将忽略它们。否则,如果此项为false(默认值),我们将合并所有分部文件。这应被视为专家级选项,在完全了解其含义之前不应启用。 |
1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | 如果为true,数据将以Spark 1.4及更早版本的方式写入。例如,十进制值将以Apache Parquet的固定长度字节数组格式写入,Apache Hive和Apache Impala等其他系统使用此格式。如果为false,则将使用Parquet中的较新格式。例如,十进制将以基于int的格式写入。如果Parquet输出旨在与不支持此较新格式的系统一起使用,请将其设置为true。 |
1.6.0 |
spark.sql.parser.quotedRegexColumnNames |
false | 当为true时,SELECT语句中带引号的标识符(使用反引号)将被解释为正则表达式。 |
2.3.0 |
spark.sql.pivotMaxValues |
10000 | 当进行透视而不指定透视列的值时,这是将收集到的(不重复的)值的最大数量,超过此数量将报错。 |
1.6.0 |
spark.sql.planner.pythonExecution.memory |
(无) | 指定Spark驱动程序中执行Python代码的内存分配,单位为MiB。设置后,它将Python执行的内存限制为指定量。如果未设置,Spark将不限制Python的内存使用,并且由应用程序负责避免超出与其他非JVM进程共享的开销内存空间。注意:Windows不支持资源限制,MacOS上实际资源不受限制。 |
4.0.0 |
spark.sql.preserveCharVarcharTypeInfo |
false | 当为true时,Spark不会用STRING类型替换CHAR/VARCHAR类型,这是Spark 3.0及更早版本的默认行为。这意味着CHAR/VARCHAR类型的长度检查将强制执行,并且CHAR类型也将正确填充。 |
4.0.0 |
spark.sql.pyspark.inferNestedDictAsStruct.enabled |
false | PySpark的SparkSession.createDataFrame默认将嵌套字典推断为map。当设置为true时,它将嵌套字典推断为struct。 |
3.3.0 |
spark.sql.pyspark.jvmStacktrace.enabled |
false | 当为true时,它会在面向用户的PySpark异常中显示JVM堆栈跟踪以及Python堆栈跟踪。默认情况下,为了隐藏JVM堆栈跟踪并仅显示Python友好的异常,此功能是禁用的。请注意,这与日志级别设置无关。 |
3.0.0 |
spark.sql.pyspark.plotting.max_rows |
1000 | 图表的视觉限制。如果为基于top-n的图表(饼图、条形图、横向条形图)设置为1000,则将使用前1000个数据点进行绘图。对于基于采样的图表(散点图、面积图、折线图),将随机采样1000个数据点。 |
4.0.0 |
spark.sql.pyspark.udf.profiler |
(无) | 通过启用或禁用并选择“perf”或“memory”类型来配置Python/Pandas UDF分析器,或取消设置此配置以禁用分析器。此功能默认禁用。 |
4.0.0 |
spark.sql.readSideCharPadding |
true | 当为true时,Spark在读取CHAR类型列/字段时应用字符串填充,除了写入端的填充。此配置默认为true,以在外部表等情况下更好地强制执行CHAR类型语义。 |
3.4.0 |
spark.sql.redaction.options.regex |
(?i)url | 用于决定Spark SQL命令选项映射中哪些键包含敏感信息的正则表达式。名称与此正则表达式匹配的选项值将在解释输出中被删除。此删除在由spark.redaction.regex定义的全局删除配置之上应用。 |
2.2.2 |
spark.sql.redaction.string.regex |
(spark.redaction.string.regex 的值) |
用于决定Spark生成的字符串中哪些部分包含敏感信息的正则表达式。当此正则表达式匹配字符串部分时,该字符串部分将被替换为虚拟值。此功能目前用于删除SQL解释命令的输出。如果未设置此配置,则使用 |
2.3.0 |
spark.sql.repl.eagerEval.enabled |
false | 是否启用即时求值。当为true时,仅当REPL支持即时求值时,才会显示Dataset的前K行。目前,PySpark和SparkR支持即时求值。在PySpark中,对于Jupyter等笔记本,将返回HTML表格(由repr_html生成)。对于纯Python REPL,返回的输出将格式化为dataframe.show()。在SparkR中,返回的输出将类似于R data.frame的显示方式。 |
2.4.0 |
spark.sql.repl.eagerEval.maxNumRows |
20 | 即时求值返回的最大行数。此设置仅在spark.sql.repl.eagerEval.enabled设置为true时生效。此配置的有效范围是0到(Int.MaxValue - 1),因此负值和大于(Int.MaxValue - 1)的无效配置将被规范化为0和(Int.MaxValue - 1)。 |
2.4.0 |
spark.sql.repl.eagerEval.truncate |
20 | 即时求值返回的每个单元格的最大字符数。此设置仅在spark.sql.repl.eagerEval.enabled设置为true时生效。 |
2.4.0 |
spark.sql.scripting.enabled |
false | SQL脚本功能正在开发中,其使用应在此功能标志下进行。SQL脚本允许用户编写包括控制流和错误处理的过程式SQL。 |
4.0.0 |
spark.sql.session.localRelationCacheThreshold |
67108864 | 本地关系序列化后在驱动程序端缓存的大小(字节)阈值。 |
3.5.0 |
spark.sql.session.timeZone |
(本地时区的值) | 会话本地时区的ID,格式为基于区域的时区ID或时区偏移量。区域ID必须是“area/city”的形式,例如“America/Los_Angeles”。时区偏移量必须是“(+|-)HH”、“(+|-)HH:mm”或“(+|-)HH:mm:ss”的格式,例如“-08”、“+01:00”或“-13:33:33”。此外,“UTC”和“Z”支持作为“+00:00”的别名。不建议使用其他短名称,因为它们可能含义模糊。 |
2.2.0 |
spark.sql.shuffle.partitions |
200 | 在进行连接或聚合操作时,用于数据混洗的默认分区数。注意:对于结构化流,此配置在从同一检查点位置重新启动查询之间不能更改。 |
1.1.0 |
spark.sql.shuffleDependency.fileCleanup.enabled |
false | 启用后,混洗文件将在Spark Connect SQL执行结束时清理。 |
4.0.0 |
spark.sql.shuffleDependency.skipMigration.enabled |
false | 启用后,Spark Connect SQL执行的混洗依赖项将在执行结束时标记,并且在退役期间不会迁移。 |
4.0.0 |
spark.sql.shuffledHashJoinFactor |
3 | 如果小方的数据大小乘以这个因子仍然小于大方,则可以选择混洗哈希连接。 |
3.3.0 |
spark.sql.sources.bucketing.autoBucketedScan.enabled |
true | 当为true时,根据查询计划自动决定是否对输入表执行分桶扫描。如果满足以下条件,则不使用分桶扫描:1. 查询没有利用分桶的运算符(例如join、group-by等),或2. 这些运算符和表扫描之间存在交换运算符。请注意,当“spark.sql.sources.bucketing.enabled”设置为false时,此配置不生效。 |
3.1.0 |
spark.sql.sources.bucketing.enabled |
true | 当为false时,我们将分桶表视为普通表 |
2.0.0 |
spark.sql.sources.bucketing.maxBuckets |
100000 | 允许的最大分桶数。 |
2.4.0 |
spark.sql.sources.default |
parquet | 输入/输出中使用的默认数据源。 |
1.3.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
32 | 在驱动程序端允许列出文件的最大路径数。如果在分区发现期间检测到的路径数超过此值,它将尝试使用另一个Spark分布式作业列出文件。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。 |
1.5.0 |
spark.sql.sources.partitionColumnTypeInference.enabled |
true | 当为true时,自动推断分区列的数据类型。 |
1.5.0 |
spark.sql.sources.partitionOverwriteMode |
STATIC | 当INSERT OVERWRITE分区数据源表时,我们目前支持两种模式:静态(static)和动态(dynamic)。在静态模式下,Spark在覆盖之前,会删除INSERT语句中所有匹配分区规范(例如PARTITION(a=1,b))的分区。在动态模式下,Spark不会预先删除分区,而只在运行时覆盖那些有数据写入的分区。默认情况下,我们使用静态模式以保持Spark 2.3之前的相同行为。请注意,此配置不影响Hive serde表,因为它们始终以动态模式覆盖。这也可以作为数据源的输出选项,使用键partitionOverwriteMode设置(此设置优先),例如dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)。 |
2.3.0 |
spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled |
false | 是否允许在分区转换兼容但不完全相同的情况下进行存储分区连接。此配置要求spark.sql.sources.v2.bucketing.enabled和spark.sql.sources.v2.bucketing.pushPartValues.enabled都启用,并且spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled禁用。 |
4.0.0 |
spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled |
false | 是否允许在连接键是源表分区键子集的情况下进行存储分区连接。在规划时,Spark将仅根据连接键对分区进行分组。目前仅在spark.sql.requireAllClusterKeysForDistribution为false时启用此功能。 |
4.0.0 |
spark.sql.sources.v2.bucketing.enabled |
false | 与spark.sql.sources.bucketing.enabled类似,此配置用于为V2数据源启用分桶。启用后,Spark将通过SupportsReportPartitioning识别V2数据源报告的特定分布,并在必要时尝试避免混洗。 |
3.3.0 |
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled |
false | 在存储分区连接期间,当连接双方都为KeyGroupedPartitioning时,是否允许输入分区部分集群化。在规划时,Spark将根据表统计信息选择数据量较少的一方,对其进行分组和复制以匹配另一方。这是对倾斜连接的优化,当某些分区分配大量数据时,有助于减少数据倾斜。此配置要求spark.sql.sources.v2.bucketing.enabled和spark.sql.sources.v2.bucketing.pushPartValues.enabled都启用。 |
3.4.0 |
spark.sql.sources.v2.bucketing.partition.filter.enabled |
false | 运行存储分区连接时是否过滤分区。启用后,如果连接类型允许,可以省略另一侧不匹配的分区进行扫描。此配置要求spark.sql.sources.v2.bucketing.enabled和spark.sql.sources.v2.bucketing.pushPartValues.enabled都启用。 |
4.0.0 |
spark.sql.sources.v2.bucketing.pushPartValues.enabled |
true | 当spark.sql.sources.v2.bucketing.enabled启用时,是否下推常用分区值。启用后,如果连接的双方都是KeyGroupedPartitioning,并且它们共享兼容的分区键,即使它们没有完全相同的分区值,Spark也将计算分区值的超集并将该信息下推到扫描节点,扫描节点将为任一侧缺失的分区值使用空分区。这有助于消除不必要的混洗。 |
3.4.0 |
spark.sql.sources.v2.bucketing.shuffle.enabled |
false | 在存储分区连接期间,是否允许仅混洗一方。当只有一方是KeyGroupedPartitioning时,如果满足条件,Spark将只混洗另一方。此优化将减少需要混洗的数据量。此配置要求spark.sql.sources.v2.bucketing.enabled启用。 |
4.0.0 |
spark.sql.sources.v2.bucketing.sorting.enabled |
false | 启用后,Spark将通过SupportsReportPartitioning识别V2数据源报告的特定分布,并尝试在按这些列排序时尽可能避免混洗。此配置要求spark.sql.sources.v2.bucketing.enabled启用。 |
4.0.0 |
spark.sql.stackTracesInDataFrameContext |
1 | 捕获的DataFrame查询上下文中非Spark堆栈跟踪的数量。 |
4.0.0 |
spark.sql.statistics.fallBackToHdfs |
false | 当为true时,如果表统计信息无法从表元数据中获取,它将回退到HDFS。这对于确定表是否足够小以使用广播连接非常有用。此标志仅对非分区Hive表有效。对于非分区数据源表,如果表统计信息不可用,它将自动重新计算。对于分区数据源和分区Hive表,如果表统计信息不可用,则为“spark.sql.defaultSizeInBytes”。 |
2.0.0 |
spark.sql.statistics.histogram.enabled |
false | 如果启用,则在计算列统计信息时生成直方图。直方图可以提供更好的估计精度。目前,Spark只支持等高直方图。请注意,收集直方图会产生额外成本。例如,收集列统计信息通常只需要一次表扫描,但生成等高直方图将导致额外的表扫描。 |
2.3.0 |
spark.sql.statistics.size.autoUpdate.enabled |
false | 启用表数据更改后自动更新表大小。请注意,如果表的总文件数非常大,这可能会很昂贵并减慢数据更改命令。 |
2.3.0 |
spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled |
false | 当此配置启用时,Spark还将在analyze table命令(即ANALYZE TABLE .. COMPUTE STATISTICS [NOSCAN])中更新分区统计信息。请注意,该命令也将变得更加昂贵。当此配置禁用时,Spark将仅更新表级别统计信息。 |
4.0.0 |
spark.sql.storeAssignmentPolicy |
ANSI | 当将值插入到具有不同数据类型的列时,Spark将执行类型强制转换。目前,我们支持3种类型强制转换规则策略:ANSI、legacy和strict。使用ANSI策略,Spark按照ANSI SQL执行类型强制转换。实际上,其行为与PostgreSQL基本相同。它不允许某些不合理的类型转换,例如将 |
3.0.0 |
spark.sql.streaming.checkpointLocation |
(无) | 用于存储流式查询检查点数据的默认位置。 |
2.0.0 |
spark.sql.streaming.continuous.epochBacklogQueueSize |
10000 | 队列中存储以等待延迟时期的最大条目数。如果此参数超出队列大小,流将停止并报错。 |
3.0.0 |
spark.sql.streaming.disabledV2Writers |
一个逗号分隔的完全限定数据源注册类名列表,其中StreamWriteSupport被禁用。写入这些源将回退到V1 Sinks。 |
2.3.1 | |
spark.sql.streaming.fileSource.cleaner.numThreads |
1 | 文件源已完成文件清理器中使用的线程数。 |
3.0.0 |
spark.sql.streaming.forceDeleteTempCheckpointLocation |
false | 当为true时,启用临时检查点位置强制删除。 |
3.0.0 |
spark.sql.streaming.metricsEnabled |
false | 是否为活动流查询报告Dropwizard/Codahale指标。 |
2.0.2 |
spark.sql.streaming.multipleWatermarkPolicy |
min | 当流式查询中存在多个水印运算符时,计算全局水印值的策略。默认值为“min”,它选择多个运算符报告的最小水印。另一个可选值为“max”,它选择多个运算符中的最大值。注意:此配置在从同一检查点位置重新启动查询之间不能更改。 |
2.4.0 |
spark.sql.streaming.noDataMicroBatches.enabled |
true | 流式微批处理引擎是否会执行无数据的批处理,以便对有状态流查询进行急切状态管理。 |
2.4.1 |
spark.sql.streaming.numRecentProgressUpdates |
100 | 为流式查询保留的进度更新数 |
2.1.1 |
spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition |
false | 当为true时,流式会话窗口在混洗之前在本地分区中排序和合并会话。这是为了减少混洗的行数,但仅在批处理中有大量行分配给同一会话时才有利。 |
3.2.0 |
spark.sql.streaming.stateStore.encodingFormat |
unsaferow | 有状态运算符用于在状态存储中存储信息的编码格式 |
4.0.0 |
spark.sql.streaming.stateStore.stateSchemaCheck |
true | 当为true时,Spark将根据现有状态的Schema验证状态Schema,如果Schema不兼容,则查询将失败。 |
3.1.0 |
spark.sql.streaming.stopActiveRunOnRestart |
true | 不支持同时运行同一个流查询的多个实例。如果发现同一个流查询(在同一集群上相同或不同的SparkSession中)存在并发活动实例,并且此标志为true,我们将停止旧的流查询实例以启动新的实例。 |
3.0.0 |
spark.sql.streaming.stopTimeout |
0 | 调用流查询的stop()方法时,等待流执行线程停止的毫秒数。0或负值表示无限期等待。 |
3.0.0 |
spark.sql.streaming.transformWithState.stateSchemaVersion |
3 | transformWithState运算符使用的状态Schema版本 |
4.0.0 |
spark.sql.thriftServer.interruptOnCancel |
(spark.sql.execution.interruptOnCancel 的值) |
当为true时,如果取消一个查询,所有正在运行的任务都将被中断。当为false时,所有正在运行的任务将继续直到完成。 |
3.2.0 |
spark.sql.thriftServer.queryTimeout |
0ms | 在Thrift Server中设置查询持续时间超时(秒)。如果超时设置为正值,则当超时超出时,正在运行的查询将自动取消,否则查询将继续运行直至完成。如果通过 |
3.1.0 |
spark.sql.thriftserver.scheduler.pool |
(无) | 为JDBC客户端会话设置公平调度器池。 |
1.1.1 |
spark.sql.thriftserver.ui.retainedSessions |
200 | JDBC/ODBC Web UI历史中保留的SQL客户端会话数。 |
1.4.0 |
spark.sql.thriftserver.ui.retainedStatements |
200 | JDBC/ODBC Web UI历史中保留的SQL语句数。 |
1.4.0 |
spark.sql.timeTravelTimestampKey |
timestampAsOf | 读取表时指定时间旅行时间戳的选项名称。 |
4.0.0 |
spark.sql.timeTravelVersionKey |
versionAsOf | 读取表时指定时间旅行表版本的选项名称。 |
4.0.0 |
spark.sql.timestampType |
TIMESTAMP_LTZ | 配置Spark SQL的默认时间戳类型,包括SQL DDL、Cast子句、类型字面量和数据源的Schema推断。将配置设置为TIMESTAMP_NTZ将使用TIMESTAMP WITHOUT TIME ZONE作为默认类型,而设置为TIMESTAMP_LTZ将使用TIMESTAMP WITH LOCAL TIME ZONE。在3.4.0版本之前,Spark仅支持TIMESTAMP WITH LOCAL TIME ZONE类型。 |
3.4.0 |
spark.sql.transposeMaxValues |
500 | 当进行转置而不指定索引列的值时,这是将转置的最大值数量,超过此数量将报错。 |
4.0.0 |
spark.sql.tvf.allowMultipleTableArguments.enabled |
false | 当为true时,允许表值函数有多个表参数,接收这些表所有行的笛卡尔积。 |
3.5.0 |
spark.sql.ui.explainMode |
formatted | 配置Spark SQL UI中使用的查询解释模式。值可以是“simple”、“extended”、“codegen”、“cost”或“formatted”。默认值为“formatted”。 |
3.1.0 |
spark.sql.variable.substitute |
true | 这将启用使用 |
2.0.0 |
静态 SQL 配置
静态SQL配置是跨会话、不可变的Spark SQL配置。它们可以通过配置文件和带--conf/-c
前缀的命令行选项设置最终值,或者通过设置用于创建SparkSession
的SparkConf
来设置。外部用户可以通过SparkSession.conf
或通过set命令查询静态SQL配置值,例如SET spark.sql.extensions;
,但无法设置/取消设置它们。
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.sql.cache.serializer |
org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer | 实现org.apache.spark.sql.columnar.CachedBatchSerializer接口的类名。它将用于将SQL数据转换为可以更有效缓存的格式。底层API可能会更改,因此请谨慎使用。不能指定多个类。该类必须有一个无参数构造函数。 |
3.1.0 |
spark.sql.catalog.spark_catalog.defaultDatabase |
default | 会话目录的默认数据库。 |
3.4.0 |
spark.sql.event.truncate.length |
2147483647 | SQL长度超过此阈值时,将在添加到事件之前截断。默认为不截断。如果设置为0,将记录调用站点。 |
3.0.0 |
spark.sql.extensions |
(无) | 一个逗号分隔的类列表,这些类实现了Function1[SparkSessionExtensions, Unit],用于配置Spark Session扩展。这些类必须具有无参数构造函数。如果指定了多个扩展,则按指定顺序应用。对于规则和规划器策略,它们按指定顺序应用。对于解析器,使用最后一个解析器,并且每个解析器都可以委托给其前任。对于函数名称冲突的情况,使用最后注册的函数名称。 |
2.2.0 |
spark.sql.extensions.test.loadFromCp |
true | 确定我们是否应该使用SparkSessionExtensionsProvider机制从类路径加载扩展的标志。这仅是测试标志。 |
|
spark.sql.hive.metastore.barrierPrefixes |
一个逗号分隔的类前缀列表,对于Spark SQL与之通信的每个Hive版本,这些类应被明确重新加载。例如,声明在通常共享的前缀(即 |
1.4.0 | |
spark.sql.hive.metastore.jars |
builtin | 用于实例化HiveMetastoreClient的jar包位置。此属性可以是以下四种选项之一:1. “builtin” 使用Hive 2.3.10,该版本在启用 |
1.4.0 |
spark.sql.hive.metastore.jars.path |
用于实例化HiveMetastoreClient的jar包的逗号分隔路径。此配置仅在 |
3.1.0 | |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | 一个逗号分隔的类前缀列表,这些类应使用Spark SQL和特定版本的Hive共享的类加载器加载。应共享的类的一个示例是与Metastore通信所需的JDBC驱动程序。需要共享的其他类是那些与已共享类交互的类。例如,log4j使用的自定义追加器。 |
1.4.0 |
spark.sql.hive.metastore.version |
2.3.10 | Hive Metastore的版本。可用选项包括 |
1.4.0 |
spark.sql.hive.thriftServer.singleSession |
false | 当设置为true时,Hive Thrift服务器以单会话模式运行。所有JDBC/ODBC连接共享临时视图、函数注册表、SQL配置和当前数据库。 |
1.6.0 |
spark.sql.hive.version |
2.3.10 | Spark发行版捆绑的已编译的(即内置的)Hive版本。请注意,这是一个只读配置,仅用于报告内置的Hive版本。如果您希望Spark调用不同的Metastore客户端,请参阅spark.sql.hive.metastore.version。 |
1.1.1 |
spark.sql.metadataCacheTTLSeconds |
-1000ms | 元数据缓存(分区文件元数据缓存和会话目录缓存)的生存时间(TTL)值。此配置仅在值为正(> 0)时生效。它还需要将“spark.sql.catalogImplementation”设置为 |
3.1.0 |
spark.sql.queryExecutionListeners |
(无) | 实现QueryExecutionListener接口的类名列表,这些类将自动添加到新创建的会话中。这些类应具有无参数构造函数,或需要SparkConf参数的构造函数。 |
2.3.0 |
spark.sql.sources.disabledJdbcConnProviderList |
配置一个被禁用的JDBC连接提供者列表。列表包含以逗号分隔的JDBC连接提供者的名称。 |
3.1.0 | |
spark.sql.streaming.streamingQueryListeners |
(无) | 实现StreamingQueryListener接口的类名列表,这些类将自动添加到新创建的会话中。这些类应具有无参数构造函数,或需要SparkConf参数的构造函数。 |
2.4.0 |
spark.sql.streaming.ui.enabled |
true | 当Spark Web UI启用时,是否为Spark应用程序运行结构化流式Web UI。 |
3.0.0 |
spark.sql.streaming.ui.retainedProgressUpdates |
100 | 为结构化流式UI保留的流式查询进度更新数。 |
3.0.0 |
spark.sql.streaming.ui.retainedQueries |
100 | 为结构化流式UI保留的非活动查询数。 |
3.0.0 |
spark.sql.ui.retainedExecutions |
1000 | Spark UI中保留的执行次数。 |
1.5.0 |
spark.sql.warehouse.dir |
($PWD/spark-warehouse 的值) |
受管数据库和表的默认位置。 |
2.0.0 |
Spark Streaming
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.streaming.backpressure.enabled |
false | 启用或禁用Spark Streaming的内部背压机制(自1.5版起)。这使得Spark Streaming能够根据当前的批处理调度延迟和处理时间来控制接收速率,从而使系统以其处理速度接收数据。在内部,这动态设置了接收器的最大接收速率。此速率受spark.streaming.receiver.maxRate 和spark.streaming.kafka.maxRatePerPartition (如果设置)的值上限限制(见下文)。 |
1.5.0 |
spark.streaming.backpressure.initialRate |
未设置 | 当启用背压机制时,这是每个接收器在第一批数据中接收数据的初始最大接收速率。 | 2.0.0 |
spark.streaming.blockInterval |
200ms | Spark Streaming接收器接收到的数据在存储到Spark之前分块的时间间隔。建议最小值为50毫秒。有关更多详细信息,请参阅Spark Streaming编程指南中的并行度(数据接收)部分。 | 0.8.0 |
spark.streaming.receiver.maxRate |
未设置 | 每个接收器接收数据的最大速率(每秒记录数)。实际上,每个流每秒最多消耗此数量的记录。将此配置设置为0或负数将不对速率施加限制。有关更多详细信息,请参阅Spark Streaming编程指南中的部署指南。 | 1.0.2 |
spark.streaming.receiver.writeAheadLog.enable |
false | 为接收器启用预写日志。所有通过接收器接收到的输入数据都将保存到预写日志中,以便在驱动程序故障后恢复。有关更多详细信息,请参阅Spark Streaming编程指南中的部署指南。 | 1.2.1 |
spark.streaming.unpersist |
true | 强制Spark Streaming生成并持久化的RDD自动从Spark内存中解除持久化。Spark Streaming接收到的原始输入数据也会自动清除。将此设置为false将允许原始数据和持久化RDD在流式应用程序外部可访问,因为它们不会自动清除。但这会以Spark内存使用量增加为代价。 | 0.9.0 |
spark.streaming.stopGracefullyOnShutdown |
false | 如果为true ,Spark会在JVM关闭时优雅地关闭StreamingContext ,而不是立即关闭。 |
1.4.0 |
spark.streaming.kafka.maxRatePerPartition |
未设置 | 使用新的Kafka direct stream API时,从每个Kafka分区读取数据的最大速率(每秒记录数)。有关更多详细信息,请参阅Kafka集成指南。 | 1.3.0 |
spark.streaming.kafka.minRatePerPartition |
1 | 使用新的Kafka direct stream API时,从每个Kafka分区读取数据的最小速率(每秒记录数)。 | 2.4.0 |
spark.streaming.ui.retainedBatches |
1000 | Spark Streaming UI和状态API在进行垃圾回收之前记住的批次数量。 | 1.0.0 |
spark.streaming.driver.writeAheadLog.closeFileAfterWrite |
false | 在驱动程序上写入预写日志记录后是否关闭文件。当您想将S3(或任何不支持刷新的文件系统)用于驱动程序上的元数据WAL时,请将其设置为“true”。 | 1.6.0 |
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite |
false | 在接收器上写入预写日志记录后是否关闭文件。当您想将S3(或任何不支持刷新的文件系统)用于接收器上的数据WAL时,请将其设置为“true”。 | 1.6.0 |
SparkR (已废弃)
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.r.numRBackendThreads |
2 | RBackend用于处理SparkR包RPC调用的线程数。 | 1.4.0 |
spark.r.command |
Rscript | 在集群模式下用于在驱动程序和工作节点上执行R脚本的可执行文件。 | 1.5.3 |
spark.r.driver.command |
spark.r.command | 在客户端模式下用于在驱动程序上执行R脚本的可执行文件。在集群模式下被忽略。 | 1.5.3 |
spark.r.shell.command |
R | 在客户端模式下用于在驱动程序上执行sparkR shell的可执行文件。在集群模式下被忽略。它与环境变量SPARKR_DRIVER_R 相同,但优先级高于它。spark.r.shell.command 用于sparkR shell,而spark.r.driver.command 用于运行R脚本。 |
2.1.0 |
spark.r.backendConnectionTimeout |
6000 | R进程与其到RBackend连接的连接超时时间(秒)。 | 2.1.0 |
spark.r.heartBeatInterval |
100 | SparkR后端向R进程发送心跳的时间间隔,以防止连接超时。 | 2.1.0 |
GraphX
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.graphx.pregel.checkpointInterval |
-1 | Pregel中图和消息的检查点间隔。它用于避免由于大量迭代后长的血缘链导致的stackOverflowError。检查点默认禁用。 | 2.2.0 |
集群管理器
Spark中的每个集群管理器都有额外的配置选项。配置可以在每个模式的页面上找到。
YARN
Kubernetes
Standalone 模式
环境变量
某些Spark设置可以通过环境变量配置,这些变量从Spark安装目录中的conf/spark-env.sh
脚本(Windows上是conf/spark-env.cmd
)读取。在独立模式下,此文件可以提供机器特定信息,例如主机名。在运行本地Spark应用程序或提交脚本时,也会 sourcing 此文件。
请注意,Spark安装时默认不存在conf/spark-env.sh
。但是,您可以复制conf/spark-env.sh.template
来创建它。请确保将复制的文件设置为可执行。
以下变量可在spark-env.sh
中设置
环境变量 | 含义 |
---|---|
JAVA_HOME |
Java安装位置(如果不在您的默认PATH 中)。 |
PYSPARK_PYTHON |
PySpark在驱动程序和工作节点中使用的Python可执行二进制文件(如果可用,默认为python3 ,否则为python )。如果设置了属性spark.pyspark.python ,则其优先级更高。 |
PYSPARK_DRIVER_PYTHON |
PySpark仅在驱动程序中使用的Python可执行二进制文件(默认为PYSPARK_PYTHON )。如果设置了属性spark.pyspark.driver.python ,则其优先级更高。 |
SPARKR_DRIVER_R |
SparkR shell使用的R可执行二进制文件(默认为R )。如果设置了属性spark.r.shell.command ,则其优先级更高。 |
SPARK_LOCAL_IP |
要绑定的机器的IP地址。 |
SPARK_PUBLIC_DNS |
您的Spark程序将向其他机器通告的主机名。 |
除上述之外,还有用于设置Spark独立集群脚本的选项,例如每台机器使用的核心数和最大内存。
由于spark-env.sh
是一个shell脚本,其中一些可以以编程方式设置——例如,您可以通过查找特定网络接口的IP来计算SPARK_LOCAL_IP
。
注意:在cluster
模式下在YARN上运行Spark时,环境变量需要使用conf/spark-defaults.conf
文件中的spark.yarn.appMasterEnv.[EnvironmentVariableName]
属性来设置。在cluster
模式下,spark-env.sh
中设置的环境变量将不会反映在YARN Application Master进程中。有关更多信息,请参阅YARN相关Spark属性。
配置日志记录
Spark使用log4j进行日志记录。您可以通过在conf
目录中添加log4j2.properties
文件来配置它。要开始使用,请复制提供的模板之一:log4j2.properties.template
(用于纯文本日志记录)或log4j2-json-layout.properties.template
(用于结构化日志记录)。
纯文本日志记录
默认的日志记录格式是纯文本,使用Log4j的Pattern Layout。
默认情况下,纯文本日志不包含MDC(映射诊断上下文)信息。要包含它,请更新log4j2.properties
文件中的PatternLayout
配置。例如,添加%X{task_name}
以在日志中包含任务名称。此外,使用spark.sparkContext.setLocalProperty("key", "value")
将自定义数据添加到MDC。
结构化日志记录
从版本4.0.0开始,spark-submit
支持使用JSON Template Layout进行可选的结构化日志记录。此格式允许使用JSON数据源高效查询日志,并包含所有MDC信息,以提高可搜索性和调试能力。
要启用结构化日志记录并包含MDC信息,请将配置spark.log.structuredLogging.enabled
设置为true
(默认为false
)。如需额外自定义,请将log4j2-json-layout.properties.template
复制到conf/log4j2.properties
并根据需要进行调整。
使用 Spark SQL 查询结构化日志
要以JSON格式查询结构化日志,请使用以下代码片段:
Python
from pyspark.logger import SPARK_LOG_SCHEMA
logDf = spark.read.schema(SPARK_LOG_SCHEMA).json("path/to/logs")
Scala
import org.apache.spark.util.LogUtils.SPARK_LOG_SCHEMA
val logDf = spark.read.schema(SPARK_LOG_SCHEMA).json("path/to/logs")
注意:如果您使用交互式shell(pyspark shell或spark-shell),则可以省略代码中的导入语句,因为SPARK_LOG_SCHEMA已在shell上下文中可用。
覆盖配置目录
要指定不同于默认“SPARK_HOME/conf”的配置目录,您可以设置SPARK_CONF_DIR。Spark将使用此目录中的配置文件(spark-defaults.conf, spark-env.sh, log4j2.properties等)。
继承 Hadoop 集群配置
如果您计划使用Spark从HDFS读取和写入数据,则应将以下两个Hadoop配置文件包含在Spark的类路径中:
hdfs-site.xml
,提供HDFS客户端的默认行为。core-site.xml
,设置默认文件系统名称。
这些配置文件的位置在不同Hadoop版本中有所不同,但常见位置是/etc/hadoop/conf
内部。一些工具会动态创建配置,但提供下载副本的机制。
要使这些文件对Spark可见,请在$SPARK_HOME/conf/spark-env.sh
中将HADOOP_CONF_DIR
设置为包含配置文件的位置。
自定义 Hadoop/Hive 配置
如果您的Spark应用程序与Hadoop、Hive或两者都交互,则Spark的类路径中可能存在Hadoop/Hive配置文件。
多个正在运行的应用程序可能需要不同的Hadoop/Hive客户端配置。您可以为每个应用程序复制和修改Spark类路径中的hdfs-site.xml
、core-site.xml
、yarn-site.xml
、hive-site.xml
。在YARN上运行的Spark集群中,这些配置文件是集群范围设置的,应用程序无法安全地更改。
更好的选择是使用spark.hadoop.*
形式的spark hadoop属性,并使用spark.hive.*
形式的spark hive属性。例如,添加配置“spark.hadoop.abc.def=xyz”表示添加hadoop属性“abc.def=xyz”,添加配置“spark.hive.abc=xyz”表示添加hive属性“hive.abc=xyz”。它们可以被视为与普通spark属性相同,可以在$SPARK_HOME/conf/spark-defaults.conf
中设置。
在某些情况下,您可能希望避免在SparkConf
中硬编码某些配置。例如,Spark允许您简单地创建一个空配置并设置spark/spark hadoop/spark hive属性。
val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)
此外,您可以在运行时修改或添加配置。
./bin/spark-submit \
--name "My app" \
--master "local[4]" \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
--conf spark.hive.abc=xyz
myApp.jar
自定义资源调度和配置概述
GPU和其他加速器已广泛用于加速特殊工作负载,例如深度学习和信号处理。Spark现在支持请求和调度通用资源,例如GPU,但有一些注意事项。当前实现要求资源具有可由调度程序分配的地址。它要求您的集群管理器支持并正确配置资源。
有可用的配置用于请求驱动程序资源:spark.driver.resource.{resourceName}.amount
,请求执行器资源:spark.executor.resource.{resourceName}.amount
,并指定每个任务的要求:spark.task.resource.{resourceName}.amount
。在YARN、Kubernetes和Spark Standalone上的客户端驱动程序中,需要spark.driver.resource.{resourceName}.discoveryScript
配置。对于YARN和Kubernetes,需要spark.executor.resource.{resourceName}.discoveryScript
配置。Kubernetes还需要spark.driver.resource.{resourceName}.vendor
和/或spark.executor.resource.{resourceName}.vendor
。有关每个配置的更多信息,请参阅上面的配置说明。
Spark将使用指定的配置首先从集群管理器请求具有相应资源的容器。一旦获得容器,Spark将在该容器中启动一个执行器,该执行器将发现容器拥有的资源以及与每个资源关联的地址。执行器将向驱动程序注册并报告可供该执行器使用的资源。然后,Spark调度器可以根据用户指定的资源需求将任务调度到每个执行器并分配特定的资源地址。用户可以使用TaskContext.get().resources
API查看分配给任务的资源。在驱动程序上,用户可以使用SparkContext的resources
调用查看分配的资源。然后由用户使用分配的地址进行他们想要的处理或将这些地址传递给他们正在使用的ML/AI框架。
有关YARN、Kubernetes和Standalone模式的要求和详细信息,请参阅您的集群管理器特定页面:YARN、Kubernetes和Standalone模式。目前,本地模式不支持此功能。另请注意,不支持具有多个工作节点的local-cluster模式(请参阅Standalone文档)。
阶段级别调度概述
阶段级调度功能允许用户在阶段级别指定任务和执行器资源需求。这允许不同阶段使用具有不同资源的执行器运行。一个典型的例子是,一个ETL阶段仅使用CPU执行器运行,而下一个阶段是需要GPU的ML阶段。阶段级调度允许用户在ML阶段运行时请求具有GPU的不同执行器,而不是在应用程序启动时就获取具有GPU的执行器,然后在ETL阶段运行时它们处于空闲状态。此功能仅适用于Scala、Java和Python中的RDD API。当启用动态分配时,它在YARN、Kubernetes和Standalone上可用。当禁用动态分配时,它允许用户在阶段级别指定不同的任务资源需求,并且目前在YARN、Kubernetes和Standalone集群上支持此功能。有关更多实现细节,请参阅YARN页面或Kubernetes页面或Standalone页面。
请参阅RDD.withResources
和ResourceProfileBuilder
API以使用此功能。当动态分配被禁用时,具有不同任务资源需求的任务将与DEFAULT_RESOURCE_PROFILE
共享执行器。而当动态分配被启用时,当前实现为每个创建的ResourceProfile
获取新的执行器,并且目前必须完全匹配。Spark不会尝试将任务适配到与执行器创建时具有不同ResourceProfile的执行器中。未使用的执行器将使用动态分配逻辑进行空闲超时。此功能的默认配置是每个阶段只允许一个ResourceProfile。如果用户将超过1个ResourceProfile关联到RDD,Spark默认会抛出异常。请参阅配置spark.scheduler.resource.profileMergeConflicts
来控制此行为。当spark.scheduler.resource.profileMergeConflicts
启用时,Spark实现的当前合并策略是冲突ResourceProfiles中每个资源的最大值。Spark将创建一个包含每个资源最大值的新ResourceProfile。
基于推送的 shuffle 概述
基于推送的混洗有助于提高Spark混洗的可靠性和性能。它采取尽力而为的方法,将map任务生成的混洗块推送到远程外部混洗服务,以便按混洗分区进行合并。Reduce任务将合并后的混洗分区和原始混洗块的组合作为其输入数据,从而将外部混洗服务的小随机磁盘读取转换为大顺序读取。Reduce任务更好地数据局部性的可能性进一步有助于最小化网络I/O。在某些情况下,例如合并输出可用时的分区合并,基于推送的混洗优先于批处理获取。
基于推送的混洗改善了长时间运行作业/查询的性能,这些作业/查询在混洗过程中涉及大量磁盘I/O。目前它不太适合快速运行且处理较少混洗数据的作业/查询。这将在未来的版本中进一步改进。
目前,基于推送的混洗仅支持带外部混洗服务的Spark on YARN。
外部 Shuffle 服务(服务器)端配置选项
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.shuffle.push.server.mergedShuffleFileManagerImpl |
org.apache.spark.network.shuffle.
|
实现MergedShuffleFileManager 的类名,用于管理基于推送的混洗。这作为服务器端配置,用于禁用或启用基于推送的混洗。默认情况下,基于推送的混洗在服务器端是禁用的。要在服务器端启用基于推送的混洗,请将此配置设置为 |
3.2.0 |
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile |
2m |
在基于推送的混洗过程中,将合并的混洗文件分割成多个块时,每个块的最小大小。合并的混洗文件由多个小的混洗块组成。在单个磁盘I/O中获取完整的合并混洗文件会增加客户端和外部混洗服务的内存需求。相反,外部混洗服务以 设置过高会增加客户端和外部混洗服务的内存需求。 设置过低会不必要地增加对外部混洗服务的RPC请求总数。 |
3.2.0 |
spark.shuffle.push.server.mergedIndexCacheSize |
100m |
内存中可用于存储合并索引文件的基于推送的混洗缓存的最大大小。此缓存是spark.shuffle.service.index.cache.size 配置的缓存之外的附加缓存。 |
3.2.0 |
客户端配置选项
属性名称 | 默认值 | 含义 | 始于版本 |
---|---|---|---|
spark.shuffle.push.enabled |
false |
设置为true可在客户端启用基于推送的混洗,并与服务器端标志spark.shuffle.push.server.mergedShuffleFileManagerImpl 协同工作。 |
3.2.0 |
spark.shuffle.push.finalize.timeout |
10 秒 |
在给定混洗map阶段的所有mapper完成后,驱动程序等待(秒)发送合并最终请求到远程外部混洗服务的时间。这为外部混洗服务提供了额外的时间来合并块。设置过长可能导致性能下降。 | 3.2.0 |
spark.shuffle.push.maxRetainedMergerLocations |
500 |
基于推送的混洗缓存的最大合并器位置数。目前,合并器位置是负责处理推送块、合并它们并为后续混洗获取提供合并块的外部混洗服务的主机。 | 3.2.0 |
spark.shuffle.push.mergersMinThresholdRatio |
0.05 |
用于根据reducer阶段的分区数计算阶段所需的最小混洗合并器位置数的比率。例如,一个有100个分区且使用默认值0.05的reduce阶段,需要至少5个唯一的合并器位置才能启用基于推送的混洗。 | 3.2.0 |
spark.shuffle.push.mergersMinStaticThreshold |
5 |
静态阈值,表示需要可用的混洗推送合并器位置数,才能为某个阶段启用基于推送的混洗。请注意,此配置与spark.shuffle.push.mergersMinThresholdRatio 协同工作。需要最大值中的spark.shuffle.push.mergersMinStaticThreshold 和spark.shuffle.push.mergersMinThresholdRatio 比率的合并器数量才能为某个阶段启用基于推送的混洗。例如:对于子阶段的1000个分区,如果spark.shuffle.push.mergersMinStaticThreshold为5,spark.shuffle.push.mergersMinThresholdRatio设置为0.05,则需要至少50个合并器才能为该阶段启用基于推送的混洗。 |
3.2.0 |
spark.shuffle.push.numPushThreads |
(无) | 指定块推送器池中的线程数。这些线程协助创建连接并将块推送到远程外部混洗服务。默认情况下,线程池大小等于Spark执行器核心数。 | 3.2.0 |
spark.shuffle.push.maxBlockSizeToPush |
1m |
单个块推送到远程外部混洗服务的最大大小。大于此阈值的块不会被推送到远程合并。这些混洗块将以原始方式获取。 设置过高会导致更多块被推送到远程外部混洗服务,但这些块已经通过现有机制高效获取,从而增加了将大块推送到远程外部混洗服务的额外开销。建议将 设置过低会导致更少数量的块被合并并直接从mapper外部混洗服务获取,从而导致更多的小随机读取,影响整体磁盘I/O性能。 |
3.2.0 |
spark.shuffle.push.maxBlockBatchSize |
3m |
要分组为单个推送请求的混洗块批处理的最大大小。默认设置为3m ,以便略高于spark.storage.memoryMapThreshold 的默认值2m ,因为每个批处理块很可能进行内存映射,这会带来更高的开销。 |
3.2.0 |
spark.shuffle.push.merge.finalizeThreads |
8 | 驱动程序用于最终确定混洗合并的线程数。由于大型混洗可能需要几秒钟才能最终确定,因此在启用基于推送的混洗时,拥有多个线程有助于驱动程序处理并发混洗合并最终确定请求。 | 3.3.0 |
spark.shuffle.push.minShuffleSizeToWait |
500m |
只有当混洗数据总大小超过此阈值时,驱动程序才会等待合并完成。如果混洗总大小较小,驱动程序将立即完成混洗输出。 | 3.3.0 |
spark.shuffle.push.minCompletedPushRatio |
1.0 |
在基于推送的混洗期间,驱动程序开始混洗合并最终确定之前,需要完成推送的最小map分区比例。 | 3.3.0 |