性能调优

Spark 提供了许多用于调优 DataFrame 或 SQL 工作负载性能的技术。广义上讲,这些技术包括数据缓存、更改数据集分区方式、选择最佳连接策略以及为优化器提供可用于构建更高效执行计划的额外信息。

缓存数据

Spark SQL 可以通过调用 spark.catalog.cacheTable("tableName")dataFrame.cache() 来使用内存列式格式缓存表。然后 Spark SQL 将仅扫描所需的列,并自动调整压缩以最小化内存使用和 GC 压力。您可以调用 spark.catalog.uncacheTable("tableName")dataFrame.unpersist() 从内存中删除表。

内存缓存的配置可以通过 spark.conf.set 或通过使用 SQL 运行 SET key=value 命令来完成。

属性名称默认值含义起始版本
spark.sql.inMemoryColumnarStorage.compressed true 设置为 true 时,Spark SQL 将根据数据统计信息自动为每列选择压缩编解码器。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式缓存的批处理大小。更大的批处理大小可以提高内存利用率和压缩率,但在缓存数据时有 OOM 的风险。 1.1.1

调优分区

属性名称默认值含义起始版本
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 读取文件时打包到单个分区的最大字节数。此配置仅在使用基于文件的数据源(如 Parquet、JSON 和 ORC)时有效。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) 打开文件的估计成本,以在相同时间内可以扫描的字节数衡量。这在将多个文件放入一个分区时使用。最好高估,这样小文件的分区将比大文件的分区更快(后者会先被调度)。此配置仅在使用基于文件的数据源(如 Parquet、JSON 和 ORC)时有效。 2.0.0
spark.sql.files.minPartitionNum 默认并行度 建议的(非保证的)最小拆分文件分区数。如果未设置,默认值为 `spark.sql.leafNodeDefaultParallelism`。此配置仅在使用基于文件的数据源(如 Parquet、JSON 和 ORC)时有效。 3.1.0
spark.sql.files.maxPartitionNum 建议的(非保证的)最大拆分文件分区数。如果设置了此值,并且初始分区数超过此值,Spark 将重新调整每个分区,使分区数接近此值。此配置仅在使用基于文件的数据源(如 Parquet、JSON 和 ORC)时有效。 3.5.0
spark.sql.shuffle.partitions 200 配置用于连接或聚合时 shuffle 数据的分区数。 1.1.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32 配置启用作业输入路径并行列表的阈值。如果输入路径的数量大于此阈值,Spark 将使用 Spark 分布式作业列出文件。否则,它将回退到顺序列表。此配置仅在使用基于文件的数据源(如 Parquet、ORC 和 JSON)时有效。 1.5.0
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 配置作业输入路径的最大列表并行度。如果输入路径的数量大于此值,它将被限制为使用此值。此配置仅在使用基于文件的数据源(如 Parquet、ORC 和 JSON)时有效。 2.1.1

Coalesce 提示

Coalesce 提示允许 Spark SQL 用户控制输出文件数量,就像 Dataset API 中的 coalescerepartitionrepartitionByRange 一样,它们可用于性能调优和减少输出文件数量。“COALESCE” 提示仅将分区号作为参数。“REPARTITION” 提示有分区号、列,或两者/两者皆无作为参数。“REPARTITION_BY_RANGE” 提示必须有列名,分区号是可选的。“REBALANCE” 提示有初始分区号、列,或两者/两者皆无作为参数。

SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;

更多详情请参阅分区提示的文档。

利用统计信息

Apache Spark 在众多可能选项中选择最佳执行计划的能力,部分取决于它对执行计划中每个节点(读取、过滤、连接等)将输出多少行的估计。这些估计又基于通过以下几种方式提供给 Spark 的统计信息:

缺失或不准确的统计信息将阻碍 Spark 选择最佳计划的能力,并可能导致查询性能不佳。因此,检查提供给 Spark 的统计信息以及它在查询规划和执行期间所做的估计会很有帮助。

优化 Join 策略

