Spark 配置

Spark 提供三个位置来配置系统

Spark 属性

Spark 属性控制大多数应用程序设置,并为每个应用程序单独配置。这些属性可以直接在传递给 SparkContextSparkConf 上设置。 SparkConf 允许您配置一些常用属性(例如主 URL 和应用程序名称),以及通过 set() 方法配置任意键值对。例如,我们可以使用两个线程初始化应用程序,如下所示

请注意,我们使用 local[2] 运行,这意味着两个线程 - 代表“最小”并行度,这有助于检测仅在分布式环境中运行时存在的错误。

val conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("CountingSheep")
val sc = new SparkContext(conf)

请注意,我们可以在本地模式下拥有多个线程,并且在 Spark Streaming 等情况下,我们实际上可能需要多个线程来防止任何类型的饥饿问题。

指定某个时间段的属性应使用时间单位进行配置。接受以下格式

25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

指定字节大小的属性应使用大小单位进行配置。接受以下格式

1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

虽然没有单位的数字通常被解释为字节,但一些数字被解释为 KiB 或 MiB。请参阅各个配置属性的文档。在可能的情况下,指定单位是可取的。

动态加载 Spark 属性

在某些情况下,您可能希望避免在 SparkConf 中硬编码某些配置。例如,如果您希望使用不同的主节点或不同的内存量运行相同的应用程序。Spark 允许您简单地创建一个空的配置

val sc = new SparkContext(new SparkConf())

然后,您可以在运行时提供配置值

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Spark shell 和 spark-submit 工具支持两种方法来动态加载配置。第一种是命令行选项,例如 --master,如上所示。 spark-submit 可以使用 --conf/-c 标志接受任何 Spark 属性,但使用特殊标志来表示在启动 Spark 应用程序中起作用的属性。运行 ./bin/spark-submit --help 将显示这些选项的完整列表。

bin/spark-submit 还将从 conf/spark-defaults.conf 中读取配置选项,其中每一行都包含一个由空格分隔的键和值。例如

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

作为标志或在属性文件中指定的任何值都将传递给应用程序,并与通过 SparkConf 指定的值合并。直接在 SparkConf 上设置的属性具有最高优先级,然后是传递给 spark-submitspark-shell 的标志,然后是 spark-defaults.conf 文件中的选项。一些配置键在 Spark 的早期版本中已被重命名;在这种情况下,仍然接受旧的键名,但优先级低于任何新键的实例。

Spark 属性主要可以分为两种:一种与部署相关,例如“spark.driver.memory”、“spark.executor.instances”,这种属性在运行时通过 SparkConf 以编程方式设置时可能不会受到影响,或者其行为取决于您选择的集群管理器和部署模式,因此建议通过配置文件或 spark-submit 命令行选项进行设置;另一种主要与 Spark 运行时控制相关,例如“spark.task.maxFailures”,这种属性可以通过两种方式设置。

查看 Spark 属性

位于 http://<driver>:4040 的应用程序 Web UI 在“环境”选项卡中列出了 Spark 属性。这是一个有用的地方,可以用来检查您的属性是否已正确设置。请注意,只有通过 spark-defaults.confSparkConf 或命令行显式指定的值才会出现。对于所有其他配置属性,您可以假设使用默认值。

可用属性

控制内部设置的大多数属性都有合理的默认值。一些最常见的设置选项是

应用程序属性

属性名称默认值含义自版本
spark.app.name (无) 应用程序的名称。这将显示在 UI 和日志数据中。 0.9.0
spark.driver.cores 1 用于驱动程序进程的内核数,仅在集群模式下。 1.3.0
spark.driver.maxResultSize 1g 每个 Spark 操作(例如 collect)中所有分区序列化结果的总大小限制(以字节为单位)。应至少为 1M,或 0 表示无限制。如果总大小超过此限制,作业将被中止。设置较高的限制可能会导致驱动程序出现内存不足错误(取决于 spark.driver.memory 和 JVM 中对象的内存开销)。设置适当的限制可以保护驱动程序免受内存不足错误。 1.2.0
spark.driver.memory 1g 用于驱动程序进程的内存量,即 SparkContext 初始化的位置,与 JVM 内存字符串格式相同,带有大小单位后缀(“k”、“m”、“g”或“t”)(例如 512m2g)。
注意:在客户端模式下,此配置不能直接通过 SparkConf 在您的应用程序中设置,因为驱动程序 JVM 已在此时启动。相反,请通过 --driver-memory 命令行选项或在您的默认属性文件中设置此选项。
1.1.1
spark.driver.memoryOverhead driverMemory * spark.driver.memoryOverheadFactor,最小值为 384 在集群模式下,每个驱动程序进程要分配的非堆内存量(以 MiB 为单位,除非另有说明)。这是用于处理 VM 开销、内部字符串、其他本机开销等内容的内存。这往往会随着容器大小的增加而增加(通常为 6-10%)。此选项目前在 YARN、Mesos 和 Kubernetes 上受支持。注意:非堆内存包括堆外内存(当 spark.memory.offHeap.enabled=true 时)以及其他驱动程序进程使用的内存(例如与 PySpark 驱动程序一起使用的 python 进程)以及在同一容器中运行的其他非驱动程序进程使用的内存。运行驱动程序的容器的最大内存大小由 spark.driver.memoryOverheadspark.driver.memory 的总和决定。 2.3.0
spark.driver.memoryOverheadFactor 0.10 在集群模式下,每个驱动程序进程要分配为额外非堆内存的驱动程序内存的比例。这是用于处理 VM 开销、内部字符串、其他本机开销等内容的内存。这往往会随着容器大小的增加而增加。此值默认为 0.10,除了 Kubernetes 非 JVM 作业,其默认为 0.40。这样做是因为非 JVM 任务需要更多非 JVM 堆空间,并且此类任务通常会因“内存开销超过”错误而失败。这通过更高的默认值来抢占此错误。如果直接设置 spark.driver.memoryOverhead,则会忽略此值。 3.3.0
spark.driver.resource.{resourceName}.amount 0 在驱动程序上要使用的特定资源类型的数量。如果使用此选项,您还必须指定 spark.driver.resource.{resourceName}.discoveryScript,以便驱动程序在启动时找到资源。 3.0.0
spark.driver.resource.{resourceName}.discoveryScript 驱动程序要运行的脚本,用于发现特定资源类型。这应该将 ResourceInformation 类格式的 JSON 字符串写入 STDOUT。它具有名称和地址数组。对于客户端提交的驱动程序,发现脚本必须为该驱动程序分配与同一主机上的其他驱动程序不同的资源地址。 3.0.0
spark.driver.resource.{resourceName}.vendor 要用于驱动程序的资源的供应商。此选项目前仅在 Kubernetes 上受支持,实际上是遵循 Kubernetes 设备插件命名约定的供应商和域。(例如,对于 Kubernetes 上的 GPU,此配置将设置为 nvidia.com 或 amd.com) 3.0.0
spark.resources.discoveryPlugin org.apache.spark.resource.ResourceDiscoveryScriptPlugin 实现 org.apache.spark.api.resource.ResourceDiscoveryPlugin 的类名列表,以逗号分隔,加载到应用程序中。这适用于高级用户,他们可以使用自定义实现替换资源发现类。Spark 将尝试指定的所有类,直到其中一个类返回该资源的资源信息。如果所有插件都没有返回该资源的信息,则它会最后尝试发现脚本。 3.0.0
spark.executor.memory 1g 每个执行器进程使用的内存量,以与 JVM 内存字符串相同的格式,带有一个大小单位后缀(“k”、“m”、“g”或“t”)(例如 512m2g)。 0.7.0
spark.executor.pyspark.memory 未设置 分配给每个执行器中 PySpark 的内存量,以 MiB 为单位,除非另有说明。如果设置,则执行器的 PySpark 内存将限制为此数量。如果未设置,Spark 不会限制 Python 的内存使用,应用程序应避免超过与其他非 JVM 进程共享的开销内存空间。当 PySpark 在 YARN 或 Kubernetes 中运行时,此内存将添加到执行器资源请求中。
注意:此功能依赖于 Python 的 `resource` 模块;因此,行为和限制将被继承。例如,Windows 不支持资源限制,MacOS 上不会限制实际资源。
2.4.0
spark.executor.memoryOverhead executorMemory * spark.executor.memoryOverheadFactor,最小值为 384 每个执行器进程要分配的额外内存量,以 MiB 为单位,除非另有说明。这是用于考虑 VM 开销、内部字符串、其他本机开销等的内存。这往往会随着执行器大小的增加而增加(通常为 6-10%)。此选项目前在 YARN 和 Kubernetes 上受支持。
注意:额外内存包括 PySpark 执行器内存(当 spark.executor.pyspark.memory 未配置时)以及在同一容器中运行的其他非执行器进程使用的内存。运行执行器的容器的最大内存大小由 spark.executor.memoryOverheadspark.executor.memoryspark.memory.offHeap.sizespark.executor.pyspark.memory 的总和决定。
2.3.0
spark.executor.memoryOverheadFactor 0.10 每个执行器进程分配为额外非堆内存的执行器内存的比例。这是用于考虑 VM 开销、内部字符串、其他本机开销等的内存。这往往会随着容器大小的增加而增加。此值默认为 0.10,除了 Kubernetes 非 JVM 作业,其默认为 0.40。这是因为非 JVM 任务需要更多非 JVM 堆空间,并且此类任务通常会因“内存开销超出”错误而失败。这通过更高的默认值来防止此错误。如果直接设置 spark.executor.memoryOverhead,则会忽略此值。 3.3.0
spark.executor.resource.{resourceName}.amount 0 每个执行器进程使用的特定资源类型的数量。如果使用此项,还必须指定 spark.executor.resource.{resourceName}.discoveryScript,以便执行器在启动时找到资源。 3.0.0
spark.executor.resource.{resourceName}.discoveryScript 执行器运行以发现特定资源类型的脚本。这应该将 ResourceInformation 类的格式的 JSON 字符串写入 STDOUT。它有一个名称和一个地址数组。 3.0.0
spark.executor.resource.{resourceName}.vendor 用于执行器的资源的供应商。此选项目前仅在 Kubernetes 上受支持,实际上是遵循 Kubernetes 设备插件命名约定的供应商和域。(例如,对于 Kubernetes 上的 GPU,此配置将设置为 nvidia.com 或 amd.com) 3.0.0
spark.extraListeners (无) 实现 SparkListener 的类的逗号分隔列表;在初始化 SparkContext 时,将创建这些类的实例并注册到 Spark 的监听器总线。如果一个类有一个接受 SparkConf 的单参数构造函数,则将调用该构造函数;否则,将调用零参数构造函数。如果找不到有效的构造函数,SparkContext 创建将失败并抛出异常。 1.3.0
spark.local.dir /tmp 用于 Spark 中“临时”空间的目录,包括映射输出文件和存储在磁盘上的 RDD。这应该位于系统中的快速本地磁盘上。它也可以是不同磁盘上多个目录的逗号分隔列表。
注意:这将被集群管理器设置的 SPARK_LOCAL_DIRS(独立)、MESOS_SANDBOX(Mesos)或 LOCAL_DIRS(YARN)环境变量覆盖。
0.5.0
spark.logConf false 在启动 SparkContext 时,将有效的 SparkConf 作为 INFO 记录。 0.9.0
spark.master (无) 要连接的集群管理器。请参阅 允许的主机 URL 列表 0.9.0
spark.submit.deployMode client Spark 驱动程序的部署模式,可以是“client”或“cluster”,这意味着在本地(“client”)或集群内部的某个节点上远程(“cluster”)启动驱动程序。 1.5.0
spark.log.callerContext (无) 在 Yarn/HDFS 上运行时,将写入 Yarn RM 日志/HDFS 审计日志的应用程序信息。其长度取决于 Hadoop 配置 hadoop.caller.context.max.size。它应该简明扼要,通常最多可以包含 50 个字符。 2.2.0
spark.log.level (无) 如果设置,则会覆盖任何用户定义的日志设置,就像在 Spark 启动时调用 SparkContext.setLogLevel() 一样。有效的日志级别包括:“ALL”、“DEBUG”、“ERROR”、“FATAL”、“INFO”、“OFF”、“TRACE”、“WARN”。 3.5.0
spark.driver.supervise false 如果为 true,则在驱动程序以非零退出状态失败时自动重启驱动程序。仅在 Spark 独立模式或 Mesos 集群部署模式下有效。 1.3.0
spark.driver.log.dfsDir (无) 同步 Spark 驱动程序日志的基目录,如果 spark.driver.log.persistToDfs.enabled 为 true。在此基目录中,每个应用程序将驱动程序日志记录到特定于应用程序的文件中。用户可能希望将其设置为统一的位置,例如 HDFS 目录,以便可以持久化驱动程序日志文件以供以后使用。此目录应允许任何 Spark 用户读取/写入文件,并允许 Spark History Server 用户删除文件。此外,如果 spark.history.fs.driverlog.cleaner.enabled 为 true,并且如果它们比通过设置 spark.history.fs.driverlog.cleaner.maxAge 配置的最大年龄更旧,则 Spark History Server 会清理此目录中的旧日志。 3.0.0
spark.driver.log.persistToDfs.enabled false 如果为 true,则在客户端模式下运行的 spark 应用程序将驱动程序日志写入持久化存储,该存储在 spark.driver.log.dfsDir 中配置。如果未配置 spark.driver.log.dfsDir,则不会持久化驱动程序日志。此外,通过在 Spark History Server 中将 spark.history.fs.driverlog.cleaner.enabled 设置为 true 来启用清理程序。 3.0.0
spark.driver.log.layout %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex 同步到 spark.driver.log.dfsDir 的驱动程序日志的布局。如果未配置此项,它将使用 log4j2.properties 中定义的第一个追加器的布局。如果该项也未配置,则驱动程序日志将使用默认布局。 3.0.0
spark.driver.log.allowErasureCoding false 是否允许驱动程序日志使用擦除编码。在 HDFS 上,擦除编码文件不会像常规复制文件那样快速更新,因此它们可能需要更长时间才能反映应用程序写入的更改。请注意,即使为 true,Spark 仍然不会强制文件使用擦除编码,它只会使用文件系统默认值。 3.0.0
spark.decommission.enabled false 启用停用时,Spark 将尽力优雅地关闭执行器。当 spark.storage.decommission.enabled 启用时,Spark 将尝试将所有 RDD 块(由 spark.storage.decommission.rddBlocks.enabled 控制)和混洗块(由 spark.storage.decommission.shuffleBlocks.enabled 控制)从停用的执行器迁移到远程执行器。启用停用后,Spark 还将在 spark.dynamicAllocation.enabled 启用时停用执行器,而不是将其杀死。 3.1.0
spark.executor.decommission.killInterval (无) 停用的执行器将被外部(例如非 Spark)服务强制杀死后的持续时间。 3.1.0
spark.executor.decommission.forceKillTimeout (无) Spark 强制停用的执行器退出后的持续时间。在大多数情况下,应将其设置为较高的值,因为较低的值会阻止块迁移有足够的时间完成。 3.2.0
spark.executor.decommission.signal PWR 用于触发执行器开始停用的信号。 3.2.0
spark.executor.maxNumFailures numExecutors * 2,最小值为 3 在应用程序失败之前允许的最大执行器失败次数。此配置仅在 YARN 或 Kubernetes 上有效,前提是 `spark.kubernetes.allocation.pods.allocator` 设置为 'direct'。 3.5.0
spark.executor.failuresValidityInterval (无) 执行器失败将被视为独立且不会累积到尝试次数后的间隔。此配置仅在 YARN 或 Kubernetes 上有效,前提是 `spark.kubernetes.allocation.pods.allocator` 设置为 'direct'。 3.5.0

除了这些之外,以下属性也可用,并且在某些情况下可能有用

运行时环境

