性能调优

对于某些工作负载,可以通过将数据缓存在内存中或启用一些实验性选项来提高性能。

在内存中缓存数据

Spark SQL 可以通过调用 spark.catalog.cacheTable("tableName")dataFrame.cache() 来使用内存中的列式格式缓存表。 然后,Spark SQL 将仅扫描所需的列,并将自动调整压缩以最大限度地减少内存使用和 GC 压力。 您可以调用 spark.catalog.uncacheTable("tableName")dataFrame.unpersist() 从内存中删除表。

可以使用 SparkSession 上的 setConf 方法或使用 SQL 运行 SET key=value 命令来完成内存中缓存的配置。

属性名称默认值含义起始版本
spark.sql.inMemoryColumnarStorage.compressed true 设置为 true 时,Spark SQL 将根据数据统计信息自动为每列选择压缩编解码器。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式缓存的批处理大小。 较大的批处理大小可以提高内存利用率和压缩率,但在缓存数据时存在 OOM 风险。 1.1.1

其他配置选项

以下选项也可用于调整查询执行的性能。 随着自动执行更多优化,这些选项可能会在将来的版本中弃用。

属性名称默认值含义起始版本
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 读取文件时要打包到单个分区中的最大字节数。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) 打开文件的估计成本,以在相同时间内可以扫描的字节数来衡量。 这用于将多个文件放入一个分区。 最好高估,那么包含小文件的分区将比包含较大文件的分区(首先调度)更快。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 2.0.0
spark.sql.files.minPartitionNum 默认并行度 建议的(未保证的)最小拆分文件分区数。 如果未设置,则默认值为 `spark.sql.leafNodeDefaultParallelism`。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 3.1.0
spark.sql.files.maxPartitionNum 建议的(未保证的)最大拆分文件分区数。 如果设置了此项,如果初始分区数超过此值,Spark 将重新调整每个分区的大小,以使分区数接近此值。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 3.5.0
spark.sql.broadcastTimeout 300

广播连接中广播等待时间的超时时间(秒)

1.3.0
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置执行联接时将广播到所有工作节点的一个表的最大大小(以字节为单位)。 通过将此值设置为 -1,可以禁用广播。 请注意,目前统计信息仅支持 Hive Metastore 表,其中已运行命令 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 1.1.0
spark.sql.shuffle.partitions 200 配置在对联接或聚合的数据进行混洗时要使用的分区数。 1.1.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32 配置启用作业输入路径并行列出的阈值。 如果输入路径的数量大于此阈值,则 Spark 将使用 Spark 分布式作业列出文件。 否则,它将回退到顺序列表。 此配置仅在使用基于文件的源(例如 Parquet、ORC 和 JSON)时有效。 1.5.0
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 配置作业输入路径的最大列表并行度。 如果输入路径的数量大于此值,则会将其限制为使用此值。 此配置仅在使用基于文件的源(例如 Parquet、ORC 和 JSON)时有效。 2.1.1

SQL 查询的 Join 策略提示

连接策略提示,即 BROADCASTMERGESHUFFLE_HASHSHUFFLE_REPLICATE_NL,指示 Spark 在将每个指定的关联与另一个关联进行连接时,在每个指定的关联上使用提示的策略。 例如,当在表“t1”上使用 BROADCAST 提示时,以“t1”作为构建侧的广播连接(广播哈希连接或广播嵌套循环连接,具体取决于是否存在任何等值连接键)将被 Spark 优先考虑,即使统计信息建议的表“t1”的大小高于配置 spark.sql.autoBroadcastJoinThreshold

如果在连接的两侧指定了不同的连接策略提示,则 Spark 优先考虑 BROADCAST 提示,而不是 MERGE 提示,而不是 SHUFFLE_HASH 提示,而不是 SHUFFLE_REPLICATE_NL 提示。 如果两侧都使用 BROADCAST 提示或 SHUFFLE_HASH 提示指定,则 Spark 将根据连接类型和关联的大小选择构建侧。

请注意,不能保证 Spark 会选择提示中指定的连接策略,因为特定策略可能不支持所有连接类型。

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
src <- sql("SELECT * FROM src")
records <- sql("SELECT * FROM records")
head(join(src, hint(records, "broadcast"), src$key == records$key))
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

有关更多详细信息,请参阅 Join Hints 的文档。

SQL 查询的 Coalesce 提示

Coalesce 提示允许 Spark SQL 用户控制输出文件数量,就像 Dataset API 中的 coalescerepartitionrepartitionByRange 一样,它们可用于性能调整和减少输出文件数量。“COALESCE”提示仅将分区号作为参数。“REPARTITION”提示将分区号、列或两者作为参数。“REPARTITION_BY_RANGE”提示必须具有列名,并且分区号是可选的。“REBALANCE”提示具有初始分区号、列或两者作为参数。

SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;

有关更多详细信息,请参阅 Partitioning Hints 的文档。

自适应查询执行

自适应查询执行 (AQE) 是 Spark SQL 中的一种优化技术,它利用运行时统计信息来选择最有效的查询执行计划,自 Apache Spark 3.2.0 起默认启用。 Spark SQL 可以通过 spark.sql.adaptive.enabled 作为总括配置来启用和禁用 AQE。 从 Spark 3.0 开始,AQE 中有三个主要功能:包括合并 shuffle 后的分区、将 sort-merge join 转换为 broadcast join 以及倾斜 join 优化。

