性能调优
对于某些工作负载,可以通过将数据缓存到内存中或启用一些实验性选项来提高性能。
将数据缓存到内存中
Spark SQL 可以使用内存中的列式格式缓存表,方法是调用 spark.catalog.cacheTable("tableName")
或 dataFrame.cache()
。然后 Spark SQL 将仅扫描所需的列,并会自动调整压缩以最大程度地减少内存使用量和 GC 压力。您可以调用 spark.catalog.uncacheTable("tableName")
或 dataFrame.unpersist()
从内存中删除表。
内存缓存的配置可以通过 SparkSession
上的 setConf
方法完成,也可以通过使用 SQL 运行 SET key=value
命令完成。
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 设置为 true 时,Spark SQL 将根据数据的统计信息自动为每列选择压缩编解码器。 | 1.0.1 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列式缓存的批次大小。较大的批次大小可以提高内存利用率和压缩率,但在缓存数据时会增加 OOM 风险。 | 1.1.1 |
其他配置选项
以下选项也可用于调整查询执行的性能。随着更多优化自动执行,这些选项可能会在将来的版本中被弃用。
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.files.maxPartitionBytes |
134217728 (128 MB) | 读取文件时打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 | 2.0.0 |
spark.sql.files.openCostInBytes |
4194304 (4 MB) | 打开文件的估计成本,以可以扫描的字节数衡量。这在将多个文件放入分区时使用。最好高估,这样包含小文件的分区将比包含大文件的分区更快(先调度)。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 | 2.0.0 |
spark.sql.files.minPartitionNum |
默认并行度 | 建议的(不保证)拆分文件分区的最小数量。如果未设置,则默认值为 `spark.sql.leafNodeDefaultParallelism`。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 | 3.1.0 |
spark.sql.files.maxPartitionNum |
无 | 建议的(不保证)拆分文件分区的最大数量。如果设置了它,Spark 将重新调整每个分区,以使分区数量接近此值(如果初始分区数量超过此值)。此配置仅在使用基于文件的源(如 Parquet、JSON 和 ORC)时有效。 | 3.5.0 |
spark.sql.broadcastTimeout |
300 |
广播连接中广播等待时间的超时时间(以秒为单位) |
1.3.0 |
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) | 配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为 -1,可以禁用广播。请注意,目前仅支持 Hive 元存储表,其中已运行命令 `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan`。 | 1.1.0 |
spark.sql.shuffle.partitions |
200 | 配置在对数据进行洗牌以进行连接或聚合时要使用的分区数。 | 1.1.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
32 | 配置启用作业输入路径的并行列表的阈值。如果输入路径的数量大于此阈值,Spark 将使用 Spark 分布式作业列出文件。否则,它将回退到顺序列表。此配置仅在使用基于文件的源(如 Parquet、ORC 和 JSON)时有效。 | 1.5.0 |
spark.sql.sources.parallelPartitionDiscovery.parallelism |
10000 | 配置作业输入路径的最大列表并行度。如果输入路径的数量大于此值,它将被限制为使用此值。此配置仅在使用基于文件的源(如 Parquet、ORC 和 JSON)时有效。 | 2.1.1 |
SQL 查询的连接策略提示
连接策略提示,即 BROADCAST
、MERGE
、SHUFFLE_HASH
和 SHUFFLE_REPLICATE_NL
,指示 Spark 在将它们与另一个关系连接时,对每个指定的关系使用提示的策略。例如,当在表“t1”上使用 BROADCAST
提示时,即使统计信息建议表“t1”的大小超过配置 spark.sql.autoBroadcastJoinThreshold
,Spark 也会优先使用广播连接(广播哈希连接或广播嵌套循环连接,具体取决于是否存在任何等值连接键),其中“t1”作为构建方。
当在连接的两侧指定不同的连接策略提示时,Spark 优先考虑 BROADCAST
提示,然后是 MERGE
提示,然后是 SHUFFLE_HASH
提示,最后是 SHUFFLE_REPLICATE_NL
提示。当两侧都指定了 BROADCAST
提示或 SHUFFLE_HASH
提示时,Spark 将根据连接类型和关系的大小选择构建方。
请注意,Spark 不保证会选择提示中指定的连接策略,因为特定策略可能不支持所有连接类型。
有关更多详细信息,请参阅 连接提示 的文档。
SQL 查询的合并提示
合并提示允许 Spark SQL 用户像 Dataset API 中的 coalesce
、repartition
和 repartitionByRange
一样控制输出文件的数量,它们可用于性能调优和减少输出文件的数量。“COALESCE”提示只有一个分区号作为参数。“REPARTITION”提示具有分区号、列或两者/都不作为参数。“REPARTITION_BY_RANGE”提示必须具有列名,分区号是可选的。“REBALANCE”提示具有初始分区号、列或两者/都不作为参数。
SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;
有关更多详细信息,请参阅 分区提示 的文档。
自适应查询执行
自适应查询执行 (AQE) 是 Spark SQL 中的一种优化技术,它利用运行时统计信息来选择最有效的查询执行计划,该技术从 Apache Spark 3.2.0 开始默认启用。Spark SQL 可以通过 spark.sql.adaptive.enabled
作为伞形配置来启用和禁用 AQE。从 Spark 3.0 开始,AQE 中有三个主要功能:包括合并后洗牌分区、将排序合并连接转换为广播连接以及倾斜连接优化。
合并后洗牌分区
当 spark.sql.adaptive.enabled
和 spark.sql.adaptive.coalescePartitions.enabled
配置都为 true 时,此功能会根据映射输出统计信息合并后洗牌分区。此功能简化了运行查询时的洗牌分区数量的调整。您无需设置适合数据集的适当洗牌分区数量。Spark 可以在运行时选择适当的洗牌分区数量,只要您通过 spark.sql.adaptive.coalescePartitions.initialPartitionNum
配置设置了足够大的初始洗牌分区数量。
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.adaptive.coalescePartitions.enabled |
true | 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 将根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)合并连续的洗牌分区,以避免出现太多的小任务。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
true | 当为 true 时,Spark 会忽略合并连续洗牌分区时由 spark.sql.adaptive.advisoryPartitionSizeInBytes (默认值为 64MB)指定的目标大小,并且只尊重由 spark.sql.adaptive.coalescePartitions.minPartitionSize (默认值为 1MB)指定的最小分区大小,以最大程度地提高并行度。这样做是为了避免在启用自适应查询执行时出现性能下降。建议将此配置设置为 false 并尊重由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定的目标大小。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB | 合并后洗牌分区的最小大小。它的值最多可以是 spark.sql.adaptive.advisoryPartitionSizeInBytes 的 20%。这在忽略目标大小进行分区合并时很有用,这是默认情况。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
(无) | 合并前的初始洗牌分区数量。如果未设置,它等于 spark.sql.shuffle.partitions 。此配置仅在 spark.sql.adaptive.enabled 和 spark.sql.adaptive.coalescePartitions.enabled 都启用时有效。 |
3.0.0 |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
64 MB | 自适应优化期间洗牌分区的建议大小(以字节为单位)(当 spark.sql.adaptive.enabled 为 true 时)。当 Spark 合并小的洗牌分区或拆分倾斜的洗牌分区时,它会生效。 |
3.0.0 |
拆分倾斜洗牌分区
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled |
true | 当为 true 且 spark.sql.adaptive.enabled 为 true 时,Spark 将优化 RebalancePartitions 中的倾斜洗牌分区,并根据目标大小(由 spark.sql.adaptive.advisoryPartitionSizeInBytes 指定)将它们拆分为更小的分区,以避免数据倾斜。 |
3.2.0 |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor |
0.2 | 如果分区的尺寸小于此因子乘以 spark.sql.adaptive.advisoryPartitionSizeInBytes ,则该分区将在拆分期间被合并。 |
3.3.0 |
将排序合并连接转换为广播连接
当任何连接侧的运行时统计信息小于自适应广播哈希连接阈值时,AQE 会将排序合并连接转换为广播哈希连接。这不如一开始就规划广播哈希连接效率高,但比继续进行排序合并连接要好,因为我们可以节省两侧连接的排序,并本地读取混洗文件以节省网络流量(如果 spark.sql.adaptive.localShuffleReader.enabled
为真)。
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.adaptive.autoBroadcastJoinThreshold |
(无) | 配置在执行连接时将广播到所有工作节点的表的最大大小(以字节为单位)。通过将此值设置为 -1,可以禁用广播。默认值与 spark.sql.autoBroadcastJoinThreshold 相同。请注意,此配置仅在自适应框架中使用。 |
3.2.0 |
spark.sql.adaptive.localShuffleReader.enabled |
true | 当为真且 spark.sql.adaptive.enabled 为真时,Spark 会尝试使用本地混洗读取器来读取混洗数据,当混洗分区不需要时,例如,在将排序合并连接转换为广播哈希连接之后。 |
3.0.0 |
将排序合并连接转换为洗牌哈希连接
当所有后混洗分区都小于阈值时,AQE 会将排序合并连接转换为混洗哈希连接,最大阈值可以查看配置 spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold
。
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0 | 配置每个分区可以允许构建本地哈希映射的最大大小(以字节为单位)。如果此值不小于 spark.sql.adaptive.advisoryPartitionSizeInBytes 且所有分区大小都不大于此配置,则连接选择优先使用混洗哈希连接而不是排序合并连接,而不管 spark.sql.join.preferSortMergeJoin 的值如何。 |
3.2.0 |
优化倾斜连接
数据倾斜会严重降低连接查询的性能。此功能通过将倾斜的任务拆分为大致大小相等的任务(如果需要,还会复制任务)来动态处理排序合并连接中的倾斜。当 spark.sql.adaptive.enabled
和 spark.sql.adaptive.skewJoin.enabled
配置都启用时,它才会生效。
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.adaptive.skewJoin.enabled |
true | 当为真且 spark.sql.adaptive.enabled 为真时,Spark 会通过拆分(如果需要,还会复制)倾斜分区来动态处理排序合并连接中的倾斜。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
5.0 | 如果分区的大小大于此因子乘以中位数分区大小,并且也大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes ,则该分区被视为倾斜。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB | 如果分区的大小(以字节为单位)大于此阈值,并且也大于 spark.sql.adaptive.skewJoin.skewedPartitionFactor 乘以中位数分区大小,则该分区被视为倾斜。理想情况下,此配置应设置为大于 spark.sql.adaptive.advisoryPartitionSizeInBytes 。 |
3.0.0 |
spark.sql.adaptive.forceOptimizeSkewedJoin |
false | 当为真时,强制启用 OptimizeSkewedJoin,这是一个自适应规则,用于优化倾斜连接以避免落后任务,即使它引入了额外的混洗。 | 3.3.0 |
杂项
属性名称 | 默认值 | 含义 | 自版本 |
---|---|---|---|
spark.sql.adaptive.optimizer.excludedRules |
(无) | 配置要在自适应优化器中禁用的规则列表,其中规则由其规则名称指定,并用逗号分隔。优化器将记录实际被排除的规则。 | 3.1.0 |
spark.sql.adaptive.customCostEvaluatorClass |
(无) | 要用于自适应执行的自定义成本评估器类。如果未设置,Spark 默认情况下将使用自己的 SimpleCostEvaluator 。 |
3.2.0 |