属性名称默认值含义自版本
spark.driver.extraClassPath (无) 要添加到驱动程序类路径开头的额外类路径条目。
注意:在客户端模式下,此配置不能通过应用程序中的 SparkConf 直接设置,因为驱动程序 JVM 已在此之前启动。相反,请通过 --driver-class-path 命令行选项或在默认属性文件中设置此项。
1.0.0
spark.driver.defaultJavaOptions (无) 要添加到 spark.driver.extraJavaOptions 开头的默认 JVM 选项字符串。这旨在由管理员设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置最大堆大小(-Xmx)设置是非法的。最大堆大小设置可以通过集群模式中的 spark.driver.memory 和客户端模式中的 --driver-memory 命令行选项设置。
注意:在客户端模式下,此配置不能通过应用程序中的 SparkConf 直接设置,因为驱动程序 JVM 已在此之前启动。相反,请通过 --driver-java-options 命令行选项或在默认属性文件中设置此项。
3.0.0
spark.driver.extraJavaOptions (无) 要传递给驱动程序的额外 JVM 选项字符串。这旨在由用户设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置最大堆大小(-Xmx)设置是非法的。最大堆大小设置可以通过集群模式中的 spark.driver.memory 和客户端模式中的 --driver-memory 命令行选项设置。
注意: 在客户端模式下,此配置不能直接通过 SparkConf 在应用程序中设置,因为驱动程序 JVM 此时已经启动。相反,请通过 --driver-java-options 命令行选项或在默认属性文件中设置。spark.driver.defaultJavaOptions 将被添加到此配置之前。
1.0.0
spark.driver.extraLibraryPath (无) 设置启动驱动程序 JVM 时要使用的特殊库路径。
注意: 在客户端模式下,此配置不能直接通过 SparkConf 在应用程序中设置,因为驱动程序 JVM 此时已经启动。相反,请通过 --driver-library-path 命令行选项或在默认属性文件中设置。
1.0.0
spark.driver.userClassPathFirst false (实验性)在驱动程序中加载类时,是否将用户添加的 jar 优先于 Spark 自身的 jar。此功能可用于缓解 Spark 依赖项和用户依赖项之间的冲突。目前,它是一个实验性功能。仅在集群模式下使用。 1.3.0
spark.executor.extraClassPath (无) 要添加到执行器类路径开头的额外类路径条目。这主要用于与旧版 Spark 的向后兼容性。用户通常不需要设置此选项。 1.0.0
spark.executor.defaultJavaOptions (无) 要添加到 spark.executor.extraJavaOptions 开头的默认 JVM 选项字符串。这旨在由管理员设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置 Spark 属性或最大堆大小(-Xmx)设置是非法的。Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件进行设置。最大堆大小设置可以使用 spark.executor.memory 进行设置。如果存在,以下符号将被插值: 将被应用程序 ID 替换,而 将被执行器 ID 替换。例如,要将详细的 gc 日志记录到名为 /tmp 中的应用程序的执行器 ID 的文件中,请传递一个值为:-verbose:gc -Xloggc:/tmp/-.gc 的 'value'。 3.0.0
spark.executor.extraJavaOptions (无) 要传递给执行器的额外 JVM 选项字符串。这旨在由用户设置。例如,GC 设置或其他日志记录。请注意,使用此选项设置 Spark 属性或最大堆大小(-Xmx)设置是非法的。Spark 属性应使用 SparkConf 对象或与 spark-submit 脚本一起使用的 spark-defaults.conf 文件进行设置。最大堆大小设置可以使用 spark.executor.memory 进行设置。如果存在,以下符号将被插值: 将被应用程序 ID 替换,而 将被执行器 ID 替换。例如,要将详细的 gc 日志记录到名为 /tmp 中的应用程序的执行器 ID 的文件中,请传递一个值为:-verbose:gc -Xloggc:/tmp/-.gc 的 'value'。spark.executor.defaultJavaOptions 将被添加到此配置之前。 1.0.0
spark.executor.extraLibraryPath (无) 设置启动执行器 JVM 时要使用的特殊库路径。 1.0.0
spark.executor.logs.rolling.maxRetainedFiles -1 设置系统将保留的最新滚动日志文件的数量。较旧的日志文件将被删除。默认情况下禁用。 1.1.0
spark.executor.logs.rolling.enableCompression false 启用执行器日志压缩。如果启用,则滚动执行器日志将被压缩。默认情况下禁用。 2.0.2
spark.executor.logs.rolling.maxSize 1024 * 1024 设置执行器日志将被滚动的文件的最大大小(以字节为单位)。默认情况下禁用滚动。有关自动清理旧日志的信息,请参见 spark.executor.logs.rolling.maxRetainedFiles 1.4.0
spark.executor.logs.rolling.strategy (无) 设置执行器日志滚动的策略。默认情况下禁用。它可以设置为 "time"(基于时间的滚动)或 "size"(基于大小的滚动)。对于 "time",使用 spark.executor.logs.rolling.time.interval 设置滚动间隔。对于 "size",使用 spark.executor.logs.rolling.maxSize 设置滚动后的最大文件大小。 1.1.0
spark.executor.logs.rolling.time.interval daily 设置执行器日志将被滚动的的时间间隔。默认情况下禁用滚动。有效值为 dailyhourlyminutely 或任何以秒为单位的间隔。有关自动清理旧日志的信息,请参见 spark.executor.logs.rolling.maxRetainedFiles 1.1.0
spark.executor.userClassPathFirst false (实验性)与 spark.driver.userClassPathFirst 相同的功能,但应用于执行器实例。 1.3.0
spark.executorEnv.[EnvironmentVariableName] (无) EnvironmentVariableName 指定的环境变量添加到执行器进程。用户可以指定多个这样的变量来设置多个环境变量。 0.9.0
spark.redaction.regex (?i)secret|password|token|access[.]key 用于确定驱动程序和执行器环境中的哪些 Spark 配置属性和环境变量包含敏感信息的正则表达式。当此正则表达式匹配属性键或值时,该值将从环境 UI 和各种日志(如 YARN 和事件日志)中删除。 2.1.2
spark.redaction.string.regex (无) 用于确定 Spark 生成的字符串的哪些部分包含敏感信息的正则表达式。当此正则表达式匹配字符串的一部分时,该字符串部分将被替换为一个虚拟值。目前,这用于删除 SQL explain 命令的输出。 2.2.0
spark.python.profile false 在 Python 工作程序中启用分析,分析结果将通过 sc.show_profiles() 显示,或者在驱动程序退出之前显示。它也可以通过 sc.dump_profiles(path) 导出到磁盘。如果一些分析结果已手动显示,则它们不会在驱动程序退出之前自动显示。默认情况下,将使用 pyspark.profiler.BasicProfiler,但可以通过将分析器类作为参数传递给 SparkContext 构造函数来覆盖它。 1.2.0
spark.python.profile.dump (无) 用于在驱动程序退出之前导出分析结果的目录。结果将作为每个 RDD 的单独文件导出。它们可以通过 pstats.Stats() 加载。如果指定了此选项,则分析结果不会自动显示。 1.2.0
spark.python.worker.memory 512m 在聚合期间每个 Python 工作程序进程使用的内存量,与 JVM 内存字符串格式相同,带有一个大小单位后缀("k"、"m"、"g" 或 "t")(例如 512m2g)。如果聚合期间使用的内存超过此数量,它将把数据溢出到磁盘。 1.1.0
spark.python.worker.reuse true 是否重用 Python 工作程序。如果是,它将使用固定数量的 Python 工作程序,不需要为每个任务 fork() 一个 Python 进程。如果存在大量广播,这将非常有用,因为广播不需要为每个任务从 JVM 传输到 Python 工作程序。 1.2.0
spark.files 要放置在每个执行器工作目录中的文件的逗号分隔列表。允许使用通配符。 1.0.0
spark.submit.pyFiles 要放置在 Python 应用程序的 PYTHONPATH 上的 .zip、.egg 或 .py 文件的逗号分隔列表。允许使用通配符。 1.0.1
spark.jars 要包含在驱动程序和执行器类路径上的 jar 的逗号分隔列表。允许使用通配符。 0.9.0
spark.jars.packages 要包含在驱动程序和执行器类路径上的 jar 的 Maven 坐标的逗号分隔列表。坐标应为 groupId:artifactId:version。如果给出了 spark.jars.ivySettings,则将根据文件中的配置解析工件,否则将从本地 Maven 仓库中搜索工件,然后是 Maven 中央仓库,最后是命令行选项 --repositories 给出的任何其他远程仓库。有关更多详细信息,请参见 高级依赖项管理 1.5.0
spark.jars.excludes 要排除的 groupId:artifactId 的逗号分隔列表,以便在解析 spark.jars.packages 中提供的依赖项时避免依赖项冲突。 1.5.0
spark.jars.ivy 用于指定 Ivy 用户目录的路径,用于本地 Ivy 缓存和来自 spark.jars.packages 的包文件。这将覆盖 Ivy 属性 ivy.default.ivy.user.dir,该属性默认为 ~/.ivy2。 1.3.0
spark.jars.ivySettings 用于自定义使用 spark.jars.packages 指定的 jar 的解析的 Ivy 设置文件的路径,而不是内置的默认设置,例如 Maven 中央仓库。命令行选项 --repositoriesspark.jars.repositories 给出的其他仓库也将被包含在内。这对于允许 Spark 从防火墙后面解析工件很有用,例如通过内部工件服务器(如 Artifactory)。有关设置文件格式的详细信息,请参见 设置文件。仅支持具有 file:// 方案的路径。没有方案的路径被假定为具有 file:// 方案。

在 YARN 集群模式下运行时,此文件也将被本地化到远程驱动程序,以便在 SparkContext#addJar 中进行依赖项解析。

