性能调优
对于某些工作负载,可以通过将数据缓存在内存中或启用一些实验性选项来提高性能。
在内存中缓存数据
Spark SQL 可以通过调用 spark.catalog.cacheTable("tableName")
或 dataFrame.cache()
来使用内存中的列式格式缓存表。 然后,Spark SQL 将仅扫描所需的列,并将自动调整压缩以最大限度地减少内存使用和 GC 压力。 您可以调用 spark.catalog.uncacheTable("tableName")
或 dataFrame.unpersist()
从内存中删除表。
可以使用 SparkSession
上的 setConf
方法或使用 SQL 运行 SET key=value
命令来完成内存中缓存的配置。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 设置为 true 时,Spark SQL 将根据数据统计信息自动为每列选择压缩编解码器。 | 1.0.1 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列式缓存的批处理大小。 较大的批处理大小可以提高内存利用率和压缩率,但在缓存数据时存在 OOM 风险。 | 1.1.1 |
其他配置选项
以下选项也可用于调整查询执行的性能。 随着自动执行更多优化,这些选项可能会在将来的版本中弃用。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.files.maxPartitionBytes |
134217728 (128 MB) | 读取文件时要打包到单个分区中的最大字节数。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 | 2.0.0 |
spark.sql.files.openCostInBytes |
4194304 (4 MB) | 打开文件的估计成本,以在相同时间内可以扫描的字节数来衡量。 这用于将多个文件放入一个分区。 最好高估,那么包含小文件的分区将比包含较大文件的分区(首先调度)更快。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 | 2.0.0 |
spark.sql.files.minPartitionNum |
默认并行度 | 建议的(未保证的)最小拆分文件分区数。 如果未设置,则默认值为 `spark.sql.leafNodeDefaultParallelism`。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 | 3.1.0 |
spark.sql.files.maxPartitionNum |
无 | 建议的(未保证的)最大拆分文件分区数。 如果设置了此项,如果初始分区数超过此值,Spark 将重新调整每个分区的大小,以使分区数接近此值。 此配置仅在使用基于文件的源(例如 Parquet、JSON 和 ORC)时有效。 | 3.5.0 |
spark.sql.broadcastTimeout |
300 |
广播连接中广播等待时间的超时时间(秒) |
1.3.0 |
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) | 配置执行联接时将广播到所有工作节点的一个表的最大大小(以字节为单位)。 通过将此值设置为 -1,可以禁用广播。 请注意,目前统计信息仅支持 Hive Metastore 表,其中已运行命令 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 。 |
1.1.0 |
spark.sql.shuffle.partitions |
200 | 配置在对联接或聚合的数据进行混洗时要使用的分区数。 | 1.1.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
32 | 配置启用作业输入路径并行列出的阈值。 如果输入路径的数量大于此阈值,则 Spark 将使用 Spark 分布式作业列出文件。 否则,它将回退到顺序列表。 此配置仅在使用基于文件的源(例如 Parquet、ORC 和 JSON)时有效。 | 1.5.0 |
spark.sql.sources.parallelPartitionDiscovery.parallelism |
10000 | 配置作业输入路径的最大列表并行度。 如果输入路径的数量大于此值,则会将其限制为使用此值。 此配置仅在使用基于文件的源(例如 Parquet、ORC 和 JSON)时有效。 | 2.1.1 |
SQL 查询的 Join 策略提示
连接策略提示,即 BROADCAST
、MERGE
、SHUFFLE_HASH
和 SHUFFLE_REPLICATE_NL
,指示 Spark 在将每个指定的关联与另一个关联进行连接时,在每个指定的关联上使用提示的策略。 例如,当在表“t1”上使用 BROADCAST
提示时,以“t1”作为构建侧的广播连接(广播哈希连接或广播嵌套循环连接,具体取决于是否存在任何等值连接键)将被 Spark 优先考虑,即使统计信息建议的表“t1”的大小高于配置 spark.sql.autoBroadcastJoinThreshold
。
如果在连接的两侧指定了不同的连接策略提示,则 Spark 优先考虑 BROADCAST
提示,而不是 MERGE
提示,而不是 SHUFFLE_HASH
提示,而不是 SHUFFLE_REPLICATE_NL
提示。 如果两侧都使用 BROADCAST
提示或 SHUFFLE_HASH
提示指定,则 Spark 将根据连接类型和关联的大小选择构建侧。
请注意,不能保证 Spark 会选择提示中指定的连接策略,因为特定策略可能不支持所有连接类型。
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
src <- sql("SELECT * FROM src")
records <- sql("SELECT * FROM records")
head(join(src, hint(records, "broadcast"), src$key == records$key))
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
有关更多详细信息,请参阅 Join Hints 的文档。
SQL 查询的 Coalesce 提示
Coalesce 提示允许 Spark SQL 用户控制输出文件数量,就像 Dataset API 中的 coalesce
、repartition
和 repartitionByRange
一样,它们可用于性能调整和减少输出文件数量。“COALESCE”提示仅将分区号作为参数。“REPARTITION”提示将分区号、列或两者作为参数。“REPARTITION_BY_RANGE”提示必须具有列名,并且分区号是可选的。“REBALANCE”提示具有初始分区号、列或两者作为参数。
SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;
有关更多详细信息,请参阅 Partitioning Hints 的文档。
自适应查询执行
自适应查询执行 (AQE) 是 Spark SQL 中的一种优化技术,它利用运行时统计信息来选择最有效的查询执行计划,自 Apache Spark 3.2.0 起默认启用。 Spark SQL 可以通过 spark.sql.adaptive.enabled
作为总括配置来启用和禁用 AQE。 从 Spark 3.0 开始,AQE 中有三个主要功能:包括合并 shuffle 后的分区、将 sort-merge join 转换为 broadcast join 以及倾斜 join 优化。
合并 Shuffle 后的分区
当 spark.sql.adaptive.enabled
和 spark.sql.adaptive.coalescePartitions.enabled
配置都为 true 时,此功能会根据 map 输出统计信息合并 shuffle 后的分区。 此功能简化了运行查询时 shuffle 分区数的调整。 您无需设置适当的 shuffle 分区数来适应您的数据集。 一旦您通过 spark.sql.adaptive.coalescePartitions.initialPartitionNum
配置设置了足够大的初始 shuffle 分区数,Spark 就可以在运行时选择适当的 shuffle 分区数。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.adaptive.coalescePartitions.enabled |
true | 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 将根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)合并连续的 shuffle 分区,以避免任务过小。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
true | 如果为 true,则 Spark 在合并连续的 shuffle 分区时会忽略 spark.sql.adaptive.advisoryPartitionSizeInBytes (默认 64MB)指定的目标大小,而仅考虑 spark.sql.adaptive.coalescePartitions.minPartitionSize (默认 1MB)指定的最小分区大小,以最大限度地提高并行度。 这是为了避免在启用自适应查询执行时出现性能下降。 建议将此配置设置为 false 并遵守 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目标大小。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB | 合并分区后 shuffle 分区的最小大小。这在分区合并期间忽略目标大小(默认情况)时很有用。 | 3.2.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
(无) | 合并前的初始 shuffle 分区数。如果未设置,则等于 spark.sql.shuffle.partitions 。 此配置仅在 spark.sql.adaptive.enabled 和 spark.sql.adaptive.coalescePartitions.enabled 都启用时生效。 |
3.0.0 |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
64 MB | 自适应优化期间 shuffle 分区的建议大小(以字节为单位)(当 spark.sql.adaptive.enabled 为 true 时)。 当 Spark 合并小的 shuffle 分区或拆分倾斜的 shuffle 分区时,它会生效。 |
3.0.0 |
分割倾斜的 shuffle 分区
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled |
true | 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 将优化 RebalancePartitions 中的倾斜 shuffle 分区,并根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)将其拆分为更小的分区,以避免数据倾斜。 |
3.2.0 |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor |
0.2 | 如果分区的大小小于此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes ,则将在拆分期间合并该分区。 |
3.3.0 |
将 sort-merge join 转换为 broadcast join
当任何连接侧的运行时统计信息小于自适应广播哈希连接阈值时,AQE 会将排序合并连接转换为广播哈希连接。 这不如一开始就规划广播哈希连接有效,但它比继续进行排序合并连接更好,因为我们可以节省连接双方的排序,并本地读取 shuffle 文件以节省网络流量(如果 spark.sql.adaptive.localShuffleReader.enabled
为 true)。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.adaptive.autoBroadcastJoinThreshold |
(无) | 配置执行连接时要广播到所有 worker 节点的最大表大小(以字节为单位)。 将此值设置为 -1 可以禁用广播。 默认值与 spark.sql.autoBroadcastJoinThreshold 相同。 请注意,此配置仅在自适应框架中使用。 |
3.2.0 |
spark.sql.adaptive.localShuffleReader.enabled |
true | 当为 true 且 spark.sql.adaptive.enabled 为 true 时,当不需要 shuffle 分区时(例如,在将排序合并连接转换为广播哈希连接之后),Spark 尝试使用本地 shuffle reader 来读取 shuffle 数据。 |
3.0.0 |
将 sort-merge join 转换为 shuffled hash join
当所有 shuffle 后的分区都小于阈值时,AQE 会将排序合并连接转换为 shuffled hash join,最大阈值可以参考配置 spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0 | 配置每个分区构建本地哈希映射允许的最大大小(以字节为单位)。 如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 且所有分区大小均不大于此配置,则无论 spark.sql.join.preferSortMergeJoin 的值如何,连接选择都倾向于使用 shuffled hash join 而不是 sort merge join。 |
3.2.0 |
优化倾斜 Join
数据倾斜会严重降低连接查询的性能。 此功能通过将倾斜任务拆分为大小大致均匀的任务(并在需要时复制)来动态处理排序合并连接中的倾斜。 当 spark.sql.adaptive.enabled
和 spark.sql.adaptive.skewJoin.enabled
配置都启用时,它会生效。
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.adaptive.skewJoin.enabled |
true | 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 会通过拆分(并在需要时复制)倾斜分区来动态处理排序合并连接中的倾斜。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
5.0 | 如果分区的大小大于此因子乘以中值分区大小,并且也大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes ,则该分区被认为是倾斜的。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB | 如果分区的大小(以字节为单位)大于此阈值,并且也大于 spark.sql.adaptive.skewJoin.skewedPartitionFactor 乘以中值分区大小,则该分区被认为是倾斜的。 理想情况下,此配置应设置为大于 spark.sql.adaptive.advisoryPartitionSizeInBytes 。 |
3.0.0 |
spark.sql.adaptive.forceOptimizeSkewedJoin |
false | 如果为 true,则强制启用 OptimizeSkewedJoin,这是一个自适应规则,用于优化倾斜连接以避免落后任务,即使它引入了额外的 shuffle。 | 3.3.0 |
其他
属性名称 | 默认值 | 含义 | 起始版本 |
---|---|---|---|
spark.sql.adaptive.optimizer.excludedRules |
(无) | 配置要在自适应优化器中禁用的规则列表,其中规则由其规则名称指定,并以逗号分隔。 优化器将记录已实际排除的规则。 | 3.1.0 |
spark.sql.adaptive.customCostEvaluatorClass |
(无) | 用于自适应执行的自定义成本评估器类。 如果未设置,Spark 默认将使用其自己的 SimpleCostEvaluator 。 |
3.2.0 |