Spark 配置
- Spark 属性
- 环境变量
- 配置日志
- 覆盖配置目录
- 继承 Hadoop 集群配置
- 自定义 Hadoop/Hive 配置
- 自定义资源调度和配置概述
- Stage 级别调度概述
- 基于推送的 Shuffle 概述
Spark 提供了三个配置系统的位置
- Spark 属性控制大多数应用程序参数,可以通过使用 SparkConf 对象或通过 Java 系统属性进行设置。
- 环境变量可用于设置每台机器的设置,例如 IP 地址,通过每个节点上的
conf/spark-env.sh
脚本。 - 日志记录可以通过
log4j2.properties
进行配置。
Spark 属性
Spark 属性控制着大多数应用程序设置,并为每个应用程序单独配置。这些属性可以直接在传递给您的 SparkContext
的 SparkConf 上设置。SparkConf
允许您配置一些常用属性(例如,master URL 和应用程序名称),以及通过 set()
方法配置任意键值对。例如,我们可以初始化一个具有两个线程的应用程序,如下所示:
请注意,我们使用 local[2] 运行,这意味着两个线程 - 代表“最小”并行性,这有助于检测仅在分布式上下文中运行时存在的错误。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
请注意,我们可以在 local 模式下拥有超过 1 个线程,并且在像 Spark Streaming 这样的情况下,我们实际上可能需要超过 1 个线程来防止任何类型的饥饿问题。
指定时间持续时间的属性应配置时间单位。接受以下格式:
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
指定字节大小的属性应配置大小单位。接受以下格式:
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
虽然没有单位的数字通常被解释为字节,但少数数字被解释为 KiB 或 MiB。 请参阅各个配置属性的文档。 尽可能指定单位是可取的。
动态加载 Spark 属性
在某些情况下,您可能希望避免在 SparkConf
中硬编码某些配置。 例如,如果您想使用不同的 master 或不同的内存量运行相同的应用程序。 Spark 允许您简单地创建一个空的 conf
val sc = new SparkContext(new SparkConf())
然后,您可以在运行时提供配置值
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Spark shell 和 spark-submit
工具支持两种动态加载配置的方法。 第一种是命令行选项,例如 --master
,如上所示。 spark-submit
可以使用 --conf/-c
标志接受任何 Spark 属性,但对于在启动 Spark 应用程序中起作用的属性,则使用特殊标志。 运行 ./bin/spark-submit --help
将显示这些选项的完整列表。
bin/spark-submit
也会从 conf/spark-defaults.conf
读取配置选项,其中每行由空格分隔的键和值组成。 例如
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
指定为标志或属性文件中的任何值都将传递给应用程序,并与通过 SparkConf 指定的那些值合并。 直接在 SparkConf 上设置的属性具有最高优先级,然后是传递给 spark-submit
或 spark-shell
的标志,然后是 spark-defaults.conf
文件中的选项。 自早期版本的 Spark 以来,一些配置键已重命名; 在这种情况下,仍然接受较旧的键名称,但优先级低于任何较新键的实例。
Spark 属性主要可以分为两类:一类与部署相关,例如“spark.driver.memory”,“spark.executor.instances”,当在运行时通过 SparkConf
以编程方式设置时,此类属性可能不受影响,或者该行为取决于您选择的集群管理器和部署模式,因此建议通过配置文件或 spark-submit
命令行选项进行设置;另一种主要与 Spark 运行时控制相关,例如“spark.task.maxFailures”,此类属性可以通过任一方式进行设置。
查看 Spark 属性
位于 http://<driver>:4040
的应用程序 Web UI 在“Environment”选项卡中列出了 Spark 属性。 这是一个有用的地方,可以检查以确保您的属性已正确设置。 请注意,只会显示通过 spark-defaults.conf
、SparkConf
或命令行显式指定的值。 对于所有其他配置属性,您可以假设使用默认值。
可用属性
大多数控制内部设置的属性都有合理的默认值。 一些最常见的设置选项是:
应用程序属性
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.app.name |
(无) | 您的应用程序的名称。 这将出现在 UI 和日志数据中。 | 0.9.0 |
spark.driver.cores |
1 | 驱动程序进程要使用的内核数,仅在集群模式下。 | 1.3.0 |
spark.driver.maxResultSize |
1g | 每个 Spark 操作(例如 collect)的所有分区序列化结果总大小的限制(以字节为单位)。 至少应为 1M,或者 0 表示无限制。 如果总大小超过此限制,作业将被中止。 具有较高限制可能会导致驱动程序中出现内存不足错误(取决于 spark.driver.memory 和 JVM 中对象的内存开销)。 设置适当的限制可以保护驱动程序免受内存不足错误的影响。 | 1.2.0 |
spark.driver.memory |
1g | 用于驱动程序进程的内存量,即初始化 SparkContext 的位置,格式与 JVM 内存字符串相同,带有大小单位后缀(“k”、“m”、“g”或“t”)(例如 512m 、2g )。注意:在客户端模式下,此配置不得直接通过应用程序中的 SparkConf 设置,因为驱动程序 JVM 已经在此时启动。 相反,请通过 --driver-memory 命令行选项或在您的默认属性文件中设置此值。 |
1.1.1 |
spark.driver.memoryOverhead |
driverMemory * spark.driver.memoryOverheadFactor ,最小值为 384 |
每个驱动程序进程在集群模式下分配的非堆内存量,以 MiB 为单位,除非另有说明。 这是用于 VM 开销、字符串驻留、其他本机开销等事物的内存。 这往往会随着容器大小的增加而增长(通常为 6-10%)。 此选项当前在 YARN、Mesos 和 Kubernetes 上受支持。 注意:非堆内存包括堆外内存(当 spark.memory.offHeap.enabled=true 时)和其他驱动程序进程使用的内存(例如,与 PySpark 驱动程序一起使用的 python 进程)以及同一容器中运行的其他非驱动程序进程使用的内存。 运行驱动程序的容器的最大内存大小由 spark.driver.memoryOverhead 和 spark.driver.memory 的总和确定。 |
2.3.0 |
spark.driver.memoryOverheadFactor |
0.10 | 要分配为每个驱动程序进程在集群模式下额外非堆内存的驱动程序内存的分数。 这是用于 VM 开销、字符串驻留、其他本机开销等事物的内存。 这往往会随着容器大小的增加而增长。 此值默认为 0.10,Kubernetes 非 JVM 作业除外,默认为 0.40。 这样做是因为非 JVM 任务需要更多的非 JVM 堆空间,并且此类任务通常会因“超出内存开销”错误而失败。 这会通过更高的默认值来抢先解决此错误。 如果直接设置了 spark.driver.memoryOverhead ,则忽略此值。 |
3.3.0 |
spark.driver.resource.{resourceName}.amount |
0 | 要在驱动程序上使用的特定资源类型的数量。 如果使用此选项,您还必须指定 spark.driver.resource.{resourceName}.discoveryScript ,以便驱动程序在启动时找到该资源。 |
3.0.0 |
spark.driver.resource.{resourceName}.discoveryScript |
无 | 驱动程序运行以发现特定资源类型的脚本。此脚本应将 ResourceInformation 类的 JSON 字符串格式写入 STDOUT。该类具有名称和地址数组。对于客户端提交的驱动程序,发现脚本必须为此驱动程序分配与其他主机上的驱动程序不同的资源地址。 | 3.0.0 |
spark.driver.resource.{resourceName}.vendor |
无 | 用于驱动程序的资源的供应商。此选项当前仅在 Kubernetes 上受支持,并且实际上是遵循 Kubernetes 设备插件命名约定的供应商和域。(例如,对于 Kubernetes 上的 GPU,此配置将设置为 nvidia.com 或 amd.com) | 3.0.0 |
spark.resources.discoveryPlugin |
org.apache.spark.resource.ResourceDiscoveryScriptPlugin | 实现 org.apache.spark.api.resource.ResourceDiscoveryPlugin 的类名称的逗号分隔列表,用于加载到应用程序中。这适用于高级用户,他们可以用自定义实现替换资源发现类。Spark 将尝试指定的每个类,直到其中一个类返回该资源的资源信息。如果没有任何插件返回该资源的信息,它将最后尝试发现脚本。 | 3.0.0 |
spark.executor.memory |
1g | 每个 executor 进程使用的内存量,格式与 JVM 内存字符串相同,带有大小单位后缀(“k”、“m”、“g”或“t”)(例如512m 、2g )。 |
0.7.0 |
spark.executor.pyspark.memory |
未设置 | 分配给每个 executor 中 PySpark 的内存量,单位为 MiB,除非另有说明。如果设置,executor 的 PySpark 内存将被限制为此数量。如果未设置,Spark 将不限制 Python 的内存使用,应用程序有责任避免超过与其他非 JVM 进程共享的开销内存空间。当 PySpark 在 YARN 或 Kubernetes 中运行时,此内存会添加到 executor 资源请求中。 注意:此功能依赖于 Python 的 `resource` 模块;因此,行为和限制是被继承的。例如,Windows 不支持资源限制,并且在 MacOS 上实际资源不受限制。 |
2.4.0 |
spark.executor.memoryOverhead |
executorMemory * spark.executor.memoryOverheadFactor ,最小值为 384 |
每个 executor 进程分配的额外内存量,单位为 MiB,除非另有说明。这是用于 VM 开销、字符串驻留、其他本机开销等的内存。它通常随着 executor 大小的增加而增加(通常为 6-10%)。此选项当前在 YARN 和 Kubernetes 上受支持。 注意:额外的内存包括 PySpark executor 内存(当未配置 spark.executor.pyspark.memory 时)以及运行在同一容器中的其他非 executor 进程使用的内存。运行 executor 的容器的最大内存大小由 spark.executor.memoryOverhead 、spark.executor.memory 、spark.memory.offHeap.size 和 spark.executor.pyspark.memory 的总和决定。 |
2.3.0 |
spark.executor.memoryOverheadFactor |
0.10 | 分配为每个 executor 进程的额外非堆内存的 executor 内存的分数。这是用于 VM 开销、字符串驻留、其他本机开销等的内存。它通常随着容器大小的增加而增加。此值默认为 0.10,但 Kubernetes 非 JVM 作业除外,后者默认为 0.40。这是因为非 JVM 任务需要更多的非 JVM 堆空间,并且此类任务通常因“超出内存开销”错误而失败。这使用更高的默认值抢占此错误。如果直接设置了 spark.executor.memoryOverhead ,则会忽略此值。 |
3.3.0 |
spark.executor.resource.{resourceName}.amount |
0 | 每个 executor 进程要使用的特定资源类型的数量。如果使用此选项,还必须指定 spark.executor.resource.{resourceName}.discoveryScript ,以便 executor 在启动时找到资源。 |
3.0.0 |
spark.executor.resource.{resourceName}.discoveryScript |
无 | executor 运行以发现特定资源类型的脚本。此脚本应将 ResourceInformation 类的 JSON 字符串格式写入 STDOUT。该类具有名称和地址数组。 | 3.0.0 |
spark.executor.resource.{resourceName}.vendor |
无 | 用于 executors 的资源的供应商。此选项当前仅在 Kubernetes 上受支持,并且实际上是遵循 Kubernetes 设备插件命名约定的供应商和域。(例如,对于 Kubernetes 上的 GPU,此配置将设置为 nvidia.com 或 amd.com) | 3.0.0 |
spark.extraListeners |
(无) | 实现 SparkListener 的类的逗号分隔列表;在初始化 SparkContext 时,将创建这些类的实例并将其注册到 Spark 的监听器总线。如果一个类有一个接受 SparkConf 的单参数构造函数,将调用该构造函数;否则,将调用零参数构造函数。如果找不到有效的构造函数,则 SparkContext 创建将失败并出现异常。 |
1.3.0 |
spark.local.dir |
/tmp | 用于 Spark 中“暂存”空间的目录,包括 map 输出文件和存储在磁盘上的 RDD。这应该位于您系统中的快速本地磁盘上。它也可以是不同磁盘上的多个目录的逗号分隔列表。 注意: 这将被集群管理器设置的 SPARK_LOCAL_DIRS(独立)、MESOS_SANDBOX(Mesos)或 LOCAL_DIRS(YARN)环境变量覆盖。 |
0.5.0 |
spark.logConf |
false | 在启动 SparkContext 时,将有效的 SparkConf 作为 INFO 记录。 | 0.9.0 |
spark.master |
(无) | 要连接的集群管理器。请参阅 允许的主 URL 列表。 | 0.9.0 |
spark.submit.deployMode |
client | Spark 驱动程序程序的部署模式,可以是“client”或“cluster”,这意味着在本地(“client”)或远程(“cluster”)在集群中的一个节点上启动驱动程序程序。 | 1.5.0 |
spark.log.callerContext |
(无) | 在 Yarn/HDFS 上运行时,将写入 Yarn RM 日志/HDFS 审计日志的应用程序信息。其长度取决于 Hadoop 配置 hadoop.caller.context.max.size 。它应该简洁,通常最多可以有 50 个字符。 |
2.2.0 |
spark.log.level |
(无) | 设置后,将覆盖任何用户定义的日志设置,就像在 Spark 启动时调用 SparkContext.setLogLevel() 一样。有效的日志级别包括:“ALL”、“DEBUG”、“ERROR”、“FATAL”、“INFO”、“OFF”、“TRACE”、“WARN”。 |
3.5.0 |
spark.driver.supervise |
false | 如果为 true,则在驱动程序以非零退出状态失败时自动重新启动驱动程序。仅在 Spark 独立模式或 Mesos 集群部署模式下有效。 | 1.3.0 |
spark.driver.log.dfsDir |
(无) | 如果 spark.driver.log.persistToDfs.enabled 为 true,则 Spark 驱动程序日志同步的基本目录。在此基本目录中,每个应用程序都将驱动程序日志记录到特定于应用程序的文件中。用户可能希望将其设置为统一位置(如 HDFS 目录),以便可以持久保存驱动程序日志文件以供以后使用。此目录应允许任何 Spark 用户读取/写入文件,并允许 Spark History Server 用户删除文件。此外,如果 spark.history.fs.driverlog.cleaner.enabled 为 true,并且它们早于通过设置 spark.history.fs.driverlog.cleaner.maxAge 配置的最大期限,则此目录中的较旧日志将由 Spark History Server 清理。 |
3.0.0 |
spark.driver.log.persistToDfs.enabled |
false | 如果为 true,则以客户端模式运行的 spark 应用程序会将驱动程序日志写入持久存储,该持久存储在 spark.driver.log.dfsDir 中配置。如果未配置 spark.driver.log.dfsDir ,则驱动程序日志将不会持久保存。此外,通过在 Spark History Server 中将 spark.history.fs.driverlog.cleaner.enabled 设置为 true 来启用清理器。 |
3.0.0 |
spark.driver.log.layout |
%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex | 同步到 spark.driver.log.dfsDir 的驱动程序日志的布局。如果未配置此项,它将使用 log4j2.properties 中定义的第一个 appender 的布局。如果也未配置,则驱动程序日志使用默认布局。 |
3.0.0 |
spark.driver.log.allowErasureCoding |
false | 是否允许驱动程序日志使用擦除编码。在 HDFS 上,擦除编码文件不会像常规复制文件那样快速更新,因此它们可能需要更长的时间才能反映应用程序写入的更改。请注意,即使此项为 true,Spark 仍然不会强制文件使用擦除编码,它只会使用文件系统默认值。 | 3.0.0 |
spark.decommission.enabled |
false | 当启用停用时,Spark 将尽力优雅地关闭 executor。当 spark.storage.decommission.enabled 启用时,Spark 将尝试将所有 RDD 块(由 spark.storage.decommission.rddBlocks.enabled 控制)和 shuffle 块(由 spark.storage.decommission.shuffleBlocks.enabled 控制)从停用的 executor 迁移到远程 executor。启用停用后,当启用 spark.dynamicAllocation.enabled 时,Spark 也会停用 executor 而不是杀死它。 |
3.1.0 |
spark.executor.decommission.killInterval |
(无) | 停用的 executor 被外部(例如非 spark)服务强制杀死的时间间隔。 | 3.1.0 |
spark.executor.decommission.forceKillTimeout |
(无) | Spark 强制停用 executor 退出的持续时间。在大多数情况下,应将此值设置为较高值,因为低值会阻止块迁移有足够的时间完成。 | 3.2.0 |
spark.executor.decommission.signal |
PWR | 用于触发 executor 开始停用的信号。 | 3.2.0 |
spark.executor.maxNumFailures |
numExecutors * 2,最小值为 3 | 在应用程序失败之前的 executor 故障的最大数量。此配置仅在 YARN 上或 Kubernetes 上 `spark.kubernetes.allocation.pods.allocator` 设置为 'direct' 时生效。 | 3.5.0 |
spark.executor.failuresValidityInterval |
(无) | executor 故障将被视为独立并且不累积到尝试计数中的时间间隔。此配置仅在 YARN 上或 Kubernetes 上 `spark.kubernetes.allocation.pods.allocator` 设置为 'direct' 时生效。 | 3.5.0 |
除此之外,以下属性也可用,并且在某些情况下可能很有用
运行时环境
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.driver.extraClassPath |
(无) | 要添加到驱动程序类路径前面的额外类路径条目。 注意:在客户端模式下,不得直接在应用程序中通过 SparkConf 设置此配置,因为驱动程序 JVM 此时已经启动。请改为通过 --driver-class-path 命令行选项或在默认属性文件中设置此配置。 |
1.0.0 |
spark.driver.defaultJavaOptions |
(无) | 要添加到 spark.driver.extraJavaOptions 前面的默认 JVM 选项字符串。 此选项旨在由管理员设置。 例如,GC 设置或其他日志记录。 请注意,使用此选项设置最大堆大小 (-Xmx) 设置是非法的。 最大堆大小设置可以通过集群模式下的 spark.driver.memory 和客户端模式下的 --driver-memory 命令行选项来设置。注意: 在客户端模式下,不应直接在应用程序中通过 SparkConf 设置此配置,因为驱动程序 JVM 此时已经启动。 请改为通过 --driver-java-options 命令行选项或在默认属性文件中进行设置。 |
3.0.0 |
spark.driver.extraJavaOptions |
(无) | 要传递给驱动程序的额外 JVM 选项字符串。 此选项旨在由用户设置。 例如,GC 设置或其他日志记录。 请注意,使用此选项设置最大堆大小 (-Xmx) 设置是非法的。 最大堆大小设置可以通过集群模式下的 spark.driver.memory 和客户端模式下的 --driver-memory 命令行选项来设置。注意: 在客户端模式下,不应直接在应用程序中通过 SparkConf 设置此配置,因为驱动程序 JVM 此时已经启动。 请改为通过 --driver-java-options 命令行选项或在默认属性文件中进行设置。spark.driver.defaultJavaOptions 将被添加到此配置的前面。 |
1.0.0 |
spark.driver.extraLibraryPath |
(无) | 设置一个特殊的库路径,在启动驱动程序 JVM 时使用。 注意: 在客户端模式下,不应直接在应用程序中通过 SparkConf 设置此配置,因为驱动程序 JVM 此时已经启动。 请改为通过 --driver-library-path 命令行选项或在默认属性文件中进行设置。 |
1.0.0 |
spark.driver.userClassPathFirst |
false | (实验性) 在驱动程序中加载类时,是否优先考虑用户添加的 JAR 而不是 Spark 自己的 JAR。 此功能可用于缓解 Spark 的依赖项和用户依赖项之间的冲突。 它目前是一个实验性功能。 仅在集群模式下使用。 | 1.3.0 |
spark.executor.extraClassPath |
(无) | 要添加到 executor 类路径前面的额外类路径条目。 此选项主要用于与旧版本的 Spark 向后兼容。 用户通常不需要设置此选项。 | 1.0.0 |
spark.executor.defaultJavaOptions |
(无) | 要添加到 spark.executor.extraJavaOptions 前面的默认 JVM 选项字符串。 此选项旨在由管理员设置。 例如,GC 设置或其他日志记录。 请注意,使用此选项设置 Spark 属性或最大堆大小 (-Xmx) 设置是非法的。 Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件进行设置。 最大堆大小设置可以使用 spark.executor.memory 进行设置。 如果存在以下符号,将会进行插值: 将被应用程序 ID 替换, 将被 executor ID 替换。 例如,要启用详细的 gc 日志记录到 /tmp 中以 executor ID 命名的文件中,传递一个 'value' 为:-verbose:gc -Xloggc:/tmp/-.gc |
3.0.0 |
spark.executor.extraJavaOptions |
(无) | 要传递给 executor 的额外 JVM 选项字符串。 此选项旨在由用户设置。 例如,GC 设置或其他日志记录。 请注意,使用此选项设置 Spark 属性或最大堆大小 (-Xmx) 设置是非法的。 Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件进行设置。 最大堆大小设置可以使用 spark.executor.memory 进行设置。 如果存在以下符号,将会进行插值: 将被应用程序 ID 替换, 将被 executor ID 替换。 例如,要启用详细的 gc 日志记录到 /tmp 中以 executor ID 命名的文件中,传递一个 'value' 为:-verbose:gc -Xloggc:/tmp/-.gc spark.executor.defaultJavaOptions 将被添加到此配置的前面。 |
1.0.0 |
spark.executor.extraLibraryPath |
(无) | 设置一个特殊的库路径,在启动 executor JVM 时使用。 | 1.0.0 |
spark.executor.logs.rolling.maxRetainedFiles |
-1 | 设置系统将保留的最新滚动日志文件的数量。 较旧的日志文件将被删除。 默认情况下禁用。 | 1.1.0 |
spark.executor.logs.rolling.enableCompression |
false | 启用 executor 日志压缩。 如果启用,则将压缩滚动的 executor 日志。 默认情况下禁用。 | 2.0.2 |
spark.executor.logs.rolling.maxSize |
1024 * 1024 | 设置文件最大大小(以字节为单位),executor 日志将按该大小进行滚动。 默认情况下禁用滚动。 有关自动清除旧日志,请参阅 spark.executor.logs.rolling.maxRetainedFiles 。 |
1.4.0 |
spark.executor.logs.rolling.strategy |
(无) | 设置 executor 日志滚动的策略。 默认情况下禁用。 可以设置为 "time"(基于时间的滚动)或 "size"(基于大小的滚动)。 对于 "time",使用 spark.executor.logs.rolling.time.interval 设置滚动间隔。 对于 "size",使用 spark.executor.logs.rolling.maxSize 设置滚动的最大文件大小。 |
1.1.0 |
spark.executor.logs.rolling.time.interval |
daily | 设置 executor 日志滚动的时间间隔。 默认情况下禁用滚动。 有效值为 daily 、hourly 、minutely 或任何以秒为单位的间隔。 有关自动清除旧日志,请参阅 spark.executor.logs.rolling.maxRetainedFiles 。 |
1.1.0 |
spark.executor.userClassPathFirst |
false | (实验性) 与 spark.driver.userClassPathFirst 相同的功能,但应用于 executor 实例。 |
1.3.0 |
spark.executorEnv.[EnvironmentVariableName] |
(无) | 将 EnvironmentVariableName 指定的环境变量添加到 Executor 进程。 用户可以指定多个这些变量来设置多个环境变量。 |
0.9.0 |
spark.redaction.regex |
(?i)secret|password|token|access[.]key | 用于确定驱动程序和 executor 环境中的哪些 Spark 配置属性和环境变量包含敏感信息的正则表达式。 当此正则表达式与属性键或值匹配时,该值将从环境 UI 和各种日志(如 YARN 和事件日志)中删除。 | 2.1.2 |
spark.redaction.string.regex |
(无) | 用于确定 Spark 生成的字符串的哪些部分包含敏感信息的正则表达式。 当此正则表达式与字符串部分匹配时,该字符串部分将被虚拟值替换。 目前,它用于编辑 SQL explain 命令的输出。 | 2.2.0 |
spark.python.profile |
false | 在 Python worker 中启用分析,分析结果将通过 sc.show_profiles() 显示,或者在驱动程序退出之前显示。 也可以通过 sc.dump_profiles(path) 转储到磁盘。 如果某些分析结果已手动显示,则它们不会在驱动程序退出之前自动显示。 默认情况下,将使用 pyspark.profiler.BasicProfiler ,但可以通过将分析器类作为参数传递给 SparkContext 构造函数来覆盖它。 |
1.2.0 |
spark.python.profile.dump |
(无) | 用于在驱动程序退出之前转储分析结果的目录。 结果将作为每个 RDD 的单独文件转储。 它们可以由 pstats.Stats() 加载。 如果指定了此项,则不会自动显示分析结果。 |
1.2.0 |
spark.python.worker.memory |
512m | 在聚合期间每个 Python worker 进程使用的内存量,格式与 JVM 内存字符串相同,带有大小单位后缀("k"、"m"、"g" 或 "t")(例如 512m 、2g )。 如果聚合期间使用的内存量超过此数量,它会将数据溢出到磁盘。 |
1.1.0 |
spark.python.worker.reuse |
true | 是否重用 Python worker。 如果是,它将使用固定数量的 Python worker,不需要为每个任务 fork() 一个 Python 进程。 如果存在大量的广播,这将非常有用,那么广播不需要为每个任务从 JVM 传输到 Python worker。 | 1.2.0 |
spark.files |
要放置在每个 executor 工作目录中的以逗号分隔的文件列表。 允许使用 Globs。 | 1.0.0 | |
spark.submit.pyFiles |
要放置在 Python 应用程序的 PYTHONPATH 上的以逗号分隔的 .zip、.egg 或 .py 文件列表。 允许使用 Globs。 | 1.0.1 | |
spark.jars |
要包含在驱动程序和 executor 类路径上的以逗号分隔的 JAR 列表。 允许使用 Globs。 | 0.9.0 | |
spark.jars.packages |
要包含在驱动程序和 executor 类路径上的以逗号分隔的 JAR 的 Maven 坐标列表。 坐标应为 groupId:artifactId:version。 如果给出了 spark.jars.ivySettings ,则将根据文件中的配置解析 artifacts,否则将在本地 maven 仓库中搜索 artifacts,然后在 maven central 中搜索,最后在命令行选项 --repositories 给出的任何其他远程仓库中搜索。 有关更多详细信息,请参阅 高级依赖管理。 |
1.5.0 | |
spark.jars.excludes |
以逗号分隔的 groupId:artifactId 列表,用于在解析 spark.jars.packages 中提供的依赖项时排除,以避免依赖项冲突。 |
1.5.0 | |
spark.jars.ivy |
指定 Ivy 用户目录的路径,用于来自 spark.jars.packages 的本地 Ivy 缓存和包文件。 这将覆盖 Ivy 属性 ivy.default.ivy.user.dir ,该属性默认为 ~/.ivy2。 |
1.3.0 | |
spark.jars.ivySettings |
Ivy 设置文件的路径,用于自定义使用 spark.jars.packages 指定的 JAR 的解析,而不是内置的默认值,例如 maven central。 命令行选项 --repositories 或 spark.jars.repositories 给出的其他存储库也将被包括在内。 可用于允许 Spark 从防火墙后面解析 artifacts,例如通过内部 artifact 服务器(如 Artifactory)。 有关设置文件格式的详细信息,请参见 设置文件。 仅支持具有 file:// 方案的路径。 没有方案的路径被假定为具有 file:// 方案。在 YARN 集群模式下运行时,此文件也将被本地化到远程驱动程序,以便在 |
2.2.0 | |
spark.jars.repositories |
以逗号分隔的其他远程存储库列表,用于搜索使用 --packages 或 spark.jars.packages 给出的 maven 坐标。 |
2.3.0 | |
spark.archives |
要提取到每个 executor 工作目录中的以逗号分隔的归档文件列表。 支持 .jar、.tar.gz、.tgz 和 .zip。 您可以通过在要解压缩的文件名后添加 # 来指定要解压缩的目录名,例如 file.zip#directory 。 此配置是实验性的。 |
3.1.0 | |
spark.pyspark.driver.python |
用于驱动程序中 PySpark 的 Python 二进制可执行文件。(默认为 spark.pyspark.python ) |
2.1.0 | |
spark.pyspark.python |
用于驱动程序和 executor 中 PySpark 的 Python 二进制可执行文件。 | 2.1.0 |
Shuffle 行为
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.reducer.maxSizeInFlight |
48m | 要从每个 reduce 任务同时获取的 map 输出的最大大小,以 MiB 为单位,除非另有说明。 由于每个输出都需要我们创建一个缓冲区来接收它,因此这表示每个 reduce 任务的固定内存开销,因此除非您有大量内存,否则请保持较小的值。 | 1.4.0 |
spark.reducer.maxReqsInFlight |
Int.MaxValue | 此配置限制了在任何给定时间点获取块的远程请求数量。 当集群中的主机数量增加时,可能会导致大量入站连接到一个或多个节点,从而导致 worker 在负载下失败。 通过允许限制 fetch 请求的数量,可以缓解这种情况。 | 2.0.0 |
spark.reducer.maxBlocksInFlightPerAddress |
Int.MaxValue | 此配置限制了每个 reduce 任务从给定主机端口获取的远程块的数量。 当从单个 fetch 或同时从给定地址请求大量块时,可能会导致服务 executor 或 Node Manager 崩溃。 启用外部 shuffle 时,这对于减少 Node Manager 上的负载尤其有用。 您可以通过将其设置为较低的值来缓解此问题。 | 2.2.1 |
spark.shuffle.compress |
true | 是否压缩 map 输出文件。 通常是一个好主意。 压缩将使用 spark.io.compression.codec 。 |
0.6.0 |
spark.shuffle.file.buffer |
32k | 每个 shuffle 文件输出流的内存缓冲区大小,单位为 KiB,除非另有说明。 这些缓冲区减少了创建中间 shuffle 文件时发生的磁盘寻道和系统调用次数。 | 1.4.0 |
spark.shuffle.unsafe.file.output.buffer |
32k | 在不安全的 shuffle writer 中写入每个分区后,此缓冲区的文件系统大小。 单位为 KiB,除非另有说明。 | 2.3.0 |
spark.shuffle.spill.diskWriteBufferSize |
1024 * 1024 | 将排序后的记录写入磁盘文件时使用的缓冲区大小,单位为字节。 | 2.3.0 |
spark.shuffle.io.maxRetries |
3 | (仅 Netty) 如果由于 IO 相关异常导致获取失败,并且此值设置为非零值,则会自动重试。 此重试逻辑有助于稳定大型 shuffle,以应对长时间的 GC 暂停或瞬时网络连接问题。 | 1.2.0 |
spark.shuffle.io.numConnectionsPerPeer |
1 | (仅 Netty) 主机之间的连接会被重用,以减少大型集群的连接建立。 对于拥有大量硬盘驱动器和少量主机的集群,这可能会导致并发不足以饱和所有磁盘,因此用户可以考虑增加此值。 | 1.2.1 |
spark.shuffle.io.preferDirectBufs |
true | (仅 Netty) 使用堆外缓冲区来减少 shuffle 和缓存块传输期间的垃圾回收。 对于堆外内存受到严格限制的环境,用户可能希望关闭此选项,以强制 Netty 的所有分配都在堆上进行。 | 1.2.0 |
spark.shuffle.io.retryWait |
5s | (仅 Netty) 每次获取重试之间的等待时间。 默认情况下,重试导致的最大延迟为 15 秒,计算方式为 maxRetries * retryWait 。 |
1.2.1 |
spark.shuffle.io.backLog |
-1 | Shuffle 服务的接受队列的长度。 对于大型应用程序,可能需要增加此值,以便在服务无法跟上短时间内到达的大量连接时,不会丢弃传入的连接。 这需要在运行 shuffle 服务本身的任何地方进行配置,这可能在应用程序之外(请参阅下面的 spark.shuffle.service.enabled 选项)。 如果设置低于 1,将回退到 Netty 的 io.netty.util.NetUtil#SOMAXCONN 定义的操作系统默认值。 |
1.1.1 |
spark.shuffle.io.connectionTimeout |
spark.network.timeout 的值 |
如果 shuffle 服务器和客户端之间已建立的连接在至少 `connectionTimeout` 的时间内没有流量,但仍然存在未完成的获取请求,则该连接将被标记为空闲并关闭的超时时间。 | 1.2.0 |
spark.shuffle.io.connectionCreationTimeout |
spark.shuffle.io.connectionTimeout 的值 |
在 shuffle 服务器和客户端之间建立连接的超时时间。 | 3.2.0 |
spark.shuffle.service.enabled |
false | 启用外部 shuffle 服务。 此服务保留 executors 写入的 shuffle 文件,例如,以便可以安全地删除 executors,或者在 executor 失败的情况下可以继续进行 shuffle 获取。 必须设置外部 shuffle 服务才能启用它。 有关更多信息,请参见 动态分配配置和设置文档。 | 1.2.0 |
spark.shuffle.service.port |
7337 | 外部 shuffle 服务将运行的端口。 | 1.2.0 |
spark.shuffle.service.name |
spark_shuffle | 客户端应与之通信的 Spark shuffle 服务的配置名称。 这必须与用于在 YARN NodeManager 配置 (yarn.nodemanager.aux-services ) 中配置 Shuffle 的名称匹配。 只有当 spark.shuffle.service.enabled 设置为 true 时才生效。 |
3.2.0 |
spark.shuffle.service.index.cache.size |
100m | 缓存条目限制为指定的内存占用量,单位为字节,除非另有说明。 | 2.3.0 |
spark.shuffle.service.removeShuffle |
false | 当 shuffle 不再需要时,是否使用 ExternalShuffleService 删除已解除分配的 executors 的 shuffle 块。 如果未启用此功能,则已解除分配的 executors 上的 shuffle 数据将保留在磁盘上,直到应用程序结束。 | 3.3.0 |
spark.shuffle.maxChunksBeingTransferred |
Long.MAX_VALUE | Shuffle 服务上允许同时传输的最大块数。 请注意,当达到最大数量时,新的传入连接将被关闭。 客户端将根据 shuffle 重试配置进行重试(请参阅 spark.shuffle.io.maxRetries 和 spark.shuffle.io.retryWait ),如果达到这些限制,则任务将因获取失败而失败。 |
2.3.0 |
spark.shuffle.sort.bypassMergeThreshold |
200 | (高级) 在基于排序的 shuffle manager 中,如果不存在 map 端聚合,并且最多存在此数量的 reduce 分区,则避免合并排序数据。 | 1.1.1 |
spark.shuffle.sort.io.plugin.class |
org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO | 用于 shuffle IO 的类的名称。 | 3.0.0 |
spark.shuffle.spill.compress |
true | 是否压缩 shuffle 期间溢出的数据。 压缩将使用 spark.io.compression.codec 。 |
0.9.0 |
spark.shuffle.accurateBlockThreshold |
100 * 1024 * 1024 | 高于此阈值的 shuffle 块大小(以字节为单位)将在 HighlyCompressedMapStatus 中准确记录。 这有助于避免 OOM,方法是避免在获取 shuffle 块时低估 shuffle 块大小。 | 2.2.1 |
spark.shuffle.registration.timeout |
5000 | 注册到外部 shuffle 服务的超时时间(以毫秒为单位)。 | 2.3.0 |
spark.shuffle.registration.maxAttempts |
3 | 当我们未能注册到外部 shuffle 服务时,我们将重试 maxAttempts 次。 | 2.3.0 |
spark.shuffle.reduceLocality.enabled |
true | 是否计算 reduce 任务的本地性首选项。 | 1.5.0 |
spark.shuffle.mapOutput.minSizeForBroadcast |
512k | 我们使用 Broadcast 将 map 输出状态发送到 executors 的大小。 | 2.0.0 |
spark.shuffle.detectCorrupt |
true | 是否检测获取的块中的任何损坏。 | 2.2.0 |
spark.shuffle.detectCorrupt.useExtraMemory |
false | 如果启用,则压缩/加密流的一部分将通过使用额外的内存来解压缩/解密,以检测早期损坏。 任何抛出的 IOException 将导致任务重试一次,如果再次因相同异常而失败,则将抛出 FetchFailedException 以重试先前的阶段。 | 3.0.0 |
spark.shuffle.useOldFetchProtocol |
false | 在执行 shuffle 块获取时是否使用旧协议。 只有当我们需要新的 Spark 版本作业从旧版本外部 shuffle 服务中获取 shuffle 块时,才启用它。 | 3.0.0 |
spark.shuffle.readHostLocalDisk |
true | 如果启用 (并且 spark.shuffle.useOldFetchProtocol 已禁用),则从运行在同一主机上的那些块管理器请求的 shuffle 块将直接从磁盘读取,而不是作为远程块通过网络获取。 |
3.0.0 |
spark.files.io.connectionTimeout |
spark.network.timeout 的值 |
Spark RPC 环境中用于获取文件的已建立连接的超时时间,如果至少在 `connectionTimeout` 的时间内没有流量,但仍然存在正在下载的未完成文件,则该连接将被标记为空闲并关闭。 | 1.6.0 |
spark.files.io.connectionCreationTimeout |
spark.files.io.connectionTimeout 的值 |
在 Spark RPC 环境中建立连接以获取文件的超时时间。 | 3.2.0 |
spark.shuffle.checksum.enabled |
true | 是否计算 shuffle 数据的校验和。 如果启用,Spark 将计算 map 输出文件中每个分区数据的校验和值,并将这些值存储在磁盘上的校验和文件中。 当检测到 shuffle 数据损坏时,Spark 将尝试通过使用校验和文件来诊断损坏的原因(例如,网络问题、磁盘问题等)。 | 3.2.0 |
spark.shuffle.checksum.algorithm |
ADLER32 | 用于计算 shuffle 校验和的算法。 目前,它仅支持 JDK 的内置算法,例如 ADLER32、CRC32。 | 3.2.0 |
spark.shuffle.service.fetch.rdd.enabled |
false | 是否使用 ExternalShuffleService 获取磁盘持久化的 RDD 块。 在动态分配的情况下,如果启用此功能,则在 spark.dynamicAllocation.executorIdleTimeout 后,仅具有磁盘持久化块的 executors 将被视为空闲,并会相应地释放。 |
3.0.0 |
spark.shuffle.service.db.enabled |
true | 是否在 ExternalShuffleService 中使用 db。 请注意,这仅影响独立模式。 | 3.0.0 |
spark.shuffle.service.db.backend |
LEVELDB | 指定 shuffle 服务本地数据库中使用的基于磁盘的存储。 设置为 LEVELDB 或 ROCKSDB。 | 3.4.0 |
Spark UI
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.eventLog.logBlockUpdates.enabled |
false | 如果 spark.eventLog.enabled 为 true,是否为每个块更新记录事件。 *警告*:这将大大增加事件日志的大小。 |
2.3.0 |
spark.eventLog.longForm.enabled |
false | 如果为 true,则在事件日志中使用调用站点的长格式。 否则,使用短格式。 | 2.4.0 |
spark.eventLog.compress |
false | 如果 spark.eventLog.enabled 为 true,是否压缩记录的事件。 |
1.0.0 |
spark.eventLog.compression.codec |
zstd | 用于压缩记录的事件的编解码器。 默认情况下,Spark 提供四个编解码器:lz4 、lzf 、snappy 和 zstd 。 您还可以使用完全限定的类名来指定编解码器,例如 org.apache.spark.io.LZ4CompressionCodec 、org.apache.spark.io.LZFCompressionCodec 、org.apache.spark.io.SnappyCompressionCodec 和 org.apache.spark.io.ZStdCompressionCodec 。 |
3.0.0 |
spark.eventLog.erasureCoding.enabled |
false | 是否允许事件日志使用纠删码,或者关闭纠删码,无论文件系统默认值如何。 在 HDFS 上,纠删码文件不会像常规复制文件那样快速更新,因此应用程序更新需要更长的时间才能出现在 History Server 中。 请注意,即使这是 true,Spark 仍然不会强制文件使用纠删码,它只会使用文件系统默认值。 | 3.0.0 |
spark.eventLog.dir |
file:///tmp/spark-events | 如果 spark.eventLog.enabled 为 true,则 Spark 事件的记录的基本目录。 在此基本目录中,Spark 为每个应用程序创建一个子目录,并将特定于该应用程序的事件记录在此目录中。 用户可能希望将其设置为统一位置,例如 HDFS 目录,以便历史服务器可以读取历史文件。 |
1.0.0 |
spark.eventLog.enabled |
false | 是否记录 Spark 事件,这对于在应用程序完成后重建 Web UI 非常有用。 | 1.0.0 |
spark.eventLog.overwrite |
false | 是否覆盖任何现有文件。 | 1.0.0 |
spark.eventLog.buffer.kb |
100k | 写入输出流时使用的缓冲区大小,单位为 KiB,除非另有说明。 | 1.0.0 |
spark.eventLog.rolling.enabled |
false | 是否启用事件日志文件滚动。 如果设置为 true,则会将每个事件日志文件截断到配置的大小。 | 3.0.0 |
spark.eventLog.rolling.maxFileSize |
128m | 当 spark.eventLog.rolling.enabled=true 时,指定事件日志文件在滚动之前允许的最大大小。 |
3.0.0 |
spark.ui.dagGraph.retainedRootRDDs |
Int.MaxValue | 在垃圾回收之前,Spark UI 和状态 API 记住的 DAG 图节点数。 | 2.1.0 |
spark.ui.enabled |
true | 是否为 Spark 应用程序运行 Web UI。 | 1.1.1 |
spark.ui.store.path |
无 | 用于缓存 live UI 的应用程序信息的本地目录。 默认情况下,未设置此选项,这意味着所有应用程序信息将保留在内存中。 | 3.4.0 |
spark.ui.killEnabled |
true | 允许从 Web UI 停止作业和阶段。 | 1.0.0 |
spark.ui.liveUpdate.period |
100ms | 更新实时实体的频率。-1 表示在重放应用程序时“从不更新”,这意味着只会发生最后一次写入。对于实时应用程序,这避免了一些在我们快速处理传入的任务事件时可以省略的操作。 | 2.3.0 |
spark.ui.liveUpdate.minFlushPeriod |
1s | 刷新陈旧 UI 数据之前经过的最短时间。这可以避免在传入的任务事件不频繁触发时 UI 出现陈旧。 | 2.4.2 |
spark.ui.port |
4040 | 应用程序仪表板的端口,显示内存和工作负载数据。 | 0.7.0 |
spark.ui.retainedJobs |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的作业数量。这是一个目标最大值,在某些情况下可能会保留更少的元素。 | 1.2.0 |
spark.ui.retainedStages |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的阶段数量。这是一个目标最大值,在某些情况下可能会保留更少的元素。 | 0.9.0 |
spark.ui.retainedTasks |
100000 | Spark UI 和状态 API 在垃圾回收之前记住的单个阶段中的任务数量。这是一个目标最大值,在某些情况下可能会保留更少的元素。 | 2.0.1 |
spark.ui.reverseProxy |
false | 启用运行 Spark Master 作为 worker 和 application UI 的反向代理。在此模式下,Spark master 将反向代理 worker 和 application UI,以便在不需要直接访问其主机的情况下进行访问。请谨慎使用,因为 worker 和 application UI 将无法直接访问,您只能通过 spark master/proxy 公共 URL 访问它们。此设置会影响集群中运行的所有 worker 和 application UI,必须在所有 worker、driver 和 master 上设置。 | 2.1.0 |
spark.ui.reverseProxyUrl |
如果 Spark UI 应该通过另一个前端反向代理提供服务,则这是通过该反向代理访问 Spark master UI 的 URL。 这在为身份验证运行代理(例如 OAuth 代理)时非常有用。 该 URL 可以包含路径前缀,例如http://mydomain.com/path/to/spark/ ,允许您通过同一虚拟主机和端口为多个 Spark 集群和其他 Web 应用程序提供 UI 服务。 通常,这应该是一个包含方案(http/https)、主机和端口的绝对 URL。 可以在此处指定以“/”开头的相对 URL。 在这种情况下,Spark UI 和 Spark REST API 生成的所有 URL 都将是服务器相关的链接——这仍然有效,因为整个 Spark UI 都是通过同一主机和端口提供的服务。该设置会影响 Spark UI 中的链接生成,但前端反向代理负责
spark.ui.reverseProxy 设置为启用时才有效。 当 Spark master Web UI 可以直接访问时,不需要此设置。请注意,设置的值不能在被 "/" 分割后包含关键字 `proxy` 或 `history`。 Spark UI 依赖于这两个关键字从 URI 获取 REST API 端点。 |
2.1.0 | |
spark.ui.proxyRedirectUri |
当 Spark 在代理后面运行时,重定向的地址。 这将使 Spark 修改重定向响应,使其指向代理服务器,而不是 Spark UI 自己的地址。 这应该只是服务器的地址,没有任何应用程序的前缀路径; 前缀应该由代理服务器本身设置(通过添加 X-Forwarded-Context 请求标头),或者通过在 Spark 应用程序的配置中设置代理基本地址来设置。 |
3.0.0 | |
spark.ui.showConsoleProgress |
false | 在控制台中显示进度条。 进度条显示运行时间超过 500 毫秒的阶段的进度。 如果多个阶段同时运行,则将在同一行上显示多个进度条。 注意: 在 shell 环境中, spark.ui.showConsoleProgress 的默认值为 true。 |
1.2.1 |
spark.ui.custom.executor.log.url |
(无) | 指定自定义的 Spark 执行器日志 URL,以支持外部日志服务,而不是在 Spark UI 中使用集群管理器的应用程序日志 URL。 Spark 将通过模式支持一些路径变量,这些模式可能因集群管理器而异。 请查看您的集群管理器的文档,以了解支持哪些模式(如果有)。 请注意,此配置还会替换事件日志中的原始日志 URL,这在访问历史服务器上的应用程序时也有效。 新的日志 URL 必须是永久的,否则您可能会有执行器日志 URL 的死链接。 目前,只有 YARN 和 K8s 集群管理器支持此配置 |
3.0.0 |
spark.worker.ui.retainedExecutors |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的已完成执行器的数量。 | 1.5.0 |
spark.worker.ui.retainedDrivers |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的已完成 driver 的数量。 | 1.5.0 |
spark.sql.ui.retainedExecutions |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的已完成执行的数量。 | 1.5.0 |
spark.streaming.ui.retainedBatches |
1000 | Spark UI 和状态 API 在垃圾回收之前记住的已完成批处理的数量。 | 1.0.0 |
spark.ui.retainedDeadExecutors |
100 | Spark UI 和状态 API 在垃圾回收之前记住的已死亡执行器的数量。 | 2.0.0 |
spark.ui.filters |
无 | 要应用于 Spark Web UI 的过滤器类名称的逗号分隔列表。 该过滤器应该是标准的 javax servlet Filter。 过滤器参数也可以在配置中指定,通过设置 spark.<过滤器类名>.param.<参数名>=<值> 形式的配置条目例如 spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
|
1.0.0 |
spark.ui.requestHeaderSize |
8k | HTTP 请求标头的最大允许大小,以字节为单位,除非另有说明。 此设置也适用于 Spark History Server。 | 2.2.3 |
spark.ui.timelineEnabled |
true | 是否在 UI 页面上显示事件时间线数据。 | 3.4.0 |
spark.ui.timeline.executors.maximum |
250 | 事件时间线中显示的最大执行器数量。 | 3.2.0 |
spark.ui.timeline.jobs.maximum |
500 | 事件时间线中显示的最大作业数量。 | 3.2.0 |
spark.ui.timeline.stages.maximum |
500 | 事件时间线中显示的最大阶段数量。 | 3.2.0 |
spark.ui.timeline.tasks.maximum |
1000 | 事件时间线中显示的最大任务数量。 | 1.4.0 |
spark.appStatusStore.diskStoreDir |
无 | 本地目录,用于存储 SQL 执行的诊断信息。 此配置仅适用于实时 UI。 | 3.4.0 |
压缩和序列化
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.broadcast.compress |
true | 是否在发送广播变量之前对其进行压缩。 通常,这是一个好主意。 压缩将使用spark.io.compression.codec 。 |
0.6.0 |
spark.checkpoint.compress |
false | 是否压缩 RDD 检查点。 通常,这是一个好主意。 压缩将使用spark.io.compression.codec 。 |
2.2.0 |
spark.io.compression.codec |
lz4 | 用于压缩内部数据(例如 RDD 分区、事件日志、广播变量和 shuffle 输出)的编解码器。 默认情况下,Spark 提供四种编解码器:lz4 、lzf 、snappy 和 zstd 。 您还可以使用完全限定的类名来指定编解码器,例如org.apache.spark.io.LZ4CompressionCodec 、org.apache.spark.io.LZFCompressionCodec 、org.apache.spark.io.SnappyCompressionCodec 和 org.apache.spark.io.ZStdCompressionCodec 。 |
0.8.0 |
spark.io.compression.lz4.blockSize |
32k | LZ4 压缩中使用的块大小,在使用 LZ4 压缩编解码器的情况下。 降低此块大小也会降低使用 LZ4 时的 shuffle 内存使用量。 默认单位是字节,除非另有说明。 此配置仅适用于 `spark.io.compression.codec`。 | 1.4.0 |
spark.io.compression.snappy.blockSize |
32k | Snappy 压缩中的块大小,在使用 Snappy 压缩编解码器的情况下。 降低此块大小也会降低使用 Snappy 时的 shuffle 内存使用量。 默认单位是字节,除非另有说明。 此配置仅适用于 `spark.io.compression.codec`。 | 1.4.0 |
spark.io.compression.zstd.level |
1 | Zstd 压缩编解码器的压缩级别。 提高压缩级别将以更多的 CPU 和内存为代价获得更好的压缩效果。 此配置仅适用于 `spark.io.compression.codec`。 | 2.3.0 |
spark.io.compression.zstd.bufferSize |
32k | Zstd 压缩中使用的缓冲区大小(以字节为单位),在使用 Zstd 压缩编解码器的情况下。 降低此大小将降低使用 Zstd 时的 shuffle 内存使用量,但由于过多的 JNI 调用开销,可能会增加压缩成本。 此配置仅适用于 `spark.io.compression.codec`。 | 2.3.0 |
spark.io.compression.zstd.bufferPool.enabled |
true | 如果为 true,则启用 ZSTD JNI 库的缓冲区池。 | 3.2.0 |
spark.kryo.classesToRegister |
(无) | 如果使用 Kryo 序列化,请提供要使用 Kryo 注册的自定义类名称的逗号分隔列表。 有关更多详细信息,请参阅优化指南。 | 1.2.0 |
spark.kryo.referenceTracking |
true | 在使用 Kryo 序列化数据时是否跟踪对同一对象的引用,如果您的对象图具有循环,则这是必要的,如果它们包含同一对象的多个副本,则对于效率而言是有用的。 如果您知道情况并非如此,则可以禁用此功能以提高性能。 | 0.8.0 |
spark.kryo.registrationRequired |
false | 是否需要使用 Kryo 注册。 如果设置为“true”,则 Kryo 会在序列化未注册的类时抛出异常。 如果设置为 false(默认值),Kryo 将随每个对象一起写入未注册的类名。 写入类名可能会导致严重的性能开销,因此启用此选项可以严格强制用户没有省略注册的类。 | 1.1.0 |
spark.kryo.registrator |
(无) | 如果使用 Kryo 序列化,请提供一个用逗号分隔的类列表,这些类使用 Kryo 注册您的自定义类。 如果您需要以自定义方式注册类,例如,指定自定义字段序列化程序,则此属性很有用。 否则,spark.kryo.classesToRegister 更简单。 它应该设置为扩展 KryoRegistrator 的类。 有关更多详细信息,请参阅优化指南。 |
0.5.0 |
spark.kryo.unsafe |
true | 是否使用基于 unsafe 的 Kryo 序列化程序。 通过使用基于 Unsafe 的 IO,可以显着加快速度。 | 2.1.0 |
spark.kryoserializer.buffer.max |
64m | Kryo 序列化缓冲区的最大允许大小,以 MiB 为单位,除非另有说明。 这必须大于您尝试序列化的任何对象,并且必须小于 2048m。 如果在 Kryo 中遇到“缓冲区限制超出”异常,请增加此值。 | 1.4.0 |
spark.kryoserializer.buffer |
64k | Kryo 序列化缓冲区的初始大小,以 KiB 为单位,除非另有说明。 请注意,每个 worker 上的每个核心都会有一个缓冲区。 如果需要,此缓冲区将增长到 spark.kryoserializer.buffer.max 。 |
1.4.0 |
spark.rdd.compress |
false | 是否压缩序列化的RDD分区(例如,Java 和 Scala 中的 StorageLevel.MEMORY_ONLY_SER 或 Python 中的 StorageLevel.MEMORY_ONLY )。 可以在节省大量空间的同时,付出一些额外的CPU时间。 压缩将使用 spark.io.compression.codec 。 |
0.6.0 |
spark.serializer |
org.apache.spark.serializer. JavaSerializer |
用于序列化将通过网络发送或需要以序列化形式缓存的对象的类。 默认的 Java 序列化适用于任何 Serializable Java 对象,但速度很慢,因此我们建议在需要速度时使用 org.apache.spark.serializer.KryoSerializer 并配置 Kryo 序列化。 可以是 org.apache.spark.Serializer 的任何子类。 |
0.5.0 |
spark.serializer.objectStreamReset |
100 | 当使用 org.apache.spark.serializer.JavaSerializer 进行序列化时,序列化器会缓存对象以防止写入冗余数据,但是这会阻止这些对象的垃圾回收。 通过调用 'reset',您可以从序列化器中刷新该信息,并允许收集旧对象。 要关闭此定期重置,请将其设置为 -1。 默认情况下,它将每 100 个对象重置一次序列化器。 | 1.0.0 |
内存管理
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.memory.fraction |
0.6 | 用于执行和存储的(堆空间 - 300MB)的比例。 此值越低,发生溢出和缓存数据驱逐的频率就越高。 此配置的目的是为内部元数据、用户数据结构以及稀疏的、异常大的记录的情况下不精确的大小估计预留内存。 建议将此值保留为默认值。 有关更多详细信息,包括在增加此值时正确调整 JVM 垃圾回收的重要信息,请参见此描述。 | 1.6.0 |
spark.memory.storageFraction |
0.5 | 免于驱逐的存储内存量,表示为 spark.memory.fraction 设置的区域大小的比例。 此值越高,可用于执行的工作内存可能越少,并且任务可能更频繁地溢出到磁盘。 建议将此值保留为默认值。 有关更多详细信息,请参见此描述。 |
1.6.0 |
spark.memory.offHeap.enabled |
false | 如果为 true,Spark 将尝试使用堆外内存进行某些操作。 如果启用了堆外内存使用,则 spark.memory.offHeap.size 必须为正数。 |
1.6.0 |
spark.memory.offHeap.size |
0 | 可用于堆外分配的绝对内存量,以字节为单位,除非另有说明。 此设置对堆内存使用没有影响,因此如果执行器的总内存消耗必须在某些硬限制内,请务必相应地缩小 JVM 堆大小。 当 spark.memory.offHeap.enabled=true 时,必须将其设置为正值。 |
1.6.0 |
spark.storage.unrollMemoryThreshold |
1024 * 1024 | 在展开任何块之前要请求的初始内存。 | 1.1.0 |
spark.storage.replication.proactive |
true | 为 RDD 块启用主动块复制。 如果存在任何可用的现有副本,则会补充由于执行器故障而丢失的缓存 RDD 块副本。 这试图将块的复制级别设置为初始数量。 | 2.2.0 |
spark.storage.localDiskByExecutors.cacheSize |
1000 | 存储本地目录的执行器的最大数量。 此大小既适用于驱动程序端,也适用于执行器端,以避免具有无限制的存储。 此缓存将用于避免网络,以防从同一主机提取磁盘持久化的RDD块或shuffle块(当设置了spark.shuffle.readHostLocalDisk 时)。 |
3.0.0 |
spark.cleaner.periodicGC.interval |
30分钟 | 控制触发垃圾回收的频率。 此上下文清理器仅在弱引用被垃圾回收时触发清理。 在具有大型驱动程序 JVM 的长时间运行的应用程序中,如果驱动程序上的内存压力很小,则这可能很少发生或根本不发生。 根本不清理可能会导致执行器在一段时间后耗尽磁盘空间。 |
1.6.0 |
spark.cleaner.referenceTracking |
true | 启用或禁用上下文清理。 | 1.0.0 |
spark.cleaner.referenceTracking.blocking |
true | 控制清理线程是否应阻止清理任务(除了由 spark.cleaner.referenceTracking.blocking.shuffle Spark 属性控制的 shuffle)。 |
1.0.0 |
spark.cleaner.referenceTracking.blocking.shuffle |
false | 控制清理线程是否应阻止 shuffle 清理任务。 | 1.1.1 |
spark.cleaner.referenceTracking.cleanCheckpoints |
false | 控制是否在引用超出范围时清理检查点文件。 | 1.4.0 |
执行行为
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.broadcast.blockSize |
4m | TorrentBroadcastFactory 的每个块的大小,以 KiB 为单位,除非另有说明。 值太大将降低广播期间的并行性(使其变慢); 但是,如果它太小,BlockManager 可能会受到性能影响。 |
0.5.0 |
spark.broadcast.checksum |
true | 是否启用广播的校验和。 如果启用,广播将包含校验和,这可以帮助检测损坏的块,但代价是计算和发送更多的数据。 如果网络具有其他机制来保证数据在广播期间不会损坏,则可以禁用它。 | 2.1.1 |
spark.broadcast.UDFCompressionThreshold |
1 * 1024 * 1024 | 用户定义的函数(UDF)和 Python RDD 命令通过广播进行压缩的阈值,以字节为单位,除非另有说明。 | 3.0.0 |
spark.executor.cores |
在 YARN 模式下为 1,在独立和 Mesos 粗粒度模式下为 worker 上的所有可用内核。 | 每个执行器要使用的内核数。 在独立和 Mesos 粗粒度模式下,有关更多详细信息,请参见此描述。 | 1.0.0 |
spark.default.parallelism |
对于像 reduceByKey 和 join 这样的分布式 shuffle 操作,父 RDD 中分区的最大数量。 对于像没有父 RDD 的 parallelize 这样的操作,它取决于集群管理器
|
当用户未设置时,由像 join ,reduceByKey 和 parallelize 这样的转换返回的 RDD 中的默认分区数。 |
0.5.0 |
spark.executor.heartbeatInterval |
10秒 | 每个执行器向驱动程序发送心跳的间隔。 心跳让驱动程序知道执行器仍然存活,并使用正在进行的任务的指标来更新它。 spark.executor.heartbeatInterval 应该明显小于 spark.network.timeout | 1.1.0 |
spark.files.fetchTimeout |
60秒 | 从驱动程序获取通过 SparkContext.addFile() 添加的文件时使用的通信超时。 | 1.0.0 |
spark.files.useFetchCache |
true | 如果设置为 true(默认值),则文件获取将使用一个本地缓存,该缓存由属于同一应用程序的执行器共享,这可以提高在同一主机上运行许多执行器时的任务启动性能。 如果设置为 false,这些缓存优化将被禁用,并且所有执行器都将获取它们自己的文件副本。 可以禁用此优化,以便使用位于 NFS 文件系统上的 Spark 本地目录(有关更多详细信息,请参见SPARK-6313)。 | 1.2.2 |
spark.files.overwrite |
false | 是否覆盖启动时存在的任何文件。 用户甚至在将此选项设置为 true 之前,都无法覆盖由 SparkContext.addFile 或 SparkContext.addJar 添加的文件。 |
1.0.0 |
spark.files.ignoreCorruptFiles |
false | 是否忽略损坏的文件。 如果为 true,则在遇到损坏或不存在的文件时,Spark 作业将继续运行,并且仍然会返回已读取的内容。 | 2.1.0 |
spark.files.ignoreMissingFiles |
false | 是否忽略缺少的文件。 如果为 true,则在遇到缺少的文件时,Spark 作业将继续运行,并且仍然会返回已读取的内容。 | 2.4.0 |
spark.files.maxPartitionBytes |
134217728 (128 MiB) | 读取文件时要打包到单个分区中的最大字节数。 | 2.1.0 |
spark.files.openCostInBytes |
4194304 (4 MiB) | 打开文件的估计成本,以可以同时扫描的字节数来衡量。 这在将多个文件放入分区时使用。 最好高估,那么具有小文件的分区将比具有较大文件的分区更快。 | 2.1.0 |
spark.hadoop.cloneConf |
false | 如果设置为 true,则为每个任务克隆一个新的 Hadoop Configuration 对象。 应该启用此选项以解决 Configuration 线程安全问题(有关更多详细信息,请参见SPARK-2546)。 默认情况下禁用此选项,以避免不受这些问题影响的作业出现意外的性能下降。 |
1.0.3 |
spark.hadoop.validateOutputSpecs |
true | 如果设置为 true,则验证在 saveAsHadoopFile 和其他变体中使用的输出规范(例如,检查输出目录是否已存在)。 可以禁用此选项以消除由于预先存在的输出目录而导致的异常。 我们建议用户不要禁用此选项,除非尝试与以前版本的 Spark 兼容。 只需使用 Hadoop 的 FileSystem API 手动删除输出目录。 对于通过 Spark Streaming 的 StreamingContext 生成的作业,将忽略此设置,因为在检查点恢复期间可能需要将数据重写到预先存在的输出目录。 | 1.0.1 |
spark.storage.memoryMapThreshold |
2m | 从磁盘读取块时,Spark 内存映射之上的块大小。 默认单位是字节,除非另有说明。 这可以防止 Spark 内存映射非常小的块。 通常,对于接近或低于操作系统页面大小的块,内存映射具有很高的开销。 | 0.9.2 |
spark.storage.decommission.enabled |
false | 停用执行器时是否停用块管理器。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.enabled |
true | 在块管理器停用期间是否传输 shuffle 块。 需要可迁移的 shuffle 解析器(如基于排序的 shuffle)。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.maxThreads |
8 | 迁移 shuffle 文件时使用的最大线程数。 | 3.1.0 |
spark.storage.decommission.rddBlocks.enabled |
true | 在块管理器停用期间是否传输 RDD 块。 | 3.1.0 |
spark.storage.decommission.fallbackStorage.path |
(无) | 块管理器停用期间回退存储的位置。 例如,s3a://spark-storage/ 。 如果为空,则禁用回退存储。 该存储应由 TTL 管理,因为 Spark 不会清理它。 |
3.1.0 |
spark.storage.decommission.fallbackStorage.cleanUp |
false | 如果为 true,Spark 在关闭期间清理其回退存储数据。 | 3.2.0 |
spark.storage.decommission.shuffleBlocks.maxDiskSize |
(无) | 在拒绝远程 shuffle 块之前用于存储 shuffle 块的最大磁盘空间。 拒绝远程 shuffle 块意味着执行器不会收到任何 shuffle 迁移,并且如果没有其他执行器可用于迁移,则 shuffle 块将丢失,除非配置了 spark.storage.decommission.fallbackStorage.path 。 |
3.2.0 |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version |
1 | 文件输出提交器算法版本,有效的算法版本号:1 或 2。 请注意,2 可能会导致像 MAPREDUCE-7282 这样的正确性问题。 | 2.2.0 |
Executor 指标
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.eventLog.logStageExecutorMetrics |
false | 是否将每个 stage 的 executor 指标峰值(针对每个 executor)写入事件日志。 注意: 这些指标是在 executor 心跳中轮询(收集)和发送的,并且始终如此;此配置仅用于确定是否将聚合的指标峰值写入事件日志。 |
3.0.0 |
spark.executor.processTreeMetrics.enabled |
false | 是否在收集 executor 指标时收集进程树指标(来自 /proc 文件系统)。 注意: 只有存在 /proc 文件系统时才会收集进程树指标。 |
3.0.0 |
spark.executor.metrics.pollingInterval |
0 | 收集 executor 指标的频率(以毫秒为单位)。 如果为 0,则轮询在 executor 心跳时完成(因此以心跳间隔,由 spark.executor.heartbeatInterval 指定)。 如果为正数,则以该间隔完成轮询。 |
3.0.0 |
spark.eventLog.gcMetrics.youngGenerationGarbageCollectors |
Copy,PS Scavenge,ParNew,G1 Young Generation | 受支持的年轻代垃圾收集器的名称。 名称通常是 GarbageCollectorMXBean.getName 的返回值。 内置的年轻代垃圾收集器是 Copy,PS Scavenge,ParNew,G1 Young Generation。 | 3.0.0 |
spark.eventLog.gcMetrics.oldGenerationGarbageCollectors |
MarkSweepCompact,PS MarkSweep,ConcurrentMarkSweep,G1 Old Generation | 受支持的老年代垃圾收集器的名称。 名称通常是 GarbageCollectorMXBean.getName 的返回值。 内置的老年代垃圾收集器是 MarkSweepCompact,PS MarkSweep,ConcurrentMarkSweep,G1 Old Generation。 | 3.0.0 |
spark.executor.metrics.fileSystemSchemes |
file,hdfs | 要在 executor 指标中报告的文件系统方案。 | 3.1.0 |
网络
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.rpc.message.maxSize |
128 | “控制平面”通信中允许的最大消息大小(以 MiB 为单位);通常仅适用于 executor 和 driver 之间发送的 map 输出大小信息。 如果您运行的任务包含数千个 map 和 reduce 任务,并且看到有关 RPC 消息大小的消息,请增加此值。 | 2.0.0 |
spark.blockManager.port |
(随机) | 所有块管理器监听的端口。 它们存在于 driver 和 executor 上。 | 1.1.0 |
spark.driver.blockManager.port |
(spark.blockManager.port 的值) | 块管理器的 driver 特定端口,用于监听,以便在无法使用与 executor 相同的配置的情况下使用。 | 2.1.0 |
spark.driver.bindAddress |
(spark.driver.host 的值) | 绑定监听套接字的主机名或 IP 地址。 此配置将覆盖 SPARK_LOCAL_IP 环境变量(见下文)。 它还允许将与本地地址不同的地址公布给 executor 或外部系统。 例如,在运行具有桥接网络的容器时,这很有用。 为了使其正常工作,driver 使用的不同端口(RPC、块管理器和 UI)需要从容器的主机转发。 |
2.1.0 |
spark.driver.host |
(本地主机名) | driver 的主机名或 IP 地址。 用于与 executor 和 standalone Master 通信。 | 0.7.0 |
spark.driver.port |
(随机) | driver 监听的端口。 用于与 executor 和 standalone Master 通信。 | 0.7.0 |
spark.rpc.io.backLog |
64 | RPC 服务器的接受队列的长度。 对于大型应用程序,可能需要增加此值,以便在短时间内到达大量连接时不会丢弃传入的连接。 | 3.0.0 |
spark.network.timeout |
120s | 所有网络交互的默认超时。 如果未配置 spark.storage.blockManagerHeartbeatTimeoutMs 、spark.shuffle.io.connectionTimeout 、spark.rpc.askTimeout 或 spark.rpc.lookupTimeout ,则将使用此配置来代替它们。 |
1.3.0 |
spark.network.timeoutInterval |
60秒 | driver 检查和过期失效 executor 的间隔。 | 1.3.2 |
spark.network.io.preferDirectBufs |
true | 如果启用,则共享分配器首选堆外缓冲区分配。 堆外缓冲区用于减少 shuffle 和缓存块传输期间的垃圾回收。 对于堆外内存受到严格限制的环境,用户可能希望关闭此选项,以强制所有分配都在堆上。 | 3.0.0 |
spark.port.maxRetries |
16 | 放弃之前绑定到端口的最大重试次数。 当端口被赋予特定值(非 0)时,每次后续重试都将在重试之前将先前尝试中使用的端口递增 1。 这实际上允许它尝试一系列端口,从指定的起始端口到端口 + maxRetries。 | 1.1.1 |
spark.rpc.askTimeout |
spark.network.timeout |
RPC ask 操作等待超时的时间。 | 1.4.0 |
spark.rpc.lookupTimeout |
120s | RPC 远程端点查找操作等待超时的时间。 | 1.4.0 |
spark.network.maxRemoteBlockSizeFetchToMem |
200m | 当块的大小超过此阈值(以字节为单位)时,将远程块提取到磁盘。 这是为了避免巨大的请求占用太多内存。 请注意,此配置会影响 shuffle 提取和块管理器远程块提取。 对于启用外部 shuffle 服务的用户,此功能只有在外部 shuffle 服务至少为 2.3.0 时才能工作。 | 3.0.0 |
spark.rpc.io.connectionTimeout |
spark.network.timeout 的值 |
如果 RPC 对等方之间建立的连接在至少 `connectionTimeout` 的时间内没有流量,但存在未完成的 RPC 请求,则将这些连接标记为空闲并关闭的超时时间。 | 1.2.0 |
spark.rpc.io.connectionCreationTimeout |
spark.rpc.io.connectionTimeout 的值 |
建立 RPC 对等方之间连接的超时时间。 | 3.2.0 |
调度
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.cores.max |
(未设置) | 在 standalone 部署集群或 Mesos 集群的“粗粒度”共享模式上运行时,从整个集群(而不是从每台机器)为应用程序请求的最大 CPU 核心数。 如果未设置,则默认值为 Spark 的 standalone 集群管理器上的 spark.deploy.defaultCores ,或者 Mesos 上的无限(所有可用核心)。 |
0.6.0 |
spark.locality.wait |
3s | 放弃之前,等待启动数据本地任务的时间,然后在不太本地的节点上启动它。 相同的等待时间将用于逐步遍历多个位置级别(进程本地、节点本地、机架本地,然后是任何)。 也可以通过设置 spark.locality.wait.node 等来为每个级别自定义等待时间。 如果您的任务很长并且位置性很差,则应增加此设置,但默认设置通常效果良好。 |
0.5.0 |
spark.locality.wait.node |
spark.locality.wait | 自定义节点本地性的位置等待时间。 例如,您可以将其设置为 0 以跳过节点本地性,并立即搜索机架本地性(如果您的集群具有机架信息)。 | 0.8.0 |
spark.locality.wait.process |
spark.locality.wait | 自定义进程本地性的位置等待时间。 这会影响尝试访问特定 executor 进程中缓存数据的任务。 | 0.8.0 |
spark.locality.wait.rack |
spark.locality.wait | 自定义机架本地性的位置等待时间。 | 0.8.0 |
spark.scheduler.maxRegisteredResourcesWaitingTime |
30s | 开始调度之前等待资源注册的最长时间。 | 1.1.1 |
spark.scheduler.minRegisteredResourcesRatio |
KUBERNETES 模式下为 0.8;YARN 模式下为 0.8;standalone 模式和 Mesos 粗粒度模式下为 0.0 | 开始调度之前等待的注册资源(注册资源/预期总资源)的最小比率(资源是 yarn 模式和 Kubernetes 模式下的 executor,standalone 模式和 Mesos 粗粒度模式下的 CPU 核心数 ['spark.cores.max' 值是 Mesos 粗粒度模式的预期总资源])。 指定为 0.0 到 1.0 之间的双精度值。 无论是否达到最小资源比率,开始调度之前等待的最长时间都由配置 spark.scheduler.maxRegisteredResourcesWaitingTime 控制。 |
1.1.1 |
spark.scheduler.mode |
FIFO | 提交到同一 SparkContext 的作业之间的调度模式。 可以设置为 FAIR 以使用公平共享而不是一个接一个地排队作业。 适用于多用户服务。 |
0.8.0 |
spark.scheduler.revive.interval |
1s | 调度程序恢复 worker 资源以运行任务的间隔长度。 | 0.8.1 |
spark.scheduler.listenerbus.eventqueue.capacity |
10000 | 事件队列的默认容量。 Spark 将尝试首先使用 `spark.scheduler.listenerbus.eventqueue.queueName.capacity` 指定的容量初始化事件队列。 如果未配置,Spark 将使用此配置指定的默认容量。 请注意,容量必须大于 0。 如果丢弃了侦听器事件,请考虑增加该值(例如 20000)。 增加此值可能会导致 driver 使用更多内存。 | 2.3.0 |
spark.scheduler.listenerbus.eventqueue.shared.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 侦听器总线中共享事件队列的容量,该队列保存注册到侦听器总线的外部侦听器的事件。 如果丢弃了与共享队列对应的侦听器事件,请考虑增加该值。 增加此值可能会导致 driver 使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.appStatus.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
appStatus 事件队列的容量,该队列保存内部应用程序状态侦听器的事件。 如果丢弃了与 appStatus 队列对应的侦听器事件,请考虑增加该值。 增加此值可能会导致 driver 使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 侦听器总线中 executorManagement 事件队列的容量,该队列保存内部 executor 管理侦听器的事件。 如果丢弃了与 executorManagement 队列对应的侦听器事件,请考虑增加该值。 增加此值可能会导致 driver 使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.eventLog.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 侦听器总线中 eventLog 队列的容量,该队列保存将事件写入 eventLogs 的事件日志记录侦听器的事件。 如果丢弃了与 eventLog 队列对应的侦听器事件,请考虑增加该值。 增加此值可能会导致 driver 使用更多内存。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.streams.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark 侦听器总线中 streams 队列的容量,该队列保存内部流式传输侦听器的事件。 如果丢弃了与 streams 队列对应的侦听器事件,请考虑增加该值。 增加此值可能会导致 driver 使用更多内存。 | 3.0.0 |
spark.scheduler.resource.profileMergeConflicts |
false | 如果设置为“true”,则当在合并到单个 stage 中的 RDD 中指定了不同的 ResourceProfile 时,Spark 将合并 ResourceProfile。 当它们合并时,Spark 选择每个资源的最大值并创建一个新的 ResourceProfile。 默认值 false 会导致如果发现多个不同的 ResourceProfile 进入同一 stage,Spark 会抛出异常。 | 3.1.0 |
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout |
120s | 在获取新 executor 并调度任务之前等待的超时时间(以秒为单位),如果在所有 executor 都因任务失败而被排除的情况下,TaskSet 无法调度,则会中止 TaskSet。 | 2.4.1 |
spark.standalone.submit.waitAppCompletion |
false | 如果设置为 true,则当在合并到单个 stage 中的 RDD 中指定了不同的 ResourceProfile 时,Spark 将合并 ResourceProfile。 当它们合并时,Spark 选择每个资源的最大值并创建一个新的 ResourceProfile。 默认值 false 会导致如果发现多个不同的 ResourceProfile 进入同一 stage,Spark 会抛出异常。 | 3.1.0 |
spark.excludeOnFailure.enabled |
false | 如果设置为“true”,则阻止 Spark 在因任务失败次数过多而被排除的 executor 上调度任务。 用于排除 executor 和节点的算法可以通过其他“spark.excludeOnFailure”配置选项进一步控制。 | 2.1.0 |
spark.excludeOnFailure.timeout |
1h | (实验性) 节点或 executor 在整个应用程序中被排除的时间,然后无条件地从排除列表中删除,以尝试运行新任务。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor |
1 | (实验性) 对于给定的任务,它可以在一个 executor 上重试多少次,然后该 executor 将被排除在该任务之外。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerNode |
2 | (实验性) 对于给定的任务,它可以在一个节点上重试多少次,然后整个节点将从该任务中排除。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedTasksPerExecutor |
2 | (实验性) 在一个 stage 内,一个 executor 上有多少个不同的 task 失败后,该 executor 会被排除出该 stage。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode |
2 | (实验性) 在一个给定的 stage 中,有多少个不同的 executor 被标记为排除后,整个节点会被标记为该 stage 失败。 | 2.1.0 |
spark.excludeOnFailure.application.maxFailedTasksPerExecutor |
2 | (实验性) 在成功的 task sets 中,一个 executor 上有多少个不同的 task 失败后,该 executor 会被排除出整个应用程序。被排除的 executor 将在 spark.excludeOnFailure.timeout 指定的超时后自动添加回可用资源池。请注意,使用动态分配时,executor 可能会被标记为空闲并被集群管理器回收。 |
2.2.0 |
spark.excludeOnFailure.application.maxFailedExecutorsPerNode |
2 | (实验性) 有多少个不同的 executor 必须被排除出整个应用程序,节点才会被排除出整个应用程序。被排除的节点将在 spark.excludeOnFailure.timeout 指定的超时后自动添加回可用资源池。请注意,使用动态分配时,节点上的 executor 可能会被标记为空闲并被集群管理器回收。 |
2.2.0 |
spark.excludeOnFailure.killExcludedExecutors |
false | (实验性) 如果设置为 "true",则允许 Spark 在 executor 因 fetch 失败而被排除或因整个应用程序而被排除时自动杀死 executor,如 spark.killExcludedExecutors.application.* 所控制。请注意,当整个节点被排除时,该节点上的所有 executor 都会被杀死。 | 2.2.0 |
spark.excludeOnFailure.application.fetchFailure.enabled |
false | (实验性) 如果设置为 "true",Spark 将在发生 fetch 失败时立即排除该 executor。如果启用了外部 shuffle 服务,则整个节点将被排除。 | 2.3.0 |
spark.speculation |
false | 如果设置为 "true",则执行任务的推测执行。这意味着如果一个 stage 中一个或多个任务运行缓慢,它们将被重新启动。 | 0.6.0 |
spark.speculation.interval |
100ms | Spark 检查推测任务的频率。 | 0.6.0 |
spark.speculation.multiplier |
1.5 | 任务比中位数慢多少倍才会被考虑用于推测。 | 0.6.0 |
spark.speculation.quantile |
0.75 | 在特定 stage 启用推测之前必须完成的任务的比例。 | 0.6.0 |
spark.speculation.minTaskRuntime |
100ms | 任务运行的最短时间,之后才会被考虑用于推测。这可以用来避免启动非常短的任务的推测副本。 | 3.2.0 |
spark.speculation.task.duration.threshold |
无 | 任务持续时间,超过该时间后,调度程序将尝试对任务进行推测运行。如果提供,则当当前 stage 包含的任务数量小于或等于单个 executor 上的 slot 数量,并且任务花费的时间长于阈值时,任务将被推测运行。此配置有助于推测任务很少的 stage。如果 executor slot 足够大,则也可能应用常规推测配置。例如,即使尚未达到阈值,如果存在足够的成功运行,任务也可能会被重新启动。 slot 的数量基于 spark.executor.cores 和 spark.task.cpus 的 conf 值计算,最小值为 1。默认单位为字节,除非另有说明。 | 3.0.0 |
spark.speculation.efficiency.processRateMultiplier |
0.75 | 一个乘数,用于评估低效任务。乘数越高,越多的任务可能被认为是低效的。 | 3.4.0 |
spark.speculation.efficiency.longRunTaskFactor |
2 | 无论任务的数据处理速率是否良好,只要任务的持续时间超过因子乘以时间阈值(spark.speculation.multiplier * successfulTaskDurations.median 或 spark.speculation.minTaskRuntime )的值,任务都将被推测。 这避免了在任务缓慢与数据处理速率无关时错过低效的任务。 |
3.4.0 |
spark.speculation.efficiency.enabled |
true | 当设置为 true 时,spark 将通过 stage 任务指标或其持续时间来评估任务处理的效率,并且只需要推测低效的任务。当 1) 它的数据处理速率小于 stage 中所有成功任务的平均数据处理速率乘以一个乘数 或 2) 它的持续时间超过了 spark.speculation.efficiency.longRunTaskFactor 乘以时间阈值(spark.speculation.multiplier * successfulTaskDurations.median 或 spark.speculation.minTaskRuntime )的值时,任务是低效的。 |
3.4.0 |
spark.task.cpus |
1 | 为每个任务分配的核心数。 | 0.5.0 |
spark.task.resource.{resourceName}.amount |
1 | 为每个任务分配的特定资源类型的数量,请注意,这可以是 double 类型。如果指定了此项,您还必须提供 executor 配置 spark.executor.resource.{resourceName}.amount 和任何相应的发现配置,以便使用该资源类型创建您的 executor。除了整数数量外,还可以指定小数数量(例如,0.25,表示 1/4 的资源)。小数数量必须小于或等于 0.5,换句话说,最小的资源共享量是每个资源 2 个任务。此外,小数数量会被向下取整,以便分配资源 slot(例如,0.2222 配置,或 1/0.2222 slot 将变为 4 个任务/资源,而不是 5 个)。 |
3.0.0 |
spark.task.maxFailures |
4 | 在放弃作业之前,任何特定任务的连续失败次数。不同任务之间分布的总失败次数不会导致作业失败;特定的任务必须连续失败这么多次尝试。如果任何尝试成功,任务的失败计数将被重置。应该大于或等于 1。允许的重试次数 = 此值 - 1。 | 0.8.0 |
spark.task.reaper.enabled |
false | 启用对被杀死/中断的任务的监控。当设置为 true 时,任何被杀死的任务都将被 executor 监控,直到该任务实际完成执行。有关如何控制此监控的确切行为的详细信息,请参阅其他 spark.task.reaper.* 配置。当设置为 false(默认值)时,任务杀死将使用缺少此类监控的旧代码路径。 |
2.0.3 |
spark.task.reaper.pollingInterval |
10秒 | 当 spark.task.reaper.enabled = true 时,此设置控制 executor 轮询被杀死任务状态的频率。如果在轮询时被杀死的任务仍在运行,则将记录警告,并且默认情况下,将记录任务的线程转储(可以通过 spark.task.reaper.threadDump 设置禁用此线程转储,该设置在下面有文档记录)。 |
2.0.3 |
spark.task.reaper.threadDump |
true | 当 spark.task.reaper.enabled = true 时,此设置控制在定期轮询被杀死任务期间是否记录任务线程转储。将其设置为 false 以禁用线程转储的收集。 |
2.0.3 |
spark.task.reaper.killTimeout |
-1 | 当 spark.task.reaper.enabled = true 时,此设置指定一个超时时间,在此超时时间之后,如果被杀死的任务尚未停止运行,则 executor JVM 将自行终止。默认值 -1 禁用此机制并阻止 executor 自毁。此设置的目的是充当安全网,以防止失控的不可取消的任务使 executor 无法使用。 |
2.0.3 |
spark.stage.maxConsecutiveAttempts |
4 | 在 stage 被中止之前允许的连续 stage 尝试次数。 | 2.2.0 |
spark.stage.ignoreDecommissionFetchFailure |
false | 在计算 spark.stage.maxConsecutiveAttempts 时,是否忽略由 executor 退役引起的 stage fetch 失败。 |
3.4.0 |
Barrier 执行模式
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.barrier.sync.timeout |
365d | barrier 任务每次调用 barrier() 的超时时间(秒)。如果协调器未在配置的时间内收到来自 barrier 任务的所有同步消息,则抛出 SparkException 以使所有任务失败。默认值设置为 31536000(3600 * 24 * 365),因此 barrier() 调用将等待一年。 |
2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.interval |
15s | 在最大并发任务检查失败到下一次检查之间等待的时间(秒)。最大并发任务检查确保集群可以启动比在提交的作业的 barrier stage 中所需的更多的并发任务。如果集群刚刚启动并且没有注册足够的 executor,则检查可能会失败,因此我们等待一段时间并尝试再次执行检查。如果检查失败次数超过作业配置的最大失败次数,则使当前作业提交失败。请注意,此配置仅适用于包含一个或多个 barrier stage 的作业,我们不会对非 barrier 作业执行检查。 | 2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures |
40 | 在作业提交失败之前允许的最大并发任务检查失败次数。最大并发任务检查确保集群可以启动比在提交的作业的 barrier stage 中所需的更多的并发任务。如果集群刚刚启动并且没有注册足够的 executor,则检查可能会失败,因此我们等待一段时间并尝试再次执行检查。如果检查失败次数超过作业配置的最大失败次数,则使当前作业提交失败。请注意,此配置仅适用于包含一个或多个 barrier stage 的作业,我们不会对非 barrier 作业执行检查。 | 2.4.0 |
动态分配
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.dynamicAllocation.enabled |
false | 是否使用动态资源分配,它根据工作负载来增加和减少注册到此应用程序的 executor 的数量。有关更多详细信息,请参阅此处的描述。 这需要满足以下条件之一:1) 通过 spark.shuffle.service.enabled 启用外部 shuffle 服务,或 2) 通过 spark.dynamicAllocation.shuffleTracking.enabled 启用 shuffle 跟踪,或 3) 通过 spark.decommission.enabled 和 spark.storage.decommission.shuffleBlocks.enabled 启用 shuffle 块退役,或 4) (实验性) 配置 spark.shuffle.sort.io.plugin.class 以使用自定义 ShuffleDataIO ,其 ShuffleDriverComponents 支持可靠的存储。以下配置也相关:spark.dynamicAllocation.minExecutors 、spark.dynamicAllocation.maxExecutors 和 spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio |
1.2.0 |
spark.dynamicAllocation.executorIdleTimeout |
60秒 | 如果启用了动态分配并且 executor 空闲时间超过此持续时间,则将删除该 executor。有关更多详细信息,请参阅此描述。 | 1.2.0 |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
infinity | 如果启用了动态分配并且具有缓存的数据块的 executor 空闲时间超过此持续时间,则将删除该 executor。有关更多详细信息,请参阅此描述。 | 1.4.0 |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
如果启用了动态分配,则要运行的 executor 的初始数量。 如果设置了 `--num-executors` (或 `spark.executor.instances`) 并且大于此值,则它将用作 executor 的初始数量。 |
1.3.0 |
spark.dynamicAllocation.maxExecutors |
infinity | 如果启用了动态分配,则 executor 数量的上限。 | 1.2.0 |
spark.dynamicAllocation.minExecutors |
0 | 如果启用了动态分配,则 executor 数量的下限。 | 1.2.0 |
spark.dynamicAllocation.executorAllocationRatio |
1 | 默认情况下,动态分配会请求足够的执行器,以根据要处理的任务数量最大化并行性。虽然这可以最大限度地减少作业的延迟,但对于小任务来说,这种设置可能会由于执行器分配开销而浪费大量资源,因为某些执行器可能甚至不做任何工作。此设置允许设置一个比率,该比率将用于相对于完全并行性减少执行器的数量。默认为 1.0,以提供最大并行性。0.5 将目标执行器数量除以 2。动态分配计算出的目标执行器数量仍然可以被 spark.dynamicAllocation.minExecutors 和 spark.dynamicAllocation.maxExecutors 设置覆盖 |
2.4.0 |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | 如果启用了动态分配,并且有待处理的任务积压超过此持续时间,则将请求新的执行器。有关更多详细信息,请参阅此描述。 | 1.2.0 |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
与 spark.dynamicAllocation.schedulerBacklogTimeout 相同,但仅用于后续执行器请求。有关更多详细信息,请参阅此描述。 |
1.2.0 |
spark.dynamicAllocation.shuffleTracking.enabled |
true |
启用执行器的 shuffle 文件跟踪,从而允许在不需要外部 shuffle 服务的情况下进行动态分配。此选项将尝试保持存储活动作业 shuffle 数据的执行器的活动状态。 | 3.0.0 |
spark.dynamicAllocation.shuffleTracking.timeout |
infinity |
启用 shuffle 跟踪后,控制保存 shuffle 数据的执行器的超时时间。默认值意味着 Spark 将依赖垃圾回收来释放执行器。如果由于某种原因垃圾回收没有足够快地清理 shuffle 数据,则可以使用此选项来控制何时使执行器超时,即使它们正在存储 shuffle 数据。 | 3.0.0 |
线程配置
根据作业和集群配置,我们可以在 Spark 中的多个位置设置线程数,以有效地利用可用资源并获得更好的性能。在 Spark 3.0 之前,这些线程配置适用于 Spark 的所有角色,例如驱动程序、执行程序、worker 和 master。从 Spark 3.0 开始,我们可以从驱动程序和执行程序开始,以更精细的粒度配置线程。以下表以 RPC 模块为例。对于其他模块,例如 shuffle,只需在属性名称中将“rpc”替换为“shuffle”,除了 spark.{driver|executor}.rpc.netty.dispatcher.numThreads
,它仅适用于 RPC 模块。
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.{driver|executor}.rpc.io.serverThreads |
回退到 spark.rpc.io.serverThreads |
服务器线程池中使用的线程数 | 1.6.0 |
spark.{driver|executor}.rpc.io.clientThreads |
回退到 spark.rpc.io.clientThreads |
客户端线程池中使用的线程数 | 1.6.0 |
spark.{driver|executor}.rpc.netty.dispatcher.numThreads |
回退到 spark.rpc.netty.dispatcher.numThreads |
RPC 消息分发器线程池中使用的线程数 | 3.0.0 |
与线程相关的配置键的默认值是为驱动程序或执行器请求的核心数的最小值,或者,在没有该值的情况下,JVM 可用的核心数(硬编码上限为 8)。
Spark Connect
服务器配置
服务器配置在 Spark Connect 服务器中设置,例如,当您使用 ./sbin/start-connect-server.sh
启动 Spark Connect 服务器时。它们通常通过配置文件和带有 --conf/-c
的命令行选项设置。
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.connect.grpc.binding.port |
15002 | Spark Connect 服务器绑定的端口。 | 3.4.0 |
spark.connect.grpc.interceptor.classes |
(无) | 必须实现 io.grpc.ServerInterceptor 接口的类名的逗号分隔列表 |
3.4.0 |
spark.connect.grpc.arrow.maxBatchSize |
4m | 使用 Apache Arrow 时,限制可以从服务器端发送到客户端的一个 Arrow 批处理的最大大小。目前,我们保守地使用它的 70%,因为大小不准确而是估计值。 | 3.4.0 |
spark.connect.grpc.maxInboundMessageSize |
134217728 | 设置 gRPC 请求的最大入站消息大小。有效负载较大的请求将失败。 | 3.4.0 |
spark.connect.extensions.relation.classes |
(无) | 实现 trait org.apache.spark.sql.connect.plugin.RelationPlugin 以支持 proto 中的自定义 Relation 类型的类名的逗号分隔列表。 |
3.4.0 |
spark.connect.extensions.expression.classes |
(无) | 实现 trait org.apache.spark.sql.connect.plugin.ExpressionPlugin 以支持 proto 中的自定义 Expression 类型的类名的逗号分隔列表。 |
3.4.0 |
spark.connect.extensions.command.classes |
(无) | 实现 trait org.apache.spark.sql.connect.plugin.CommandPlugin 以支持 proto 中的自定义 Command 类型的类名的逗号分隔列表。 |
3.4.0 |
安全
请参阅安全性页面,了解有关如何保护不同 Spark 子系统的可用选项。
Spark SQL
运行时 SQL 配置
运行时 SQL 配置是每个会话、可变的 Spark SQL 配置。它们可以通过配置文件和带有前缀 --conf/-c
的命令行选项设置初始值,或者通过设置用于创建 SparkSession
的 SparkConf
。此外,它们可以通过 SET 命令设置和查询,并通过 RESET 命令恢复到其初始值,或者通过运行时 SparkSession.conf
的 setter 和 getter 方法来设置和查询。
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.sql.adaptive.advisoryPartitionSizeInBytes |
(spark.sql.adaptive.shuffle.targetPostShuffleInputSize 的值) |
自适应优化期间 shuffle 分区的建议大小(以字节为单位)(当 spark.sql.adaptive.enabled 为 true 时)。当 Spark 合并小的 shuffle 分区或拆分倾斜的 shuffle 分区时,它会生效。 |
3.0.0 |
spark.sql.adaptive.autoBroadcastJoinThreshold |
(无) | 配置在执行连接时将广播到所有 worker 节点的表的最大大小(以字节为单位)。通过将此值设置为 -1,可以禁用广播。默认值与 spark.sql.autoBroadcastJoinThreshold 相同。请注意,此配置仅在自适应框架中使用。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.enabled |
true | 当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 将根据目标大小(由 'spark.sql.adaptive.advisoryPartitionSizeInBytes' 指定)合并连续的 shuffle 分区,以避免太多的小任务。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
(无) | 合并之前的 shuffle 分区的初始数量。如果未设置,则等于 spark.sql.shuffle.partitions。此配置仅在 'spark.sql.adaptive.enabled' 和 'spark.sql.adaptive.coalescePartitions.enabled' 都为 true 时才有效。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB | 合并后 shuffle 分区的最小大小。当自适应计算的目标大小在分区合并期间太小时,这很有用。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
true | 为 true 时,Spark 在合并连续的 shuffle 分区时,不会考虑 'spark.sql.adaptive.advisoryPartitionSizeInBytes' (默认 64MB) 指定的目标大小,而是根据 Spark 集群的默认并行度自适应地计算目标大小。计算出的大小通常小于配置的目标大小。这是为了最大限度地提高并行性,并避免在启用自适应查询执行时出现性能下降。建议将此配置设置为 false 并考虑配置的目标大小。 |
3.2.0 |
spark.sql.adaptive.customCostEvaluatorClass |
(无) | 用于自适应执行的自定义成本评估器类。如果未设置,Spark 默认将使用其自己的 SimpleCostEvaluator。 |
3.2.0 |
spark.sql.adaptive.enabled |
true | 为 true 时,启用自适应查询执行,它会根据准确的运行时统计信息,在查询执行过程中重新优化查询计划。 |
1.6.0 |
spark.sql.adaptive.forceOptimizeSkewedJoin |
false | 为 true 时,强制启用 OptimizeSkewedJoin,即使它引入了额外的 shuffle。 |
3.3.0 |
spark.sql.adaptive.localShuffleReader.enabled |
true | 当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,当不需要 shuffle 分区时,Spark 尝试使用本地 shuffle 读取器来读取 shuffle 数据,例如,在将 sort-merge 连接转换为 broadcast-hash 连接之后。 |
3.0.0 |
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0b | 配置允许构建本地哈希映射的每个分区的最大大小(以字节为单位)。如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 并且所有分区大小都不大于此配置,则无论 spark.sql.join.preferSortMergeJoin 的值如何,连接选择都更倾向于使用 shuffled hash join 而不是 sort merge join。 |
3.2.0 |
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled |
true | 当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 将优化 RebalancePartitions 中的倾斜 shuffle 分区,并根据目标大小(由 'spark.sql.adaptive.advisoryPartitionSizeInBytes' 指定)将它们拆分为更小的分区,以避免数据倾斜。 |
3.2.0 |
spark.sql.adaptive.optimizer.excludedRules |
(无) | 配置要禁用在自适应优化器中的规则列表,其中规则由其规则名称指定,并以逗号分隔。优化器将记录确实已被排除的规则。 |
3.1.0 |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor |
0.2 | 如果分区的大小小于此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes,则在拆分期间将合并该分区。 |
3.3.0 |
spark.sql.adaptive.skewJoin.enabled |
true | 当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 通过拆分(并在需要时复制)倾斜分区来动态处理 shuffled join(sort-merge 和 shuffled hash)中的倾斜。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
5.0 | 如果分区的大小大于此因子乘以中值分区大小,并且大于 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes',则该分区被认为是倾斜的 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB | 如果分区的大小(以字节为单位)大于此阈值,并且大于 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' 乘以中值分区大小,则该分区被认为是倾斜的。理想情况下,此配置应设置为大于 'spark.sql.adaptive.advisoryPartitionSizeInBytes'。 |
3.0.0 |
spark.sql.allowNamedFunctionArguments |
true | 如果为 true,Spark 将开启对所有已实现命名参数的函数的命名参数的支持。 |
3.5.0 |
spark.sql.ansi.doubleQuotedIdentifiers |
false | 当为 true 且 'spark.sql.ansi.enabled' 为 true 时,Spark SQL 将双引号 (") 引起来的字面量读取为标识符。当为 false 时,它们被读取为字符串字面量。 |
3.4.0 |
spark.sql.ansi.enabled |
false | 如果为 true,Spark SQL 将使用符合 ANSI 标准的方言,而不是 Hive 兼容方言。 例如,当 SQL 运算符/函数的输入无效时,Spark 将在运行时抛出异常,而不是返回 null 结果。有关此方言的完整详细信息,您可以在 Spark 文档的“ANSI 兼容性”部分中找到。 一些 ANSI 方言功能可能并非直接来自 ANSI SQL 标准,但它们的行为符合 ANSI SQL 的风格。 |
3.0.0 |
spark.sql.ansi.enforceReservedKeywords |
false | 如果为 true 且 'spark.sql.ansi.enabled' 为 true,Spark SQL 解析器将强制执行 ANSI 保留关键字,并禁止使用保留关键字作为别名和/或表、视图、函数等的标识符的 SQL 查询。 |
3.3.0 |
spark.sql.ansi.relationPrecedence |
false | 如果为 true 且 'spark.sql.ansi.enabled' 为 true,则在组合关系时,JOIN 优先于逗号。 例如, |
3.4.0 |
spark.sql.autoBroadcastJoinThreshold |
10MB | 配置在执行连接时将广播到所有 worker 节点的表的最大大小(以字节为单位)。 将此值设置为 -1 可以禁用广播。 请注意,目前统计信息仅支持 Hive Metastore 表,其中已运行命令 |
1.1.0 |
spark.sql.avro.compression.codec |
snappy | 写入 AVRO 文件时使用的压缩编解码器。 支持的编解码器:uncompressed、deflate、snappy、bzip2、xz 和 zstandard。 默认编解码器为 snappy。 |
2.4.0 |
spark.sql.avro.deflate.level |
-1 | 用于写入 AVRO 文件的 deflate 编解码器的压缩级别。有效值必须在 1 到 9(含)或 -1 的范围内。默认值为 -1,对应于当前实现中的 6 级。 |
2.4.0 |
spark.sql.avro.filterPushdown.enabled |
true | 如果为 true,则启用将过滤器下推到 Avro 数据源。 |
3.1.0 |
spark.sql.broadcastTimeout |
300 | 广播连接中广播等待时间的超时时间(以秒为单位)。 |
1.3.0 |
spark.sql.bucketing.coalesceBucketsInJoin.enabled |
false | 如果为 true,如果连接了具有不同 bucket 数量的两个 bucketed 表,则 bucket 数量较大的那一方将被合并为与另一方具有相同的 bucket 数量。 bucket 数量较大的可以被 bucket 数量较小的那一方整除。 Bucket 合并应用于 sort-merge 连接和 shuffled hash 连接。 注意:合并 bucketed 表可以避免连接中不必要的 shuffle,但它也会降低并行性,并可能导致 shuffled hash 连接的 OOM。 |
3.1.0 |
spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio |
4 | 要应用的 bucket 合并,被合并的两个 bucket 的数量之比应小于或等于此值。 此配置仅在 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' 设置为 true 时才有效。 |
3.1.0 |
spark.sql.catalog.spark_catalog |
(无) | 一个 catalog 实现,将用作 Spark 内置 v1 catalog 的 v2 接口:spark_catalog。 此 catalog 与 spark_catalog 共享其标识符命名空间,并且必须与它保持一致; 例如,如果一个表可以被 spark_catalog 加载,则此 catalog 也必须返回该表的元数据。 要将操作委派给 spark_catalog,实现可以扩展 'CatalogExtension'。 |
3.0.0 |
spark.sql.cbo.enabled |
false | 设置为 true 时,启用 CBO 来估计计划统计信息。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.star.filter |
false | 将星型连接过滤器启发式应用于基于成本的连接枚举。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.threshold |
12 | 动态编程算法中允许的最大连接节点数。 |
2.2.0 |
spark.sql.cbo.joinReorder.enabled |
false | 在 CBO 中启用连接重排序。 |
2.2.0 |
spark.sql.cbo.planStats.enabled |
false | 如果为 true,则逻辑计划将从 catalog 中获取行数和列统计信息。 |
3.0.0 |
spark.sql.cbo.starSchemaDetection |
false | 如果为 true,则启用基于星型模式检测的连接重排序。 |
2.2.0 |
spark.sql.charAsVarchar |
false | 如果为 true,Spark 会在 CREATE/REPLACE/ALTER TABLE 命令中使用 VARCHAR 类型替换 CHAR 类型,以便新创建/更新的表不会有 CHAR 类型的列/字段。 具有 CHAR 类型列/字段的现有表不受此配置的影响。 |
3.3.0 |
spark.sql.chunkBase64String.enabled |
true | 是否截断由 |
3.5.2 |
spark.sql.cli.print.header |
false | 设置为 true 时,spark-sql CLI 会打印查询输出中的列名。 |
3.2.0 |
spark.sql.columnNameOfCorruptRecord |
_corrupt_record | 用于存储未能解析的原始/未解析的 JSON 和 CSV 记录的内部列的名称。 |
1.2.0 |
spark.sql.csv.filterPushdown.enabled |
true | 如果为 true,则启用将过滤器下推到 CSV 数据源。 |
3.0.0 |
spark.sql.datetime.java8API.enabled |
false | 如果将配置属性设置为 true,则 Java 8 API 的 java.time.Instant 和 java.time.LocalDate 类将用作 Catalyst 的 TimestampType 和 DateType 的外部类型。 如果设置为 false,则 java.sql.Timestamp 和 java.sql.Date 用于相同的目的。 |
3.0.0 |
spark.sql.debug.maxToStringFields |
25 | 可以在调试输出中转换为字符串的类序列条目的最大字段数。 超过限制的任何元素都将被删除,并替换为“... N 更多字段”占位符。 |
3.0.0 |
spark.sql.defaultCatalog |
spark_catalog | 默认 catalog 的名称。 如果用户尚未明确设置当前 catalog,这将是当前的 catalog。 |
3.0.0 |
spark.sql.error.messageFormat |
PRETTY | 当为 PRETTY 时,错误消息由错误类、消息和查询上下文的文本表示组成。 MINIMAL 和 STANDARD 格式是漂亮的 JSON 格式,其中 STANDARD 包括一个额外的 JSON 字段 |
3.4.0 |
spark.sql.execution.arrow.enabled |
false | (自 Spark 3.0 起已弃用,请设置 'spark.sql.execution.arrow.pyspark.enabled'。) |
2.3.0 |
spark.sql.execution.arrow.fallback.enabled |
true | (自 Spark 3.0 起已弃用,请设置 'spark.sql.execution.arrow.pyspark.fallback.enabled'。) |
2.4.0 |
spark.sql.execution.arrow.localRelationThreshold |
48MB | 将 Arrow 批处理转换为 Spark DataFrame 时,如果 Arrow 批处理的字节大小小于此阈值,则在驱动程序端使用本地集合。 否则,Arrow 批处理将被发送并反序列化为执行程序中的 Spark 内部行。 |
3.4.0 |
spark.sql.execution.arrow.maxRecordsPerBatch |
10000 | 使用 Apache Arrow 时,限制可以写入内存中单个 ArrowRecordBatch 的最大记录数。 此配置对于分组 API(例如 DataFrame(.cogroup).groupby.applyInPandas)无效,因为每个组都会成为每个 ArrowRecordBatch。 如果设置为零或负数,则没有限制。 |
2.3.0 |
spark.sql.execution.arrow.pyspark.enabled |
(spark.sql.execution.arrow.enabled 的值) |
如果为 true,则在 PySpark 中使用 Apache Arrow 进行列式数据传输。 此优化适用于:1. pyspark.sql.DataFrame.toPandas。 2. pyspark.sql.SparkSession.createDataFrame,当其输入是 Pandas DataFrame 或 NumPy ndarray 时。 以下数据类型不受支持:TimestampType 的 ArrayType。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.fallback.enabled |
(spark.sql.execution.arrow.fallback.enabled 的值) |
如果为 true,如果发生错误,则由 'spark.sql.execution.arrow.pyspark.enabled' 启用的优化将自动回退到非优化实现。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.selfDestruct.enabled |
false | (实验性)如果为 true,则在从 Arrow 转换为 Pandas 时,使用 Apache Arrow 的 self-destruct 和 split-blocks 选项在 PySpark 中进行列式数据传输。 这以一定的 CPU 时间为代价减少了内存使用量。 此优化适用于:当设置 'spark.sql.execution.arrow.pyspark.enabled' 时,pyspark.sql.DataFrame.toPandas。 |
3.2.0 |
spark.sql.execution.arrow.sparkr.enabled |
false | 如果为 true,则在 SparkR 中使用 Apache Arrow 进行列式数据传输。 此优化适用于:1. 当输入是 R DataFrame 时创建 DataFrame 2. collect 3. dapply 4. gapply。 以下数据类型不受支持:FloatType、BinaryType、ArrayType、StructType 和 MapType。 |
3.0.0 |
spark.sql.execution.pandas.structHandlingMode |
legacy | 创建 pandas DataFrame 时 struct 类型的转换模式。 当为 "legacy" 时,1. 当禁用 Arrow 优化时,转换为 Row 对象,2. 当启用 Arrow 优化时,转换为 dict 或如果存在重复的嵌套字段名称则引发异常。 当为 "row" 时,无论 Arrow 优化如何,都转换为 Row 对象。 当为 "dict" 时,转换为 dict 并使用带后缀的键名(例如,a_0, a_1),如果存在重复的嵌套字段名称,无论 Arrow 优化如何。 |
3.5.0 |
spark.sql.execution.pandas.udf.buffer.size |
(spark.buffer.size 的值) |
与 |
3.0.0 |
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled |
true | 如果为 true,则简化来自 Python UDF 的回溯。 它在回溯中隐藏了来自 PySpark 的 Python worker、(反)序列化等,并且仅显示来自 UDF 的异常消息。 请注意,这仅适用于 CPython 3.7+。 |
3.1.0 |
spark.sql.execution.pythonUDF.arrow.enabled |
false | 在常规 Python UDF 中启用 Arrow 优化。 只有当给定的函数至少接受一个参数时,才能启用此优化。 |
3.4.0 |
spark.sql.execution.pythonUDTF.arrow.enabled |
false | 为 Python UDTF 启用 Arrow 优化。 |
3.5.0 |
spark.sql.execution.topKSortFallbackThreshold |
2147483632 | 在带有 SORT 后跟 LIMIT 的 SQL 查询中,例如“SELECT x FROM t ORDER BY y LIMIT m”,如果 m 低于此阈值,则在内存中执行 top-K 排序,否则执行全局排序,如果需要则溢出到磁盘。 |
2.4.0 |
spark.sql.files.ignoreCorruptFiles |
false | 是否忽略损坏的文件。 如果为 true,则 Spark 作业将在遇到损坏的文件时继续运行,并且仍将返回已读取的内容。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 |
2.1.1 |
spark.sql.files.ignoreMissingFiles |
false | 是否忽略丢失的文件。 如果为 true,则 Spark 作业将在遇到丢失的文件时继续运行,并且仍将返回已读取的内容。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 |
2.3.0 |
spark.sql.files.maxPartitionBytes |
128MB | 读取文件时,打包到单个分区中的最大字节数。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 |
2.0.0 |
spark.sql.files.maxPartitionNum |
(无) | 建议的(不保证的)最大拆分文件分区数。 如果设置了该值,如果初始分区数超过该值,Spark 将重新缩放每个分区,以使分区数接近该值。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 |
3.5.0 |
spark.sql.files.maxRecordsPerFile |
0 | 写入单个文件的最大记录数。 如果此值为零或负数,则没有限制。 |
2.2.0 |
spark.sql.files.minPartitionNum |
(无) | 建议的(不保证的)最小拆分文件分区数。 如果未设置,则默认值为 |
3.1.0 |
spark.sql.function.concatBinaryAsString |
false | 如果此选项设置为 false 且所有输入均为二进制,则 |
2.3.0 |
spark.sql.function.eltOutputAsString |
false | 如果此选项设置为 false 且所有输入均为二进制,则 |
2.3.0 |
spark.sql.groupByAliases |
true | 如果为 true,则 select 列表中的别名可以在 group by 子句中使用。如果为 false,则会抛出分析异常。 |
2.2.0 |
spark.sql.groupByOrdinal |
true | 如果为 true,则 group by 子句中的序号将被视为 select 列表中的位置。如果为 false,则序号将被忽略。 |
2.0.0 |
spark.sql.hive.convertInsertingPartitionedTable |
true | 如果设置为 true,并且 |
3.0.0 |
spark.sql.hive.convertMetastoreCtas |
true | 如果设置为 true,Spark 将尝试在 CTAS 中使用内置的数据源写入器而不是 Hive serde。只有当分别针对 Parquet 和 ORC 格式启用 |
3.0.0 |
spark.sql.hive.convertMetastoreInsertDir |
true | 如果设置为 true,Spark 将尝试在 INSERT OVERWRITE DIRECTORY 中使用内置的数据源写入器而不是 Hive serde。只有当分别针对 Parquet 和 ORC 格式启用 |
3.3.0 |
spark.sql.hive.convertMetastoreOrc |
true | 如果设置为 true,则内置的 ORC 读取器和写入器将用于处理使用 HiveQL 语法创建的 ORC 表,而不是 Hive serde。 |
2.0.0 |
spark.sql.hive.convertMetastoreParquet |
true | 如果设置为 true,则内置的 Parquet 读取器和写入器将用于处理使用 HiveQL 语法创建的 Parquet 表,而不是 Hive serde。 |
1.1.1 |
spark.sql.hive.convertMetastoreParquet.mergeSchema |
false | 如果为 true,还会尝试合并不同 Parquet 数据文件中可能不同但兼容的 Parquet 模式。此配置仅当 "spark.sql.hive.convertMetastoreParquet" 为 true 时才有效。 |
1.3.1 |
spark.sql.hive.dropPartitionByName.enabled |
false | 如果为 true,Spark 将获取分区名称而不是分区对象来删除分区,这可以提高删除分区的性能。 |
3.4.0 |
spark.sql.hive.filesourcePartitionFileCacheSize |
262144000 | 如果非零,则启用内存中分区文件元数据的缓存。所有表共享一个缓存,该缓存最多可以使用指定的字节数用于文件元数据。此配置仅在启用 Hive 文件源分区管理时有效。 |
2.1.1 |
spark.sql.hive.manageFilesourcePartitions |
true | 如果为 true,则也为文件源表启用 Metastore 分区管理。这包括数据源表和转换后的 Hive 表。启用分区管理后,数据源表将分区存储在 Hive Metastore 中,并在 spark.sql.hive.metastorePartitionPruning 设置为 true 时,使用 Metastore 在查询规划期间修剪分区。 |
2.1.1 |
spark.sql.hive.metastorePartitionPruning |
true | 如果为 true,某些谓词将被下推到 Hive Metastore 中,以便可以更早地消除不匹配的分区。 |
1.5.0 |
spark.sql.hive.metastorePartitionPruningFallbackOnException |
false | 当遇到来自 Metastore 的 MetaException 时,是否回退到从 Hive Metastore 获取所有分区并在 Spark 客户端执行分区修剪。请注意,如果启用此选项并且要列出的分区很多,则 Spark 查询性能可能会下降。如果禁用此选项,Spark 将使查询失败。 |
3.3.0 |
spark.sql.hive.metastorePartitionPruningFastFallback |
false | 启用此配置后,如果 Hive 不支持谓词,或者 Spark 由于遇到来自 Metastore 的 MetaException 而回退,Spark 将通过首先获取分区名称,然后在客户端评估过滤器表达式来修剪分区。请注意,不支持带有 TimeZoneAwareExpression 的谓词。 |
3.3.0 |
spark.sql.hive.thriftServer.async |
true | 如果设置为 true,Hive Thrift 服务器以异步方式执行 SQL 查询。 |
1.5.0 |
spark.sql.hive.verifyPartitionPath |
false | 如果为 true,则在读取存储在 HDFS 中的数据时,检查表根目录下的所有分区路径。此配置将在未来的版本中被弃用,并由 spark.files.ignoreMissingFiles 替换。 |
1.4.0 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制用于列式缓存的批处理大小。更大的批处理大小可以提高内存利用率和压缩率,但在缓存数据时可能会导致 OOM。 |
1.1.1 |
spark.sql.inMemoryColumnarStorage.compressed |
true | 如果设置为 true,Spark SQL 将根据数据的统计信息自动为每一列选择一种压缩编解码器。 |
1.0.1 |
spark.sql.inMemoryColumnarStorage.enableVectorizedReader |
true | 为列式缓存启用向量化读取器。 |
2.3.1 |
spark.sql.json.filterPushdown.enabled |
true | 如果为 true,则启用将过滤器下推到 JSON 数据源。 |
3.1.0 |
spark.sql.jsonGenerator.ignoreNullFields |
true | 在 JSON 数据源和 JSON 函数(如 to_json)中生成 JSON 对象时,是否忽略空字段。如果为 false,则为 JSON 对象中的空字段生成 null。 |
3.0.0 |
spark.sql.leafNodeDefaultParallelism |
(无) | 生成数据的 Spark SQL 叶节点的默认并行度,例如文件扫描节点、本地数据扫描节点、范围节点等。此配置的默认值为 'SparkContext#defaultParallelism'。 |
3.2.0 |
spark.sql.mapKeyDedupPolicy |
EXCEPTION | 在内置函数中对映射键进行重复数据删除的策略:CreateMap, MapFromArrays, MapFromEntries, StringToMap, MapConcat 和 TransformKeys。当 EXCEPTION 时,如果检测到重复的映射键,则查询失败。当 LAST_WIN 时,最后插入的映射键优先。 |
3.0.0 |
spark.sql.maven.additionalRemoteRepositories |
https://maven-central.storage-download.googleapis.com/maven2/ | 可选的额外远程 Maven 镜像仓库的逗号分隔字符串配置。如果默认的 Maven Central 仓库无法访问,则仅用于在 IsolatedClientLoader 中下载 Hive jar。 |
3.0.0 |
spark.sql.maxMetadataStringLength |
100 | 元数据字符串输出的最大字符数。例如, |
3.1.0 |
spark.sql.maxPlanStringLength |
2147483632 | 计划字符串输出的最大字符数。如果计划更长,则进一步的输出将被截断。默认设置始终生成完整计划。如果计划字符串占用过多内存或导致驱动程序或 UI 进程中出现 OutOfMemory 错误,请将其设置为较低的值,例如 8k。 |
3.0.0 |
spark.sql.maxSinglePartitionBytes |
9223372036854775807b | 单个分区允许的最大字节数。否则,规划器将引入 shuffle 以提高并行性。 |
3.4.0 |
spark.sql.optimizer.collapseProjectAlwaysInline |
false | 是否始终折叠两个相邻的投影并内联表达式,即使这会导致额外的重复。 |
3.3.0 |
spark.sql.optimizer.dynamicPartitionPruning.enabled |
true | 如果为 true,当分区列用作连接键时,我们将为分区列生成谓词。 |
3.0.0 |
spark.sql.optimizer.enableCsvExpressionOptimization |
true | 是否在 SQL 优化器中优化 CSV 表达式。它包括从 from_csv 中修剪不必要的列。 |
3.2.0 |
spark.sql.optimizer.enableJsonExpressionOptimization |
true | 是否在 SQL 优化器中优化 JSON 表达式。它包括从 from_json 中修剪不必要的列,简化 from_json + to_json,to_json + named_struct(from_json.col1, from_json.col2, ....)。 |
3.1.0 |
spark.sql.optimizer.excludedRules |
(无) | 配置要在优化器中禁用的规则列表,其中规则由其规则名称指定,并用逗号分隔。不能保证此配置中的所有规则最终都会被排除,因为某些规则对于正确性是必需的。优化器将记录实际已被排除的规则。 |
2.4.0 |
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold |
10GB | Bloom 过滤器应用端计划的聚合扫描大小的字节大小阈值。Bloom 过滤器应用端的聚合扫描字节大小需要超过此值才能注入 Bloom 过滤器。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold |
10MB | Bloom 过滤器创建端计划的大小阈值。估计大小需要低于此值才能尝试注入 Bloom 过滤器。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.enabled |
true | 如果为 true,并且 shuffle 连接的一侧具有选择性谓词,我们将尝试在另一侧插入 Bloom 过滤器以减少 shuffle 数据量。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems |
1000000 | 运行时 Bloom 过滤器的默认预期项目数 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumBits |
67108864 | 运行时 Bloom 过滤器使用的最大位数 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumItems |
4000000 | 运行时 Bloom 过滤器允许的最大预期项目数 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.numBits |
8388608 | 运行时 Bloom 过滤器使用的默认位数 |
3.3.0 |
spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled |
true | 为基于组的行级操作启用运行时组过滤。替换数据组(例如,文件,分区)的数据源可以在计划行级操作扫描时使用提供的数据源过滤器修剪整个组。但是,这种过滤是有限的,因为并非所有表达式都可以转换为数据源过滤器,并且某些表达式只能由 Spark 评估(例如,子查询)。由于重写组的成本很高,因此 Spark 可以在运行时执行查询,以查找哪些记录与行级操作的条件匹配。有关匹配记录的信息将传递回行级操作扫描,从而允许数据源丢弃不必重写的组。 |
3.4.0 |
spark.sql.optimizer.runtimeFilter.number.threshold |
10 | 单个查询注入的运行时过滤器(非 DPP)的总数。这是为了防止驱动程序因 Bloom 过滤器过多而导致 OOM。 |
3.3.0 |
spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled |
false | 如果为 true,并且 shuffle 连接的一侧具有选择性谓词,我们将尝试在另一侧插入半连接以减少 shuffle 数据量。 |
3.3.0 |
spark.sql.orc.aggregatePushdown |
false | 如果为 true,聚合将被下推到 ORC 以进行优化。 支持 MIN、MAX 和 COUNT 作为聚合表达式。 对于 MIN/MAX,支持 boolean、integer、float 和 date 类型。 对于 COUNT,支持所有数据类型。 如果任何 ORC 文件页脚中缺少统计信息,则会抛出异常。 |
3.3.0 |
spark.sql.orc.columnarReaderBatchSize |
4096 | 要在 orc 向量化读取器批处理中包含的行数。 应该仔细选择此数字,以最大程度地减少开销并避免在读取数据时出现 OOM。 |
2.4.0 |
spark.sql.orc.columnarWriterBatchSize |
1024 | 要在 orc 向量化写入器批处理中包含的行数。 应该仔细选择此数字,以最大程度地减少开销并避免在写入数据时出现 OOM。 |
3.4.0 |
spark.sql.orc.compression.codec |
snappy | 设置写入 ORC 文件时使用的压缩编解码器。如果在表特定的选项/属性中指定了 |
2.3.0 |
spark.sql.orc.enableNestedColumnVectorizedReader |
true | 为嵌套列启用向量化 orc 解码。 |
3.2.0 |
spark.sql.orc.enableVectorizedReader |
true | 启用向量化 orc 解码。 |
2.3.0 |
spark.sql.orc.filterPushdown |
true | 如果为 true,则为 ORC 文件启用过滤器下推。 |
1.4.0 |
spark.sql.orc.mergeSchema |
false | 如果为 true,Orc 数据源合并从所有数据文件收集的模式,否则从随机数据文件中选择模式。 |
3.0.0 |
spark.sql.orderByOrdinal |
true | 如果为 true,则序号被视为 select 列表中的位置。 如果为 false,则 order/sort by 子句中的序号将被忽略。 |
2.0.0 |
spark.sql.parquet.aggregatePushdown |
false | 如果为 true,则聚合将被下推到 Parquet 以进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。 对于 MIN/MAX,支持 boolean、integer、float 和 date 类型。 对于 COUNT,支持所有数据类型。 如果任何 Parquet 文件页脚中缺少统计信息,则会抛出异常。 |
3.3.0 |
spark.sql.parquet.binaryAsString |
false | 一些其他生成 Parquet 的系统,特别是 Impala 和 Spark SQL 的旧版本,在写出 Parquet 模式时,不会区分二进制数据和字符串。 此标志告诉 Spark SQL 将二进制数据解释为字符串,以提供与这些系统的兼容性。 |
1.1.1 |
spark.sql.parquet.columnarReaderBatchSize |
4096 | parquet 向量化读取器批处理中包含的行数。应仔细选择该数字,以最大限度地减少开销并避免读取数据时出现 OOM。 |
2.4.0 |
spark.sql.parquet.compression.codec |
snappy | 设置写入 Parquet 文件时使用的压缩编解码器。 如果在特定于表的选项/属性中指定了 |
1.1.1 |
spark.sql.parquet.enableNestedColumnVectorizedReader |
true | 为嵌套列(例如,struct、list、map)启用向量化 Parquet 解码。 需要启用 spark.sql.parquet.enableVectorizedReader。 |
3.3.0 |
spark.sql.parquet.enableVectorizedReader |
true | 启用向量化 parquet 解码。 |
2.0.0 |
spark.sql.parquet.fieldId.read.enabled |
false | Field ID 是 Parquet 模式规范的本机字段。 启用后,Parquet 读取器将使用请求的 Spark 模式中的字段 ID(如果存在)来查找 Parquet 字段,而不是使用列名。 |
3.3.0 |
spark.sql.parquet.fieldId.read.ignoreMissing |
false | 当 Parquet 文件没有任何字段 ID,但 Spark 读取模式正在使用字段 ID 进行读取时,如果启用此标志,我们将静默地返回 null,否则会报错。 |
3.3.0 |
spark.sql.parquet.fieldId.write.enabled |
true | Field ID 是 Parquet 模式规范的本机字段。 启用后,Parquet 写入器会将 Spark 模式中的字段 ID 元数据(如果存在)填充到 Parquet 模式。 |
3.3.0 |
spark.sql.parquet.filterPushdown |
true | 设置为 true 时,启用 Parquet 过滤器下推优化。 |
1.2.0 |
spark.sql.parquet.inferTimestampNTZ.enabled |
true | 启用后,在模式推断期间,带有注释 isAdjustedToUTC = false 的 Parquet 时间戳列将被推断为 TIMESTAMP_NTZ 类型。 否则,所有 Parquet 时间戳列都将被推断为 TIMESTAMP_LTZ 类型。 请注意,Spark 在文件写入时将输出模式写入 Parquet 的页脚元数据,并在文件读取时利用它。 因此,此配置仅影响非 Spark 写入的 Parquet 文件的模式推断。 |
3.4.0 |
spark.sql.parquet.int96AsTimestamp |
true | 一些生成 Parquet 的系统,特别是 Impala,将 Timestamp 存储到 INT96 中。 Spark 也会将 Timestamp 存储为 INT96,因为我们需要避免纳秒字段的精度损失。 此标志告诉 Spark SQL 将 INT96 数据解释为时间戳,以提供与这些系统的兼容性。 |
1.3.0 |
spark.sql.parquet.int96TimestampConversion |
false | 这控制在转换为时间戳时,是否应将时间戳调整应用于 INT96 数据,以用于 Impala 写入的数据。 这是必要的,因为 Impala 存储的 INT96 数据具有与 Hive 和 Spark 不同的时区偏移量。 |
2.3.0 |
spark.sql.parquet.mergeSchema |
false | 如果为 true,Parquet 数据源会合并从所有数据文件收集的模式,否则,如果没有任何摘要文件,则从摘要文件或随机数据文件中选择模式。 |
1.5.0 |
spark.sql.parquet.outputTimestampType |
INT96 | 设置 Spark 将数据写入 Parquet 文件时使用的 Parquet 时间戳类型。 INT96 是 Parquet 中非标准但常用的时间戳类型。 TIMESTAMP_MICROS 是 Parquet 中的标准时间戳类型,它存储自 Unix epoch 以来的微秒数。 TIMESTAMP_MILLIS 也是标准类型,但精度为毫秒,这意味着 Spark 必须截断其时间戳值的微秒部分。 |
2.3.0 |
spark.sql.parquet.recordLevelFilter.enabled |
false | 如果为 true,则启用 Parquet 的本机记录级过滤,该过滤使用下推的过滤器。 仅当启用“spark.sql.parquet.filterPushdown”且未使用向量化读取器时,此配置才有效。 您可以通过将“spark.sql.parquet.enableVectorizedReader”设置为 false 来确保不使用向量化读取器。 |
2.3.0 |
spark.sql.parquet.respectSummaryFiles |
false | 如果为 true,我们假设 Parquet 的所有 part-file 都与摘要文件一致,并且我们在合并模式时将忽略它们。 否则,如果为 false(默认值),我们将合并所有 part-file。 这应被视为仅供专家使用的选项,在确切了解其含义之前不应启用。 |
1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | 如果为 true,数据将以 Spark 1.4 及更早版本的方式写入。 例如,十进制值将以 Apache Parquet 的固定长度字节数组格式写入,其他系统(例如 Apache Hive 和 Apache Impala)也使用该格式。 如果为 false,则将使用 Parquet 中较新的格式。 例如,十进制数将以基于整数的格式写入。 如果 Parquet 输出旨在与不支持此较新格式的系统一起使用,请设置为 true。 |
1.6.0 |
spark.sql.parser.quotedRegexColumnNames |
false | 如果为 true,则 SELECT 语句中带引号的标识符(使用反引号)被解释为正则表达式。 |
2.3.0 |
spark.sql.pivotMaxValues |
10000 | 在执行数据透视而不指定数据透视列的值时,这是在没有错误的情况下将收集的最大(不同)值的数量。 |
1.6.0 |
spark.sql.pyspark.inferNestedDictAsStruct.enabled |
false | 默认情况下,PySpark 的 SparkSession.createDataFrame 将嵌套字典推断为 map。 当设置为 true 时,它将嵌套字典推断为 struct。 |
3.3.0 |
spark.sql.pyspark.jvmStacktrace.enabled |
false | 如果为 true,则在用户友好的 PySpark 异常中,将同时显示 JVM 堆栈跟踪和 Python 堆栈跟踪。 默认情况下,它处于禁用状态,以隐藏 JVM 堆栈跟踪,仅显示 Python 友好的异常。 请注意,这与日志级别设置无关。 |
3.0.0 |
spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled |
false | 默认情况下,PySpark 的 SparkSession.createDataFrame 从数组中的所有值推断数组的元素类型。 如果此配置设置为 true,它将恢复仅从第一个数组元素推断类型的旧行为。 |
3.4.0 |
spark.sql.readSideCharPadding |
true | 如果为 true,除了写入端填充之外,Spark 在读取 CHAR 类型列/字段时还会应用字符串填充。 默认情况下,此配置为 true,以便在外部表等情况下更好地强制执行 CHAR 类型语义。 |
3.4.0 |
spark.sql.redaction.options.regex |
(?i)url | 用于确定 Spark SQL 命令的选项映射中哪些键包含敏感信息的正则表达式。 与此正则表达式匹配的选项名称的值将在 explain 输出中被编辑。 此编辑是在由 spark.redaction.regex 定义的全局编辑配置之上应用的。 |
2.2.2 |
spark.sql.redaction.string.regex |
(spark.redaction.string.regex 的值) |
用于确定 Spark 生成的字符串的哪些部分包含敏感信息的正则表达式。 当此正则表达式与字符串的一部分匹配时,该字符串部分将替换为虚拟值。 这当前用于编辑 SQL explain 命令的输出。 如果未设置此配置,则使用 |
2.3.0 |
spark.sql.repl.eagerEval.enabled |
false | 是否启用预先计算。 如果为 true,则当且仅当 REPL 支持预先计算时,才会显示 Dataset 的前 K 行。 目前,PySpark 和 SparkR 支持预先计算。 在 PySpark 中,对于像 Jupyter 这样的笔记本,将返回 HTML 表格(由 repr_html 生成)。 对于普通 Python REPL,返回的输出格式类似于 dataframe.show()。 在 SparkR 中,显示的返回输出类似于 R data.frame。 |
2.4.0 |
spark.sql.repl.eagerEval.maxNumRows |
20 | 预先计算返回的最大行数。 仅当 spark.sql.repl.eagerEval.enabled 设置为 true 时,此配置才生效。 此配置的有效范围是 0 到 (Int.MaxValue - 1),因此无效配置(如负数和大于 (Int.MaxValue - 1))将被归一化为 0 和 (Int.MaxValue - 1)。 |
2.4.0 |
spark.sql.repl.eagerEval.truncate |
20 | 预先计算返回的每个单元格的最大字符数。 仅当 spark.sql.repl.eagerEval.enabled 设置为 true 时,此配置才生效。 |
2.4.0 |
spark.sql.session.localRelationCacheThreshold |
67108864 | 序列化后要在驱动程序端缓存的本地关系的字节大小阈值。 |
3.5.0 |
spark.sql.session.timeZone |
(本地时区的值) | 会话本地时区的 ID,格式可以是基于区域的时区 ID 或时区偏移量。 区域 ID 必须采用“area/city”的形式,例如“America/Los_Angeles”。 时区偏移量必须采用“(+|-)HH”、“(+|-)HH:mm”或“(+|-)HH:mm:ss”格式,例如“-08”、“+01:00”或“-13:33:33”。 “UTC”和“Z”也支持作为“+00:00”的别名。 不建议使用其他短名称,因为它们可能不明确。 |
2.2.0 |
spark.sql.shuffle.partitions |
200 | 在对连接或聚合的数据进行洗牌时使用的默认分区数。 注意:对于结构化流式处理,从同一检查点位置重新启动查询后,无法更改此配置。 |
1.1.0 |
spark.sql.shuffledHashJoinFactor |
3 | 如果小端的数据大小乘以该因子仍然小于大端,则可以选择 shuffle hash join。 |
3.3.0 |
spark.sql.sources.bucketing.autoBucketedScan.enabled |
true | 如果为 true,则决定是否根据查询计划自动对输入表执行 bucketed 扫描。 如果 1. 查询没有利用 bucketing 的运算符(例如 join、group-by 等),或者 2. 在这些运算符和表扫描之间存在交换运算符,则不要使用 bucketed 扫描。 请注意,当 'spark.sql.sources.bucketing.enabled' 设置为 false 时,此配置不起作用。 |
3.1.0 |
spark.sql.sources.bucketing.enabled |
true | 如果为 false,我们将把 bucketed 表视为普通表 |
2.0.0 |
spark.sql.sources.bucketing.maxBuckets |
100000 | 允许的最大桶数。 |
2.4.0 |
spark.sql.sources.default |
parquet | 在输入/输出中使用的默认数据源。 |
1.3.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
32 | 允许在驱动程序端列出文件的最大路径数。 如果在分区发现期间检测到的路径数超过此值,它将尝试使用另一个 Spark 分布式作业列出文件。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 |
1.5.0 |
spark.sql.sources.partitionColumnTypeInference.enabled |
true | 如果为 true,则自动推断分区列的数据类型。 |
1.5.0 |
spark.sql.sources.partitionOverwriteMode |
STATIC | 当 INSERT OVERWRITE 分区数据源表时,我们当前支持 2 种模式:静态和动态。 在静态模式下,Spark 在覆盖之前删除与 INSERT 语句中的分区规范(例如 PARTITION(a=1,b))匹配的所有分区。 在动态模式下,Spark 不会提前删除分区,而仅覆盖在运行时写入数据的那些分区。 默认情况下,我们使用静态模式来保持 Spark 2.3 之前的相同行为。 请注意,此配置不影响 Hive serde 表,因为它们始终以动态模式覆盖。 也可以使用键 partitionOverwriteMode 将其设置为数据源的输出选项(该选项优先于此设置),例如 dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)。 |
2.3.0 |
spark.sql.sources.v2.bucketing.enabled |
false | 类似于 spark.sql.sources.bucketing.enabled,此配置用于为 V2 数据源启用分桶。启用后,Spark 将识别 V2 数据源通过 SupportsReportPartitioning 报告的特定分布,并尝试在必要时避免 shuffle。 |
3.3.0 |
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled |
false | 在存储分区的连接期间,当连接的两侧都是 KeyGroupedPartitioning 时,是否允许输入分区部分聚集。在规划时,Spark 将根据表统计信息选择数据量较小的一侧,对其进行分组和复制以匹配另一侧。这是对倾斜连接的优化,可以帮助减少某些分区分配大量数据时的数据倾斜。此配置需要同时启用 spark.sql.sources.v2.bucketing.enabled 和 spark.sql.sources.v2.bucketing.pushPartValues.enabled。 |
3.4.0 |
spark.sql.sources.v2.bucketing.pushPartValues.enabled |
false | 当启用 spark.sql.sources.v2.bucketing.enabled 时,是否下推公共分区值。启用后,如果连接的两侧都是 KeyGroupedPartitioning 并且它们共享兼容的分区键,即使它们没有完全相同的分区值,Spark 将计算分区值的超集并将该信息下推到扫描节点,扫描节点将对任一侧缺少的分区值使用空分区。这有助于消除不必要的 shuffle。 |
3.4.0 |
spark.sql.statistics.fallBackToHdfs |
false | 如果表统计信息无法从表元数据中获得,则设置为 true 时,将回退到 HDFS。 这在确定表是否足够小以使用广播连接时非常有用。 此标志仅对非分区 Hive 表有效。 对于非分区数据源表,如果表统计信息不可用,它将自动重新计算。 对于分区数据源和分区 Hive 表,如果表统计信息不可用,则为“spark.sql.defaultSizeInBytes”。 |
2.0.0 |
spark.sql.statistics.histogram.enabled |
false | 如果启用,则在计算列统计信息时生成直方图。 直方图可以提供更好的估计准确性。 目前,Spark 仅支持等高直方图。 请注意,收集直方图需要额外的成本。 例如,收集列统计信息通常只需要一次表扫描,但生成等高直方图将导致额外的表扫描。 |
2.3.0 |
spark.sql.statistics.size.autoUpdate.enabled |
false | 启用对表大小的自动更新,一旦表的数据发生更改。 请注意,如果表的总文件数非常大,这可能会很昂贵并降低数据更改命令的速度。 |
2.3.0 |
spark.sql.storeAssignmentPolicy |
ANSI | 当将值插入到具有不同数据类型的列中时,Spark 将执行类型强制转换。 目前,我们支持 3 种类型强制转换规则的策略:ANSI、legacy 和 strict。 使用 ANSI 策略,Spark 按照 ANSI SQL 执行类型强制转换。 在实践中,该行为与 PostgreSQL 大致相同。 它不允许某些不合理的类型转换,例如将 |
3.0.0 |
spark.sql.streaming.checkpointLocation |
(无) | 用于存储流式查询的检查点数据的默认位置。 |
2.0.0 |
spark.sql.streaming.continuous.epochBacklogQueueSize |
10000 | 队列中存储的等待延迟 epoch 的最大条目数。如果队列的大小超过此参数,流将停止并出现错误。 |
3.0.0 |
spark.sql.streaming.disabledV2Writers |
禁用 StreamWriteSupport 的完全限定数据源注册类名称的逗号分隔列表。写入这些源将回退到 V1 Sinks。 |
2.3.1 | |
spark.sql.streaming.fileSource.cleaner.numThreads |
1 | 文件源已完成文件清理器中使用的线程数。 |
3.0.0 |
spark.sql.streaming.forceDeleteTempCheckpointLocation |
false | 如果为 true,则启用临时检查点位置强制删除。 |
3.0.0 |
spark.sql.streaming.metricsEnabled |
false | 是否为活动流式查询报告 Dropwizard/Codahale 指标。 |
2.0.2 |
spark.sql.streaming.multipleWatermarkPolicy |
min | 当流式查询中有多个水印运算符时,用于计算全局水印值的策略。默认值为“min”,它选择多个运算符报告的最小水印。其他替代值为“max”,它选择多个运算符中的最大值。注意:无法在从同一检查点位置重新启动查询之间更改此配置。 |
2.4.0 |
spark.sql.streaming.noDataMicroBatches.enabled |
true | 流式微批处理引擎是否将执行没有数据的批处理,以进行有状态流式查询的即时状态管理。 |
2.4.1 |
spark.sql.streaming.numRecentProgressUpdates |
100 | 要为流式查询保留的进度更新的数量 |
2.1.1 |
spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition |
false | 如果为 true,则流式会话窗口在 shuffle 之前对本地分区中的会话进行排序和合并。 这是为了减少要 shuffle 的行,但仅在批处理中有大量行被分配给同一会话时才有利。 |
3.2.0 |
spark.sql.streaming.stateStore.stateSchemaCheck |
true | 如果为 true,Spark 将根据现有状态下的模式验证状态模式,如果它不兼容,则查询将失败。 |
3.1.0 |
spark.sql.streaming.stopActiveRunOnRestart |
true | 不支持同时运行同一流式查询的多个运行。如果我们发现流式查询存在并发活动运行(在同一群集上的相同或不同 SparkSession 中),并且此标志为 true,我们将停止旧的流式查询运行以启动新的运行。 |
3.0.0 |
spark.sql.streaming.stopTimeout |
0 | 调用流式查询的 stop() 方法时,等待流式执行线程停止的毫秒数。 0 或负值无限期等待。 |
3.0.0 |
spark.sql.thriftServer.interruptOnCancel |
true | 如果为 true,如果取消一个查询,所有正在运行的任务都将被中断。如果为 false,所有正在运行的任务将保持运行直到完成。 |
3.2.0 |
spark.sql.thriftServer.queryTimeout |
0ms | 在 Thrift Server 中设置查询持续时间超时(以秒为单位)。 如果超时设置为正值,则当超过超时时,运行中的查询将自动取消,否则查询将继续运行直到完成。 如果通过 |
3.1.0 |
spark.sql.thriftserver.scheduler.pool |
(无) | 为 JDBC 客户端会话设置一个 Fair Scheduler 池。 |
1.1.1 |
spark.sql.thriftserver.ui.retainedSessions |
200 | JDBC/ODBC Web UI 历史记录中保留的 SQL 客户端会话数。 |
1.4.0 |
spark.sql.thriftserver.ui.retainedStatements |
200 | JDBC/ODBC Web UI 历史记录中保留的 SQL 语句数。 |
1.4.0 |
spark.sql.timestampType |
TIMESTAMP_LTZ | 配置 Spark SQL 的默认时间戳类型,包括 SQL DDL、Cast 子句、类型字面量和数据源的模式推断。将配置设置为 TIMESTAMP_NTZ 将使用 TIMESTAMP WITHOUT TIME ZONE 作为默认类型,而将其设置为 TIMESTAMP_LTZ 将使用 TIMESTAMP WITH LOCAL TIME ZONE。在 3.4.0 版本之前,Spark 仅支持 TIMESTAMP WITH LOCAL TIME ZONE 类型。 |
3.4.0 |
spark.sql.tvf.allowMultipleTableArguments.enabled |
false | 如果为 true,则允许表值函数的多个表参数,接收这些表的所有行的笛卡尔积。 |
3.5.0 |
spark.sql.ui.explainMode |
formatted | 配置 Spark SQL UI 中使用的查询解释模式。 该值可以是“simple”、“extended”、“codegen”、“cost”或“formatted”。 默认值为“formatted”。 |
3.1.0 |
spark.sql.variable.substitute |
true | 这允许使用类似 |
2.0.0 |
静态 SQL 配置
静态 SQL 配置是跨会话、不可变的 Spark SQL 配置。 它们可以通过配置文件和带有 --conf/-c
前缀的命令行选项或通过设置用于创建 SparkSession
的 SparkConf
来设置最终值。 外部用户可以通过 SparkSession.conf
或通过 set 命令查询静态 sql 配置值,例如 SET spark.sql.extensions;
,但不能设置/取消设置它们。
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.sql.cache.serializer |
org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer | 实现 org.apache.spark.sql.columnar.CachedBatchSerializer 的类的名称。 它将用于将 SQL 数据转换为可以更有效地缓存的格式。 底层 API 可能会发生变化,因此请谨慎使用。 无法指定多个类。 该类必须具有一个无参数构造函数。 |
3.1.0 |
spark.sql.catalog.spark_catalog.defaultDatabase |
default | 会话目录的默认数据库。 |
3.4.0 |
spark.sql.event.truncate.length |
2147483647 | SQL 长度的阈值,超过此阈值将在添加到事件之前将其截断。 默认为不截断。 如果设置为 0,则将改为记录调用站点。 |
3.0.0 |
spark.sql.extensions |
(无) | 实现 Function1[SparkSessionExtensions, Unit] 的类(用于配置 Spark Session 扩展)的逗号分隔列表。 这些类必须具有无参数构造函数。 如果指定了多个扩展,则它们按指定的顺序应用。 对于规则和规划器策略,它们按指定的顺序应用。 对于解析器,使用最后一个解析器,并且每个解析器都可以委托给其前任。 对于函数名称冲突,使用最后注册的函数名称。 |
2.2.0 |
spark.sql.hive.metastore.barrierPrefixes |
应为 Spark SQL 与之通信的每个 Hive 版本显式重新加载的类前缀的逗号分隔列表。 例如,通常在共享的前缀(即 |
1.4.0 | |
spark.sql.hive.metastore.jars |
builtin | 应用于实例化 HiveMetastoreClient 的 jar 的位置。 此属性可以是以下四个选项之一: 1. "builtin" 使用 Hive 2.3.9,该版本在使用 |
1.4.0 |
spark.sql.hive.metastore.jars.path |
用于实例化 HiveMetastoreClient 的 jar 包的逗号分隔路径。此配置仅在 |
3.1.0 | |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | 应该使用 Spark SQL 和特定版本的 Hive 之间共享的类加载器加载的类前缀的逗号分隔列表。应该共享的类的一个例子是连接到 Metastore 所需的 JDBC 驱动程序。其他需要共享的类是那些与已经共享的类交互的类。例如,log4j 使用的自定义附加器。 |
1.4.0 |
spark.sql.hive.metastore.version |
2.3.9 | Hive Metastore 的版本。可用选项为 |
1.4.0 |
spark.sql.hive.thriftServer.singleSession |
false | 如果设置为 true,则 Hive Thrift 服务器以单会话模式运行。所有 JDBC/ODBC 连接共享临时视图、函数注册表、SQL 配置和当前数据库。 |
1.6.0 |
spark.sql.hive.version |
2.3.9 | Spark 发行版捆绑的已编译的,即内置的 Hive 版本。请注意,这是一个只读配置,仅用于报告内置的 Hive 版本。如果要 Spark 调用不同的 Metastore 客户端,请参阅 spark.sql.hive.metastore.version。 |
1.1.1 |
spark.sql.metadataCacheTTLSeconds |
-1000ms | 元数据缓存的生存时间 (TTL) 值:分区文件元数据缓存和会话目录缓存。 此配置仅在此值具有正值(> 0)时才有效。 它还需要将“spark.sql.catalogImplementation”设置为“hive”,将“spark.sql.hive.filesourcePartitionFileCacheSize”设置为 > 0,并将“spark.sql.hive.manageFilesourcePartitions”设置为“true”才能应用于分区文件元数据缓存。 |
3.1.0 |
spark.sql.queryExecutionListeners |
(无) | 实现 QueryExecutionListener 的类名称的列表,这些类将自动添加到新创建的会话中。 这些类应该具有无参数构造函数,或者具有需要 SparkConf 参数的构造函数。 |
2.3.0 |
spark.sql.sources.disabledJdbcConnProviderList |
配置禁用 JDBC 连接提供程序的列表。 该列表包含以逗号分隔的 JDBC 连接提供程序的名称。 |
3.1.0 | |
spark.sql.streaming.streamingQueryListeners |
(无) | 实现 StreamingQueryListener 的类名称的列表,这些类将自动添加到新创建的会话中。 这些类应该具有无参数构造函数,或者具有需要 SparkConf 参数的构造函数。 |
2.4.0 |
spark.sql.streaming.ui.enabled |
true | 启用 Spark Web UI 时是否为 Spark 应用程序运行结构化流式 Web UI。 |
3.0.0 |
spark.sql.streaming.ui.retainedProgressUpdates |
100 | 为结构化流式 UI 保留的流式查询的进度更新数。 |
3.0.0 |
spark.sql.streaming.ui.retainedQueries |
100 | 为结构化流式 UI 保留的非活动查询数。 |
3.0.0 |
spark.sql.ui.retainedExecutions |
1000 | 在 Spark UI 中保留的执行次数。 |
1.5.0 |
spark.sql.warehouse.dir |
($PWD/spark-warehouse 的值) |
托管数据库和表的默认位置。 |
2.0.0 |
Spark Streaming
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.streaming.backpressure.enabled |
false | 启用或禁用 Spark Streaming 的内部反压机制(自 1.5 起)。 这使 Spark Streaming 能够根据当前的批处理调度延迟和处理时间来控制接收速率,以便系统仅以系统可以处理的速度接收。 在内部,这会动态设置接收器的最大接收速率。 如果设置了 spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition ,则此速率受这些值的上限限制(参见下文)。 |
1.5.0 |
spark.streaming.backpressure.initialRate |
未设置 | 这是启用反压机制时,每个接收器接收第一个批处理数据时的初始最大接收速率。 | 2.0.0 |
spark.streaming.blockInterval |
200ms | Spark Streaming 接收器接收的数据在存储到 Spark 之前被分块成数据块的间隔。 建议最小值 - 50 毫秒。 有关更多详细信息,请参阅 Spark Streaming 编程指南中的 性能调整 部分。 | 0.8.0 |
spark.streaming.receiver.maxRate |
未设置 | 每个接收器将接收数据的最大速率(每秒记录数)。 实际上,每个流每秒最多消耗此数量的记录。 将此配置设置为 0 或负数将不对速率进行限制。 有关模式的更多详细信息,请参阅 Spark Streaming 编程指南中的 部署指南。 | 1.0.2 |
spark.streaming.receiver.writeAheadLog.enable |
false | 为接收器启用预写日志。 通过接收器接收的所有输入数据都将保存到预写日志中,这使其可以在驱动程序发生故障后进行恢复。 有关更多详细信息,请参阅 Spark Streaming 编程指南中的 部署指南。 | 1.2.1 |
spark.streaming.unpersist |
true | 强制 Spark Streaming 生成和持久化的 RDD 自动从 Spark 的内存中取消持久化。 Spark Streaming 接收的原始输入数据也会自动清除。 将此设置为 false 将允许在流式应用程序外部访问原始数据和持久化的 RDD,因为它们不会自动清除。 但这会导致 Spark 中更高的内存使用率。 | 0.9.0 |
spark.streaming.stopGracefullyOnShutdown |
false | 如果为 true ,则 Spark 会在 JVM 关闭时优雅地关闭 StreamingContext ,而不是立即关闭。 |
1.4.0 |
spark.streaming.kafka.maxRatePerPartition |
未设置 | 使用新的 Kafka 直接流 API 时,将从每个 Kafka 分区读取数据的最大速率(每秒记录数)。 有关更多详细信息,请参阅 Kafka 集成指南。 | 1.3.0 |
spark.streaming.kafka.minRatePerPartition |
1 | 使用新的 Kafka 直接流 API 时,将从每个 Kafka 分区读取数据的最小速率(每秒记录数)。 | 2.4.0 |
spark.streaming.ui.retainedBatches |
1000 | Spark Streaming UI 和状态 API 在垃圾回收之前记住多少批次。 | 1.0.0 |
spark.streaming.driver.writeAheadLog.closeFileAfterWrite |
false | 是否在驱动程序上写入预写日志记录后关闭文件。 如果您想使用 S3(或任何不支持刷新的文件系统)作为驱动程序上的元数据 WAL,请将其设置为“true”。 | 1.6.0 |
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite |
false | 是否在接收器上写入预写日志记录后关闭文件。 如果您想使用 S3(或任何不支持刷新的文件系统)作为接收器上的数据 WAL,请将其设置为“true”。 | 1.6.0 |
SparkR
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.r.numRBackendThreads |
2 | RBackend 用于处理来自 SparkR 包的 RPC 调用的线程数。 | 1.4.0 |
spark.r.command |
Rscript | 用于在集群模式下为驱动程序和工作程序执行 R 脚本的可执行文件。 | 1.5.3 |
spark.r.driver.command |
spark.r.command | 用于在客户端模式下为驱动程序执行 R 脚本的可执行文件。 在集群模式下忽略。 | 1.5.3 |
spark.r.shell.command |
R | 用于在客户端模式下为驱动程序执行 sparkR shell 的可执行文件。 在集群模式下忽略。 它与环境变量 SPARKR_DRIVER_R 相同,但优先于它。 spark.r.shell.command 用于 sparkR shell,而 spark.r.driver.command 用于运行 R 脚本。 |
2.1.0 |
spark.r.backendConnectionTimeout |
6000 | R 进程在其与 RBackend 的连接上设置的连接超时(以秒为单位)。 | 2.1.0 |
spark.r.heartBeatInterval |
100 | 从 SparkR 后端发送到 R 进程以防止连接超时的心跳间隔。 | 2.1.0 |
GraphX
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.graphx.pregel.checkpointInterval |
-1 | Pregel 中图形和消息的检查点间隔。 它用于避免由于大量迭代后长谱系链导致的 stackOverflowError。 默认情况下禁用检查点。 | 2.2.0 |
部署
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.deploy.recoveryMode |
NONE | 恢复模式设置,用于在以集群模式运行并失败并重新启动时,恢复提交的 Spark 作业。 这仅适用于使用独立模式或 Mesos 运行的集群模式。 | 0.8.1 |
spark.deploy.zookeeper.url |
无 | 当 `spark.deploy.recoveryMode` 设置为 ZOOKEEPER 时,此配置用于设置要连接的 zookeeper URL。 | 0.8.1 |
spark.deploy.zookeeper.dir |
无 | 当 `spark.deploy.recoveryMode` 设置为 ZOOKEEPER 时,此配置用于设置 zookeeper 目录以存储恢复状态。 | 0.8.1 |
集群管理器
Spark 中的每个集群管理器都有其他配置选项。 可以在每种模式的页面上找到配置
YARN
Mesos
Kubernetes
Standalone 模式
环境变量
某些 Spark 设置可以通过环境变量进行配置,这些变量是从 Spark 安装目录中的 conf/spark-env.sh
脚本(或 Windows 上的 conf/spark-env.cmd
)读取的。 在独立和 Mesos 模式下,此文件可以提供机器特定的信息,例如主机名。 运行本地 Spark 应用程序或提交脚本时也会对其进行源化。
请注意,默认情况下,安装 Spark 时不存在 conf/spark-env.sh
。 但是,您可以复制 conf/spark-env.sh.template
来创建它。 确保您使副本可执行。
以下变量可以在 spark-env.sh
中设置
环境变量 | 含义 |
---|---|
JAVA_HOME |
安装 Java 的位置(如果它不在您的默认 PATH 上)。 |
PYSPARK_PYTHON |
用于驱动程序和工作程序中 PySpark 的 Python 二进制可执行文件(如果可用,默认为 python3 ,否则为 python )。 如果设置了属性 spark.pyspark.python ,则优先 |
PYSPARK_DRIVER_PYTHON |
仅用于驱动程序中 PySpark 的 Python 二进制可执行文件(默认为 PYSPARK_PYTHON )。 如果设置了属性 spark.pyspark.driver.python ,则优先 |
SPARKR_DRIVER_R |
用于 SparkR shell 的 R 二进制可执行文件(默认为 R )。 如果设置了属性 spark.r.shell.command ,则优先 |
SPARK_LOCAL_IP |
要绑定到的机器的 IP 地址。 |
SPARK_PUBLIC_DNS |
您的 Spark 程序将向其他机器宣传的主机名。 |
除了上述内容外,还可以选择设置 Spark 独立集群脚本,例如每台机器上要使用的核心数和最大内存。
由于 spark-env.sh
是一个 shell 脚本,因此其中一些可以以编程方式设置 - 例如,您可以通过查找特定网络接口的 IP 来计算 SPARK_LOCAL_IP
。
注意:当在 YARN 上以 cluster
模式运行 Spark 时,需要使用 spark.yarn.appMasterEnv.[EnvironmentVariableName]
属性在你的 conf/spark-defaults.conf
文件中设置环境变量。在 spark-env.sh
中设置的环境变量不会反映在 cluster
模式下的 YARN Application Master 进程中。更多信息请参阅 YARN 相关的 Spark 属性。
配置日志
Spark 使用 log4j 进行日志记录。你可以通过在 conf
目录中添加一个 log4j2.properties
文件来配置它。一种开始的方法是复制位于该目录中的现有 log4j2.properties.template
。
默认情况下,Spark 会向 MDC (Mapped Diagnostic Context) 添加 1 条记录:mdc.taskName
,它显示类似于 task 1.0 in stage 0.0
的内容。你可以将 %X{mdc.taskName}
添加到你的 patternLayout 中以便在日志中打印它。此外,你可以使用 spark.sparkContext.setLocalProperty(s"mdc.$name", "value")
将用户特定数据添加到 MDC 中。MDC 中的键将是字符串“mdc.$name”。
覆盖配置目录
要指定与默认 “SPARK_HOME/conf” 不同的配置目录,你可以设置 SPARK_CONF_DIR。Spark 将使用来自此目录的配置文件(spark-defaults.conf、spark-env.sh、log4j2.properties 等)。
继承 Hadoop 集群配置
如果你计划使用 Spark 从 HDFS 读取和写入数据,则应将两个 Hadoop 配置文件包含在 Spark 的类路径中。
hdfs-site.xml
,它为 HDFS 客户端提供默认行为。core-site.xml
,它设置默认的文件系统名称。
这些配置文件的位置因 Hadoop 版本而异,但一个常见的位置是在 /etc/hadoop/conf
内部。某些工具会动态创建配置,但提供下载其副本的机制。
要使这些文件对 Spark 可见,请在 $SPARK_HOME/conf/spark-env.sh
中将 HADOOP_CONF_DIR
设置为包含配置文件的位置。
自定义 Hadoop/Hive 配置
如果你的 Spark 应用程序与 Hadoop、Hive 或两者交互,则 Spark 的类路径中可能存在 Hadoop/Hive 配置文件。
多个正在运行的应用程序可能需要不同的 Hadoop/Hive 客户端配置。你可以复制和修改 Spark 类路径中的 hdfs-site.xml
、core-site.xml
、yarn-site.xml
、hive-site.xml
用于每个应用程序。在 YARN 上运行的 Spark 集群中,这些配置文件是集群范围设置的,应用程序无法安全地更改它们。
更好的选择是使用 spark.hadoop.*
形式的 spark hadoop 属性,以及使用 spark.hive.*
形式的 spark hive 属性。例如,添加配置“spark.hadoop.abc.def=xyz”表示添加 hadoop 属性“abc.def=xyz”,而添加配置“spark.hive.abc=xyz”表示添加 hive 属性“hive.abc=xyz”。 它们可以被认为与可以在 $SPARK_HOME/conf/spark-defaults.conf
中设置的普通 spark 属性相同。
在某些情况下,你可能希望避免在 SparkConf
中硬编码某些配置。 例如,Spark 允许你简单地创建一个空配置并设置 spark/spark hadoop/spark hive 属性。
val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)
此外,你可以在运行时修改或添加配置
./bin/spark-submit \
--name "My app" \
--master local[4] \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
--conf spark.hive.abc=xyz
myApp.jar
自定义资源调度和配置概述
GPU 和其他加速器已被广泛用于加速特殊工作负载,例如深度学习和信号处理。 现在,Spark 支持请求和调度通用资源,例如 GPU,但有一些注意事项。 当前的实现要求资源具有可以由调度程序分配的地址。 它需要你的集群管理器支持这些资源并进行正确配置。
有一些配置可用于请求驱动程序的资源:spark.driver.resource.{resourceName}.amount
,请求 executor 的资源:spark.executor.resource.{resourceName}.amount
,并指定每个任务的要求:spark.task.resource.{resourceName}.amount
。 在 YARN、Kubernetes 和 Spark Standalone 上的客户端驱动程序上需要 spark.driver.resource.{resourceName}.discoveryScript
配置。 YARN 和 Kubernetes 需要 spark.executor.resource.{resourceName}.discoveryScript
配置。 Kubernetes 还需要 spark.driver.resource.{resourceName}.vendor
和/或 spark.executor.resource.{resourceName}.vendor
。 有关每个配置的更多信息,请参见上面的配置说明。
Spark 将使用指定的配置首先从集群管理器请求具有相应资源的容器。 一旦获得容器,Spark 将在该容器中启动一个 Executor,该 Executor 将发现该容器具有哪些资源以及与每个资源关联的地址。 Executor 将注册到 Driver 并报告回该 Executor 可用的资源。 然后,Spark 调度程序可以将任务调度到每个 Executor,并根据用户指定的资源要求分配特定的资源地址。 用户可以使用 TaskContext.get().resources
API 查看分配给任务的资源。 在驱动程序上,用户可以使用 SparkContext 的 resources
调用来查看分配的资源。 然后,由用户决定使用 assignedaddresses 来进行他们想要的处理,或者将它们传递到他们正在使用的 ML/AI 框架中。
有关每个的要求和详细信息,请参见集群管理器特定页面 - YARN,Kubernetes 和 独立模式。 当前 Mesos 或本地模式不可用。 并且还请注意,不支持具有多个 worker 的 local-cluster 模式(请参阅独立文档)。
Stage 级别调度概述
阶段级调度功能允许用户在阶段级指定任务和 executor 的资源需求。 这允许不同的阶段运行具有不同资源的 executors。 一个典型的例子是,一个 ETL 阶段仅与具有 CPU 的 executors 一起运行,下一个阶段是需要 GPU 的 ML 阶段。 阶段级调度允许用户在 ML 阶段运行时请求具有 GPU 的不同 executors,而不必在应用程序启动时就获取具有 GPU 的 executors,然后在 ETL 阶段运行时使它们处于空闲状态。 这仅适用于 Scala、Java 和 Python 中的 RDD API。 当启用动态分配时,它在 YARN、Kubernetes 和独立模式下可用。 当禁用动态分配时,它允许用户在阶段级指定不同的任务资源需求,并且目前 YARN,Kubernetes 和独立集群都支持该功能。 有关更多实现详细信息,请参见 YARN 页面或 Kubernetes 页面或 独立 页面。
有关使用此功能的详细信息,请参见 RDD.withResources
和 ResourceProfileBuilder
API。 当禁用动态分配时,具有不同任务资源需求的任务将与 DEFAULT_RESOURCE_PROFILE
共享 executors。 虽然启用动态分配时,当前实现为创建的每个 ResourceProfile
获取新的 executors,并且当前必须完全匹配。 Spark 不会尝试将任务放入需要与创建 executor 的 ResourceProfile 不同的 executor 中。 不使用的 executors 将通过动态分配逻辑空闲超时。 此功能的默认配置是仅允许每个阶段一个 ResourceProfile。 如果用户将多个 ResourceProfile 关联到 RDD,则 Spark 默认情况下将引发异常。 请参见配置 spark.scheduler.resource.profileMergeConflicts
来控制该行为。 当启用 spark.scheduler.resource.profileMergeConflicts
时,Spark 实现的当前合并策略是冲突的 ResourceProfile 中每个资源的最大值。 Spark 将创建一个新的 ResourceProfile,其中包含每个资源的最大值。
基于推送的 Shuffle 概述
基于推送的 shuffle 有助于提高 spark shuffle 的可靠性和性能。 它采用尽力而为的方法,将 map 任务生成的 shuffle blocks 推送到远程外部 shuffle 服务,以便每个 shuffle 分区进行合并。 Reduce 任务获取合并的 shuffle 分区和原始 shuffle blocks 的组合作为其输入数据,从而将外部 shuffle 服务的小型随机磁盘读取转换为大型顺序读取。 Reduce 任务更好的数据局部性的可能性还有助于最大限度地减少网络 IO。 对于某些场景,例如当合并输出可用时进行分区合并,基于推送的 shuffle 优先于批量获取。
基于推送的 shuffle 提高了长时间运行的作业/查询的性能,这些作业/查询涉及 shuffle 期间的大量磁盘 I/O。 目前,它不太适合于快速运行处理少量 shuffle 数据的作业/查询。 这将在未来的版本中得到进一步改进。
当前,仅在使用外部 shuffle 服务的 YARN 上的 Spark 支持基于推送的 shuffle。
外部 Shuffle 服务(服务器端)配置选项
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.shuffle.push.server.mergedShuffleFileManagerImpl |
org.apache.spark.network.shuffle.
|
管理基于推送的 shuffle 的 MergedShuffleFileManager 的实现的类名。 这充当服务器端配置以禁用或启用基于推送的 shuffle。 默认情况下,在服务器端禁用基于推送的 shuffle。要在服务器端启用基于推送的 shuffle,请将此配置设置为 |
3.2.0 |
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile |
2m |
在基于推送的 shuffle 期间,将合并的 shuffle 文件划分为多个 chunk 时,chunk 的最小大小。 合并的 shuffle 文件由多个小的 shuffle blocks 组成。 在单个磁盘 I/O 中获取完整的合并 shuffle 文件会增加客户端和外部 shuffle 服务的内存需求。 相反,外部 shuffle 服务以 如果设置得过高,会增加客户端和外部 Shuffle 服务的内存需求。 如果设置得过低,会不必要地增加对外部 Shuffle 服务的 RPC 请求总数。 |
3.2.0 |
spark.shuffle.push.server.mergedIndexCacheSize |
100m |
用于在基于推送的 shuffle 中存储合并后的索引文件的最大内存缓存大小。此缓存是 spark.shuffle.service.index.cache.size 配置的缓存的补充。 |
3.2.0 |
客户端配置选项
属性名称 | 默认值 | 含义 | 自从版本 |
---|---|---|---|
spark.shuffle.push.enabled |
false |
设置为 true 以在客户端启用基于推送的 shuffle,并与服务器端标志 spark.shuffle.push.server.mergedShuffleFileManagerImpl 配合使用。 |
3.2.0 |
spark.shuffle.push.finalize.timeout |
10秒 |
在给定的 shuffle map 阶段的所有 mapper 完成后,driver 等待多少秒,然后向远程外部 shuffle 服务发送合并完成请求。 这让外部 shuffle 服务有更多时间来合并块。设置过长可能会导致性能下降。 | 3.2.0 |
spark.shuffle.push.maxRetainedMergerLocations |
500 |
为基于推送的 shuffle 缓存的最大合并器位置数。 目前,合并器位置是负责处理推送的块、合并它们并为后续 shuffle fetch 提供合并后的块的外部 shuffle 服务的主机。 | 3.2.0 |
spark.shuffle.push.mergersMinThresholdRatio |
0.05 |
用于根据 reducer 阶段的分区数量计算阶段所需的最小 shuffle 合并器位置数量的比率。 例如,一个有 100 个分区的 reduce 阶段,使用默认值 0.05,至少需要 5 个唯一的合并器位置才能启用基于推送的 shuffle。 | 3.2.0 |
spark.shuffle.push.mergersMinStaticThreshold |
5 |
为阶段启用基于推送的 shuffle 所需的 shuffle 推送合并器位置数量的静态阈值。 请注意,此配置与 spark.shuffle.push.mergersMinThresholdRatio 配合使用。 需要 spark.shuffle.push.mergersMinStaticThreshold 的最大值和 spark.shuffle.push.mergersMinThresholdRatio 比率的合并器数量才能为阶段启用基于推送的 shuffle。 例如:对于子阶段的 1000 个分区,spark.shuffle.push.mergersMinStaticThreshold 设置为 5,spark.shuffle.push.mergersMinThresholdRatio 设置为 0.05,我们需要至少 50 个合并器才能为该阶段启用基于推送的 shuffle。 |
3.2.0 |
spark.shuffle.push.numPushThreads |
(无) | 指定块推送器池中的线程数。 这些线程有助于创建连接并将块推送到远程外部 shuffle 服务。 默认情况下,线程池大小等于 spark executor core 的数量。 | 3.2.0 |
spark.shuffle.push.maxBlockSizeToPush |
1m |
要推送到远程外部 shuffle 服务的单个块的最大大小。 大于此阈值的块不会被推送以进行远程合并。 这些 shuffle 块将以原始方式获取。 如果设置得过高,会导致更多块被推送到远程外部 shuffle 服务,但这些块已经可以通过现有机制有效地获取,从而导致将大型块推送到远程外部 shuffle 服务的额外开销。 建议将 如果设置得过低,会导致更少的块被合并,并且直接从 mapper 外部 shuffle 服务获取,从而导致更高的少量随机读取,从而影响整体磁盘 I/O 性能。 |
3.2.0 |
spark.shuffle.push.maxBlockBatchSize |
3m |
要分组到单个推送请求中的 shuffle 块的最大批量大小。 默认设置为 3m ,为了保持其略高于 spark.storage.memoryMapThreshold 默认值 2m ,因为它很可能每个块批次都会进行内存映射,从而产生更高的开销。 |
3.2.0 |
spark.shuffle.push.merge.finalizeThreads |
8 | Driver 用于完成 shuffle 合并的线程数。 由于完成大型 shuffle 可能需要几秒钟,因此当启用基于推送的 shuffle 时,拥有多个线程有助于 driver 处理并发的 shuffle 合并完成请求。 | 3.3.0 |
spark.shuffle.push.minShuffleSizeToWait |
500m |
只有当 shuffle 数据总大小大于此阈值时,Driver 才会等待合并完成。 如果 shuffle 总大小较小,则 Driver 将立即完成 shuffle 输出。 | 3.3.0 |
spark.shuffle.push.minCompletedPushRatio |
1.0 |
在基于推送的 shuffle 期间,Driver 开始 shuffle 合并完成之前应完成推送的最小 map 分区比例。 | 3.3.0 |