2.2.0
spark.jars.repositories 用于搜索使用 --packagesspark.jars.packages 给出的 Maven 坐标的额外远程仓库的逗号分隔列表。 2.3.0
spark.archives 要解压缩到每个执行器工作目录中的存档的逗号分隔列表。支持 .jar、.tar.gz、.tgz 和 .zip。您可以通过在文件名后添加 # 来指定要解压缩的目录名,例如 file.zip#directory。此配置是实验性的。 3.1.0
spark.pyspark.driver.python 用于 PySpark 驱动程序的 Python 二进制可执行文件。(默认值为 spark.pyspark.python 2.1.0
spark.pyspark.python 用于 PySpark 驱动程序和执行器的 Python 二进制可执行文件。 2.1.0

混洗行为

属性名称默认值含义自版本
spark.reducer.maxSizeInFlight 48m 从每个 reduce 任务同时获取的映射输出的最大大小(以 MiB 为单位,除非另有说明)。由于每个输出都需要我们创建一个缓冲区来接收它,因此这表示每个 reduce 任务的固定内存开销,因此请保持较小,除非您有大量的内存。 1.4.0
spark.reducer.maxReqsInFlight Int.MaxValue 此配置限制了在任何给定时间点获取块的远程请求数量。当集群中的主机数量增加时,它可能导致一个或多个节点的入站连接数量非常大,导致工作程序在负载下失败。通过允许它限制获取请求的数量,可以缓解这种情况。 2.0.0
spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue 此配置限制了从给定主机端口获取的每个 reduce 任务的远程块的数量。当从给定地址获取大量块或同时获取大量块时,这可能会导致服务执行器或节点管理器崩溃。当启用外部混洗时,这对于减少节点管理器的负载特别有用。您可以通过将其设置为较低的值来缓解此问题。 2.2.1
spark.shuffle.compress true 是否压缩映射输出文件。通常是一个好主意。压缩将使用 spark.io.compression.codec 0.6.0
spark.shuffle.file.buffer 32k 每个混洗文件输出流的内存缓冲区的大小(以 KiB 为单位,除非另有说明)。这些缓冲区减少了创建中间混洗文件时进行的磁盘查找和系统调用的次数。 1.4.0
spark.shuffle.unsafe.file.output.buffer 32k 在每个分区写入不安全混洗写入器后,此缓冲区大小的文件系统。以 KiB 为单位,除非另有说明。 2.3.0
spark.shuffle.spill.diskWriteBufferSize 1024 * 1024 将排序后的记录写入磁盘文件时使用的缓冲区大小(以字节为单位)。 2.3.0
spark.shuffle.io.maxRetries 3 (仅限 Netty)如果将其设置为非零值,则由于与 IO 相关的异常而失败的获取将自动重试。此重试逻辑有助于在长时间的 GC 暂停或瞬态网络连接问题的情况下稳定大型混洗。 1.2.0
spark.shuffle.io.numConnectionsPerPeer 1 (仅限 Netty)主机之间的连接将被重用,以减少大型集群的连接累积。对于具有许多硬盘和少数主机的集群,这可能会导致并发性不足以使所有磁盘饱和,因此用户可以考虑增加此值。 1.2.1
spark.shuffle.io.preferDirectBufs true (仅限 Netty)使用堆外缓冲区来减少混洗和缓存块传输期间的垃圾回收。对于堆外内存严格限制的环境,用户可能希望将其关闭,以强制 Netty 的所有分配都在堆内进行。 1.2.0
spark.shuffle.io.retryWait 5s (仅限 Netty) 尝试获取数据时,两次尝试之间的等待时间。默认情况下,重试导致的最大延迟为 15 秒,计算公式为 maxRetries * retryWait 1.2.1
spark.shuffle.io.backLog -1 shuffle 服务的接受队列长度。对于大型应用程序,可能需要增加此值,以便在短时间内大量连接到达时,如果服务无法跟上连接速度,则不会丢弃传入连接。这需要在 shuffle 服务本身运行的任何地方进行配置,这可能在应用程序之外(请参阅下面的 spark.shuffle.service.enabled 选项)。如果设置为小于 1,则将回退到 Netty 的 io.netty.util.NetUtil#SOMAXCONN 定义的操作系统默认值。 1.1.1
spark.shuffle.io.connectionTimeout spark.network.timeout 的值 shuffle 服务器和客户端之间已建立连接的超时时间,如果在至少 connectionTimeout 时间内没有通道流量,但仍有未完成的获取请求,则将这些连接标记为空闲并关闭。 1.2.0
spark.shuffle.io.connectionCreationTimeout spark.shuffle.io.connectionTimeout 的值 shuffle 服务器和客户端之间建立连接的超时时间。 3.2.0
spark.shuffle.service.enabled false 启用外部 shuffle 服务。此服务保留执行器写入的 shuffle 文件,例如,以便可以安全地删除执行器,或者以便在执行器发生故障时可以继续进行 shuffle 获取。必须设置外部 shuffle 服务才能启用它。有关更多信息,请参阅 动态分配配置和设置文档 1.2.0
spark.shuffle.service.port 7337 外部 shuffle 服务将运行的端口。 1.2.0
spark.shuffle.service.name spark_shuffle 客户端应与其通信的 Spark shuffle 服务的配置名称。这必须与用于在 YARN NodeManager 配置中配置 Shuffle 的名称匹配(yarn.nodemanager.aux-services)。仅在 spark.shuffle.service.enabled 设置为 true 时生效。 3.2.0
spark.shuffle.service.index.cache.size 100m 缓存条目限制为指定的内存占用,以字节为单位,除非另有说明。 2.3.0
spark.shuffle.service.removeShuffle false 是否使用 ExternalShuffleService 删除已取消分配执行器的 shuffle 块,当 shuffle 不再需要时。如果没有启用此功能,则已取消分配的执行器上的 shuffle 数据将保留在磁盘上,直到应用程序结束。 3.3.0
spark.shuffle.maxChunksBeingTransferred Long.MAX_VALUE 允许在 shuffle 服务上同时传输的最大块数。请注意,当达到最大数量时,新的传入连接将被关闭。客户端将根据 shuffle 重试配置(请参阅 spark.shuffle.io.maxRetriesspark.shuffle.io.retryWait)进行重试,如果达到这些限制,则任务将因获取失败而失败。 2.3.0
spark.shuffle.sort.bypassMergeThreshold 200 (高级) 在基于排序的 shuffle 管理器中,如果不存在 map 端聚合并且最多有这么多 reduce 分区,则避免对数据进行合并排序。 1.1.1
spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO 用于 shuffle IO 的类的名称。 3.0.0
spark.shuffle.spill.compress true 是否压缩 shuffle 期间溢出的数据。压缩将使用 spark.io.compression.codec 0.9.0
spark.shuffle.accurateBlockThreshold 100 * 1024 * 1024 HighlyCompressedMapStatus 中 shuffle 块大小的阈值(以字节为单位),超过此阈值,shuffle 块的大小将被准确记录。这有助于通过避免在获取 shuffle 块时低估 shuffle 块大小来防止 OOM。 2.2.1
spark.shuffle.registration.timeout 5000 向外部 shuffle 服务注册的超时时间(以毫秒为单位)。 2.3.0
spark.shuffle.registration.maxAttempts 3 如果我们无法向外部 shuffle 服务注册,我们将尝试注册最大次数。 2.3.0
spark.shuffle.reduceLocality.enabled true 是否计算 reduce 任务的局部性偏好。 1.5.0
spark.shuffle.mapOutput.minSizeForBroadcast 512k 使用 Broadcast 将 map 输出状态发送到执行器的阈值大小。 2.0.0
spark.shuffle.detectCorrupt true 是否检测获取的块中的任何损坏。 2.2.0
spark.shuffle.detectCorrupt.useExtraMemory false 如果启用,则通过使用额外的内存来解压缩/解密压缩/加密流的一部分,以尽早检测损坏。任何抛出的 IOException 都将导致任务重试一次,如果它再次使用相同的异常失败,则将抛出 FetchFailedException 以重试上一个阶段。 3.0.0
spark.shuffle.useOldFetchProtocol false 是否在执行 shuffle 块获取时使用旧协议。仅在我们需要新 Spark 版本作业从旧版本外部 shuffle 服务获取 shuffle 块的兼容性场景中启用它。 3.0.0
spark.shuffle.readHostLocalDisk true 如果启用(并且 spark.shuffle.useOldFetchProtocol 已禁用),则从运行在同一主机上的那些块管理器请求的 shuffle 块将直接从磁盘读取,而不是作为远程块通过网络获取。 3.0.0
spark.files.io.connectionTimeout spark.network.timeout 的值 在 Spark RPC 环境中获取文件的已建立连接的超时时间,如果在至少 connectionTimeout 时间内没有通道流量,但仍有未完成的文件正在下载,则将这些连接标记为空闲并关闭。 1.6.0
spark.files.io.connectionCreationTimeout spark.files.io.connectionTimeout 的值 在 Spark RPC 环境中获取文件建立连接的超时时间。 3.2.0
spark.shuffle.checksum.enabled true 是否计算 shuffle 数据的校验和。如果启用,Spark 将为 map 输出文件中的每个分区数据计算校验和值,并将这些值存储在磁盘上的校验和文件中。当检测到 shuffle 数据损坏时,Spark 将尝试使用校验和文件诊断损坏的原因(例如,网络问题、磁盘问题等)。 3.2.0
spark.shuffle.checksum.algorithm ADLER32 用于计算 shuffle 校验和的算法。目前,它只支持 JDK 的内置算法,例如 ADLER32、CRC32。 3.2.0
spark.shuffle.service.fetch.rdd.enabled false 是否使用 ExternalShuffleService 获取磁盘持久化 RDD 块。在动态分配的情况下,如果启用此功能,则只有磁盘持久化块的执行器将在 spark.dynamicAllocation.executorIdleTimeout 后被视为空闲,并将相应地释放。 3.0.0
spark.shuffle.service.db.enabled true 是否在 ExternalShuffleService 中使用数据库。请注意,这仅影响独立模式。 3.0.0
spark.shuffle.service.db.backend LEVELDB 指定 shuffle 服务本地数据库中使用的基于磁盘的存储。设置为 LEVELDB 或 ROCKSDB。 3.4.0

Spark UI

属性名称默认值含义自版本
spark.eventLog.logBlockUpdates.enabled false 如果 spark.eventLog.enabled 为 true,是否为每个块更新记录事件。*警告:*这将大大增加事件日志的大小。 2.3.0
spark.eventLog.longForm.enabled false 如果为 true,则在事件日志中使用调用站点的长格式。否则使用短格式。 2.4.0
spark.eventLog.compress false 如果 spark.eventLog.enabled 为 true,是否压缩记录的事件。 1.0.0
spark.eventLog.compression.codec zstd 压缩记录事件的编解码器。默认情况下,Spark 提供四个编解码器:lz4lzfsnappyzstd。您还可以使用完全限定的类名来指定编解码器,例如 org.apache.spark.io.LZ4CompressionCodecorg.apache.spark.io.LZFCompressionCodecorg.apache.spark.io.SnappyCompressionCodecorg.apache.spark.io.ZStdCompressionCodec 3.0.0
spark.eventLog.erasureCoding.enabled false 是否允许事件日志使用擦除编码,或者关闭擦除编码,无论文件系统默认值如何。在 HDFS 上,擦除编码文件不会像常规复制文件那样快速更新,因此应用程序更新将在历史服务器中出现的时间更长。请注意,即使为 true,Spark 仍然不会强制文件使用擦除编码,它只会使用文件系统默认值。 3.0.0
spark.eventLog.dir file:///tmp/spark-events 如果 spark.eventLog.enabled 为 true,则记录 Spark 事件的基目录。在此基目录中,Spark 为每个应用程序创建一个子目录,并在该目录中记录特定于应用程序的事件。用户可能希望将其设置为统一的位置(如 HDFS 目录),以便历史服务器可以读取历史文件。 1.0.0
spark.eventLog.enabled false 是否记录 Spark 事件,这对于在应用程序完成后重建 Web UI 很有用。 1.0.0
spark.eventLog.overwrite false 是否覆盖任何现有文件。 1.0.0
spark.eventLog.buffer.kb 100k 写入输出流时使用的缓冲区大小,以 KiB 为单位,除非另有说明。 1.0.0
spark.eventLog.rolling.enabled false 是否启用滚动事件日志文件。如果设置为 true,它会将每个事件日志文件缩减到配置的大小。 3.0.0
spark.eventLog.rolling.maxFileSize 128m spark.eventLog.rolling.enabled=true 时,指定事件日志文件在滚动之前可以达到的最大大小。 3.0.0
spark.ui.dagGraph.retainedRootRDDs Int.MaxValue Spark UI 和状态 API 在进行垃圾回收之前记住的 DAG 图节点数量。 2.1.0
spark.ui.enabled true 是否为 Spark 应用程序运行 Web UI。 1.1.1
spark.ui.store.path 用于缓存实时 UI 的应用程序信息的本地目录。默认情况下,此选项未设置,这意味着所有应用程序信息都将保存在内存中。 3.4.0
spark.ui.killEnabled true 允许从 Web UI 中杀死作业和阶段。 1.0.0
spark.ui.liveUpdate.period 100ms 更新实时实体的频率。-1 表示在重播应用程序时“从不更新”,这意味着只会发生最后一次写入。对于实时应用程序,这避免了一些操作,这些操作在快速处理传入任务事件时可以省略。 2.3.0
spark.ui.liveUpdate.minFlushPeriod 1s 过时的 UI 数据刷新之前经过的最短时间。这避免了在不频繁触发传入任务事件时 UI 过时。 2.4.2
spark.ui.port 4040 应用程序仪表板的端口,该仪表板显示内存和工作负载数据。 0.7.0
spark.ui.retainedJobs 1000 Spark UI 和状态 API 在进行垃圾回收之前记住的作业数量。这是一个目标最大值,在某些情况下,可能保留更少的元素。 1.2.0
spark.ui.retainedStages 1000 Spark UI 和状态 API 在进行垃圾回收之前记住的阶段数量。这是一个目标最大值,在某些情况下,可能保留更少的元素。 0.9.0
spark.ui.retainedTasks 100000 Spark UI 和状态 API 在进行垃圾回收之前记住的一个阶段中的任务数量。这是一个目标最大值,在某些情况下,可能保留更少的元素。 2.0.1
spark.ui.reverseProxy false 启用将 Spark Master 作为工作器和应用程序 UI 的反向代理运行。在此模式下,Spark Master 将反向代理工作器和应用程序 UI,以允许访问,而无需直接访问其主机。谨慎使用,因为工作器和应用程序 UI 将无法直接访问,您只能通过 Spark Master/代理公共 URL 访问它们。此设置会影响集群中运行的所有工作器和应用程序 UI,并且必须在所有工作器、驱动程序和主节点上设置。 2.1.0
spark.ui.reverseProxyUrl 如果 Spark UI 应该通过另一个前端反向代理提供服务,那么这是通过该反向代理访问 Spark 主 UI 的 URL。当运行代理进行身份验证时,例如 OAuth 代理,这很有用。URL 可能包含路径前缀,例如 http://mydomain.com/path/to/spark/,允许您通过同一个虚拟主机和端口为多个 Spark 集群和其他 Web 应用程序提供 UI 服务。通常,这应该是一个包含方案(http/https)、主机和端口的绝对 URL。可以在这里指定以“/”开头的相对 URL。在这种情况下,Spark UI 和 Spark REST API 生成的所有 URL 将是服务器相对链接 - 这仍然有效,因为整个 Spark UI 是通过同一个主机和端口提供的。
此设置会影响 Spark UI 中的链接生成,但前端反向代理负责
  • 在转发请求之前剥离路径前缀,
  • 重写直接指向 Spark 主节点的重定向,
  • 将从 http://mydomain.com/path/to/sparkhttp://mydomain.com/path/to/spark/ 的访问重定向(路径前缀后的尾部斜杠);否则,主页面上的相对链接无法正常工作。
此设置会影响集群中运行的所有工作器和应用程序 UI,并且必须在所有工作器、驱动程序和主节点上设置相同的值。它仅在 spark.ui.reverseProxy 启用时有效。当 Spark 主节点 Web UI 可直接访问时,不需要此设置。
请注意,设置的值在按“/”分割后不能包含关键字“proxy”或“history”。Spark UI 依赖这两个关键字从 URI 获取 REST API 端点。
2.1.0
spark.ui.proxyRedirectUri 当 Spark 在代理后面运行时,重定向的地址。这将使 Spark 修改重定向响应,使其指向代理服务器,而不是 Spark UI 自己的地址。这应该只是服务器的地址,没有任何应用程序的前缀路径;前缀应该由代理服务器本身设置(通过添加 X-Forwarded-Context 请求头),或者通过在 Spark 应用程序的配置中设置代理基础。 3.0.0
spark.ui.showConsoleProgress false 在控制台中显示进度条。进度条显示运行时间超过 500 毫秒的阶段的进度。如果多个阶段同时运行,则会在同一行上显示多个进度条。
注意:在 shell 环境中,spark.ui.showConsoleProgress 的默认值为 true。
1.2.1
spark.ui.custom.executor.log.url (无) 指定自定义 Spark 执行器日志 URL,以支持外部日志服务,而不是在 Spark UI 中使用集群管理器的应用程序日志 URL。Spark 将通过模式支持一些路径变量,这些模式可能因集群管理器而异。请查看您的集群管理器的文档,以查看支持哪些模式(如果有)。

请注意,此配置还会替换事件日志中的原始日志 URL,这在访问历史服务器上的应用程序时也会生效。新的日志 URL 必须是永久性的,否则您可能会遇到执行器日志 URL 的死链接。

目前,只有 YARN 模式支持此配置

3.0.0
spark.worker.ui.retainedExecutors 1000 Spark UI 和状态 API 在垃圾回收之前记住多少个已完成的执行器。 1.5.0
spark.worker.ui.retainedDrivers 1000 Spark UI 和状态 API 在垃圾回收之前记住多少个已完成的驱动程序。 1.5.0
spark.sql.ui.retainedExecutions 1000 Spark UI 和状态 API 在垃圾回收之前记住多少个已完成的执行。 1.5.0
spark.streaming.ui.retainedBatches 1000 Spark UI 和状态 API 在垃圾回收之前记住多少个已完成的批次。 1.0.0
spark.ui.retainedDeadExecutors 100 Spark UI 和状态 API 在垃圾回收之前记住多少个已死亡的执行器。 2.0.0
spark.ui.filters 要应用于 Spark Web UI 的过滤器类名的逗号分隔列表。过滤器应该是标准的 javax servlet Filter
过滤器参数也可以在配置中指定,方法是设置以下形式的配置项:spark.<class name of filter>.param.<param name>=<value>
例如
spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
1.0.0
spark.ui.requestHeaderSize 8k HTTP 请求头的最大允许大小(以字节为单位,除非另有说明)。此设置也适用于 Spark 历史服务器。 2.2.3
spark.ui.timelineEnabled true 是否在 UI 页面上显示事件时间线数据。 3.4.0
spark.ui.timeline.executors.maximum 250 事件时间线中显示的执行器最大数量。 3.2.0
spark.ui.timeline.jobs.maximum 500 事件时间线中显示的作业最大数量。 3.2.0
spark.ui.timeline.stages.maximum 500 事件时间线中显示的阶段最大数量。 3.2.0
spark.ui.timeline.tasks.maximum 1000 事件时间线中显示的任务最大数量。 1.4.0
spark.appStatusStore.diskStoreDir 存储 SQL 执行诊断信息的本地目录。此配置仅适用于实时 UI。 3.4.0

压缩和序列化

属性名称默认值含义自版本
spark.broadcast.compress true 是否在发送广播变量之前压缩它们。通常是一个好主意。压缩将使用 spark.io.compression.codec 0.6.0
spark.checkpoint.compress false 是否压缩 RDD 检查点。通常是一个好主意。压缩将使用 spark.io.compression.codec 2.2.0
spark.io.compression.codec lz4 用于压缩内部数据的编解码器,例如 RDD 分区、事件日志、广播变量和混洗输出。默认情况下,Spark 提供四种编解码器:lz4lzfsnappyzstd。您也可以使用完全限定的类名来指定编解码器,例如 org.apache.spark.io.LZ4CompressionCodecorg.apache.spark.io.LZFCompressionCodecorg.apache.spark.io.SnappyCompressionCodecorg.apache.spark.io.ZStdCompressionCodec 0.8.0
spark.io.compression.lz4.blockSize 32k LZ4 压缩中使用的块大小,在使用 LZ4 压缩编解码器的情况下。降低此块大小也会降低使用 LZ4 时混洗内存的使用量。默认单位为字节,除非另有说明。此配置仅适用于 spark.io.compression.codec 1.4.0
spark.io.compression.snappy.blockSize 32k Snappy 压缩中的块大小,在使用 Snappy 压缩编解码器的情况下。降低此块大小也会降低使用 Snappy 时混洗内存的使用量。默认单位为字节,除非另有说明。此配置仅适用于 spark.io.compression.codec 1.4.0
spark.io.compression.zstd.level 1 Zstd 压缩编解码器的压缩级别。提高压缩级别将导致更好的压缩,但会以更高的 CPU 和内存使用量为代价。此配置仅适用于 spark.io.compression.codec 2.3.0
spark.io.compression.zstd.bufferSize 32k Zstd 压缩中使用的缓冲区大小(以字节为单位),在使用 Zstd 压缩编解码器的情况下。降低此大小将降低使用 Zstd 时混洗内存的使用量,但可能会增加压缩成本,因为 JNI 调用开销过大。此配置仅适用于 spark.io.compression.codec 2.3.0
spark.io.compression.zstd.bufferPool.enabled true 如果为 true,则启用 ZSTD JNI 库的缓冲区池。 3.2.0
spark.kryo.classesToRegister (无) 如果您使用 Kryo 序列化,请提供要与 Kryo 注册的自定义类名的逗号分隔列表。有关更多详细信息,请参阅 调整指南 1.2.0
spark.kryo.referenceTracking true 是否在使用 Kryo 序列化数据时跟踪对同一对象的引用,这在您的对象图具有循环时是必需的,并且如果它们包含同一对象的多个副本,则对效率很有用。如果知道情况并非如此,可以禁用它以提高性能。 0.8.0
spark.kryo.registrationRequired false 是否要求与 Kryo 注册。如果设置为“true”,则 Kryo 在序列化未注册的类时会抛出异常。如果设置为 false(默认值),则 Kryo 会将未注册的类名与每个对象一起写入。写入类名会导致明显的性能开销,因此启用此选项可以严格地强制用户没有遗漏注册的类。 1.1.0
spark.kryo.registrator (无) 如果您使用 Kryo 序列化,请提供将您的自定义类与 Kryo 注册的类的逗号分隔列表。如果您需要以自定义方式注册您的类,例如指定自定义字段序列化器,则此属性很有用。否则,spark.kryo.classesToRegister 更简单。它应该设置为扩展 KryoRegistrator 的类。有关更多详细信息,请参阅 调整指南 0.5.0
spark.kryo.unsafe true 是否使用基于 unsafe 的 Kryo 序列化器。通过使用基于 Unsafe 的 IO,速度可以大大提高。 2.1.0
spark.kryoserializer.buffer.max 64m Kryo 序列化缓冲区的最大允许大小(以 MiB 为单位,除非另有说明)。这必须大于您尝试序列化的任何对象,并且必须小于 2048m。如果您在 Kryo 内部收到“缓冲区限制超出”异常,请增加此值。 1.4.0
spark.kryoserializer.buffer 64k Kryo 序列化缓冲区的初始大小(以 KiB 为单位,除非另有说明)。请注意,每个工作器上每个内核将有一个缓冲区。如果需要,此缓冲区将增长到 spark.kryoserializer.buffer.max 1.4.0
spark.rdd.compress false 是否压缩序列化后的 RDD 分区(例如,对于 Java 和 Scala 中的 StorageLevel.MEMORY_ONLY_SER 或 Python 中的 StorageLevel.MEMORY_ONLY)。可以在一定程度上节省空间,但会增加一些 CPU 时间。压缩将使用 spark.io.compression.codec 0.6.0
spark.serializer org.apache.spark.serializer.
JavaSerializer
用于序列化将通过网络发送或需要以序列化形式缓存的对象的类。Java 序列化的默认值适用于任何 Serializable Java 对象,但速度很慢,因此我们建议 在需要速度时使用 org.apache.spark.serializer.KryoSerializer 并配置 Kryo 序列化。可以是 org.apache.spark.Serializer 的任何子类。 0.5.0
spark.serializer.objectStreamReset 100 使用 org.apache.spark.serializer.JavaSerializer 进行序列化时,序列化器会缓存对象以防止写入冗余数据,但这会阻止对这些对象的垃圾回收。通过调用“reset”,您可以从序列化器中刷新这些信息,并允许回收旧对象。要关闭此定期重置,请将其设置为 -1。默认情况下,它将每 100 个对象重置一次序列化器。 1.0.0

内存管理

属性名称默认值含义自版本
spark.memory.fraction 0.6 (堆空间 - 300MB) 的一部分用于执行和存储。此值越低,溢出和缓存数据驱逐的频率越高。此配置的目的是为内部元数据、用户数据结构和稀疏、异常大的记录的非精确大小估计预留内存。建议保留默认值。有关更多详细信息,包括在增加此值时正确调整 JVM 垃圾回收的重要信息,请参阅 此说明 1.6.0
spark.memory.storageFraction 0.5 不受驱逐影响的存储内存量,表示为 spark.memory.fraction 预留区域大小的一部分。此值越高,可用于执行的工作内存就越少,任务可能会更频繁地溢出到磁盘。建议保留默认值。有关更多详细信息,请参阅 此说明 1.6.0
spark.memory.offHeap.enabled false 如果为 true,Spark 将尝试对某些操作使用堆外内存。如果启用了堆外内存使用,则 spark.memory.offHeap.size 必须为正。 1.6.0
spark.memory.offHeap.size 0 用于堆外分配的内存绝对量,以字节为单位,除非另有说明。此设置不会影响堆内存使用情况,因此,如果您的执行器总内存消耗必须符合某个硬性限制,请务必相应地缩小 JVM 堆大小。当 spark.memory.offHeap.enabled=true 时,此值必须设置为正值。 1.6.0
spark.storage.unrollMemoryThreshold 1024 * 1024 在展开任何块之前请求的初始内存。 1.1.0
spark.storage.replication.proactive false 启用 RDD 块的主动块复制。由于执行器故障而丢失的缓存的 RDD 块副本将在存在任何现有可用副本的情况下得到补充。这尝试将块的复制级别恢复到初始数量。 2.2.0
spark.storage.localDiskByExecutors.cacheSize 1000 存储本地目录的最大执行器数量。此大小同时应用于驱动程序和执行器端,以避免无限制的存储。此缓存将用于在从同一主机获取磁盘持久化 RDD 块或混洗块(当 spark.shuffle.readHostLocalDisk 设置时)的情况下避免网络。 3.0.0
spark.cleaner.periodicGC.interval 30 分钟 控制多久触发一次垃圾回收。

此上下文清理器仅在弱引用被垃圾回收时触发清理。在具有大型驱动程序 JVM 的长时间运行的应用程序中,如果驱动程序上的内存压力很小,这可能很少发生或根本不发生。完全不清理可能会导致执行器一段时间后耗尽磁盘空间。
1.6.0
spark.cleaner.referenceTracking true 启用或禁用上下文清理。 1.0.0
spark.cleaner.referenceTracking.blocking true 控制清理线程是否应该阻塞清理任务(除了混洗,混洗由 spark.cleaner.referenceTracking.blocking.shuffle Spark 属性控制)。 1.0.0
spark.cleaner.referenceTracking.blocking.shuffle false 控制清理线程是否应该阻塞混洗清理任务。 1.1.1
spark.cleaner.referenceTracking.cleanCheckpoints false 控制是否清理检查点文件,如果引用超出范围。 1.4.0

执行行为

属性名称默认值含义自版本
spark.broadcast.blockSize 4 MB TorrentBroadcastFactory 中每个块的块大小,以 KiB 为单位,除非另有说明。过大的值会降低广播期间的并行性(使其变慢);但是,如果它太小,BlockManager 可能会遇到性能问题。 0.5.0
spark.broadcast.checksum true 是否启用广播的校验和。如果启用,广播将包含校验和,这有助于检测损坏的块,但需要计算和发送更多数据。如果网络有其他机制来保证数据在广播期间不会损坏,则可以禁用它。 2.1.1
spark.broadcast.UDFCompressionThreshold 1 * 1024 * 1024 用户定义函数 (UDF) 和 Python RDD 命令通过广播压缩的阈值,以字节为单位,除非另有说明。 3.0.0
spark.executor.cores 在 YARN 模式下为 1,在独立和 Mesos 粗粒度模式下为工作程序上的所有可用核心。 每个执行器上要使用的核心数量。在独立和 Mesos 粗粒度模式下,有关更多详细信息,请参阅 此描述 1.0.0
spark.default.parallelism 对于分布式混洗操作(如 reduceByKeyjoin),父 RDD 中的最大分区数。对于像 parallelize 这样的没有父 RDD 的操作,它取决于集群管理器
  • 本地模式:本地机器上的核心数量
  • Mesos 细粒度模式:8
  • 其他:所有执行器节点上的总核心数或 2,取较大者
RDD 中的默认分区数,由 joinreduceByKeyparallelize 等转换返回,当用户未设置时。 0.5.0
spark.executor.heartbeatInterval 10 秒 每个执行器向驱动程序发送心跳的间隔。心跳让驱动程序知道执行器仍然存活,并用正在进行的任务的指标更新它。spark.executor.heartbeatInterval 应该明显小于 spark.network.timeout 1.1.0
spark.files.fetchTimeout 60 秒 从驱动程序获取通过 SparkContext.addFile() 添加的文件时要使用的通信超时。 1.0.0
spark.files.useFetchCache true 如果设置为 true(默认值),文件获取将使用本地缓存,该缓存由属于同一应用程序的执行器共享,这可以提高在同一主机上运行许多执行器时的任务启动性能。如果设置为 false,这些缓存优化将被禁用,所有执行器将获取自己的文件副本。为了使用位于 NFS 文件系统上的 Spark 本地目录,可以禁用此优化(有关更多详细信息,请参阅 SPARK-6313)。 1.2.2
spark.files.overwrite false 是否覆盖启动时存在的任何文件。即使此选项设置为 true,用户也不能覆盖 SparkContext.addFileSparkContext.addJar 添加的文件。 1.0.0
spark.files.ignoreCorruptFiles false 是否忽略损坏的文件。如果为 true,Spark 作业将在遇到损坏或不存在的文件时继续运行,并且已读取的内容仍将被返回。 2.1.0
spark.files.ignoreMissingFiles false 是否忽略丢失的文件。如果为 true,Spark 作业将在遇到丢失的文件时继续运行,并且已读取的内容仍将被返回。 2.4.0
spark.files.maxPartitionBytes 134217728(128 MiB) 读取文件时打包到单个分区中的最大字节数。 2.1.0
spark.files.openCostInBytes 4194304(4 MiB) 打开文件的估计成本,以可以同时扫描的字节数衡量。这在将多个文件放入分区时使用。最好高估,这样具有小文件的分区将比具有大文件的分区更快。 2.1.0
spark.hadoop.cloneConf false 如果设置为 true,则为每个任务克隆一个新的 Hadoop Configuration 对象。应启用此选项以解决 Configuration 线程安全问题(有关更多详细信息,请参阅 SPARK-2546)。默认情况下,它被禁用,以避免对不受这些问题影响的作业造成意外的性能下降。 1.0.3
spark.hadoop.validateOutputSpecs true 如果设置为 true,则验证 saveAsHadoopFile 和其他变体中使用的输出规范(例如,检查输出目录是否已存在)。可以禁用此选项以静默由于预先存在的输出目录而导致的异常。我们建议用户不要禁用此选项,除非试图实现与以前版本的 Spark 的兼容性。只需使用 Hadoop 的 FileSystem API 手动删除输出目录即可。此设置将被忽略,因为通过 Spark Streaming 的 StreamingContext 生成的作业,因为数据可能需要在检查点恢复期间重写到预先存在的输出目录。 1.0.1
spark.storage.memoryMapThreshold 2 MB Spark 从磁盘读取块时内存映射的块大小。默认单位为字节,除非另有说明。这可以防止 Spark 内存映射非常小的块。通常,内存映射对于接近或低于操作系统页面大小的块来说开销很大。 0.9.2
spark.storage.decommission.enabled false 是否在停用执行器时停用块管理器。 3.1.0
spark.storage.decommission.shuffleBlocks.enabled true 是否在块管理器停用期间传输混洗块。需要可迁移的混洗解析器(如基于排序的混洗)。 3.1.0
spark.storage.decommission.shuffleBlocks.maxThreads 8 迁移混洗文件时要使用的最大线程数。 3.1.0
spark.storage.decommission.rddBlocks.enabled true 是否在块管理器停用期间传输 RDD 块。 3.1.0
spark.storage.decommission.fallbackStorage.path (无) 块管理器停用期间回退存储的位置。例如,s3a://spark-storage/。如果为空,则回退存储被禁用。存储应由 TTL 管理,因为 Spark 不会清理它。 3.1.0
spark.storage.decommission.fallbackStorage.cleanUp false 如果为 true,Spark 会在关闭时清理其回退存储数据。 3.2.0
spark.storage.decommission.shuffleBlocks.maxDiskSize (无) 在拒绝远程混洗块之前要使用的最大磁盘空间。拒绝远程混洗块意味着执行器将不会接收任何混洗迁移,如果没有任何其他执行器可用于迁移,则混洗块将丢失,除非配置了 spark.storage.decommission.fallbackStorage.path 3.2.0
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1 文件输出提交器算法版本,有效的算法版本号:1 或 2。请注意,2 可能会导致 MAPREDUCE-7282 之类的正确性问题。 2.2.0

执行器指标

属性名称默认值含义自版本
spark.eventLog.logStageExecutorMetrics false 是否将每个阶段的执行器指标峰值(对于每个执行器)写入事件日志。
注意:指标是在执行器心跳中轮询(收集)并发送的,并且始终执行此操作;此配置仅用于确定是否将聚合的指标峰值写入事件日志。
3.0.0
spark.executor.processTreeMetrics.enabled false 是否在收集执行器指标时收集进程树指标(来自 /proc 文件系统)。
注意:仅当 /proc 文件系统存在时才会收集进程树指标。
3.0.0
spark.executor.metrics.pollingInterval 0 收集执行器指标的频率(以毫秒为单位)。
如果为 0,则轮询在执行器心跳时完成(因此在心跳间隔内,由 spark.executor.heartbeatInterval 指定)。如果为正数,则轮询以该间隔完成。
3.0.0
spark.eventLog.gcMetrics.youngGenerationGarbageCollectors Copy、PS Scavenge、ParNew、G1 Young Generation 支持的年轻代垃圾收集器的名称。名称通常是 GarbageCollectorMXBean.getName 的返回值。内置的年轻代垃圾收集器是 Copy、PS Scavenge、ParNew、G1 Young Generation。 3.0.0
spark.eventLog.gcMetrics.oldGenerationGarbageCollectors MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation 支持的旧代垃圾收集器的名称。名称通常是 GarbageCollectorMXBean.getName 的返回值。内置的旧代垃圾收集器是 MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation。 3.0.0
spark.executor.metrics.fileSystemSchemes file、hdfs 在执行器指标中报告的文件系统方案。 3.1.0

网络

属性名称默认值含义自版本
spark.rpc.message.maxSize 128 在“控制平面”通信中允许的最大消息大小(以 MiB 为单位);通常仅适用于在执行器和驱动程序之间发送的映射输出大小信息。如果您正在运行具有数千个映射和归约任务的作业,并且看到有关 RPC 消息大小的消息,请增加此值。 2.0.0
spark.blockManager.port (随机) 所有块管理器监听的端口。这些端口存在于驱动程序和执行器上。 1.1.0
spark.driver.blockManager.port spark.blockManager.port 的值) 块管理器监听的特定于驱动程序的端口,用于它无法使用与执行器相同的配置的情况。 2.1.0
spark.driver.bindAddress (spark.driver.host 的值) 绑定监听套接字的主机名或 IP 地址。此配置会覆盖 SPARK_LOCAL_IP 环境变量(见下文)。
它还允许将与本地地址不同的地址宣传给执行器或外部系统。例如,当使用桥接网络运行容器时,这很有用。为了使此功能正常工作,需要从容器的主机转发驱动程序使用的不同端口(RPC、块管理器和 UI)。
2.1.0
spark.driver.host (本地主机名) 驱动程序的主机名或 IP 地址。用于与执行器和独立 Master 通信。 0.7.0
spark.driver.port (随机) 驱动程序监听的端口。用于与执行器和独立 Master 通信。 0.7.0
spark.rpc.io.backLog 64 RPC 服务器的接受队列长度。对于大型应用程序,可能需要增加此值,以便在短时间内到达大量连接时不会丢弃传入连接。 3.0.0
spark.network.timeout 120s 所有网络交互的默认超时时间。如果未配置,此配置将用于代替 spark.storage.blockManagerHeartbeatTimeoutMsspark.shuffle.io.connectionTimeoutspark.rpc.askTimeoutspark.rpc.lookupTimeout 1.3.0
spark.network.timeoutInterval 60 秒 驱动程序检查和过期死执行器的间隔。 1.3.2
spark.network.io.preferDirectBufs true 如果启用,则共享分配器优先使用堆外缓冲区分配。堆外缓冲区用于减少 shuffle 和缓存块传输期间的垃圾回收。对于堆外内存严格限制的环境,用户可能希望将其关闭以强制所有分配都在堆内。 3.0.0
spark.port.maxRetries 16 绑定到端口时在放弃之前尝试的最大次数。当端口被赋予特定值(非 0)时,每次后续重试都会将前一次尝试中使用的端口加 1,然后再重试。这实际上允许它尝试从指定的起始端口到端口 + maxRetries 的一系列端口。 1.1.1
spark.rpc.askTimeout spark.network.timeout RPC ask 操作在超时之前等待的持续时间。 1.4.0
spark.rpc.lookupTimeout 120s RPC 远程端点查找操作在超时之前等待的持续时间。 1.4.0
spark.network.maxRemoteBlockSizeFetchToMem 200m 当块的大小超过此阈值(以字节为单位)时,远程块将被获取到磁盘。这样做是为了避免巨大的请求占用太多内存。请注意,此配置将影响 shuffle 获取和块管理器远程块获取。对于启用了外部 shuffle 服务的用户,此功能仅在外部 shuffle 服务至少为 2.3.0 时才有效。 3.0.0
spark.rpc.io.connectionTimeout spark.network.timeout 的值 RPC 对等体之间已建立连接的超时时间,如果存在未完成的 RPC 请求,但通道在至少 connectionTimeout 时间内没有流量,则将其标记为空闲并关闭。 1.2.0
spark.rpc.io.connectionCreationTimeout spark.rpc.io.connectionTimeout 的值 RPC 对等体之间建立连接的超时时间。 3.2.0

