Spark 性能调优
由于大多数 Spark 计算都是基于内存的,Spark 程序可能受到集群中任何资源的瓶颈:CPU、网络带宽或内存。通常,如果数据能完全放入内存,瓶颈会是网络带宽,但有时您也需要进行一些调优,例如以序列化形式存储 RDD,以减少内存使用。本指南将涵盖两个主要主题:数据序列化(对于良好的网络性能至关重要,并且可以减少内存使用)和内存调优。我们还将简要介绍几个较小的主题。
数据序列化
序列化在任何分布式应用程序的性能中都扮演着重要角色。序列化对象缓慢或消耗大量字节的格式会极大地拖慢计算速度。通常,这会是您优化 Spark 应用程序时首先应该调整的地方。Spark 旨在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两种序列化库:
- Java 序列化:默认情况下,Spark 使用 Java 的
ObjectOutputStream
框架序列化对象,并且可以处理您创建的任何实现java.io.Serializable
接口的类。您还可以通过扩展java.io.Externalizable
来更精细地控制序列化性能。Java 序列化虽然灵活,但通常相当慢,并且导致许多类的序列化格式很大。 - Kryo 序列化:Spark 也可以使用 Kryo 库(版本 4)更快地序列化对象。Kryo 比 Java 序列化显著更快且更紧凑(通常快达 10 倍),但它不支持所有
Serializable
类型,并且为了获得最佳性能,要求您提前注册程序中将使用的类。
您可以通过使用 SparkConf 初始化作业并调用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
来切换使用 Kryo。此设置不仅配置了工作节点之间数据混洗(shuffling)所使用的序列化器,也配置了将 RDD 序列化到磁盘时所使用的序列化器。Kryo 不是默认序列化器的唯一原因是其自定义注册要求,但我们建议在任何网络密集型应用程序中尝试使用它。自 Spark 2.0.0 起,当混洗(shuffling)简单类型、简单类型数组或字符串类型的 RDD 时,我们内部使用 Kryo 序列化器。
Spark 自动包含 Kryo 序列化器,用于 Twitter chill 库中 AllScalaRegistrar 所涵盖的许多常用核心 Scala 类。
要使用 Kryo 注册您自己的自定义类,请使用 registerKryoClasses
方法。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Kryo 文档描述了更高级的注册选项,例如添加自定义序列化代码。
如果您的对象很大,您可能还需要增加 spark.kryoserializer.buffer
配置。此值需要足够大,以容纳您将序列化的最大对象。
最后,如果您不注册自定义类,Kryo 仍然会工作,但它将不得不为每个对象存储完整的类名,这是浪费的。
内存调优
在调优内存使用方面有三个考量:对象使用的内存量(您可能希望整个数据集都适合内存)、访问这些对象的开销,以及垃圾回收的开销(如果您有高对象周转率)。
默认情况下,Java 对象的访问速度很快,但它们很容易比其字段中的“原始”数据多消耗 2-5 倍的空间。这有几个原因:
- 每个独立的 Java 对象都有一个“对象头”,大约 16 字节,包含指向其类的信息等。对于数据量很少的对象(例如一个
Int
字段),这可能比数据本身更大。 - Java
String
s 比原始字符串数据多约 40 字节的开销(因为它们将其存储在Char
数组中并保留长度等额外数据),并且由于String
内部使用 UTF-16 编码,每个字符存储为两个字节。因此,一个 10 字符的字符串可以轻易消耗 60 字节。 - 常见的集合类,例如
HashMap
和LinkedList
,使用链式数据结构,其中每个条目都有一个“包装器”对象(例如Map.Entry
)。这个对象不仅有头,还有指向列表中下一个对象的指针(通常每个 8 字节)。 - 基本类型的集合通常将它们存储为“装箱”对象,例如
java.lang.Integer
。
本节将首先概述 Spark 中的内存管理,然后讨论用户可以采取哪些特定策略来更有效地利用其应用程序中的内存。特别是,我们将描述如何确定对象的内存使用情况,以及如何改进它——无论是通过更改数据结构,还是通过以序列化格式存储数据。然后我们将介绍 Spark 缓存大小和 Java 垃圾回收器的调优。
内存管理概述
Spark 中的内存使用主要分为两类:执行内存和存储内存。执行内存指用于混洗(shuffles)、连接(joins)、排序(sorts)和聚合(aggregations)计算的内存,而存储内存指用于缓存和在集群中传播内部数据的内存。在 Spark 中,执行内存和存储内存共享一个统一区域 (M)。当没有执行内存被使用时,存储内存可以获取所有可用内存,反之亦然。执行内存必要时可以逐出存储内存,但仅限于总存储内存使用量低于某个阈值 (R) 时。换句话说,R
描述了 M
内的一个子区域,其中缓存块永远不会被逐出。由于实现的复杂性,存储内存不能逐出执行内存。
这种设计确保了几个理想的特性。首先,不使用缓存的应用程序可以将整个空间用于执行,从而避免了不必要的磁盘溢出。其次,使用缓存的应用程序可以保留一个最小存储空间 (R),其数据块在该空间中不会被逐出。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户了解内存内部是如何划分的专业知识。
尽管有两个相关的配置,但典型用户无需调整它们,因为默认值适用于大多数工作负载:
spark.memory.fraction
表示M
的大小,它是 (JVM 堆空间 - 300MiB) 的一个分数(默认为 0.6)。剩余空间(40%)用于用户数据结构、Spark 内部元数据,以及在稀疏和异常大记录情况下防止 OOM(内存溢出)错误。spark.memory.storageFraction
表示R
的大小,它是M
的一个分数(默认为 0.5)。R
是M
内的存储空间,其中缓存块不会被执行内存逐出。
spark.memory.fraction
的值应该被设置,以使这部分堆空间舒适地适应 JVM 的老年代或“永久代”。有关详细信息,请参阅下面的高级 GC 调优讨论。
确定内存消耗
衡量数据集所需内存消耗量的最佳方法是创建一个 RDD,将其放入缓存,然后查看 Web UI 中的“存储(Storage)”页面。该页面会告诉您 RDD 占用了多少内存。
要估计特定对象的内存消耗,请使用 SizeEstimator
的 estimate
方法。这对于尝试不同的数据布局以削减内存使用量,以及确定广播变量在每个执行器(executor)堆上将占用多少空间非常有用。
调优数据结构
减少内存消耗的第一种方法是避免增加开销的 Java 特性,例如基于指针的数据结构和包装器对象。有几种方法可以实现这一点:
- 设计您的数据结构时,优先使用对象数组和基本类型,而不是标准的 Java 或 Scala 集合类(例如
HashMap
)。fastutil 库为基本类型提供了与 Java 标准库兼容的便捷集合类。 - 尽可能避免使用包含大量小对象和指针的嵌套结构。
- 考虑使用数字 ID 或枚举对象作为键,而不是字符串。
- 如果您的内存少于 32 GiB,请设置 JVM 标志
-XX:+UseCompressedOops
,使指针为四字节而不是八字节。您可以将这些选项添加到spark-env.sh
中。
序列化 RDD 存储
如果尽管进行了这些调优,您的对象仍然太大而无法高效存储,那么一个更简单的方法是使用 RDD 持久化 API 中的序列化存储级别(例如 MEMORY_ONLY_SER
)以序列化形式存储它们。Spark 将把每个 RDD 分区存储为一个大的字节数组。以序列化形式存储数据的唯一缺点是访问时间会变慢,因为需要即时反序列化每个对象。如果您想以序列化形式缓存数据,我们强烈建议使用 Kryo,因为它比 Java 序列化(当然也比原始 Java 对象)占用更小的空间。
垃圾回收调优
当您的程序存储的 RDD 发生大量“数据流失”(churn)时,JVM 垃圾回收可能成为一个问题。(对于那些只读取一次 RDD 然后在其上执行许多操作的程序来说,这通常不是问题。)当 Java 需要逐出旧对象以腾出空间给新对象时,它将需要遍历所有 Java 对象并找到未使用的对象。这里要记住的关键点是:垃圾回收的成本与 Java 对象的数量成正比,因此使用对象较少的数据结构(例如 Int
数组而不是 LinkedList
)会大大降低此成本。一个更好的方法是以序列化形式持久化对象,如上所述:现在每个 RDD 分区将只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 是一个问题,首先要尝试的是使用序列化缓存。
GC 也可能因为任务的工作内存(运行任务所需的空间量)和节点上缓存的 RDD 之间的干扰而成为问题。我们将讨论如何控制分配给 RDD 缓存的空间以缓解此问题。
衡量 GC 的影响
GC 调优的第一步是收集垃圾回收发生的频率和 GC 所花费时间的统计信息。这可以通过向 Java 选项添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
来完成。(有关如何将 Java 选项传递给 Spark 作业的信息,请参阅配置指南。)下次运行 Spark 作业时,每当发生垃圾回收时,您都会在工作节点的日志中看到打印的消息。请注意,这些日志将在您集群的工作节点上(在其工作目录的 stdout
文件中),而不是在您的驱动程序中。
高级 GC 调优
为了进一步调优垃圾回收,我们首先需要了解一些关于 JVM 内存管理的基本信息:
-
Java 堆空间分为两个区域:新生代(Young)和老年代(Old)。新生代旨在存放生命周期短的对象,而老年代则用于存放生命周期较长的对象。
-
新生代进一步分为三个区域:[Eden, Survivor1, Survivor2]。
-
垃圾回收过程的简化描述:当 Eden 区满时,会在 Eden 区上运行一次 Minor GC,并将 Eden 和 Survivor1 中存活的对象复制到 Survivor2。Survivor 区域会进行交换。如果一个对象足够老或者 Survivor2 区已满,它就会被移到老年代。最后,当老年代接近满时,会触发一次 Full GC。
Spark 中 GC 调优的目标是确保只有生命周期长的 RDD 存储在老年代中,并且新生代的大小足以存储生命周期短的对象。这将有助于避免 Full GC 回收任务执行期间创建的临时对象。一些可能有用的步骤是:
-
通过收集 GC 统计信息来检查是否存在过多的垃圾回收。如果在任务完成之前多次调用 Full GC,则意味着没有足够的内存用于执行任务。
-
如果 Minor GC 过多但 Major GC 不多,为 Eden 区分配更多内存会有帮助。您可以将 Eden 区的大小设置为每个任务所需内存的过高估计值。如果 Eden 区的大小被确定为
E
,那么您可以使用选项-Xmn=4/3*E
来设置新生代的大小。(乘以 4/3 是为了同时考虑幸存者区(survivor regions)占用的空间。) -
在打印的 GC 统计信息中,如果老年代(OldGen)接近满载,通过降低
spark.memory.fraction
来减少用于缓存的内存量;缓存更少的对象优于减慢任务执行速度。或者,考虑减小新生代的大小。这意味着如果您已按上述方式设置了-Xmn
,则应降低其值。如果不是,尝试更改 JVM 的NewRatio
参数值。许多 JVM 默认将其设置为 2,这意味着老年代占用堆的 2/3。它应该足够大,使得该分数超过spark.memory.fraction
。 -
自 4.0.0 版本以来,Spark 默认使用 JDK 17,这也使得 G1GC 垃圾回收器成为默认值。请注意,对于大型执行器(executor)堆,使用
-XX:G1HeapRegionSize
增加 G1 区域大小可能很重要。 -
举例来说,如果您的任务正在从 HDFS 读取数据,任务使用的内存量可以通过从 HDFS 读取的数据块大小来估计。请注意,一个解压后的块的大小通常是原始块的 2 或 3 倍。因此,如果我们希望有 3 或 4 个任务的工作空间,并且 HDFS 块大小为 128 MiB,我们可以估计 Eden 区的大小为
4*3*128MiB
。 -
监控垃圾回收的频率和耗时在新设置下如何变化。
我们的经验表明,GC 调优的效果取决于您的应用程序和可用内存量。网上描述了更多调优选项,但从高层次来看,管理 Full GC 发生的频率有助于降低开销。
执行器(executor)的 GC 调优标志可以通过在作业配置中设置 spark.executor.defaultJavaOptions
或 spark.executor.extraJavaOptions
来指定。
其他考量
并行度
除非您将每个操作的并行度设置得足够高,否则集群将无法充分利用。Spark 会根据每个文件的大小自动设置要运行的“map”任务数量(尽管您可以通过 SparkContext.textFile
等的可选参数来控制),对于分布式“reduce”操作,例如 groupByKey
和 reduceByKey
,它会使用最大父 RDD 的分区数量。您可以将并行度作为第二个参数传递(请参阅 spark.PairRDDFunctions
文档),或者设置配置属性 spark.default.parallelism
来更改默认值。总的来说,我们建议您的集群中每个 CPU 核心分配 2-3 个任务。
输入路径的并行列表
有时,当作业输入包含大量目录时,您可能还需要增加目录列表的并行度,否则该过程可能需要很长时间,尤其是在面对像 S3 这样的对象存储时。如果您的作业使用 Hadoop 输入格式处理 RDD(例如,通过 SparkContext.sequenceFile
),则并行度通过 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads
控制(当前默认值为 1)。
对于使用基于文件数据源的 Spark SQL,您可以调优 spark.sql.sources.parallelPartitionDiscovery.threshold
和 spark.sql.sources.parallelPartitionDiscovery.parallelism
以提高列表并行度。更多详细信息请参阅Spark SQL 性能调优指南。
Reduce 任务的内存使用
有时,您会遇到 OutOfMemoryError,不是因为您的 RDD 不适合内存,而是因为您的某个任务(例如 groupByKey
中的某个 reduce 任务)的工作集太大。Spark 的混洗操作(sortByKey
、groupByKey
、reduceByKey
、join
等)会在每个任务中构建一个哈希表来执行分组,这个哈希表通常可能很大。这里最简单的修复方法是提高并行度,以便每个任务的输入集更小。Spark 可以高效支持短至 200 毫秒的任务,因为它在多个任务之间重用一个执行器(executor)JVM 并且任务启动成本低,因此您可以安全地将并行度提高到超过集群中核心的数量。
广播大型变量
使用 SparkContext
中提供的广播功能可以大大减少每个序列化任务的大小,以及在集群上启动作业的成本。如果您的任务在其内部使用了来自驱动程序中的任何大型对象(例如静态查找表),请考虑将其转换为广播变量。Spark 会在主节点(master)上打印每个任务的序列化大小,因此您可以查看该信息来决定您的任务是否过大;通常,大于约 20 KiB 的任务可能值得优化。
数据本地性
数据本地性对 Spark 作业的性能有重大影响。如果数据和操作数据的代码在一起,那么计算往往会很快。但如果代码和数据分离,其中一个必须移动到另一个的位置。通常,将序列化代码从一个地方传输到另一个地方比传输一大块数据更快,因为代码大小远小于数据。Spark 的调度是围绕着数据本地性这个一般原则构建的。
数据本地性是指数据与处理其的代码之间的接近程度。根据数据当前位置,本地性有几个级别。从近到远依次为:
PROCESS_LOCAL
数据与运行中的代码在同一个 JVM 中。这是可能的最佳本地性。NODE_LOCAL
数据在同一个节点上。例如,数据可能在同一节点上的 HDFS 中,或者在同一节点上的另一个执行器(executor)中。这比PROCESS_LOCAL
稍慢,因为数据必须在进程之间传输。NO_PREF
数据从任何地方访问速度都一样快,没有本地性偏好。RACK_LOCAL
数据位于同一服务器机架上。数据在同一机架上的不同服务器上,因此需要通过网络发送,通常通过单个交换机。ANY
数据在网络上的其他地方,不在同一个机架上。
Spark 倾向于在最佳本地性级别调度所有任务,但这并非总是可行。在没有任何空闲执行器(executor)上没有未处理数据的情况下,Spark 会切换到较低的本地性级别。有两种选择:a) 等待繁忙的 CPU 释放以在同一服务器上的数据上启动任务,或者 b) 立即在较远的地方启动一个新任务,这需要将数据移动到那里。
Spark 通常会等待一段时间,希望能有繁忙的 CPU 释放出来。一旦超时,它就开始将数据从较远的地方移动到空闲的 CPU 上。每个级别之间的回退等待超时可以单独配置或在一个参数中一起配置;详细信息请参阅配置页面上的 spark.locality
参数。如果您的任务耗时较长且本地性较差,您应该增加这些设置,但默认值通常效果很好。
总结
这篇简短的指南旨在指出您在调优 Spark 应用程序时应了解的主要关注点——最重要的是数据序列化和内存调优。对于大多数程序,切换到 Kryo 序列化并以序列化形式持久化数据将解决大多数常见的性能问题。关于其他调优最佳实践,请随时在Spark 邮件列表上提问。