合并 Shuffle 后的分区

spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 配置都为 true 时,此功能会根据 map 输出统计信息合并 shuffle 后的分区。 此功能简化了运行查询时 shuffle 分区数的调整。 您无需设置适当的 shuffle 分区数来适应您的数据集。 一旦您通过 spark.sql.adaptive.coalescePartitions.initialPartitionNum 配置设置了足够大的初始 shuffle 分区数,Spark 就可以在运行时选择适当的 shuffle 分区数。

属性名称默认值含义起始版本
spark.sql.adaptive.coalescePartitions.enabled true 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 将根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)合并连续的 shuffle 分区,以避免任务过小。 3.0.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true 如果为 true,则 Spark 在合并连续的 shuffle 分区时会忽略 spark.sql.adaptive.advisoryPartitionSizeInBytes(默认 64MB)指定的目标大小,而仅考虑 spark.sql.adaptive.coalescePartitions.minPartitionSize(默认 1MB)指定的最小分区大小,以最大限度地提高并行度。 这是为了避免在启用自适应查询执行时出现性能下降。 建议将此配置设置为 false 并遵守 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目标大小。 3.2.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 合并分区后 shuffle 分区的最小大小。这在分区合并期间忽略目标大小(默认情况)时很有用。 3.2.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (无) 合并前的初始 shuffle 分区数。如果未设置,则等于 spark.sql.shuffle.partitions。 此配置仅在 spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 都启用时生效。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 自适应优化期间 shuffle 分区的建议大小(以字节为单位)(当 spark.sql.adaptive.enabled 为 true 时)。 当 Spark 合并小的 shuffle 分区或拆分倾斜的 shuffle 分区时,它会生效。 3.0.0

分割倾斜的 shuffle 分区

属性名称默认值含义起始版本
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 将优化 RebalancePartitions 中的倾斜 shuffle 分区,并根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)将其拆分为更小的分区,以避免数据倾斜。 3.2.0
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2 如果分区的大小小于此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes,则将在拆分期间合并该分区。 3.3.0

将 sort-merge join 转换为 broadcast join

当任何连接侧的运行时统计信息小于自适应广播哈希连接阈值时,AQE 会将排序合并连接转换为广播哈希连接。 这不如一开始就规划广播哈希连接有效,但它比继续进行排序合并连接更好,因为我们可以节省连接双方的排序,并本地读取 shuffle 文件以节省网络流量(如果 spark.sql.adaptive.localShuffleReader.enabled 为 true)。

属性名称默认值含义起始版本
spark.sql.adaptive.autoBroadcastJoinThreshold (无) 配置执行连接时要广播到所有 worker 节点的最大表大小(以字节为单位)。 将此值设置为 -1 可以禁用广播。 默认值与 spark.sql.autoBroadcastJoinThreshold 相同。 请注意,此配置仅在自适应框架中使用。 3.2.0
spark.sql.adaptive.localShuffleReader.enabled true 当为 true 且 spark.sql.adaptive.enabled 为 true 时,当不需要 shuffle 分区时(例如,在将排序合并连接转换为广播哈希连接之后),Spark 尝试使用本地 shuffle reader 来读取 shuffle 数据。 3.0.0

将 sort-merge join 转换为 shuffled hash join

当所有 shuffle 后的分区都小于阈值时,AQE 会将排序合并连接转换为 shuffled hash join,最大阈值可以参考配置 spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold

属性名称默认值含义起始版本
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 配置每个分区构建本地哈希映射允许的最大大小(以字节为单位)。 如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 且所有分区大小均不大于此配置,则无论 spark.sql.join.preferSortMergeJoin 的值如何,连接选择都倾向于使用 shuffled hash join 而不是 sort merge join。 3.2.0

优化倾斜 Join

数据倾斜会严重降低连接查询的性能。 此功能通过将倾斜任务拆分为大小大致均匀的任务(并在需要时复制)来动态处理排序合并连接中的倾斜。 当 spark.sql.adaptive.enabledspark.sql.adaptive.skewJoin.enabled 配置都启用时,它会生效。

属性名称默认值含义起始版本
spark.sql.adaptive.skewJoin.enabled true 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 会通过拆分(并在需要时复制)倾斜分区来动态处理排序合并连接中的倾斜。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0 如果分区的大小大于此因子乘以中值分区大小,并且也大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,则该分区被认为是倾斜的。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 如果分区的大小(以字节为单位)大于此阈值,并且也大于 spark.sql.adaptive.skewJoin.skewedPartitionFactor 乘以中值分区大小,则该分区被认为是倾斜的。 理想情况下,此配置应设置为大于 spark.sql.adaptive.advisoryPartitionSizeInBytes 3.0.0
spark.sql.adaptive.forceOptimizeSkewedJoin false 如果为 true,则强制启用 OptimizeSkewedJoin,这是一个自适应规则,用于优化倾斜连接以避免落后任务,即使它引入了额外的 shuffle。 3.3.0

其他

属性名称默认值含义起始版本
spark.sql.adaptive.optimizer.excludedRules (无) 配置要在自适应优化器中禁用的规则列表,其中规则由其规则名称指定,并以逗号分隔。 优化器将记录已实际排除的规则。 3.1.0
spark.sql.adaptive.customCostEvaluatorClass (无) 用于自适应执行的自定义成本评估器类。 如果未设置,Spark 默认将使用其自己的 SimpleCostEvaluator 3.2.0