调度

属性名称默认值含义自版本
spark.cores.max (未设置) 独立部署集群“粗粒度”共享模式下的 Mesos 集群 上运行时,从整个集群(而不是从每台机器)请求应用程序的 CPU 内核的最大数量。如果未设置,默认值为 Spark 独立集群管理器上的 spark.deploy.defaultCores,或 Mesos 上的无限(所有可用内核)。 0.6.0
spark.locality.wait 3s 在放弃并将其启动到不太本地化的节点之前,等待启动数据本地任务的时间。相同的等待将用于逐步遍历多个本地化级别(进程本地、节点本地、机架本地,然后是任何)。还可以通过设置 spark.locality.wait.node 等来自定义每个级别的等待时间。如果您的任务很长并且看到本地化效果不佳,您应该增加此设置,但默认设置通常效果很好。 0.5.0
spark.locality.wait.node spark.locality.wait 自定义节点本地化的本地化等待时间。例如,您可以将其设置为 0 以跳过节点本地化并立即搜索机架本地化(如果您的集群具有机架信息)。 0.8.0
spark.locality.wait.process spark.locality.wait 自定义进程本地化的本地化等待时间。这会影响尝试访问特定执行器进程中的缓存数据的任务。 0.8.0
spark.locality.wait.rack spark.locality.wait 自定义机架本地化的本地化等待时间。 0.8.0
spark.scheduler.maxRegisteredResourcesWaitingTime 30s 在调度开始之前等待资源注册的最大时间。 1.1.1
spark.scheduler.minRegisteredResourcesRatio KUBERNETES 模式为 0.8;YARN 模式为 0.8;独立模式和 Mesos 粗粒度模式为 0.0 已注册资源的最小比率(已注册资源 / 预期总资源)(资源在 yarn 模式和 Kubernetes 模式下是执行器,在独立模式和 Mesos 粗粒度模式下是 CPU 内核 ['spark.cores.max' 值是 Mesos 粗粒度模式的预期总资源])在调度开始之前等待。指定为 0.0 到 1.0 之间的双精度数。无论是否已达到最小资源比率,在调度开始之前等待的最大时间都由配置 spark.scheduler.maxRegisteredResourcesWaitingTime 控制。 1.1.1
spark.scheduler.mode FIFO 提交到同一 SparkContext 的作业之间的 调度模式。可以设置为 FAIR 以使用公平共享而不是将作业一个接一个地排队。对多用户服务很有用。 0.8.0
spark.scheduler.revive.interval 1s 调度程序恢复工作程序资源提供以运行任务的间隔长度。 0.8.1
spark.scheduler.listenerbus.eventqueue.capacity 10000 事件队列的默认容量。Spark 将首先尝试使用 spark.scheduler.listenerbus.eventqueue.queueName.capacity 指定的容量初始化事件队列。如果未配置,Spark 将使用此配置指定的默认容量。请注意,容量必须大于 0。如果丢弃了侦听器事件,请考虑增加值(例如 20000)。增加此值可能会导致驱动程序使用更多内存。 2.3.0
spark.scheduler.listenerbus.eventqueue.shared.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark 侦听器总线中共享事件队列的容量,该队列保存注册到侦听器总线的外部侦听器(s)的事件。如果与共享队列相对应的侦听器事件被丢弃,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.listenerbus.eventqueue.appStatus.capacity spark.scheduler.listenerbus.eventqueue.capacity appStatus 事件队列的容量,该队列保存内部应用程序状态侦听器的事件。如果与 appStatus 队列相对应的侦听器事件被丢弃,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark 侦听器总线中 executorManagement 事件队列的容量,该队列保存内部执行器管理侦听器的事件。如果与 executorManagement 队列相对应的侦听器事件被丢弃,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.listenerbus.eventqueue.eventLog.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark 侦听器总线中 eventLog 队列的容量,该队列保存将事件写入 eventLogs 的事件日志侦听器的事件。如果与 eventLog 队列相对应的侦听器事件被丢弃,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.listenerbus.eventqueue.streams.capacity spark.scheduler.listenerbus.eventqueue.capacity Spark 侦听器总线中 streams 队列的容量,该队列保存内部流侦听器的事件。如果与 streams 队列相对应的侦听器事件被丢弃,请考虑增加值。增加此值可能会导致驱动程序使用更多内存。 3.0.0
spark.scheduler.resource.profileMergeConflicts false 如果设置为“true”,Spark 将在将组合成单个阶段的不同 RDD 中指定了不同的配置文件时合并 ResourceProfiles。合并时,Spark 选择每个资源的最大值并创建一个新的 ResourceProfile。默认值为 false,如果在进入同一阶段的 RDD 中发现多个不同的 ResourceProfiles,Spark 将抛出异常。 3.1.0
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout 120s 在放弃无法调度的 TaskSet 之前等待获取新的执行器并调度任务的超时时间(以秒为单位),因为所有执行器由于任务失败而被排除在外。 2.4.1
spark.standalone.submit.waitAppCompletion false 如果设置为 true,Spark 将在将组合成单个阶段的不同 RDD 中指定了不同的配置文件时合并 ResourceProfiles。合并时,Spark 选择每个资源的最大值并创建一个新的 ResourceProfile。默认值为 false,如果在进入同一阶段的 RDD 中发现多个不同的 ResourceProfiles,Spark 将抛出异常。 3.1.0
spark.excludeOnFailure.enabled false 如果设置为“true”,则阻止 Spark 在由于太多任务失败而被排除的执行器上调度任务。用于排除执行器和节点的算法可以通过其他“spark.excludeOnFailure”配置选项进一步控制。 2.1.0
spark.excludeOnFailure.timeout 1h (实验性) 节点或执行器被排除在整个应用程序之外的时间,在它被无条件地从排除列表中删除以尝试运行新任务之前。 2.1.0
spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor 1 (实验性) 对于给定的任务,它可以在一个执行器上重试多少次,然后该执行器被排除在该任务之外。 2.1.0
spark.excludeOnFailure.task.maxTaskAttemptsPerNode 2 (实验性) 对于给定的任务,它可以在一个节点上重试多少次,然后整个节点被排除在该任务之外。 2.1.0
spark.excludeOnFailure.stage.maxFailedTasksPerExecutor 2 (实验性) 在一个阶段内,在一个执行器上必须失败多少个不同的任务,然后该执行器被排除在该阶段之外。 2.1.0
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode 2 (实验性) 对于给定的阶段,有多少个不同的执行器被标记为排除,然后整个节点被标记为该阶段失败。 2.1.0
spark.excludeOnFailure.application.maxFailedTasksPerExecutor 2 (实验性) 在成功的任务集中,在一个执行器上必须失败多少个不同的任务,然后该执行器被排除在整个应用程序之外。在 spark.excludeOnFailure.timeout 指定的超时时间后,被排除的执行器将自动添加到可用资源池中。请注意,使用动态分配,执行器可能会被标记为空闲并被集群管理器回收。 2.2.0
spark.excludeOnFailure.application.maxFailedExecutorsPerNode 2 (实验性) 必须排除多少个不同的执行器才能排除整个应用程序,然后该节点被排除在整个应用程序之外。在 spark.excludeOnFailure.timeout 指定的超时时间后,被排除的节点将自动添加到可用资源池中。请注意,使用动态分配,节点上的执行器可能会被标记为空闲并被集群管理器回收。 2.2.0
spark.excludeOnFailure.killExcludedExecutors false (实验性) 如果设置为“true”,则允许 Spark 在由于获取失败或整个应用程序被排除(由 spark.killExcludedExecutors.application.* 控制)时自动杀死执行器。请注意,当整个节点被排除时,该节点上的所有执行器都将被杀死。 2.2.0
spark.excludeOnFailure.application.fetchFailure.enabled false (实验性) 如果设置为“true”,则 Spark 将在发生获取失败时立即排除执行器。如果启用了外部 shuffle 服务,则整个节点将被排除。 2.3.0
spark.speculation false 如果设置为“true”,则执行任务的推测性执行。这意味着如果一个或多个任务在一个阶段运行缓慢,它们将被重新启动。 0.6.0
spark.speculation.interval 100ms Spark 检查推测任务的频率。 0.6.0
spark.speculation.multiplier 1.5 任务比中位数慢多少倍才能被视为推测。 0.6.0
spark.speculation.quantile 0.75 必须完成的任务比例,才能为特定阶段启用推测。 0.6.0
spark.speculation.minTaskRuntime 100ms 任务在被视为推测之前运行的最小时间。这可以用来避免启动非常短的任务的推测性副本。 3.2.0
spark.speculation.task.duration.threshold 任务持续时间,在此之后调度程序将尝试推测性地运行任务。如果提供,如果当前阶段包含的任务数量小于或等于单个执行器上的插槽数量,并且任务花费的时间超过阈值,则将推测性地运行任务。此配置有助于推测具有很少任务的阶段。如果执行器插槽足够大,则常规推测配置也可能适用。例如,即使未达到阈值,如果执行器插槽足够大,并且有足够多的成功运行,则任务可能会被重新启动。插槽数量是根据 spark.executor.cores 和 spark.task.cpus 的配置值计算的,最小值为 1。默认单位为字节,除非另有说明。 3.0.0
spark.speculation.efficiency.processRateMultiplier 0.75 评估低效任务时使用的乘数。乘数越高,可能被视为低效的任务就越多。 3.4.0
spark.speculation.efficiency.longRunTaskFactor 2 只要任务持续时间超过因子和时间阈值(无论是 spark.speculation.multiplier * successfulTaskDurations.median 还是 spark.speculation.minTaskRuntime)的乘积,无论其数据处理速率是否良好,都会推测任务。这避免了在任务速度与数据处理速率无关时错过低效的任务。 3.4.0
spark.speculation.efficiency.enabled true 设置为 true 时,Spark 将通过阶段任务指标或其持续时间评估任务处理的效率,并且只需要推测低效的任务。当 1) 其数据处理速率小于阶段中所有成功任务的平均数据处理速率乘以乘数,或 2) 其持续时间超过 spark.speculation.efficiency.longRunTaskFactor 和时间阈值(无论是 spark.speculation.multiplier * successfulTaskDurations.median 还是 spark.speculation.minTaskRuntime)的乘积时,任务被认为是低效的。 3.4.0
spark.task.cpus 1 为每个任务分配的内核数量。 0.5.0
spark.task.resource.{resourceName}.amount 1 为每个任务分配的特定资源类型数量,请注意,这可以是双精度数。如果指定了此项,则还必须提供执行器配置 spark.executor.resource.{resourceName}.amount 和任何相应的发现配置,以便创建具有该资源类型的执行器。除了整数量之外,还可以指定小数数量(例如,0.25,表示 1/4 的资源)。小数数量必须小于或等于 0.5,换句话说,资源共享的最小数量是每个资源 2 个任务。此外,小数数量将被向下取整以分配资源插槽(例如,0.2222 的配置,或 1/0.2222 个插槽将变为 4 个任务/资源,而不是 5 个)。 3.0.0
spark.task.maxFailures 4 在放弃作业之前,任何特定任务连续失败的次数。跨不同任务的总失败次数不会导致作业失败;特定任务必须连续失败此次数的尝试。如果任何尝试成功,则该任务的失败计数将被重置。应大于或等于 1。允许的重试次数 = 此值 - 1。 0.8.0
spark.task.reaper.enabled false 启用对已杀死/中断任务的监控。设置为 true 时,任何被杀死的任务都将由执行器监控,直到该任务实际完成执行。有关如何控制此监控的精确行为的详细信息,请参阅其他 spark.task.reaper.* 配置。设置为 false(默认值)时,任务杀死将使用缺乏此类监控的旧代码路径。 2.0.3
spark.task.reaper.pollingInterval 10 秒 spark.task.reaper.enabled = true 时,此设置控制执行器轮询已杀死任务状态的频率。如果在轮询时已杀死任务仍在运行,则会记录警告,并且默认情况下会记录任务的线程转储(可以通过 spark.task.reaper.threadDump 设置禁用此线程转储,该设置将在下面介绍)。 2.0.3
spark.task.reaper.threadDump true spark.task.reaper.enabled = true 时,此设置控制是否在定期轮询已杀死任务期间记录任务线程转储。将其设置为 false 以禁用线程转储的收集。 2.0.3
spark.task.reaper.killTimeout -1 spark.task.reaper.enabled = true 时,此设置指定一个超时时间,在此之后,如果已杀死任务尚未停止运行,则执行器 JVM 将自行杀死。默认值 -1 禁用此机制,并阻止执行器自毁。此设置的目的是充当安全网,以防止失控的不可取消任务使执行器无法使用。 2.0.3
spark.stage.maxConsecutiveAttempts 4 在中止阶段之前允许的连续阶段尝试次数。 2.2.0
spark.stage.ignoreDecommissionFetchFailure false 是否忽略在计数 spark.stage.maxConsecutiveAttempts 时由执行器退役引起的阶段获取失败。 3.4.0

屏障执行模式

属性名称默认值含义自版本
spark.barrier.sync.timeout 365d 来自屏障任务的每个 barrier() 调用的超时时间(以秒为单位)。如果协调器在配置的时间内没有收到来自屏障任务的所有同步消息,则抛出 SparkException 以使所有任务失败。默认值为 31536000(3600 * 24 * 365),因此 barrier() 调用将等待一年。 2.4.0
spark.scheduler.barrier.maxConcurrentTasksCheck.interval 15s 在最大并发任务检查失败和下一次检查之间等待的时间(以秒为单位)。最大并发任务检查确保集群可以启动比屏障阶段在作业提交时所需的并发任务更多。如果集群刚刚启动并且没有足够的执行器注册,则检查可能会失败,因此我们会等待一段时间并尝试再次执行检查。如果检查对于作业失败的次数超过配置的最大失败次数,则使当前作业提交失败。请注意,此配置仅适用于包含一个或多个屏障阶段的作业,我们不会对非屏障作业执行检查。 2.4.0
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures 40 在使作业提交失败之前允许的最大并发任务检查失败次数。最大并发任务检查确保集群可以启动比屏障阶段在作业提交时所需的并发任务更多。如果集群刚刚启动并且没有足够的执行器注册,则检查可能会失败,因此我们会等待一段时间并尝试再次执行检查。如果检查对于作业失败的次数超过配置的最大失败次数,则使当前作业提交失败。请注意,此配置仅适用于包含一个或多个屏障阶段的作业,我们不会对非屏障作业执行检查。 2.4.0

动态分配

属性名称默认值含义自版本
spark.dynamicAllocation.enabled false 是否使用动态资源分配,它根据工作负载动态调整与该应用程序注册的执行器数量。有关更多详细信息,请参阅 此处 的说明。