自动广播连接

属性名称默认值含义起始版本
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置在执行连接时将广播到所有工作节点的表的字节最大大小。通过将此值设置为 -1,可以禁用广播。 1.1.0
spark.sql.broadcastTimeout 300

广播连接中广播等待时间的超时时间(秒)

1.3.0

Join 策略提示

Join 策略提示,即 BROADCASTMERGESHUFFLE_HASHSHUFFLE_REPLICATE_NL,指示 Spark 在将指定关系与另一个关系连接时,对每个指定关系使用提示的策略。例如,当在表 't1' 上使用 BROADCAST 提示时,即使根据统计信息表 't1' 的大小超过配置 spark.sql.autoBroadcastJoinThreshold,Spark 也会优先使用将 't1' 作为构建侧的广播连接(无论是广播哈希连接还是广播嵌套循环连接,取决于是否存在等值连接键)。

当连接的两侧都指定了不同的连接策略提示时,Spark 优先考虑 BROADCAST 提示,其次是 MERGE 提示,然后是 SHUFFLE_HASH 提示,最后是 SHUFFLE_REPLICATE_NL 提示。当两侧都指定了 BROADCAST 提示或 SHUFFLE_HASH 提示时,Spark 将根据连接类型和关系的尺寸选择构建侧。

请注意,Spark 不保证会选择提示中指定的连接策略,因为特定策略可能不支持所有连接类型。

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
src <- sql("SELECT * FROM src")
records <- sql("SELECT * FROM records")
head(join(src, hint(records, "broadcast"), src$key == records$key))
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

更多详情请参阅Join 提示的文档。

自适应查询执行

自适应查询执行 (AQE) 是 Spark SQL 中的一种优化技术,它利用运行时统计信息选择最有效的查询执行计划,自 Apache Spark 3.2.0 起默认启用。Spark SQL 可以通过 spark.sql.adaptive.enabled 作为总开关来开启和关闭 AQE。

属性名称默认值含义起始版本
spark.sql.adaptive.enabled true 当为 true 时,启用自适应查询执行,它会根据准确的运行时统计信息在查询执行中途重新优化查询计划。 1.6.0

合并 Shuffle 后分区

spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 配置都为 true 时,此功能会根据 Map 输出统计信息合并 shuffle 后分区。此功能简化了运行查询时 shuffle 分区数的调优。您无需设置合适的分区数来适应您的数据集。一旦您通过 spark.sql.adaptive.coalescePartitions.initialPartitionNum 配置设置了足够大的初始 shuffle 分区数,Spark 可以在运行时选择合适的 shuffle 分区数。

属性名称默认值含义起始版本
spark.sql.adaptive.coalescePartitions.enabled true 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 将根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)合并连续的 shuffle 分区,以避免过多的任务碎片。 3.0.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true 当为 true 时,Spark 在合并连续的 shuffle 分区时忽略 spark.sql.adaptive.advisoryPartitionSizeInBytes (默认 64MB) 指定的目标大小,而只遵守 spark.sql.adaptive.coalescePartitions.minPartitionSize (默认 1MB) 指定的最小分区大小,以最大化并行度。这是为了避免启用自适应查询执行时的性能退化。建议在繁忙的集群上将此配置设置为 false,以提高资源利用效率(避免过多小任务)。 3.2.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 合并后 shuffle 分区的最小大小。当分区合并过程中忽略目标大小时(默认情况),此项很有用。 3.2.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (无) 合并前的初始 shuffle 分区数。如果未设置,则等于 spark.sql.shuffle.partitions。此配置仅在 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 都启用时生效。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 自适应优化期间(当 spark.sql.adaptive.enabled 为 true 时)shuffle 分区的建议字节大小。当 Spark 合并小的 shuffle 分区或拆分倾斜的 shuffle 分区时生效。 3.0.0

拆分倾斜 Shuffle 分区

