Spark 调优
由于大多数 Spark 计算都是在内存中进行的,Spark 程序可能会受到集群中任何资源的瓶颈限制:CPU、网络带宽或内存。通常情况下,如果数据能够存入内存,瓶颈往往是网络带宽;但有时,你也需要进行一些调优(例如以序列化形式存储 RDD)来降低内存占用。本指南将涵盖两个主要主题:数据序列化(对获得良好的网络性能至关重要,也能减少内存占用)和内存调优。此外,我们还将简要介绍其他几个主题。
数据序列化
序列化在任何分布式应用的性能中都起着重要作用。如果序列化对象格式速度缓慢或占用大量字节,将会极大地拖慢计算速度。通常,这是优化 Spark 应用时首先应该考虑调优的地方。Spark 旨在兼顾便利性(允许你在操作中使用任何 Java 类型)与性能。它提供了两个序列化库:
- Java 序列化:默认情况下,Spark 使用 Java 的
ObjectOutputStream框架进行序列化,并且可以处理任何实现了java.io.Serializable的类。你还可以通过扩展java.io.Externalizable来更精确地控制序列化性能。Java 序列化虽然灵活,但通常速度非常慢,且对于许多类而言生成的序列化格式较大。 - Kryo 序列化:Spark 也可以使用 Kryo 库(第 4 版)来更快地序列化对象。Kryo 比 Java 序列化快得多且更紧凑(通常可达 10 倍以上),但它不支持所有
Serializable类型,并且为了获得最佳性能,你需要提前注册程序中要使用的类。
你可以通过在初始化作业时使用 SparkConf 并调用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 来切换到使用 Kryo。此设置不仅配置了工作节点之间洗牌(Shuffle)数据时使用的序列化器,还配置了将 RDD 序列化到磁盘时使用的序列化器。Kryo 未被设为默认值的原因仅仅是因为需要手动注册,但我们建议在任何网络密集型应用中尝试使用它。从 Spark 2.0.0 开始,当对简单类型、简单类型数组或字符串类型进行 RDD 洗牌时,我们在内部已使用 Kryo 序列化器。
Spark 自动包含了对 Twitter chill 库中 AllScalaRegistrar 所涵盖的许多常用 Scala 核心类的 Kryo 序列化支持。
若要向 Kryo 注册你自己的自定义类,请使用 registerKryoClasses 方法。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)Kryo 文档介绍了更高级的注册选项,例如添加自定义序列化代码。
如果你的对象很大,你可能还需要增加 spark.kryoserializer.buffer 配置。该值必须足够大,才能容纳你将要序列化的最大对象。
最后,如果你不注册自定义类,Kryo 仍然可以工作,但它必须在每个对象中存储完整的类名,这会造成浪费。
内存调优
调优内存使用时有三个考虑因素:对象使用的内存总量(你可能希望整个数据集都能存入内存)、访问这些对象的开销,以及垃圾回收的开销(如果对象的周转率很高)。
默认情况下,Java 对象访问速度很快,但很容易比字段中的“原始”数据多消耗 2-5 倍的空间。这主要有以下几个原因:
- 每个独立的 Java 对象都有一个“对象头”,约为 16 字节,包含指向其类等信息。对于数据量很小的对象(例如仅有一个
Int字段),这可能比数据本身还要大。 - Java
String相较于原始字符串数据有约 40 字节的开销(因为它存储在Char数组中,并保存了长度等额外数据),且由于String内部使用 UTF-16 编码,每个字符存储为两个字节。因此,一个 10 字符的字符串很容易消耗 60 字节。 - 通用的集合类,如
HashMap和LinkedList,使用链式数据结构,其中每个条目都有一个“包装器”对象(例如Map.Entry)。该对象不仅有头部,还有指向列表中下一个对象的指针(通常每个 8 字节)。 - 原始类型的集合通常将它们存储为“装箱”后的对象,如
java.lang.Integer。
本节将首先概述 Spark 中的内存管理,然后讨论用户可以采取的具体策略,以提高应用中的内存使用效率。特别是,我们将描述如何确定对象的内存使用量,以及如何通过更改数据结构或以序列化格式存储数据来优化内存。接着,我们将讨论如何调优 Spark 的缓存大小和 Java 垃圾回收器。
内存管理概述
Spark 中的内存使用主要分为两大类:执行内存和存储内存。执行内存用于洗牌、连接、排序和聚合计算;存储内存用于在集群中缓存和传播内部数据。在 Spark 中,执行和存储共享一个统一的区域(M)。当不使用执行内存时,存储可以占用所有可用内存,反之亦然。如有必要,执行可能会驱逐存储,但前提是存储内存总量必须保持在某个阈值(R)之上。换句话说,R 描述了 M 内的一个子区域,其中的缓存块永远不会被驱逐。由于实现上的复杂性,存储不能驱逐执行内存。
这种设计确保了几个理想的属性。首先,不使用缓存的应用可以使用整个空间进行执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用可以保留最小存储空间(R),其中的数据块不会被驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,无需用户具备关于内存如何内部分割的专业知识。
虽然有两个相关的配置项,但典型用户无需调整它们,因为默认值适用于大多数工作负载:
spark.memory.fraction表示M的大小占(JVM 堆空间 - 300MiB)的比例(默认为 0.6)。剩余空间(40%)保留给用户数据结构、Spark 内部元数据,以及在处理稀疏或异常巨大的记录时防止 OOM 错误。spark.memory.storageFraction表示R在M中所占的比例(默认为 0.5)。R是M内的存储空间,其中的缓存块免于被执行过程驱逐。
设置 spark.memory.fraction 的值应确保这部分堆空间能舒适地容纳在 JVM 的“老年代”(Old generation/tenured generation)中。详情请参阅下文关于高级 GC 调优的讨论。
确定内存消耗
衡量数据集所需内存消耗的最佳方法是创建一个 RDD,将其放入缓存,并查看 Web UI 中的“存储”(Storage)页面。该页面会告诉你 RDD 占用的内存大小。
要估算特定对象的内存消耗,请使用 SizeEstimator 的 estimate 方法。这对于尝试不同的数据布局以压缩内存使用,以及确定广播变量在每个执行器堆上将占用的空间大小非常有用。
数据结构调优
减少内存消耗的第一种方法是避免使用那些增加额外开销的 Java 特性,如基于指针的数据结构和包装对象。有几种方法可以实现这一点:
- 在设计数据结构时,优先使用对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如
HashMap)。fastutil 库为与 Java 标准库兼容的原始类型提供了方便的集合类。 - 尽可能避免使用包含大量小对象和指针的嵌套结构。
- 考虑使用数值 ID 或枚举对象代替字符串作为键。
- 如果你的 RAM 小于 32 GiB,请设置 JVM 参数
-XX:+UseCompressedOops,使指针占用 4 字节而不是 8 字节。你可以在spark-env.sh中添加这些选项。
序列化 RDD 存储
当你的对象即便经过上述调优仍然太大而无法高效存储时,减少内存使用的一个更简单的方法是使用 RDD 持久化 API 中的序列化存储级别(例如 MEMORY_ONLY_SER)以序列化形式存储它们。Spark 随后会将每个 RDD 分区存储为一个大的字节数组。以序列化形式存储数据的唯一缺点是访问时间变慢,因为必须在运行时对每个对象进行反序列化。如果你希望以序列化形式缓存数据,我们强烈建议使用 Kryo,因为它生成的体积远小于 Java 序列化(肯定比原始 Java 对象要小)。
垃圾回收(GC)调优
当程序存储的 RDD 发生大规模“搅动”(churn)时,JVM 垃圾回收(GC)可能成为一个问题。(对于仅读取一次 RDD 然后进行多次操作的程序,这通常不是问题。)当 Java 需要驱逐旧对象为新对象腾出空间时,它需要遍历所有 Java 对象并找出未使用的。这里要记住的核心点是:垃圾回收的开销与 Java 对象的数量成正比,因此使用对象更少的数据结构(例如 Int 数组而不是 LinkedList)会极大地降低此开销。更好的方法是按照上述方法以序列化形式持久化对象:现在每个 RDD 分区将只有一个对象(字节数组)。在尝试其他技术之前,如果 GC 是个问题,首先要尝试的是使用序列化缓存。
由于任务的工作内存(运行任务所需的空间大小)与节点上缓存的 RDD 之间存在干扰,GC 也可能成为问题。我们将讨论如何控制分配给 RDD 缓存的空间以缓解这种情况。
衡量 GC 的影响
GC 调优的第一步是收集有关垃圾回收发生频率以及所花费时间的统计数据。这可以通过在 Java 选项中添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 来完成。(关于如何向 Spark 作业传递 Java 选项的信息,请参阅配置指南。)下次运行 Spark 作业时,你将在工作节点的日志中看到每次垃圾回收发生时打印的消息。注意,这些日志位于集群的工作节点上(在工作目录下的 stdout 文件中),而不是在你的驱动(Driver)程序中。
高级 GC 调优
要进一步调优垃圾回收,我们首先需要了解关于 JVM 内存管理的一些基本信息:
-
Java 堆空间被分为两个区域:年轻代(Young)和老年代(Old)。年轻代旨在容纳生命周期较短的对象,而老年代则用于长期存在的对象。
-
年轻代进一步分为三个区域:[Eden, Survivor1, Survivor2]。
-
垃圾回收过程的简化描述:当 Eden 满时,会对 Eden 进行一次小型 GC(Minor GC),Eden 和 Survivor1 中存活的对象会被复制到 Survivor2。Survivor 区域会进行交换。如果对象足够老或者 Survivor2 已满,它会被移到老年代。最后,当老年代接近满时,会触发一次完全 GC(Full GC)。
Spark 中 GC 调优的目标是确保只有长期存在的 RDD 存储在老年代,并且年轻代的大小足以存储短生命周期的对象。这将有助于避免为了回收任务执行期间产生的临时对象而触发 Full GC。以下是一些可能有用的步骤:
-
通过收集 GC 统计信息来检查是否存在过多的垃圾回收。如果在任务完成前触发了多次 Full GC,这意味着没有足够的可用内存供任务执行。
-
如果 Minor GC 过多但 Major GC 较少,为 Eden 分配更多内存会有帮助。你可以将 Eden 大小设置为任务所需内存的预估值。如果 Eden 的大小确定为
E,则可以使用选项-Xmn=4/3*E设置年轻代的大小。(按 4/3 的放大倍数是为了考虑 Survivor 区域所使用的空间。) -
在打印的 GC 统计信息中,如果 OldGen 接近已满,请通过降低
spark.memory.fraction来减少用于缓存的内存量;缓存更少的对象要比拖慢任务执行更好。或者,考虑减小年轻代的大小。这意味着如果你按照上述设置了-Xmn,请减小该值。如果没有,请尝试更改 JVM 的NewRatio参数。许多 JVM 默认将其设置为 2,意味着老年代占据堆空间的 2/3。它应该足够大,使得该比例超过spark.memory.fraction。 -
自 4.0.0 版本起,Spark 默认使用 JDK 17,这也使得 G1GC 垃圾回收器成为默认选项。注意,对于较大的执行器堆大小,使用
-XX:G1HeapRegionSize增加 G1 区域大小可能很重要。 -
例如,如果你的任务正在从 HDFS 读取数据,任务使用的内存量可以通过从 HDFS 读取的数据块大小来估算。请注意,解压后的块大小通常是原始块大小的 2 到 3 倍。因此,如果我们希望有 3 或 4 个任务的工作空间,且 HDFS 块大小为 128 MiB,我们可以估算 Eden 的大小为
4*3*128MiB。 -
监控垃圾回收的频率和耗时随新设置的变化。
我们的经验表明,GC 调优的效果取决于你的应用程序和可用内存量。网上有更多调优选项,但从宏观上看,管理 Full GC 发生的频率有助于减少开销。
执行器的 GC 调优标志可以通过在作业配置中设置 spark.executor.defaultJavaOptions 或 spark.executor.extraJavaOptions 来指定。
其他注意事项
并行度水平
除非将每个操作的并行度设置得足够高,否则集群将无法得到充分利用。Spark 会根据大小自动设置在每个文件上运行的“映射”(map)任务数量(尽管你可以通过 SparkContext.textFile 等的可选参数对其进行控制),而对于分布式“归约”(reduce)操作(如 groupByKey 和 reduceByKey),它使用最大父 RDD 的分区数量。你可以将并行度作为第二个参数传递(参阅 spark.PairRDDFunctions 文档),或者设置配置属性 spark.default.parallelism 来更改默认值。通常,我们建议每个集群 CPU 核心对应 2-3 个任务。
输入路径的并行列表
有时,当作业输入包含大量目录时,你可能还需要增加目录列表的并行度,否则该过程可能会耗时非常长,尤其是在处理 S3 等对象存储时。如果你的作业处理的是带有 Hadoop 输入格式的 RDD(例如,通过 SparkContext.sequenceFile),则并行度由 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads 控制(目前默认值为 1)。
对于使用基于文件数据源的 Spark SQL,你可以调整 spark.sql.sources.parallelPartitionDiscovery.threshold 和 spark.sql.sources.parallelPartitionDiscovery.parallelism 来提高列表并行度。详情请参阅 Spark SQL 性能调优指南。
Reduce 任务的内存使用
有时,你遇到 OutOfMemoryError 并不是因为 RDD 存不下,而是因为某个任务的工作集(例如 groupByKey 中的 reduce 任务)太大。Spark 的洗牌操作(sortByKey, groupByKey, reduceByKey, join 等)会在每个任务内构建一个哈希表来执行分组,这通常会很大。这里最简单的修复方法是增加并行度,从而减小每个任务的输入集。Spark 可以高效支持短至 200 毫秒的任务,因为它会在多个任务之间重用执行器 JVM,且任务启动成本较低,因此你可以放心地将并行度增加到超过集群的核心数。
广播大变量
使用 SparkContext 中提供的广播功能可以极大地减少每个序列化任务的大小,以及在集群上启动作业的开销。如果你的任务在内部使用了来自驱动程序的任何大型对象(例如静态查找表),请考虑将其转换为广播变量。Spark 会在主节点上打印每个任务的序列化大小,你可以查看该值来决定任务是否过大;通常,大于 20 KiB 的任务可能值得优化。
数据本地性
数据本地性对 Spark 作业的性能有重大影响。如果数据和操作数据的代码在一起,计算通常会很快。但如果代码和数据分离,则必须移动其中之一。通常,发送序列化的代码比发送一大块数据要快,因为代码体积远小于数据。Spark 的调度构建在数据本地性这一通用原则之上。
数据本地性是指数据与处理它的代码之间的接近程度。根据数据当前的位置,有几个级别的本地性。从最近到最远依次为:
PROCESS_LOCAL:数据与正在运行的代码处于同一个 JVM 中。这是最好的本地性。NODE_LOCAL:数据在同一个节点上。例如在同一节点的 HDFS 中,或者在同一节点的另一个执行器中。由于数据必须在进程之间传输,这比PROCESS_LOCAL稍慢。NO_PREF:数据从任何地方访问速度一样快,没有本地性偏好。RACK_LOCAL:数据在同一个服务器机架上。数据位于同一机架上的不同服务器上,因此需要通过网络(通常经过单个交换机)进行发送。ANY:数据位于网络上的其他位置,不在同一个机架中。
Spark 倾向于在最佳本地性级别调度所有任务,但这并不总是可行。在任何空闲执行器上都没有未处理数据的情况下,Spark 会切换到更低的本地性级别。有两个选择:a) 等待忙碌的 CPU 空闲以启动该服务器上的数据任务,或 b) 立即在更远的地方启动新任务,这需要将数据移动到那里。
Spark 通常采取的做法是稍微等待,希望忙碌的 CPU 能够空闲下来。一旦等待超时,它就会开始将数据从远处移动到空闲的 CPU。每个级别之间回退的等待超时可以在一个参数中统一配置,也可以分别配置;详情请参阅配置页面上的 spark.locality 参数。如果你的任务很长且本地性较差,你应该增加这些设置,但默认值通常表现良好。
总结
这是一个简短的指南,旨在指出调优 Spark 应用时应注意的主要问题——最重要的是数据序列化和内存调优。对于大多数程序,切换到 Kryo 序列化并以序列化形式持久化数据将解决最常见的性能问题。欢迎在 Spark 邮件列表上询问其他调优的最佳实践。