这需要以下条件之一:1) 通过 spark.shuffle.service.enabled 启用外部 shuffle 服务,或 2) 通过 spark.dynamicAllocation.shuffleTracking.enabled 启用 shuffle 跟踪,或 3) 通过 spark.decommission.enabledspark.storage.decommission.shuffleBlocks.enabled 启用 shuffle 块退役,或 4) (实验性) 配置 spark.shuffle.sort.io.plugin.class 以使用自定义 ShuffleDataIO,其 ShuffleDriverComponents 支持可靠存储。以下配置也相关:spark.dynamicAllocation.minExecutorsspark.dynamicAllocation.maxExecutorsspark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio
1.2.0
spark.dynamicAllocation.executorIdleTimeout 60 秒 如果启用了动态分配,并且执行器空闲时间超过此持续时间,则将删除该执行器。有关更多详细信息,请参阅 此处 的说明。 1.2.0
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 如果启用了动态分配,并且缓存了数据块的执行器空闲时间超过此持续时间,则将删除该执行器。有关更多详细信息,请参阅 此处 的说明。 1.4.0
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 如果启用了动态分配,则要运行的初始执行器数量。

如果设置了 --num-executors(或 spark.executor.instances)并且大于此值,则将用作初始执行器数量。
1.3.0
spark.dynamicAllocation.maxExecutors infinity 如果启用了动态分配,则执行器数量的上限。 1.2.0
spark.dynamicAllocation.minExecutors 0 如果启用了动态分配,则执行器数量的下限。 1.2.0
spark.dynamicAllocation.executorAllocationRatio 1 默认情况下,动态分配将请求足够的执行器以根据要处理的任务数量最大限度地提高并行度。虽然这最大限度地减少了作业的延迟,但对于小型任务,此设置可能会由于执行器分配开销而浪费大量资源,因为某些执行器甚至可能不会执行任何工作。此设置允许设置一个比率,该比率将用于减少与完全并行度相关的执行器数量。默认为 1.0 以提供最大并行度。0.5 将将目标执行器数量除以 2。动态分配计算的目标执行器数量仍然可以被 spark.dynamicAllocation.minExecutorsspark.dynamicAllocation.maxExecutors 设置覆盖。 2.4.0
spark.dynamicAllocation.schedulerBacklogTimeout 1s 如果启用了动态分配,并且待处理任务的积压时间超过此持续时间,则将请求新的执行器。有关更多详细信息,请参阅 此处 的说明。 1.2.0
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout spark.dynamicAllocation.schedulerBacklogTimeout 相同,但仅用于后续的执行器请求。有关更多详细信息,请参阅 此处 的说明。 1.2.0
spark.dynamicAllocation.shuffleTracking.enabled true 为执行器启用 shuffle 文件跟踪,这允许动态分配,而无需外部 shuffle 服务。此选项将尝试使存储活动作业的 shuffle 数据的执行器保持活动状态。 3.0.0
spark.dynamicAllocation.shuffleTracking.timeout infinity 启用 shuffle 跟踪时,控制保存 shuffle 数据的执行器的超时时间。默认值意味着 Spark 将依赖于 shuffle 被垃圾回收才能释放执行器。如果由于某种原因垃圾回收没有足够快地清理 shuffle,则可以使用此选项来控制何时使执行器超时,即使它们正在存储 shuffle 数据。 3.0.0

线程配置

根据作业和集群配置,我们可以在 Spark 的多个地方设置线程数量,以有效利用可用资源,从而获得更好的性能。在 Spark 3.0 之前,这些线程配置适用于 Spark 的所有角色,例如驱动程序、执行器、工作器和主节点。从 Spark 3.0 开始,我们可以更细粒度地配置驱动程序和执行器的线程。以下表格以 RPC 模块为例。对于其他模块,例如 shuffle,只需将属性名称中的“rpc”替换为“shuffle”,但 spark.{driver|executor}.rpc.netty.dispatcher.numThreads 除外,它仅适用于 RPC 模块。

属性名称默认值含义自版本
spark.{driver|executor}.rpc.io.serverThreads 回退到 spark.rpc.io.serverThreads 服务器线程池中使用的线程数量 1.6.0
spark.{driver|executor}.rpc.io.clientThreads 回退到 spark.rpc.io.clientThreads 客户端线程池中使用的线程数量 1.6.0
spark.{driver|executor}.rpc.netty.dispatcher.numThreads 回退到 spark.rpc.netty.dispatcher.numThreads RPC 消息调度程序线程池中使用的线程数量 3.0.0

与线程相关的配置键的默认值是驱动程序或执行器请求的内核数的最小值,或者在没有该值的情况下,JVM 可用的内核数(硬编码上限为 8)。

Spark Connect

服务器配置

服务器配置是在 Spark Connect 服务器中设置的,例如,当您使用 ./sbin/start-connect-server.sh 启动 Spark Connect 服务器时。它们通常通过配置文件和命令行选项使用 --conf/-c 设置。

属性名称默认值含义自版本
spark.connect.grpc.binding.port 15002 Spark Connect 服务器绑定的端口。 3.4.0
spark.connect.grpc.interceptor.classes (无) 必须实现 io.grpc.ServerInterceptor 接口的类名列表,用逗号分隔。 3.4.0
spark.connect.grpc.arrow.maxBatchSize 4 MB 使用 Apache Arrow 时,限制从服务器端发送到客户端端的单个 Arrow 批次的最大大小。目前,我们保守地使用其 70%,因为大小不是准确的,而是估计的。 3.4.0
spark.connect.grpc.maxInboundMessageSize 134217728 设置 gRPC 请求的最大入站消息大小。具有更大有效载荷的请求将失败。 3.4.0
spark.connect.extensions.relation.classes (无) 实现特征 org.apache.spark.sql.connect.plugin.RelationPlugin 的类名列表,以支持 proto 中的自定义关系类型。 3.4.0
spark.connect.extensions.expression.classes (无) 实现特征 org.apache.spark.sql.connect.plugin.ExpressionPlugin 的类名列表,以支持 proto 中的自定义表达式类型。 3.4.0
spark.connect.extensions.command.classes (无) 实现特征 org.apache.spark.sql.connect.plugin.CommandPlugin 的类名列表,以支持 proto 中的自定义命令类型。 3.4.0

安全

有关如何保护不同 Spark 子系统,请参阅 安全 页面。

Spark SQL

运行时 SQL 配置

运行时 SQL 配置是每个会话的、可变的 Spark SQL 配置。它们可以通过配置文件和命令行选项使用 --conf/-c 前缀设置初始值,或者通过设置用于创建 SparkSessionSparkConf 设置。此外,它们可以通过 SET 命令设置和查询,并通过 RESET 命令重置为其初始值,或者通过运行时 SparkSession.conf 的 setter 和 getter 方法设置和查询。

属性名称默认值含义自版本
spark.sql.adaptive.advisoryPartitionSizeInBytes (spark.sql.adaptive.shuffle.targetPostShuffleInputSize 的值)

自适应优化期间(当 spark.sql.adaptive.enabled 为 true 时)shuffle 分区的建议大小(以字节为单位)。当 Spark 合并小的 shuffle 分区或拆分倾斜的 shuffle 分区时,它会生效。

3.0.0
spark.sql.adaptive.autoBroadcastJoinThreshold (无)

配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为 -1 可以禁用广播。默认值与 spark.sql.autoBroadcastJoinThreshold 相同。请注意,此配置仅在自适应框架中使用。

3.2.0
spark.sql.adaptive.coalescePartitions.enabled true

当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 将根据目标大小(由 'spark.sql.adaptive.advisoryPartitionSizeInBytes' 指定)合并连续的 shuffle 分区,以避免出现太多的小任务。

3.0.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (无)

合并之前的初始 shuffle 分区数量。如果未设置,它等于 spark.sql.shuffle.partitions。此配置仅在 'spark.sql.adaptive.enabled' 和 'spark.sql.adaptive.coalescePartitions.enabled' 都为 true 时有效。

3.0.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB

合并后 shuffle 分区的最小大小。当自适应计算的目标大小在分区合并期间过小时,这很有用。

3.2.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true

当为 true 时,Spark 在合并连续的 shuffle 分区时不会遵守 'spark.sql.adaptive.advisoryPartitionSizeInBytes' 指定的目标大小(默认值为 64MB),而是根据 Spark 集群的默认并行度自适应地计算目标大小。计算的大小通常小于配置的目标大小。这是为了最大限度地提高并行度,并在启用自适应查询执行时避免性能下降。建议将此配置设置为 false 并遵守配置的目标大小。

3.2.0
spark.sql.adaptive.customCostEvaluatorClass (无)

用于自适应执行的自定义成本评估器类。如果未设置,Spark 默认情况下将使用自己的 SimpleCostEvaluator。

3.2.0
spark.sql.adaptive.enabled true

当为 true 时,启用自适应查询执行,它根据准确的运行时统计信息在查询执行过程中重新优化查询计划。

1.6.0
spark.sql.adaptive.forceOptimizeSkewedJoin false

当为 true 时,即使它引入了额外的 shuffle,也会强制启用 OptimizeSkewedJoin。

3.3.0
spark.sql.adaptive.localShuffleReader.enabled true

当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 尝试使用本地 shuffle 读取器读取 shuffle 数据,当 shuffle 分区不需要时,例如,在将排序合并连接转换为广播哈希连接之后。

3.0.0
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0b

配置每个分区允许构建本地哈希映射的最大大小(以字节为单位)。如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 并且所有分区大小都不大于此配置,则连接选择优先使用 shuffle 哈希连接而不是排序合并连接,而不管 spark.sql.join.preferSortMergeJoin 的值如何。

3.2.0
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true

当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 将优化 RebalancePartitions 中的倾斜 shuffle 分区,并根据目标大小(由 'spark.sql.adaptive.advisoryPartitionSizeInBytes' 指定)将它们拆分为更小的分区,以避免数据倾斜。

3.2.0
spark.sql.adaptive.optimizer.excludedRules (无)

配置要在自适应优化器中禁用的规则列表,其中规则由其规则名称指定,并用逗号分隔。优化器将记录实际已排除的规则。

3.1.0
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2

如果分区的尺寸小于此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes,则该分区将在拆分期间被合并。

3.3.0
spark.sql.adaptive.skewJoin.enabled true

当为 true 且 'spark.sql.adaptive.enabled' 为 true 时,Spark 通过拆分(并在需要时复制)倾斜分区来动态处理 shuffle 连接(排序合并和 shuffle 哈希)中的倾斜。

3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0

如果分区的尺寸大于此因子乘以中位数分区尺寸,并且也大于 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes',则该分区被视为倾斜。

3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB

如果分区的尺寸(以字节为单位)大于此阈值,并且也大于 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' 乘以中位数分区尺寸,则该分区被视为倾斜。理想情况下,此配置应设置为大于 'spark.sql.adaptive.advisoryPartitionSizeInBytes'。

3.0.0
spark.sql.allowNamedFunctionArguments true

如果为 true,Spark 将为所有已实现命名参数的函数开启对命名参数的支持。

3.5.0
spark.sql.ansi.doubleQuotedIdentifiers false