属性名称默认值含义起始版本
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 将优化 RebalancePartitions 中倾斜的 shuffle 分区,并根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)将其拆分为更小的分区,以避免数据倾斜。 3.2.0
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2 如果分区大小小于此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes,则该分区将在拆分期间被合并。 3.3.0

将排序合并连接转换为广播连接

当任何连接侧的运行时统计信息小于自适应广播哈希连接阈值时,AQE 会将排序合并连接转换为广播哈希连接。这不如一开始就规划广播哈希连接效率高,但比继续排序合并连接要好,因为我们可以避免对连接两侧进行排序,并本地读取 shuffle 文件以节省网络流量(前提是 spark.sql.adaptive.localShuffleReader.enabled 为 true)。

属性名称默认值含义起始版本
spark.sql.adaptive.autoBroadcastJoinThreshold (无) 配置在执行连接时将广播到所有工作节点的表的字节最大大小。通过将此值设置为 -1,可以禁用广播。默认值与 spark.sql.autoBroadcastJoinThreshold 相同。请注意,此配置仅在自适应框架中使用。 3.2.0
spark.sql.adaptive.localShuffleReader.enabled true 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 会尝试在不需要 shuffle 分区的情况下使用本地 shuffle 读取器读取 shuffle 数据,例如,在将排序合并连接转换为广播哈希连接之后。 3.0.0

将排序合并连接转换为 Shuffle 哈希连接

当所有 shuffle 后分区都小于 spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 中配置的阈值时,AQE 会将排序合并连接转换为 shuffle 哈希连接。

属性名称默认值含义起始版本
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 配置每个分区允许构建本地哈希映射的最大字节大小。如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 并且所有分区大小都不大于此配置,则无论 spark.sql.join.preferSortMergeJoin 的值如何,连接选择都倾向于使用 shuffle 哈希连接而不是排序合并连接。 3.2.0

优化倾斜连接

数据倾斜会严重降低 Join 查询的性能。此功能通过将倾斜任务拆分(并在需要时复制)成大致均匀大小的任务,来动态处理排序合并连接中的倾斜。它在 spark.sql.adaptive.enabledspark.sql.adaptive.skewJoin.enabled 配置都启用时生效。

属性名称默认值含义起始版本
spark.sql.adaptive.skewJoin.enabled true 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 会通过拆分(并在需要时复制)倾斜分区来动态处理排序合并连接中的倾斜。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0 如果分区大小大于此因子乘以中间分区大小,并且也大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,则该分区被视为倾斜分区。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 如果分区大小(字节)大于此阈值,并且也大于 spark.sql.adaptive.skewJoin.skewedPartitionFactor 乘以中间分区大小,则该分区被视为倾斜分区。理想情况下,此配置应设置为大于 spark.sql.adaptive.advisoryPartitionSizeInBytes 3.0.0
spark.sql.adaptive.forceOptimizeSkewedJoin false 当为 true 时,强制启用 OptimizeSkewedJoin,这是一个自适应规则,用于优化倾斜连接以避免慢任务,即使它引入额外的 shuffle。 3.3.0

高级定制

您可以通过提供自己的成本评估器类或排除 AQE 优化器规则来控制 AQE 的工作细节。

属性名称默认值含义起始版本
spark.sql.adaptive.optimizer.excludedRules (无) 配置自适应优化器中要禁用的规则列表,其中规则通过其规则名称指定并用逗号分隔。优化器将记录实际已被排除的规则。 3.1.0
spark.sql.adaptive.customCostEvaluatorClass (无) 用于自适应执行的自定义成本评估器类。如果未设置,Spark 默认将使用其自身的 SimpleCostEvaluator 3.2.0

存储分区连接

存储分区连接 (SPJ) 是 Spark SQL 中的一种优化技术,它利用现有的存储布局来避免 Shuffle 阶段。

这是 Bucket Joins 概念的泛化,后者仅适用于桶化表,而 SPJ 适用于通过 FunctionCatalog 中注册的函数进行分区的表。存储分区连接目前支持兼容的 V2 数据源。