当为 true 且 'spark.sql.ansi.enabled' 为 true 时,Spark SQL 将双引号 (") 中的文字读取为标识符。当为 false 时,它们被读取为字符串文字。

3.4.0
spark.sql.ansi.enabled false

当为 true 时,Spark SQL 使用 ANSI 兼容方言,而不是 Hive 兼容方言。例如,当 SQL 运算符/函数的输入无效时,Spark 将在运行时抛出异常,而不是返回空结果。有关此方言的完整详细信息,您可以在 Spark 文档的“ANSI 兼容性”部分中找到它们。某些 ANSI 方言功能可能不是直接来自 ANSI SQL 标准,但它们的行为与 ANSI SQL 的风格一致。

3.0.0
spark.sql.ansi.enforceReservedKeywords false

当为 true 且 'spark.sql.ansi.enabled' 为 true 时,Spark SQL 解析器将强制执行 ANSI 保留关键字,并禁止使用保留关键字作为别名和/或标识符(用于表、视图、函数等)的 SQL 查询。

3.3.0
spark.sql.ansi.relationPrecedence false

当为 true 且 'spark.sql.ansi.enabled' 为 true 时,JOIN 在组合关系时优先于逗号。例如,t1, t2 JOIN t3 应产生 t1 X (t2 X t3)。如果配置为 false,则结果为 (t1 X t2) X t3

3.4.0
spark.sql.autoBroadcastJoinThreshold 10MB

配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为 -1 可以禁用广播。请注意,目前仅支持 Hive 元存储表(已运行 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令)和文件数据源表(直接在数据文件上计算统计信息)的统计信息。

1.1.0
spark.sql.avro.compression.codec snappy

写入 AVRO 文件时使用的压缩编解码器。支持的编解码器:uncompressed、deflate、snappy、bzip2、xz 和 zstandard。默认编解码器为 snappy。

2.4.0
spark.sql.avro.deflate.level -1

写入 AVRO 文件时使用的 deflate 编解码器的压缩级别。有效值必须在 1 到 9(含)或 -1 的范围内。默认值为 -1,对应于当前实现中的 6 级。

2.4.0
spark.sql.avro.filterPushdown.enabled true

当为 true 时,启用将过滤器下推到 Avro 数据源。

3.1.0
spark.sql.broadcastTimeout 300

广播连接中广播等待时间的超时时间(以秒为单位)。

1.3.0
spark.sql.bucketing.coalesceBucketsInJoin.enabled false

如果两个具有不同桶数的桶表进行连接,并且此选项设置为 true,则桶数较多的那一方将被合并,使其桶数与另一方相同。较大的桶数应能被较小的桶数整除。桶合并适用于排序合并连接和混洗哈希连接。注意:合并桶表可以避免连接中不必要的混洗,但也会降低并行度,并可能导致混洗哈希连接出现 OOM。

3.1.0
spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio 4

要应用桶合并,两个被合并的桶的桶数之比应小于或等于此值。此配置仅在 'spark.sql.bucketing.coalesceBucketsInJoin.enabled' 设置为 true 时有效。

3.1.0
spark.sql.catalog.spark_catalog (无)

将用作 Spark 内置 v1 目录的 v2 接口的目录实现:spark_catalog。此目录与其标识符命名空间共享,并且必须与其保持一致;例如,如果可以通过 spark_catalog 加载表,则此目录也必须返回表元数据。为了将操作委托给 spark_catalog,实现可以扩展 'CatalogExtension'。

3.0.0
spark.sql.cbo.enabled false

如果设置为 true,则启用 CBO 来估计计划统计信息。

2.2.0
spark.sql.cbo.joinReorder.dp.star.filter false

将星型连接过滤器启发式应用于基于成本的连接枚举。

2.2.0
spark.sql.cbo.joinReorder.dp.threshold 12

动态规划算法中允许的最大连接节点数。

2.2.0
spark.sql.cbo.joinReorder.enabled false

在 CBO 中启用连接重排序。

2.2.0
spark.sql.cbo.planStats.enabled false

如果为 true,则逻辑计划将从目录中获取行数和列统计信息。

3.0.0
spark.sql.cbo.starSchemaDetection false

如果为 true,则启用基于星型模式检测的连接重排序。

2.2.0
spark.sql.charAsVarchar false

如果为 true,Spark 将在 CREATE/REPLACE/ALTER TABLE 命令中用 VARCHAR 类型替换 CHAR 类型,这样新创建/更新的表将不会有 CHAR 类型列/字段。现有具有 CHAR 类型列/字段的表不受此配置的影响。

3.3.0
spark.sql.cli.print.header false

如果设置为 true,则 spark-sql CLI 将在查询输出中打印列的名称。

3.2.0
spark.sql.columnNameOfCorruptRecord _corrupt_record

用于存储无法解析的原始/未解析 JSON 和 CSV 记录的内部列的名称。

1.2.0
spark.sql.csv.filterPushdown.enabled true

如果为 true,则启用对 CSV 数据源的过滤器下推。

3.0.0
spark.sql.datetime.java8API.enabled false

如果配置属性设置为 true,则 Java 8 API 的 java.time.Instant 和 java.time.LocalDate 类将用作 Catalyst 的 TimestampType 和 DateType 的外部类型。如果设置为 false,则 java.sql.Timestamp 和 java.sql.Date 将用于相同目的。

3.0.0
spark.sql.debug.maxToStringFields 25

调试输出中可以转换为字符串的类似序列的条目的最大字段数。超出限制的任何元素都将被删除并替换为 "... N more fields" 占位符。

3.0.0
spark.sql.defaultCatalog spark_catalog

默认目录的名称。如果用户尚未显式设置当前目录,则这将是当前目录。

3.0.0
spark.sql.error.messageFormat PRETTY

如果为 PRETTY,则错误消息将包含错误类、消息和查询上下文的文本表示形式。MINIMAL 和 STANDARD 格式是漂亮的 JSON 格式,其中 STANDARD 包含一个额外的 JSON 字段 message。此配置属性会影响 Thrift Server 和 SQL CLI 在运行查询时显示的错误消息。

3.4.0
spark.sql.execution.arrow.enabled false

(自 Spark 3.0 起已弃用,请设置 'spark.sql.execution.arrow.pyspark.enabled'。)

2.3.0
spark.sql.execution.arrow.fallback.enabled true

(自 Spark 3.0 起已弃用,请设置 'spark.sql.execution.arrow.pyspark.fallback.enabled'。)

2.4.0
spark.sql.execution.arrow.localRelationThreshold 48MB

将 Arrow 批次转换为 Spark DataFrame 时,如果 Arrow 批次的字节大小小于此阈值,则在驱动程序端使用本地集合。否则,Arrow 批次将被发送到执行器并反序列化为 Spark 内部行。

3.4.0
spark.sql.execution.arrow.maxRecordsPerBatch 10000

使用 Apache Arrow 时,限制可以写入内存中的单个 ArrowRecordBatch 的最大记录数。如果设置为零或负数,则没有限制。

2.3.0
spark.sql.execution.arrow.pyspark.enabled (spark.sql.execution.arrow.enabled 的值)

如果为 true,则在 PySpark 中使用 Apache Arrow 进行列式数据传输。此优化适用于:1. pyspark.sql.DataFrame.toPandas。2. pyspark.sql.SparkSession.createDataFrame,当其输入是 Pandas DataFrame 或 NumPy ndarray 时。以下数据类型不受支持:TimestampType 的 ArrayType。

3.0.0
spark.sql.execution.arrow.pyspark.fallback.enabled (spark.sql.execution.arrow.fallback.enabled 的值)

如果为 true,则如果发生错误,则由 'spark.sql.execution.arrow.pyspark.enabled' 启用的优化将自动回退到非优化实现。

3.0.0
spark.sql.execution.arrow.pyspark.selfDestruct.enabled false

(实验性) 如果为 true,则在 PySpark 中使用 Apache Arrow 的自毁和拆分块选项进行列式数据传输,当从 Arrow 转换为 Pandas 时。这减少了内存使用量,但会增加一些 CPU 时间。此优化适用于:pyspark.sql.DataFrame.toPandas,当 'spark.sql.execution.arrow.pyspark.enabled' 设置时。

3.2.0
spark.sql.execution.arrow.sparkr.enabled false

如果为 true,则在 SparkR 中使用 Apache Arrow 进行列式数据传输。此优化适用于:1. createDataFrame,当其输入是 R DataFrame 时 2. collect 3. dapply 4. gapply 以下数据类型不受支持:FloatType、BinaryType、ArrayType、StructType 和 MapType。

3.0.0
spark.sql.execution.pandas.structHandlingMode legacy

创建 pandas DataFrame 时结构类型的转换模式。如果为 "legacy",1. 当 Arrow 优化被禁用时,转换为 Row 对象,2. 当 Arrow 优化被启用时,转换为 dict 或如果存在重复的嵌套字段名称则抛出异常。如果为 "row",则无论 Arrow 优化如何,都转换为 Row 对象。如果为 "dict",则转换为 dict 并使用后缀键名,例如 a_0、a_1,如果存在重复的嵌套字段名称,则无论 Arrow 优化如何。

3.5.0
spark.sql.execution.pandas.udf.buffer.size (spark.buffer.size 的值)

spark.buffer.size 相同,但仅适用于 Pandas UDF 执行。如果未设置,则回退为 spark.buffer.size。请注意,Pandas 执行需要超过 4 个字节。降低此值可能会使小的 Pandas UDF 批次被迭代和流水线化;但是,这可能会降低性能。请参阅 SPARK-27870。

3.0.0
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled true

如果为 true,则简化 Python UDF 的回溯。它隐藏了 PySpark 中的 Python 工作器、(反)序列化等,并且只显示 UDF 中的异常消息。请注意,这仅适用于 CPython 3.7+。

3.1.0
spark.sql.execution.pythonUDF.arrow.enabled false

在常规 Python UDF 中启用 Arrow 优化。此优化仅在给定函数至少接受一个参数时才能启用。

3.4.0
spark.sql.execution.pythonUDTF.arrow.enabled false

为 Python UDTF 启用 Arrow 优化。

3.5.0
spark.sql.execution.topKSortFallbackThreshold 2147483632

在具有 SORT 后跟 LIMIT 的 SQL 查询中,例如 'SELECT x FROM t ORDER BY y LIMIT m',如果 m 小于此阈值,则在内存中执行 top-K 排序,否则执行全局排序,如果需要,则溢出到磁盘。

2.4.0
spark.sql.files.ignoreCorruptFiles false

是否忽略损坏的文件。如果为 true,则 Spark 作业将在遇到损坏的文件时继续运行,并且已读取的内容仍将被返回。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。

2.1.1
spark.sql.files.ignoreMissingFiles false

是否忽略丢失的文件。如果为 true,则 Spark 作业将在遇到丢失的文件时继续运行,并且已读取的内容仍将被返回。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。

2.3.0
spark.sql.files.maxPartitionBytes 128MB

读取文件时打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。

2.0.0
spark.sql.files.maxPartitionNum (无)

建议的(不保证)最大拆分文件分区数。如果设置了它,Spark 将重新调整每个分区的大小,以使分区数接近此值,如果初始分区数超过此值。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。

3.5.0
spark.sql.files.maxRecordsPerFile 0

写入单个文件的最大记录数。如果此值为零或负数,则没有限制。

2.2.0
spark.sql.files.minPartitionNum (无)

建议的(不保证)最小拆分文件分区数。如果未设置,则默认值为 spark.sql.leafNodeDefaultParallelism。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。

3.1.0
spark.sql.function.concatBinaryAsString false

如果此选项设置为 false 并且所有输入都是二进制的,则 functions.concat 将返回二进制输出。否则,它将返回字符串。

2.3.0
spark.sql.function.eltOutputAsString false

如果此选项设置为 false 并且所有输入都是二进制的,则 elt 将返回二进制输出。否则,它将返回字符串。

2.3.0
spark.sql.groupByAliases true

如果为 true,则 select 列表中的别名可以在 group by 子句中使用。如果为 false,则在这种情况发生时会抛出分析异常。

2.2.0
spark.sql.groupByOrdinal true

如果为 true,则 group by 子句中的序数将被视为 select 列表中的位置。如果为 false,则序数将被忽略。

2.0.0
spark.sql.hive.convertInsertingPartitionedTable true

如果设置为 true,并且 spark.sql.hive.convertMetastoreParquetspark.sql.hive.convertMetastoreOrc 为 true,则内置 ORC/Parquet 写入器将用于处理插入使用 HiveSQL 语法创建的分区 ORC/Parquet 表。

3.0.0
spark.sql.hive.convertMetastoreCtas true

如果设置为 true,Spark 将尝试使用内置数据源写入器而不是 Hive serde 在 CTAS 中。此标志仅在分别为 Parquet 和 ORC 格式启用了 spark.sql.hive.convertMetastoreParquetspark.sql.hive.convertMetastoreOrc 时有效。

3.0.0
spark.sql.hive.convertMetastoreInsertDir true

如果设置为 true,Spark 将尝试使用内置数据源写入器而不是 Hive serde 在 INSERT OVERWRITE DIRECTORY 中。此标志仅在分别为 Parquet 和 ORC 格式启用了 spark.sql.hive.convertMetastoreParquetspark.sql.hive.convertMetastoreOrc 时有效。

3.3.0
spark.sql.hive.convertMetastoreOrc true

如果设置为 true,则内置 ORC 读取器和写入器将用于处理使用 HiveQL 语法创建的 ORC 表,而不是 Hive serde。

2.0.0
spark.sql.hive.convertMetastoreParquet true

如果设置为 true,则内置 Parquet 读取器和写入器将用于处理使用 HiveQL 语法创建的 parquet 表,而不是 Hive serde。

1.1.1
spark.sql.hive.convertMetastoreParquet.mergeSchema false

如果为 true,则还尝试合并不同但兼容的 Parquet 架构(可能存在于不同的 Parquet 数据文件中)。此配置仅在 "spark.sql.hive.convertMetastoreParquet" 为 true 时有效。

1.3.1
spark.sql.hive.dropPartitionByName.enabled false

如果为 true,Spark 将获取分区名称而不是分区对象来删除分区,这可以提高删除分区的性能。

3.4.0
spark.sql.hive.filesourcePartitionFileCacheSize 262144000

如果非零,则启用在内存中缓存分区文件元数据。所有表共享一个缓存,该缓存最多可以使用指定的字节数来存储文件元数据。此配置仅在启用了 hive filesource 分区管理时有效。

2.1.1
spark.sql.hive.manageFilesourcePartitions true

如果为 true,则为文件源表启用元存储分区管理。这包括数据源表和转换后的 Hive 表。启用分区管理后,数据源表将分区存储在 Hive 元存储中,并在 spark.sql.hive.metastorePartitionPruning 设置为 true 时使用元存储来修剪查询规划期间的分区。

2.1.1
spark.sql.hive.metastorePartitionPruning true

如果为真,一些谓词将被推送到 Hive 元存储中,以便更早地消除不匹配的分区。

1.5.0
spark.sql.hive.metastorePartitionPruningFallbackOnException false

当从元存储中遇到 MetaException 时,是否回退到从 Hive 元存储中获取所有分区并在 Spark 客户端侧执行分区修剪。请注意,如果启用此选项并且有许多分区要列出,Spark 查询性能可能会下降。如果禁用此选项,Spark 将失败查询。

3.3.0
spark.sql.hive.metastorePartitionPruningFastFallback false

当此配置启用时,如果谓词不受 Hive 支持或 Spark 由于遇到来自元存储的 MetaException 而回退,Spark 将改为通过首先获取分区名称,然后在客户端侧评估过滤器表达式来修剪分区。请注意,具有 TimeZoneAwareExpression 的谓词不受支持。

3.3.0
spark.sql.hive.thriftServer.async true

当设置为 true 时,Hive Thrift 服务器以异步方式执行 SQL 查询。

1.5.0
spark.sql.hive.verifyPartitionPath false

如果为真,则在读取存储在 HDFS 中的数据时检查表根目录下的所有分区路径。此配置将在将来的版本中弃用,并由 spark.files.ignoreMissingFiles 替换。

1.4.0
spark.sql.inMemoryColumnarStorage.batchSize 10000

控制列式缓存的批次大小。较大的批次大小可以提高内存利用率和压缩,但在缓存数据时会冒 OOM 的风险。

1.1.1
spark.sql.inMemoryColumnarStorage.compressed true

当设置为 true 时,Spark SQL 将根据数据的统计信息自动为每列选择压缩编解码器。

1.0.1
spark.sql.inMemoryColumnarStorage.enableVectorizedReader true

为列式缓存启用矢量化读取器。

2.3.1
spark.sql.json.filterPushdown.enabled true

如果为真,则启用对 JSON 数据源的过滤器下推。

3.1.0
spark.sql.jsonGenerator.ignoreNullFields true

在 JSON 数据源和 JSON 函数(如 to_json)中生成 JSON 对象时是否忽略空字段。如果为 false,则在 JSON 对象中为空字段生成 null。

3.0.0
spark.sql.leafNodeDefaultParallelism (无)

生成数据的 Spark SQL 叶节点的默认并行度,例如文件扫描节点、本地数据扫描节点、范围节点等。此配置的默认值为 'SparkContext#defaultParallelism'。

3.2.0
spark.sql.mapKeyDedupPolicy EXCEPTION

在内置函数中对 map 键进行重复数据消除的策略:CreateMap、MapFromArrays、MapFromEntries、StringToMap、MapConcat 和 TransformKeys。当为 EXCEPTION 时,如果检测到重复的 map 键,则查询将失败。当为 LAST_WIN 时,最后插入的 map 键优先。

3.0.0
spark.sql.maven.additionalRemoteRepositories https://maven-central.storage-download.googleapis.com/maven2/

可选的额外远程 Maven 镜像存储库的逗号分隔字符串配置。这仅在默认的 Maven Central 存储库不可访问时用于在 IsolatedClientLoader 中下载 Hive jar 包。

3.0.0
spark.sql.maxMetadataStringLength 100

元数据字符串的输出字符数上限。例如,DataSourceScanExec 中的文件位置,如果超过长度,每个值都将被缩写。

3.1.0
spark.sql.maxPlanStringLength 2147483632

计划字符串的输出字符数上限。如果计划更长,则将截断进一步的输出。默认设置始终生成完整计划。如果计划字符串占用太多内存或在驱动程序或 UI 进程中导致内存不足错误,请将其设置为较低的值,例如 8k。

3.0.0
spark.sql.maxSinglePartitionBytes 9223372036854775807b

单个分区允许的最大字节数。否则,规划器将引入混洗以提高并行度。

3.4.0
spark.sql.optimizer.collapseProjectAlwaysInline false

是否始终折叠两个相邻的投影并内联表达式,即使这会导致额外的重复。

3.3.0
spark.sql.optimizer.dynamicPartitionPruning.enabled true

如果为真,当分区列用作连接键时,我们将生成分区列的谓词。

3.0.0
spark.sql.optimizer.enableCsvExpressionOptimization true

是否在 SQL 优化器中优化 CSV 表达式。它包括从 from_csv 中修剪不必要的列。

3.2.0
spark.sql.optimizer.enableJsonExpressionOptimization true

是否在 SQL 优化器中优化 JSON 表达式。它包括从 from_json 中修剪不必要的列,简化 from_json + to_json,to_json + named_struct(from_json.col1, from_json.col2, ....)。

3.1.0
spark.sql.optimizer.excludedRules (无)

配置要在优化器中禁用的规则列表,其中规则由其规则名称指定,并用逗号分隔。不能保证此配置中的所有规则最终都会被排除,因为某些规则对于正确性是必要的。优化器将记录实际被排除的规则。

2.4.0
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold 10GB

布隆过滤器应用侧计划的聚合扫描大小的字节大小阈值。布隆过滤器应用侧的聚合扫描字节大小需要超过此值才能注入布隆过滤器。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold 10MB

布隆过滤器创建侧计划的大小阈值。估计大小需要低于此值才能尝试注入布隆过滤器。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.enabled true

如果为真,并且混洗连接的一侧具有选择性谓词,我们将尝试在另一侧插入布隆过滤器以减少混洗数据量。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems 1000000

运行时布隆过滤器的预期项目数的默认值。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.maxNumBits 67108864

用于运行时布隆过滤器的最大位数。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.maxNumItems 4000000

运行时布隆过滤器允许的最大预期项目数。

3.3.0
spark.sql.optimizer.runtime.bloomFilter.numBits 8388608

用于运行时布隆过滤器的默认位数。

3.3.0
spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled true

为基于组的逐行操作启用运行时组过滤。替换数据组(例如文件、分区)的数据源可以在规划逐行操作扫描时使用提供的数据源过滤器修剪整个组。但是,这种过滤是有限的,因为并非所有表达式都可以转换为数据源过滤器,并且某些表达式只能由 Spark 评估(例如子查询)。由于重写组很昂贵,Spark 可以运行时执行查询以查找匹配逐行操作条件的记录。有关匹配记录的信息将传递回逐行操作扫描,允许数据源丢弃不需要重写的组。

3.4.0
spark.sql.optimizer.runtimeFilter.number.threshold 10

单个查询注入的运行时过滤器(非 DPP)的总数。这是为了防止驱动程序因太多布隆过滤器而导致内存不足。

3.3.0
spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled false

如果为真,并且混洗连接的一侧具有选择性谓词,我们将尝试在另一侧插入半连接以减少混洗数据量。

3.3.0
spark.sql.orc.aggregatePushdown false

如果为真,聚合将被推送到 ORC 以进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。对于 MIN/MAX,支持布尔值、整数、浮点数和日期类型。对于 COUNT,支持所有数据类型。如果任何 ORC 文件页脚缺少统计信息,将抛出异常。

3.3.0
spark.sql.orc.columnarReaderBatchSize 4096

包含在 orc 矢量化读取器批次中的行数。应仔细选择此数字以最大程度地减少开销并避免在读取数据时出现内存不足错误。

2.4.0
spark.sql.orc.columnarWriterBatchSize 1024

包含在 orc 矢量化写入器批次中的行数。应仔细选择此数字以最大程度地减少开销并避免在写入数据时出现内存不足错误。

3.4.0
spark.sql.orc.compression.codec snappy

设置写入 ORC 文件时使用的压缩编解码器。如果在特定于表的选项/属性中指定了 compressionorc.compress,则优先级将是 compressionorc.compressspark.sql.orc.compression.codec。可接受的值包括:none、uncompressed、snappy、zlib、lzo、zstd、lz4。

2.3.0
spark.sql.orc.enableNestedColumnVectorizedReader true

为嵌套列启用矢量化 orc 解码。

3.2.0
spark.sql.orc.enableVectorizedReader true

启用矢量化 orc 解码。

2.3.0
spark.sql.orc.filterPushdown true

如果为真,则启用对 ORC 文件的过滤器下推。

1.4.0
spark.sql.orc.mergeSchema false

如果为真,Orc 数据源将合并从所有数据文件收集的模式,否则模式将从随机数据文件选择。

3.0.0
spark.sql.orderByOrdinal true

如果为真,则序数将被视为 select 列表中的位置。如果为 false,则 order/sort by 子句中的序数将被忽略。

2.0.0
spark.sql.parquet.aggregatePushdown false

如果为真,聚合将被推送到 Parquet 以进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。对于 MIN/MAX,支持布尔值、整数、浮点数和日期类型。对于 COUNT,支持所有数据类型。如果任何 Parquet 文件页脚缺少统计信息,将抛出异常。

3.3.0
spark.sql.parquet.binaryAsString false

一些其他生成 Parquet 的系统,特别是 Impala 和旧版本的 Spark SQL,在写入 Parquet 模式时不会区分二进制数据和字符串。此标志告诉 Spark SQL 将二进制数据解释为字符串,以提供与这些系统的兼容性。

1.1.1
spark.sql.parquet.columnarReaderBatchSize 4096

包含在 parquet 矢量化读取器批次中的行数。应仔细选择此数字以最大程度地减少开销并避免在读取数据时出现内存不足错误。

2.4.0
spark.sql.parquet.compression.codec snappy

设置写入 Parquet 文件时使用的压缩编解码器。如果在特定于表的选项/属性中指定了 compressionparquet.compression,则优先级将是 compressionparquet.compressionspark.sql.parquet.compression.codec。可接受的值包括:none、uncompressed、snappy、gzip、lzo、brotli、lz4、lz4raw、lz4_raw、zstd。

1.1.1
spark.sql.parquet.enableNestedColumnVectorizedReader true

为嵌套列(例如结构体、列表、映射)启用矢量化 Parquet 解码。需要启用 spark.sql.parquet.enableVectorizedReader。

3.3.0
spark.sql.parquet.enableVectorizedReader true

启用矢量化 parquet 解码。

2.0.0
spark.sql.parquet.fieldId.read.enabled false

字段 ID 是 Parquet 模式规范的本机字段。启用后,Parquet 读取器将使用请求的 Spark 模式中的字段 ID(如果存在)来查找 Parquet 字段,而不是使用列名。

3.3.0
spark.sql.parquet.fieldId.read.ignoreMissing false

当 Parquet 文件没有任何字段 ID,但 Spark 读取模式使用字段 ID 来读取时,如果启用此标志,我们将静默返回 null,否则会报错。

3.3.0
spark.sql.parquet.fieldId.write.enabled true

字段 ID 是 Parquet 模式规范的原生字段。启用后,Parquet 写入器将填充 Spark 模式中的字段 ID 元数据(如果存在)到 Parquet 模式中。

3.3.0
spark.sql.parquet.filterPushdown true

设置为 true 时,启用 Parquet 过滤器下推优化。

1.2.0
spark.sql.parquet.inferTimestampNTZ.enabled true

启用后,在模式推断期间,带有注释 isAdjustedToUTC = false 的 Parquet 时间戳列将被推断为 TIMESTAMP_NTZ 类型。否则,所有 Parquet 时间戳列都将被推断为 TIMESTAMP_LTZ 类型。请注意,Spark 将输出模式写入文件写入时的 Parquet 页脚元数据中,并在文件读取时利用它。因此,此配置仅影响未由 Spark 写入的 Parquet 文件的模式推断。

3.4.0
spark.sql.parquet.int96AsTimestamp true

某些 Parquet 生成系统,特别是 Impala,将时间戳存储为 INT96。Spark 也将时间戳存储为 INT96,因为我们需要避免纳秒字段的精度丢失。此标志告诉 Spark SQL 将 INT96 数据解释为时间戳,以提供与这些系统的兼容性。

1.3.0
spark.sql.parquet.int96TimestampConversion false

这控制在将 INT96 数据转换为时间戳时,是否应将时间戳调整应用于 INT96 数据,用于 Impala 写入的数据。这是必要的,因为 Impala 以与 Hive 和 Spark 不同的时区偏移量存储 INT96 数据。

2.3.0
spark.sql.parquet.mergeSchema false

设置为 true 时,Parquet 数据源会合并从所有数据文件收集的模式,否则模式将从摘要文件或随机数据文件(如果不存在摘要文件)中选取。

1.5.0
spark.sql.parquet.outputTimestampType INT96

设置 Spark 将数据写入 Parquet 文件时要使用的 Parquet 时间戳类型。INT96 是 Parquet 中的一种非标准但常用的时间戳类型。TIMESTAMP_MICROS 是 Parquet 中的标准时间戳类型,它存储自 Unix 纪元以来的微秒数。TIMESTAMP_MILLIS 也是标准的,但精度为毫秒,这意味着 Spark 必须截断其时间戳值的微秒部分。

2.3.0
spark.sql.parquet.recordLevelFilter.enabled false

如果为 true,则启用 Parquet 的本机记录级过滤,使用下推的过滤器。此配置仅在 'spark.sql.parquet.filterPushdown' 启用且未使用矢量化读取器时才有效。可以通过将 'spark.sql.parquet.enableVectorizedReader' 设置为 false 来确保未使用矢量化读取器。

2.3.0
spark.sql.parquet.respectSummaryFiles false

设置为 true 时,我们假设 Parquet 的所有部分文件与摘要文件一致,并且在合并模式时将忽略它们。否则,如果为 false(默认值),我们将合并所有部分文件。这应被视为仅供专家使用的选项,在了解其确切含义之前不应启用。

1.5.0
spark.sql.parquet.writeLegacyFormat false

如果为 true,数据将以 Spark 1.4 及更早版本的方式写入。例如,十进制值将以 Apache Parquet 的固定长度字节数组格式写入,其他系统(如 Apache Hive 和 Apache Impala)使用这种格式。如果为 false,将使用 Parquet 中的较新格式。例如,小数将以基于整数的格式写入。如果 Parquet 输出旨在用于不支持这种较新格式的系统,请设置为 true。

1.6.0
spark.sql.parser.quotedRegexColumnNames false

设置为 true 时,SELECT 语句中的引号标识符(使用反引号)将被解释为正则表达式。

2.3.0
spark.sql.pivotMaxValues 10000

在不为枢轴列指定值的情况下执行枢轴时,这是将在没有错误的情况下收集的(不同)值的最大数量。

1.6.0
spark.sql.pyspark.inferNestedDictAsStruct.enabled false

默认情况下,PySpark 的 SparkSession.createDataFrame 将嵌套字典推断为映射。设置为 true 时,它将嵌套字典推断为结构。

3.3.0
spark.sql.pyspark.jvmStacktrace.enabled false

设置为 true 时,它将在面向用户的 PySpark 异常中显示 JVM 堆栈跟踪以及 Python 堆栈跟踪。默认情况下,它被禁用以隐藏 JVM 堆栈跟踪,并且仅显示面向 Python 的异常。请注意,这与日志级别设置无关。

3.0.0
spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled false

默认情况下,PySpark 的 SparkSession.createDataFrame 从数组中的所有值推断数组的元素类型。如果此配置设置为 true,它将恢复仅从第一个数组元素推断类型的旧行为。

3.4.0
spark.sql.readSideCharPadding true

设置为 true 时,Spark 在读取 CHAR 类型列/字段时应用字符串填充,除了写入端填充之外。此配置默认情况下为 true,以更好地在外部表等情况下强制执行 CHAR 类型语义。

3.4.0
spark.sql.redaction.options.regex (?i)url

用于确定 Spark SQL 命令的选项映射中哪些键包含敏感信息的正则表达式。名称与该正则表达式匹配的选项的值将在解释输出中被删除。此删除应用于由 spark.redaction.regex 定义的全局删除配置之上。

2.2.2
spark.sql.redaction.string.regex (spark.redaction.string.regex 的值)

用于确定 Spark 生成的字符串的哪些部分包含敏感信息的正则表达式。当此正则表达式匹配字符串的一部分时,该字符串部分将被替换为虚拟值。这目前用于删除 SQL 解释命令的输出。当未设置此配置时,将使用 spark.redaction.string.regex 中的值。

2.3.0
spark.sql.repl.eagerEval.enabled false

启用或禁用急切评估。设置为 true 时,仅当 REPL 支持急切评估时,才会显示数据集的前 K 行。目前,PySpark 和 SparkR 支持急切评估。在 PySpark 中,对于 Jupyter 等笔记本,将返回 HTML 表格(由 repr_html 生成)。对于纯 Python REPL,返回的输出格式类似于 dataframe.show()。在 SparkR 中,返回的输出显示类似于 R data.frame。

2.4.0
spark.sql.repl.eagerEval.maxNumRows 20

急切评估返回的行数最大值。这仅在 spark.sql.repl.eagerEval.enabled 设置为 true 时才生效。此配置的有效范围为 0 到 (Int.MaxValue - 1),因此负数和大于 (Int.MaxValue - 1) 的无效配置将被规范化为 0 和 (Int.MaxValue - 1)。

2.4.0
spark.sql.repl.eagerEval.truncate 20

急切评估返回的每个单元格的字符数最大值。这仅在 spark.sql.repl.eagerEval.enabled 设置为 true 时才生效。

2.4.0
spark.sql.session.localRelationCacheThreshold 67108864

在序列化后,在驱动程序端缓存本地关系的大小(以字节为单位)的阈值。

3.5.0
spark.sql.session.timeZone (本地时区的值)

会话本地时区的 ID,格式为基于区域的时区 ID 或时区偏移量。区域 ID 必须采用 'area/city' 的形式,例如 'America/Los_Angeles'。时区偏移量必须采用 '(+|-)HH'、'(+|-)HH:mm' 或 '(+|-)HH:mm:ss' 的格式,例如 '-08'、'+01:00' 或 '-13:33:33'。'UTC' 和 'Z' 也支持作为 '+00:00' 的别名。不建议使用其他简短名称,因为它们可能不明确。

2.2.0
spark.sql.shuffle.partitions 200

在为联接或聚合对数据进行混洗时要使用的分区数的默认值。注意:对于结构化流,此配置在从同一检查点位置重新启动查询之间不可更改。

1.1.0
spark.sql.shuffledHashJoinFactor 3

如果小端数据大小乘以该因子仍然小于大端数据大小,则可以选择混洗哈希联接。

3.3.0
spark.sql.sources.bucketing.autoBucketedScan.enabled true

设置为 true 时,根据查询计划自动决定是否对输入表进行桶扫描。如果 1. 查询没有利用桶的运算符(例如联接、分组等),或者 2. 这些运算符和表扫描之间存在交换运算符,则不要进行桶扫描。请注意,当 'spark.sql.sources.bucketing.enabled' 设置为 false 时,此配置不会生效。

3.1.0
spark.sql.sources.bucketing.enabled true

设置为 false 时,我们将把桶表视为普通表。

2.0.0
spark.sql.sources.bucketing.maxBuckets 100000

允许的最大桶数。

2.4.0
spark.sql.sources.default parquet

在输入/输出中使用的默认数据源。

1.3.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32

在驱动程序端允许列出文件的最大路径数。如果在分区发现期间检测到的路径数超过此值,它将尝试使用另一个 Spark 分布式作业列出文件。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。

1.5.0
spark.sql.sources.partitionColumnTypeInference.enabled true

设置为 true 时,自动推断分区列的数据类型。

1.5.0
spark.sql.sources.partitionOverwriteMode STATIC

在 INSERT OVERWRITE 分区数据源表时,我们目前支持两种模式:静态和动态。在静态模式下,Spark 会在覆盖之前删除与 INSERT 语句中的分区规范(例如 PARTITION(a=1,b))匹配的所有分区。在动态模式下,Spark 不会提前删除分区,只会覆盖在运行时写入数据的那些分区。默认情况下,我们使用静态模式以保持 Spark 在 2.3 之前的行为。请注意,此配置不会影响 Hive serde 表,因为它们始终以动态模式覆盖。这也可以作为数据源的输出选项设置,使用键 partitionOverwriteMode(优先于此设置),例如 dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)。

2.3.0
spark.sql.sources.v2.bucketing.enabled false

与 spark.sql.sources.bucketing.enabled 类似,此配置用于为 V2 数据源启用桶。启用后,Spark 将通过 SupportsReportPartitioning 识别 V2 数据源报告的特定分布,并在必要时尝试避免混洗。

3.3.0
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled false

在存储分区联接期间,是否允许输入分区部分聚类,当联接的两侧都是 KeyGroupedPartitioning 时。在规划时,Spark 将根据表统计信息选择数据大小较小的一侧,对其进行分组和复制以匹配另一侧。这是对倾斜联接的优化,可以帮助减少某些分区分配大量数据时的数据倾斜。此配置要求 spark.sql.sources.v2.bucketing.enabled 和 spark.sql.sources.v2.bucketing.pushPartValues.enabled 都启用。

3.4.0
spark.sql.sources.v2.bucketing.pushPartValues.enabled false

当 spark.sql.sources.v2.bucketing.enabled 启用时,是否要下推公共分区值。启用后,如果联接的两侧都是 KeyGroupedPartitioning,并且它们共享兼容的分区键,即使它们没有完全相同的分区值,Spark 也会计算分区值的超集,并将该信息下推到扫描节点,扫描节点将使用空分区来表示两侧缺少的分区值。这有助于消除不必要的混洗。

3.4.0
spark.sql.statistics.fallBackToHdfs false

设置为 true 时,如果表元数据中没有表统计信息,它将回退到 HDFS。这在确定表是否足够小以使用广播联接时很有用。此标志仅对非分区 Hive 表有效。对于非分区数据源表,如果表统计信息不可用,它将自动重新计算。对于分区数据源和分区 Hive 表,如果表统计信息不可用,它将是 'spark.sql.defaultSizeInBytes'。

2.0.0
spark.sql.statistics.histogram.enabled false

如果启用,在计算列统计信息时生成直方图。直方图可以提供更好的估计精度。目前,Spark 仅支持等高直方图。请注意,收集直方图需要额外的成本。例如,收集列统计信息通常只需要一次表扫描,但生成等高直方图会导致额外的表扫描。

2.3.0
spark.sql.statistics.size.autoUpdate.enabled false

启用对表大小的自动更新,一旦表的數據发生变化。请注意,如果表的总文件数非常大,这可能会很昂贵,并会减慢数据更改命令的速度。

2.3.0
spark.sql.storeAssignmentPolicy ANSI

当将值插入到具有不同数据类型的列时,Spark 将执行类型强制转换。目前,我们支持三种类型强制转换规则策略:ANSI、legacy 和 strict。使用 ANSI 策略,Spark 按照 ANSI SQL 执行类型强制转换。在实践中,行为与 PostgreSQL 大致相同。它不允许某些不合理的类型转换,例如将 string 转换为 intdouble 转换为 boolean。使用 legacy 策略,Spark 允许类型强制转换,只要它是有效的 Cast,这非常宽松。例如,允许将 string 转换为 intdouble 转换为 boolean。这也是 Spark 2.x 中唯一的行为,并且与 Hive 兼容。使用 strict 策略,Spark 不允许类型强制转换中任何可能的精度损失或数据截断,例如,不允许将 double 转换为 intdecimal 转换为 double

3.0.0
spark.sql.streaming.checkpointLocation (无)

用于存储流查询检查点数据的默认位置。

2.0.0
spark.sql.streaming.continuous.epochBacklogQueueSize 10000

要存储在队列中以等待延迟纪元的最大条目数。如果队列的大小超过了此参数,流将停止并出现错误。

3.0.0
spark.sql.streaming.disabledV2Writers

禁用 StreamWriteSupport 的完全限定数据源注册类名的逗号分隔列表。写入这些源将回退到 V1 接收器。

2.3.1
spark.sql.streaming.fileSource.cleaner.numThreads 1

文件源已完成文件清理器中使用的线程数。

3.0.0
spark.sql.streaming.forceDeleteTempCheckpointLocation false

当为真时,启用临时检查点位置强制删除。

3.0.0
spark.sql.streaming.metricsEnabled false

是否为活动流查询报告 Dropwizard/Codahale 指标。

2.0.2
spark.sql.streaming.multipleWatermarkPolicy min

当流查询中存在多个水印操作符时,用于计算全局水印值的策略。默认值为“min”,它选择跨多个操作符报告的最小水印。另一个备选值为“max”,它选择跨多个操作符的最大值。注意:此配置在从同一检查点位置重新启动查询之间不能更改。

2.4.0
spark.sql.streaming.noDataMicroBatches.enabled true

流微批引擎是否将在没有数据的情况下执行批次,以对有状态流查询进行急切状态管理。

2.4.1
spark.sql.streaming.numRecentProgressUpdates 100

要为流查询保留的进度更新数量

2.1.1
spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition false

当为真时,流会话窗口在混洗之前对本地分区中的会话进行排序和合并。这样做是为了减少要混洗的行,但只有在将大量行分配到同一会话的批次中时才有效。

3.2.0
spark.sql.streaming.stateStore.stateSchemaCheck true

当为真时,Spark 将验证状态模式与现有状态上的模式,如果它不兼容,则会使查询失败。

3.1.0
spark.sql.streaming.stopActiveRunOnRestart true

不支持同时运行同一流查询的多个运行。如果我们发现同一流查询(在同一集群上的相同或不同的 SparkSession 中)的并发活动运行,并且此标志为真,我们将停止旧的流查询运行以启动新的运行。

3.0.0
spark.sql.streaming.stopTimeout 0

在调用流查询的 stop() 方法时,等待流执行线程停止的毫秒数。0 或负值无限期等待。

3.0.0
spark.sql.thriftServer.interruptOnCancel true

当为真时,如果一个取消了查询,所有正在运行的任务都将被中断。当为假时,所有正在运行的任务都将保留到完成为止。

3.2.0
spark.sql.thriftServer.queryTimeout 0ms

在 Thrift Server 中设置查询持续时间超时(以秒为单位)。如果超时设置为正值,则运行的查询将在超时超过时自动取消,否则查询将继续运行直到完成。如果通过 java.sql.Statement.setQueryTimeout 为每个语句设置超时值,并且它们小于此配置值,则它们优先。如果您设置了此超时,并且希望立即取消查询,而不等待任务完成,请考虑同时启用 spark.sql.thriftServer.interruptOnCancel。

3.1.0
spark.sql.thriftserver.scheduler.pool (无)

为 JDBC 客户端会话设置公平调度程序池。

1.1.1
spark.sql.thriftserver.ui.retainedSessions 200

JDBC/ODBC Web UI 历史记录中保留的 SQL 客户端会话数。

1.4.0
spark.sql.thriftserver.ui.retainedStatements 200

JDBC/ODBC Web UI 历史记录中保留的 SQL 语句数。

1.4.0
spark.sql.timestampType TIMESTAMP_LTZ

配置 Spark SQL 的默认时间戳类型,包括 SQL DDL、Cast 子句、类型文字和数据源的模式推断。将配置设置为 TIMESTAMP_NTZ 将使用 TIMESTAMP WITHOUT TIME ZONE 作为默认类型,而将其设置为 TIMESTAMP_LTZ 将使用 TIMESTAMP WITH LOCAL TIME ZONE。在 3.4.0 版本之前,Spark 仅支持 TIMESTAMP WITH LOCAL TIME ZONE 类型。

3.4.0
spark.sql.tvf.allowMultipleTableArguments.enabled false

当为真时,允许表值函数使用多个表参数,接收这些表的行的笛卡尔积。

3.5.0
spark.sql.ui.explainMode formatted

配置 Spark SQL UI 中使用的查询解释模式。该值可以是“simple”、“extended”、“codegen”、“cost”或“formatted”。默认值为“formatted”。

3.1.0
spark.sql.variable.substitute true

这将启用使用 ${var}${system:var}${env:var} 等语法进行替换。

2.0.0

静态 SQL 配置

静态 SQL 配置是跨会话的、不可变的 Spark SQL 配置。它们可以通过配置文件和命令行选项(以 --conf/-c 为前缀)设置最终值,或者通过设置用于创建 SparkSessionSparkConf 设置。外部用户可以通过 SparkSession.conf 或通过 set 命令查询静态 sql 配置值,例如 SET spark.sql.extensions;,但不能设置/取消设置它们。

属性名称默认值含义自版本
spark.sql.cache.serializer org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer

实现 org.apache.spark.sql.columnar.CachedBatchSerializer 的类的名称。它将用于将 SQL 数据转换为可以更有效地缓存的格式。底层 API 可能会发生变化,因此请谨慎使用。不能指定多个类。该类必须具有无参数构造函数。

3.1.0
spark.sql.catalog.spark_catalog.defaultDatabase default

会话目录的默认数据库。

3.4.0
spark.sql.event.truncate.length 2147483647

SQL 长度的阈值,超过该阈值,它将在添加到事件之前被截断。默认情况下不截断。如果设置为 0,则会记录调用站点。

3.0.0
spark.sql.extensions (无)

实现 Function1[SparkSessionExtensions, Unit] 的类的逗号分隔列表,用于配置 Spark 会话扩展。这些类必须具有无参数构造函数。如果指定了多个扩展,则它们将按指定的顺序应用。对于规则和规划器策略,它们将按指定的顺序应用。对于解析器,将使用最后一个解析器,并且每个解析器都可以委托给其前一个解析器。对于函数名称冲突,将使用最后注册的函数名称。

2.2.0
spark.sql.hive.metastore.barrierPrefixes

应为 Spark SQL 与之通信的每个版本的 Hive 显式重新加载的类前缀的逗号分隔列表。例如,在通常应共享的前缀(即 org.apache.spark.*)中声明的 Hive UDF。

1.4.0
spark.sql.hive.metastore.jars builtin

应用于实例化 HiveMetastoreClient 的 jar 的位置。此属性可以是以下四个选项之一:1. “builtin” 使用 Hive 2.3.9,它在启用 -Phive 时与 Spark 程序集捆绑在一起。选择此选项时,spark.sql.hive.metastore.version 必须为 2.3.9 或未定义。2. “maven” 使用从 Maven 存储库下载的指定版本的 Hive jar。3. “path” 使用由 spark.sql.hive.metastore.jars.path 以逗号分隔的格式配置的 Hive jar。支持本地或远程路径。提供的 jar 应与 spark.sql.hive.metastore.version 版本相同。4. Hive 和 Hadoop 的标准格式的类路径。提供的 jar 应与 spark.sql.hive.metastore.version 版本相同。

1.4.0
spark.sql.hive.metastore.jars.path

用于实例化 HiveMetastoreClient 的 jar 的逗号分隔路径。仅当 spark.sql.hive.metastore.jars 设置为 path 时,此配置才有效。路径可以是以下任何格式:1. file://path/to/jar/foo.jar 2. hdfs://nameservice/path/to/jar/foo.jar 3. /path/to/jar/(没有 URI 方案的路径遵循 conf fs.defaultFS 的 URI 方案)4. [http/https/ftp]://path/to/jar/foo.jar 请注意,1、2 和 3 支持通配符。例如:1. file://path/to/jar/,file://path2/to/jar//.jar 2. hdfs://nameservice/path/to/jar/,hdfs://nameservice2/path/to/jar//.jar

3.1.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc

应使用 Spark SQL 和特定版本的 Hive 之间共享的类加载器加载的类前缀的逗号分隔列表。需要共享的类的示例是与元存储通信所需的 JDBC 驱动程序。需要共享的其他类是那些与已经共享的类交互的类。例如,由 log4j 使用的自定义追加器。

1.4.0
spark.sql.hive.metastore.version 2.3.9

Hive 元存储的版本。可用选项是 0.12.02.3.9 以及 3.0.03.1.3

1.4.0
spark.sql.hive.thriftServer.singleSession false

当设置为 true 时,Hive Thrift 服务器以单会话模式运行。所有 JDBC/ODBC 连接共享临时视图、函数注册表、SQL 配置和当前数据库。

1.6.0
spark.sql.hive.version 2.3.9

与 Spark 发行版捆绑在一起的已编译(也称为内置)Hive 版本。请注意,这是一个只读配置,仅用于报告内置的 hive 版本。如果您希望 Spark 使用不同的元存储客户端,请参考 spark.sql.hive.metastore.version。

1.1.1
spark.sql.metadataCacheTTLSeconds -1000ms

元数据缓存的生存时间 (TTL) 值:分区文件元数据缓存和会话目录缓存。此配置仅在该值具有正值 (> 0) 时才有效。它还需要将“spark.sql.catalogImplementation”设置为 hive,将“spark.sql.hive.filesourcePartitionFileCacheSize”设置为 > 0,并将“spark.sql.hive.manageFilesourcePartitions”设置为 true 以应用于分区文件元数据缓存。

3.1.0
spark.sql.queryExecutionListeners (无)

实现 QueryExecutionListener 的类名列表,这些类名将自动添加到新创建的会话中。这些类应该具有无参数构造函数,或者具有期望 SparkConf 参数的构造函数。

2.3.0
spark.sql.sources.disabledJdbcConnProviderList

配置被禁用的 JDBC 连接提供程序列表。该列表包含以逗号分隔的 JDBC 连接提供程序的名称。

3.1.0
spark.sql.streaming.streamingQueryListeners (无)

实现 StreamingQueryListener 的类名列表,这些类名将自动添加到新创建的会话中。这些类应该具有无参数构造函数,或者具有期望 SparkConf 参数的构造函数。

2.4.0
spark.sql.streaming.ui.enabled true

当 Spark Web UI 启用时,是否为 Spark 应用程序运行结构化流 Web UI。

3.0.0
spark.sql.streaming.ui.retainedProgressUpdates 100

用于结构化流 UI 的流式查询保留的进度更新数量。

3.0.0
spark.sql.streaming.ui.retainedQueries 100

用于结构化流 UI 保留的非活动查询数量。

3.0.0
spark.sql.ui.retainedExecutions 1000

在 Spark UI 中保留的执行次数。

1.5.0
spark.sql.warehouse.dir ($PWD/spark-warehouse 的值)

托管数据库和表的默认位置。

2.0.0

Spark Streaming