以下 SQL 属性通过各种优化,在不同的连接查询中启用存储分区连接。

属性名称默认值含义起始版本
spark.sql.sources.v2.bucketing.enabled false 当为 true 时,尝试通过使用兼容的 V2 数据源报告的分区来消除 shuffle。 3.3.0
spark.sql.sources.v2.bucketing.pushPartValues.enabled true 启用时,如果连接的一侧缺少另一侧的分区值,则尝试消除 shuffle。此配置要求 spark.sql.sources.v2.bucketing.enabled 为 true。 3.4.0
spark.sql.requireAllClusterKeysForCoPartition true 当为 true 时,要求连接或 MERGE 键与分区键相同且顺序相同才能消除 shuffle。因此,在这种情况下,应设置为 false 以消除 shuffle。 3.4.0
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled false 当为 true,且连接不是全外连接时,启用倾斜优化以处理避免 shuffle 时具有大量数据的分区。一侧将根据表统计信息被选为大表,此侧的拆分将是部分聚类的。另一侧的拆分将被分组并复制以匹配。此配置要求 spark.sql.sources.v2.bucketing.enabledspark.sql.sources.v2.bucketing.pushPartValues.enabled 都为 true。 3.4.0
spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled false 启用时,如果连接或 MERGE 条件不包含所有分区列,则尝试避免 shuffle。此配置要求 spark.sql.sources.v2.bucketing.enabledspark.sql.sources.v2.bucketing.pushPartValues.enabled 都为 true,并且 spark.sql.requireAllClusterKeysForCoPartition 为 false。 4.0.0
spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled false 启用时,如果分区转换兼容但不完全相同,则尝试避免 shuffle。此配置要求 spark.sql.sources.v2.bucketing.enabledspark.sql.sources.v2.bucketing.pushPartValues.enabled 都为 true。 4.0.0
spark.sql.sources.v2.bucketing.shuffle.enabled false 启用时,通过识别 V2 数据源在另一侧报告的分区,尝试避免连接一侧的 shuffle。 4.0.0

如果执行存储分区连接,查询计划在连接之前将不包含 Exchange 节点。

以下示例使用 Iceberg (https://iceberg.apache.org/docs/latest/spark-getting-started/),这是一个支持存储分区连接的 Spark V2 数据源。

CREATE TABLE prod.db.target (id INT, salary INT, dep STRING)
USING iceberg
PARTITIONED BY (dep, bucket(8, id))

CREATE TABLE prod.db.source (id INT, salary INT, dep STRING)
USING iceberg
PARTITIONED BY (dep, bucket(8, id))

EXPLAIN SELECT * FROM target t INNER JOIN source s
ON t.dep = s.dep AND t.id = s.id

-- Plan without Storage Partition Join
== Physical Plan ==
* Project (12)
+- * SortMergeJoin Inner (11)
   :- * Sort (5)
   :  +- Exchange (4) // DATA SHUFFLE
   :     +- * Filter (3)
   :        +- * ColumnarToRow (2)
   :           +- BatchScan (1)
   +- * Sort (10)
      +- Exchange (9) // DATA SHUFFLE
         +- * Filter (8)
            +- * ColumnarToRow (7)
               +- BatchScan (6)


SET 'spark.sql.sources.v2.bucketing.enabled' 'true'
SET 'spark.sql.iceberg.planning.preserve-data-grouping' 'true'
SET 'spark.sql.sources.v2.bucketing.pushPartValues.enabled' 'true'
SET 'spark.sql.requireAllClusterKeysForCoPartition' 'false'
SET 'spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled' 'true'

-- Plan with Storage Partition Join
== Physical Plan ==
* Project (10)
+- * SortMergeJoin Inner (9)
   :- * Sort (4)
   :  +- * Filter (3)
   :     +- * ColumnarToRow (2)
   :        +- BatchScan (1)
   +- * Sort (8)
      +- * Filter (7)
         +- * ColumnarToRow (6)
            +- BatchScan (5)