属性名称默认值含义自版本
spark.streaming.backpressure.enabled false 启用或禁用 Spark Streaming 的内部反压机制(从 1.5 版本开始)。这使 Spark Streaming 能够根据当前批处理调度延迟和处理时间来控制接收速率,以便系统仅以系统能够处理的速度接收数据。在内部,这会动态设置接收器的最大接收速率。如果设置了这些值(见下文),则此速率将受到 spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition 值的上限限制。 1.5.0
spark.streaming.backpressure.initialRate 未设置 这是启用反压机制时,每个接收器在第一批处理中接收数据的初始最大接收速率。 2.0.0
spark.streaming.blockInterval 200ms Spark Streaming 接收器接收到的数据被分成数据块的间隔,然后将这些数据块存储在 Spark 中。建议的最小值 - 50 毫秒。有关更多详细信息,请参阅 Spark Streaming 编程指南中的 性能调优 部分。 0.8.0
spark.streaming.receiver.maxRate 未设置 每个接收器接收数据的最大速率(每秒记录数)。实际上,每个流每秒最多会消耗这么多记录。将此配置设置为 0 或负数将不会限制速率。有关更多详细信息,请参阅 Spark Streaming 编程指南中的 部署指南 1.0.2
spark.streaming.receiver.writeAheadLog.enable false 为接收器启用预写日志。通过接收器接收的所有输入数据都将保存到预写日志中,这将允许在驱动程序故障后恢复数据。有关更多详细信息,请参阅 Spark Streaming 编程指南中的 部署指南 1.2.1
spark.streaming.unpersist true 强制 Spark Streaming 生成的并持久化的 RDD 自动从 Spark 的内存中取消持久化。Spark Streaming 接收的原始输入数据也会自动清除。将其设置为 false 将允许在流式应用程序之外访问原始数据和持久化的 RDD,因为它们不会被自动清除。但这会以 Spark 占用更多内存为代价。 0.9.0
spark.streaming.stopGracefullyOnShutdown false 如果为 true,则 Spark 会在 JVM 关闭时优雅地关闭 StreamingContext,而不是立即关闭。 1.4.0
spark.streaming.kafka.maxRatePerPartition 未设置 使用新的 Kafka 直接流 API 时,从每个 Kafka 分区读取数据的最大速率(每秒记录数)。有关更多详细信息,请参阅 Kafka 集成指南 1.3.0
spark.streaming.kafka.minRatePerPartition 1 使用新的 Kafka 直接流 API 时,从每个 Kafka 分区读取数据的最小速率(每秒记录数)。 2.4.0
spark.streaming.ui.retainedBatches 1000 Spark Streaming UI 和状态 API 在垃圾回收之前记住的批次数量。 1.0.0
spark.streaming.driver.writeAheadLog.closeFileAfterWrite false 在驱动程序上写入预写日志记录后是否关闭文件。当您想将 S3(或任何不支持刷新的文件系统)用于驱动程序上的元数据 WAL 时,将此设置为 'true'。 1.6.0
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite false 在接收器上写入预写日志记录后是否关闭文件。当您想将 S3(或任何不支持刷新的文件系统)用于接收器上的数据 WAL 时,将此设置为 'true'。 1.6.0

SparkR

属性名称默认值含义自版本
spark.r.numRBackendThreads 2 RBackend 用于处理来自 SparkR 包的 RPC 调用的线程数。 1.4.0
spark.r.command Rscript 用于在集群模式下为驱动程序和工作程序执行 R 脚本的可执行文件。 1.5.3
spark.r.driver.command spark.r.command 用于在客户端模式下为驱动程序执行 R 脚本的可执行文件。在集群模式下被忽略。 1.5.3
spark.r.shell.command R 用于在客户端模式下为驱动程序执行 sparkR shell 的可执行文件。在集群模式下被忽略。它与环境变量 SPARKR_DRIVER_R 相同,但优先于它。spark.r.shell.command 用于 sparkR shell,而 spark.r.driver.command 用于运行 R 脚本。 2.1.0
spark.r.backendConnectionTimeout 6000 R 进程在其与 RBackend 的连接上设置的连接超时时间(以秒为单位)。 2.1.0
spark.r.heartBeatInterval 100 从 SparkR 后端到 R 进程发送心跳的间隔,以防止连接超时。 2.1.0

GraphX

属性名称默认值含义自版本
spark.graphx.pregel.checkpointInterval -1 Pregel 中图形和消息的检查点间隔。它用于避免在大量迭代后由于长血统链而导致的 stackOverflowError。默认情况下,检查点被禁用。 2.2.0

部署

属性名称默认值含义自版本
spark.deploy.recoveryMode NONE 恢复模式设置,用于在集群模式下提交的 Spark 作业失败并重新启动时恢复作业。这仅适用于在使用独立模式或 Mesos 运行时使用集群模式。 0.8.1
spark.deploy.zookeeper.url spark.deploy.recoveryMode 设置为 ZOOKEEPER 时,此配置用于设置要连接的 ZooKeeper URL。 0.8.1
spark.deploy.zookeeper.dir spark.deploy.recoveryMode 设置为 ZOOKEEPER 时,此配置用于设置用于存储恢复状态的 ZooKeeper 目录。 0.8.1

集群管理器

Spark 中的每个集群管理器都有额外的配置选项。可以在每个模式的页面上找到配置。

YARN

Mesos

Kubernetes

独立模式

环境变量

某些 Spark 设置可以通过环境变量进行配置,这些环境变量从 Spark 安装目录中的 conf/spark-env.sh 脚本(或 Windows 上的 conf/spark-env.cmd)读取。在独立模式和 Mesos 模式下,此文件可以提供特定于机器的信息,例如主机名。在运行本地 Spark 应用程序或提交脚本时,也会引用它。

请注意,默认情况下,Spark 安装时 conf/spark-env.sh 不存在。但是,您可以复制 conf/spark-env.sh.template 来创建它。确保您使副本可执行。

可以在 spark-env.sh 中设置以下变量

环境变量含义
JAVA_HOME 安装 Java 的位置(如果它不在您的默认 PATH 中)。
PYSPARK_PYTHON 用于 PySpark 的 Python 二进制可执行文件,在驱动程序和工作程序中使用(默认情况下,如果可用,则为 python3,否则为 python)。如果设置了属性 spark.pyspark.python,则优先使用它。
PYSPARK_DRIVER_PYTHON 仅用于 PySpark 驱动程序的 Python 二进制可执行文件(默认情况下为 PYSPARK_PYTHON)。如果设置了属性 spark.pyspark.driver.python,则优先使用它。
SPARKR_DRIVER_R 用于 SparkR shell 的 R 二进制可执行文件(默认情况下为 R)。如果设置了属性 spark.r.shell.command,则优先使用它。
SPARK_LOCAL_IP 要绑定的机器的 IP 地址。
SPARK_PUBLIC_DNS 您的 Spark 程序将向其他机器宣传的主机名。

除了上述内容之外,还有用于设置 Spark 独立集群脚本 的选项,例如每台机器上要使用的核心数和最大内存。

由于 spark-env.sh 是一个 shell 脚本,因此其中一些可以以编程方式设置 - 例如,您可以通过查找特定网络接口的 IP 来计算 SPARK_LOCAL_IP

注意:在 YARN 上以 cluster 模式运行 Spark 时,需要使用 conf/spark-defaults.conf 文件中的 spark.yarn.appMasterEnv.[EnvironmentVariableName] 属性设置环境变量。在 cluster 模式下,在 spark-env.sh 中设置的环境变量不会反映在 YARN 应用程序主进程中。有关更多信息,请参阅 与 YARN 相关的 Spark 属性

配置日志记录

Spark 使用 log4j 进行日志记录。您可以通过在 conf 目录中添加 log4j2.properties 文件来配置它。一种方法是复制位于那里的现有 log4j2.properties.template

默认情况下,Spark 向 MDC(映射诊断上下文)添加 1 条记录:mdc.taskName,它显示类似于 task 1.0 in stage 0.0 的内容。您可以将 %X{mdc.taskName} 添加到您的 patternLayout 中,以便将其打印在日志中。此外,您可以使用 spark.sparkContext.setLocalProperty(s"mdc.$name", "value") 将用户特定数据添加到 MDC 中。MDC 中的键将是 “mdc.$name” 的字符串。

覆盖配置目录

要指定与默认的 “SPARK_HOME/conf” 不同的配置目录,您可以设置 SPARK_CONF_DIR。Spark 将使用来自此目录的配置文件(spark-defaults.conf、spark-env.sh、log4j2.properties 等)。

继承 Hadoop 集群配置

如果您计划使用 Spark 从 HDFS 读取和写入数据,则应该将两个 Hadoop 配置文件包含在 Spark 的类路径中

这些配置文件的位置在不同的 Hadoop 版本中有所不同,但常见的位置是在 /etc/hadoop/conf 中。某些工具会动态创建配置,但提供下载其副本的机制。

要使这些文件对 Spark 可见,请在 $SPARK_HOME/conf/spark-env.sh 中将 HADOOP_CONF_DIR 设置为包含配置文件的位置。

自定义 Hadoop/Hive 配置

如果您的 Spark 应用程序正在与 Hadoop、Hive 或两者交互,则 Spark 的类路径中可能存在 Hadoop/Hive 配置文件。

多个正在运行的应用程序可能需要不同的 Hadoop/Hive 客户端配置。您可以为每个应用程序复制和修改 Spark 类路径中的 hdfs-site.xmlcore-site.xmlyarn-site.xmlhive-site.xml。在 YARN 上运行的 Spark 集群中,这些配置文件是在集群范围内设置的,应用程序无法安全地更改它们。

更好的选择是使用 spark.hadoop.* 形式的 Spark Hadoop 属性,以及使用 spark.hive.* 形式的 Spark Hive 属性。例如,添加配置 “spark.hadoop.abc.def=xyz” 表示添加 Hadoop 属性 “abc.def=xyz”,添加配置 “spark.hive.abc=xyz” 表示添加 Hive 属性 “hive.abc=xyz”。它们可以被认为与可以在 $SPARK_HOME/conf/spark-defaults.conf 中设置的普通 Spark 属性相同。

在某些情况下,您可能希望避免在 SparkConf 中硬编码某些配置。例如,Spark 允许您简单地创建一个空的 conf 并设置 Spark/Spark Hadoop/Spark Hive 属性。

val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)

此外,您可以在运行时修改或添加配置。

./bin/spark-submit \
  --name "My app" \
  --master local[4] \
  --conf spark.eventLog.enabled=false \
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
  --conf spark.hadoop.abc.def=xyz \
  --conf spark.hive.abc=xyz
  myApp.jar

自定义资源调度和配置概述

GPU 和其他加速器已被广泛用于加速特殊工作负载,例如深度学习和信号处理。Spark 现在支持请求和调度通用资源,例如 GPU,但有一些注意事项。当前的实现要求资源具有可以由调度程序分配的地址。它要求您的集群管理器支持并正确配置资源。

可以使用以下配置请求驱动程序资源:spark.driver.resource.{resourceName}.amount,请求执行器资源:spark.executor.resource.{resourceName}.amount,并指定每个任务的要求:spark.task.resource.{resourceName}.amountspark.driver.resource.{resourceName}.discoveryScript 配置在 YARN、Kubernetes 和 Spark Standalone 上的客户端驱动程序上是必需的。spark.executor.resource.{resourceName}.discoveryScript 配置在 YARN 和 Kubernetes 上是必需的。Kubernetes 还需要 spark.driver.resource.{resourceName}.vendor 和/或 spark.executor.resource.{resourceName}.vendor。有关每个配置的更多信息,请参阅上面的配置说明。

Spark 将使用指定的配置首先从集群管理器请求具有相应资源的容器。获得容器后,Spark 在该容器中启动一个执行器,该执行器将发现容器拥有的资源以及与每个资源关联的地址。执行器将向驱动程序注册并报告该执行器可用的资源。然后,Spark 调度程序可以将任务调度到每个执行器,并根据用户指定的资源要求分配特定的资源地址。用户可以使用 TaskContext.get().resources api 查看分配给任务的资源。在驱动程序上,用户可以使用 SparkContext resources 调用查看分配的资源。然后,用户可以使用分配的地址进行他们想要的处理,或者将这些地址传递到他们正在使用的 ML/AI 框架中。

有关每个配置的具体要求和详细信息,请参阅您的集群管理器特定页面 - YARNKubernetes独立模式。目前,Mesos 或本地模式不支持此功能。另外请注意,不支持具有多个工作节点的本地集群模式(请参阅独立文档)。

阶段级调度概述

阶段级调度功能允许用户在阶段级指定任务和执行器资源要求。这允许不同的阶段使用具有不同资源的执行器运行。一个典型的例子是,一个 ETL 阶段使用仅具有 CPU 的执行器运行,下一个阶段是需要 GPU 的 ML 阶段。阶段级调度允许用户在 ML 阶段运行时请求具有 GPU 的不同执行器,而不是在应用程序开始时就获取具有 GPU 的执行器,并在 ETL 阶段运行时处于空闲状态。此功能仅适用于 Scala、Java 和 Python 中的 RDD API。当启用动态分配时,它在 YARN、Kubernetes 和独立模式下可用。当禁用动态分配时,它允许用户在阶段级指定不同的任务资源要求,并且目前在 YARN、Kubernetes 和独立集群上受支持。有关更多实现细节,请参阅 YARN 页面或 Kubernetes 页面或 独立 页面。

有关如何使用此功能,请参阅 RDD.withResourcesResourceProfileBuilder API。当禁用动态分配时,具有不同任务资源要求的任务将与 DEFAULT_RESOURCE_PROFILE 共享执行器。而当启用动态分配时,当前实现会为每个创建的 ResourceProfile 获取新的执行器,并且目前必须完全匹配。Spark 不会尝试将任务放入与执行器创建时使用的 ResourceProfile 不同的执行器中。未使用的执行器将使用动态分配逻辑超时空闲。此功能的默认配置是每个阶段只允许一个 ResourceProfile。如果用户将多个 ResourceProfile 与 RDD 关联,Spark 默认情况下会抛出异常。请参阅配置 spark.scheduler.resource.profileMergeConflicts 以控制该行为。当启用 spark.scheduler.resource.profileMergeConflicts 时,Spark 实现的当前合并策略是对冲突的 ResourceProfile 中每个资源的简单最大值。Spark 将创建一个新的 ResourceProfile,其中包含每个资源的最大值。

基于推送的混洗概述

基于推送的混洗有助于提高 Spark 混洗的可靠性和性能。它采用了一种尽力而为的方法,将映射任务生成的混洗块推送到远程外部混洗服务,以便按每个混洗分区进行合并。归约任务获取合并的混洗分区和原始混洗块的组合作为其输入数据,从而将外部混洗服务的小随机磁盘读取转换为大顺序读取。归约任务可能具有更好的数据局部性,这也有助于最大限度地减少网络 I/O。在某些情况下,基于推送的混洗优先于批量获取,例如,当合并的输出可用时,分区合并。

基于推送的混洗提高了长时间运行的作业/查询的性能,这些作业/查询在混洗期间涉及大量磁盘 I/O。目前,它不适合快速运行的作业/查询,这些作业/查询处理的混洗数据量较少。这将在未来的版本中得到进一步改进。

目前,基于推送的混洗仅支持在 YARN 上运行的 Spark,并使用外部混洗服务。

外部混洗服务(服务器)端配置选项

属性名称默认值含义自版本
spark.shuffle.push.server.mergedShuffleFileManagerImpl org.apache.spark.network.shuffle.
NoOpMergedShuffleFileManager
管理基于推送的混洗的 MergedShuffleFileManager 实现的类名。这充当服务器端配置,用于禁用或启用基于推送的混洗。默认情况下,服务器端禁用了基于推送的混洗。

要在服务器端启用基于推送的混洗,请将此配置设置为 org.apache.spark.network.shuffle.RemoteBlockPushResolver

3.2.0
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile 2 MB

在基于推送的混洗期间将合并的混洗文件划分为多个块时,块的最小大小。合并的混洗文件由多个小的混洗块组成。在一次磁盘 I/O 中获取完整的合并的混洗文件会增加客户端和外部混洗服务的内存要求。相反,外部混洗服务以 MB 大小的块 提供合并的文件。
此配置控制块的大小。将为每个合并的混洗文件生成一个相应的索引文件,指示块边界。

设置过高会导致客户端和外部混洗服务的内存要求增加。

设置过低会导致不必要地增加对外部混洗服务的 RPC 请求总数。

3.2.0
spark.shuffle.push.server.mergedIndexCacheSize 100m 内存中缓存的最大大小,该缓存可用于基于推送的混洗,用于存储合并的索引文件。此缓存是通过 spark.shuffle.service.index.cache.size 配置的缓存的补充。 3.2.0

客户端配置选项

属性名称默认值含义自版本
spark.shuffle.push.enabled false 设置为 true 以在客户端启用基于推送的混洗,并与服务器端标志 spark.shuffle.push.server.mergedShuffleFileManagerImpl 协同工作。 3.2.0
spark.shuffle.push.finalize.timeout 10 秒 驱动程序在所有映射器完成给定混洗映射阶段后等待的秒数,然后它向远程外部混洗服务发送合并完成请求。这为外部混洗服务提供了额外的合并块的时间。设置过长可能会导致性能下降。 3.2.0
spark.shuffle.push.maxRetainedMergerLocations 500 为基于推送的混洗缓存的合并器位置的最大数量。目前,合并器位置是负责处理推送的块、合并它们并为以后的混洗获取提供合并的块的外部混洗服务的宿主。 3.2.0
spark.shuffle.push.mergersMinThresholdRatio 0.05 用于根据归约阶段的分区数量计算阶段所需的最小混洗合并器位置数量的比率。例如,具有 100 个分区并使用默认值 0.05 的归约阶段至少需要 5 个唯一的合并器位置才能启用基于推送的混洗。 3.2.0
spark.shuffle.push.mergersMinStaticThreshold 5 为了为阶段启用基于推送的混洗,混洗推送合并器位置的数量的静态阈值应该可用。请注意,此配置与 spark.shuffle.push.mergersMinThresholdRatio 协同工作。启用阶段的基于推送的混洗所需的合并器数量的 spark.shuffle.push.mergersMinStaticThresholdspark.shuffle.push.mergersMinThresholdRatio 比率的最大值。例如:对于子阶段的 1000 个分区,spark.shuffle.push.mergersMinStaticThreshold 为 5,spark.shuffle.push.mergersMinThresholdRatio 设置为 0.05,我们需要至少 50 个合并器才能为该阶段启用基于推送的混洗。 3.2.0
spark.shuffle.push.numPushThreads (无) 指定块推送池中的线程数。这些线程有助于创建连接并将块推送到远程外部混洗服务。默认情况下,线程池大小等于 Spark 执行器内核的数量。 3.2.0
spark.shuffle.push.maxBlockSizeToPush 1m

推送到远程外部混洗服务的单个块的最大大小。大于此阈值的块不会被推送到远程合并。这些混洗块将以原始方式获取。

设置过高会导致更多块被推送到远程外部混洗服务,但这些块已经通过现有机制有效地获取,导致将大型块推送到远程外部混洗服务会产生额外的开销。建议将 spark.shuffle.push.maxBlockSizeToPush 设置为小于 spark.shuffle.push.maxBlockBatchSize 配置的值。

设置过低会导致较少的块被合并,并且直接从映射器外部混洗服务获取会导致更高的随机小读取,从而影响整体磁盘 I/O 性能。

3.2.0
spark.shuffle.push.maxBlockBatchSize 3m 要分组到单个推送请求中的混洗块批次的最大大小。默认设置为 3m,以便略高于 spark.storage.memoryMapThreshold 默认值 2m,因为很可能每个块批次都被内存映射,这会导致更高的开销。 3.2.0
spark.shuffle.push.merge.finalizeThreads 8 驱动程序用于完成混洗合并的线程数。由于大型混洗的完成可能需要几秒钟,因此使用多个线程有助于驱动程序在启用基于推送的混洗时处理并发混洗合并完成请求。 3.3.0
spark.shuffle.push.minShuffleSizeToWait 500m 驱动程序将仅在总混洗数据大小超过此阈值时等待合并完成。如果总混洗大小较小,驱动程序将立即完成混洗输出。 3.3.0
spark.shuffle.push.minCompletedPushRatio 1.0 在基于推送的混洗期间,驱动程序开始混洗合并完成之前,应完成推送的最小映射分区的分数。 3.3.0