调优 Spark
由于大多数 Spark 计算的内存特性,Spark 程序可能会受到集群中任何资源的限制:CPU、网络带宽或内存。通常,如果数据适合内存,瓶颈是网络带宽,但有时,您还需要进行一些调优,例如以序列化形式存储 RDD,以减少内存使用量。本指南将涵盖两个主要主题:数据序列化,这对于良好的网络性能至关重要,也可以减少内存使用量;以及内存调优。我们还简要介绍了几个较小的论题。
数据序列化
序列化在任何分布式应用程序的性能中都起着重要作用。将对象序列化为对象的格式速度很慢,或者消耗大量字节,这将大大降低计算速度。通常,这将是您应该调整以优化 Spark 应用程序的第一件事。 Spark 旨在在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两个序列化库
- Java 序列化:默认情况下,Spark 使用 Java 的
ObjectOutputStream
框架序列化对象,并且可以使用您创建的任何实现java.io.Serializable
的类。您还可以通过扩展java.io.Externalizable
来更密切地控制序列化的性能。Java 序列化很灵活,但通常很慢,并且对于许多类来说,会导致大型的序列化格式。 - Kryo 序列化:Spark 还可以使用 Kryo 库(版本 4)更快地序列化对象。Kryo 比 Java 序列化快得多,也更紧凑(通常高达 10 倍),但不支持所有
Serializable
类型,并且要求您提前注册程序中将要使用的类,以获得最佳性能。
您可以通过使用 SparkConf 初始化作业并调用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
来切换到使用 Kryo。此设置配置了用于不仅在 worker 节点之间 shuffle 数据,而且在将 RDD 序列化到磁盘时使用的序列化器。Kryo 不是默认设置的唯一原因是自定义注册要求,但我们建议在任何网络密集型应用程序中尝试使用它。从 Spark 2.0.0 开始,我们在使用简单类型、简单类型数组或字符串类型 shuffle RDD 时,内部使用 Kryo 序列化器。
Spark 自动包含 Kryo 序列化器,用于 Twitter chill 库中 AllScalaRegistrar 涵盖的许多常用的核心 Scala 类。
要使用 Kryo 注册您自己的自定义类,请使用 registerKryoClasses
方法。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Kryo 文档 描述了更高级的注册选项,例如添加自定义序列化代码。
如果您的对象很大,您可能还需要增加 spark.kryoserializer.buffer
config。此值需要足够大以容纳您将序列化的最大对象。
最后,如果您不注册自定义类,Kryo 仍然可以工作,但它必须为每个对象存储完整的类名,这是浪费的。
内存调优
调优内存使用量时有三个注意事项:对象使用的内存量(您可能希望整个数据集都适合内存)、访问这些对象的成本以及 垃圾回收的开销(如果您在对象方面有很高的周转率)。
默认情况下,Java 对象可以快速访问,但与字段中“原始”数据相比,很容易消耗 2-5 倍以上的空间。这是由于以下几个原因
- 每个不同的 Java 对象都有一个“对象头”,大约 16 个字节,其中包含诸如指向其类的指针之类的信息。对于数据很少的对象(例如一个
Int
字段),这可能比数据更大。 - Java
String
在原始字符串数据之上大约有 40 个字节的开销(因为它们将其存储在Char
数组中并保留额外的长度等数据),并且由于String
内部使用 UTF-16 编码,因此每个字符都存储为两个字节。因此,一个 10 个字符的字符串很容易消耗 60 个字节。 - 常见的集合类,例如
HashMap
和LinkedList
,使用链接的数据结构,其中每个条目都有一个“包装器”对象(例如Map.Entry
)。此对象不仅具有标头,还具有指向列表中下一个对象的指针(通常每个 8 个字节)。 - 原始类型的集合通常将它们存储为“boxed”对象,例如
java.lang.Integer
。
本节将首先概述 Spark 中的内存管理,然后讨论用户可以采取的具体策略,以更有效地利用其应用程序中的内存。特别是,我们将描述如何确定对象的内存使用情况,以及如何通过更改数据结构或以序列化格式存储数据来改进它。然后,我们将介绍调优 Spark 的缓存大小和 Java 垃圾回收器。
内存管理概述
Spark 中的内存使用量主要分为两类:执行和存储。执行内存是指用于 shuffle、join、sort 和聚合计算的内存,而存储内存是指用于缓存并在集群中传播内部数据的内存。在 Spark 中,执行和存储共享一个统一区域 (M)。当不使用执行内存时,存储可以获取所有可用内存,反之亦然。如有必要,执行可能会驱逐存储,但前提是总存储内存使用量低于某个阈值 (R)。换句话说,R
描述了 M
中的一个子区域,在该子区域中,缓存的块永远不会被驱逐。由于实现的复杂性,存储可能不会驱逐执行。
这种设计确保了几个理想的属性。首先,不使用缓存的应用程序可以使用整个空间进行执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小的存储空间 (R),其数据块不受驱逐的影响。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户具备如何内部划分内存的专业知识。
虽然有两个相关的配置,但典型的用户无需调整它们,因为默认值适用于大多数工作负载
spark.memory.fraction
将M
的大小表示为(JVM 堆空间 - 300MiB)的一部分(默认为 0.6)。剩余空间 (40%) 保留用于用户数据结构、Spark 中的内部元数据以及在稀疏和异常大的记录的情况下防止 OOM 错误。spark.memory.storageFraction
将R
的大小表示为M
的一部分(默认为 0.5)。R
是M
中的存储空间,其中缓存的块不受执行驱逐的影响。
应设置 spark.memory.fraction
的值,以便将此堆空间量舒适地容纳在 JVM 的旧或“tenured”生成中。有关详细信息,请参见下面关于高级 GC 调优的讨论。
确定内存消耗
确定数据集将需要的内存消耗量的最佳方法是创建一个 RDD,将其放入缓存,然后查看 Web UI 中的“存储”页面。该页面将告诉您 RDD 占用的内存量。
要估计特定对象的内存消耗量,请使用 SizeEstimator
的 estimate
方法。这对于试验不同的数据布局以减少内存使用量,以及确定广播变量在每个执行程序堆上将占用的空间量非常有用。
调整数据结构
减少内存消耗的第一种方法是避免添加开销的 Java 功能,例如基于指针的数据结构和包装器对象。有几种方法可以做到这一点
- 设计数据结构时,首选对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如
HashMap
)。fastutil 库为与 Java 标准库兼容的原始类型提供了方便的集合类。 - 尽可能避免包含大量小对象和指针的嵌套结构。
- 考虑使用数字 ID 或枚举对象代替字符串作为键。
- 如果你的内存小于 32 GiB,请设置 JVM 标志
-XX:+UseCompressedOops
,使指针占用四个字节而不是八个字节。你可以在spark-env.sh
中添加这些选项。
序列化 RDD 存储
如果你的对象仍然太大,无法有效地存储,尽管已经进行了此调整,一个更简单的方法是使用序列化形式存储它们,使用 RDD 持久化 API 中的序列化 StorageLevels,例如 MEMORY_ONLY_SER
。Spark 将把每个 RDD 分区存储为一个大的字节数组。以序列化形式存储数据的唯一缺点是访问速度较慢,因为必须动态地反序列化每个对象。如果你想以序列化形式缓存数据,我们强烈建议使用 Kryo,因为它比 Java 序列化(当然比原始 Java 对象)占用更小的空间。
垃圾回收调优
当你的程序存储的 RDD 发生大量“震荡”时,JVM 垃圾回收可能会成为一个问题。(在只读取一次 RDD 然后对其执行许多操作的程序中,通常不是问题。)当 Java 需要驱逐旧对象以为新对象腾出空间时,它需要跟踪所有 Java 对象并找到未使用的对象。这里要记住的主要一点是,垃圾回收的成本与 Java 对象的数量成正比,因此使用对象较少的数据结构(例如,使用 Int
数组而不是 LinkedList
)可以大大降低此成本。一个更好的方法是以序列化形式持久化对象,如上所述:现在每个 RDD 分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 成为问题,首先要尝试的是使用序列化缓存。
GC 也可能成为一个问题,因为你的任务的工作内存(运行任务所需的空间)与节点上缓存的 RDD 之间存在干扰。我们将讨论如何控制分配给 RDD 缓存的空间以缓解此问题。
衡量 GC 的影响
GC 调优的第一步是收集有关垃圾回收发生频率和花费在 GC 上的时间的统计信息。这可以通过将 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
添加到 Java 选项来实现。(有关将 Java 选项传递给 Spark 作业的信息,请参阅配置指南。)下次运行 Spark 作业时,你将在 worker 的日志中看到每次发生垃圾回收时打印的消息。请注意,这些日志将位于你的集群的 worker 节点上(位于其工作目录中的 stdout
文件中),而不是你的驱动程序程序上。
高级 GC 调优
要进一步调整垃圾回收,我们首先需要了解 JVM 中内存管理的一些基本信息
-
Java 堆空间分为两个区域:年轻代和老年代。年轻代用于保存生命周期短的对象,而老年代用于保存生命周期长的对象。
-
年轻代进一步分为三个区域 [Eden, Survivor1, Survivor2]。
-
垃圾回收过程的简化描述:当 Eden 满时,会在 Eden 上运行 minor GC,并将来自 Eden 和 Survivor1 的存活对象复制到 Survivor2。Survivor 区域被交换。如果一个对象足够老或者 Survivor2 满了,它就会被移动到老年代。最后,当老年代接近满时,会调用 full GC。
Spark 中 GC 调优的目标是确保只有长期存活的 RDD 存储在老年代中,并且年轻代的大小足以存储短期存活的对象。这将有助于避免 full GC 收集任务执行期间创建的临时对象。一些可能有用的步骤是
-
通过收集 GC 统计信息,检查是否存在过多的垃圾回收。如果在任务完成之前多次调用 full GC,则意味着没有足够的内存可用于执行任务。
-
如果存在太多的 minor GC 但没有太多的 major GC,则为 Eden 分配更多的内存会有所帮助。你可以将 Eden 的大小设置为对每个任务所需的内存量的过度估计。如果 Eden 的大小确定为
E
,那么你可以使用选项-Xmn=4/3*E
设置年轻代的大小。(放大 4/3 是为了考虑幸存者区域使用的空间。) -
在打印的 GC 统计信息中,如果 OldGen 接近满,则通过降低
spark.memory.fraction
来减少用于缓存的内存量;缓存较少的对象比减慢任务执行速度更好。或者,考虑减小年轻代的大小。这意味着如果你已如上设置了-Xmn
,则降低它。如果不是,请尝试更改 JVM 的NewRatio
参数的值。许多 JVM 默认将其设置为 2,这意味着老年代占据堆的 2/3。它应该足够大,以至于该分数超过spark.memory.fraction
。 -
尝试使用
-XX:+UseG1GC
使用 G1GC 垃圾回收器。在垃圾回收成为瓶颈的某些情况下,它可以提高性能。请注意,对于较大的 executor 堆大小,增加 G1 区域大小(使用-XX:G1HeapRegionSize
)可能很重要。 -
例如,如果你的任务正在从 HDFS 读取数据,则可以使用从 HDFS 读取的数据块的大小来估计任务使用的内存量。请注意,解压缩块的大小通常是块大小的 2 或 3 倍。因此,如果我们希望有 3 或 4 个任务的工作空间,并且 HDFS 块大小为 128 MiB,我们可以估计 Eden 的大小为
4*3*128MiB
。 -
监控垃圾回收的频率和花费的时间如何随着新设置而变化。
我们的经验表明,GC 调优的效果取决于你的应用程序和可用的内存量。网上描述了更多调优选项,但从更高的层面来看,管理 full GC 的发生频率有助于减少开销。
可以通过在作业的配置中设置 spark.executor.defaultJavaOptions
或 spark.executor.extraJavaOptions
来指定 executor 的 GC 调优标志。
其他考虑因素
并行度
除非你为每个操作设置足够高的并行度,否则集群将无法得到充分利用。Spark 根据每个文件的大小自动设置要在每个文件上运行的“map”任务的数量(尽管你可以通过 SparkContext.textFile
等的可选参数来控制它),并且对于分布式“reduce”操作,例如 groupByKey
和 reduceByKey
,它使用最大的父 RDD 的分区数。你可以将并行度作为第二个参数传递(请参阅 spark.PairRDDFunctions
文档),或设置配置属性 spark.default.parallelism
以更改默认值。通常,我们建议每个 CPU 核心在集群中运行 2-3 个任务。
输入路径上的并行列表
有时,当作业输入包含大量目录时,你可能还需要增加目录列表并行度,否则该过程可能需要很长时间,尤其是在针对 S3 等对象存储时。如果你的作业使用 Hadoop 输入格式的 RDD(例如,通过 SparkContext.sequenceFile
),则并行度由 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads
控制(目前默认为 1)。
对于具有基于文件的数据源的 Spark SQL,你可以调整 spark.sql.sources.parallelPartitionDiscovery.threshold
和 spark.sql.sources.parallelPartitionDiscovery.parallelism
以提高列表并行度。有关更多详细信息,请参阅Spark SQL 性能调优指南。
Reduce 任务的内存使用量
有时,你会收到 OutOfMemoryError,不是因为你的 RDD 不适合内存,而是因为你的某个任务(例如 groupByKey
中的一个 reduce 任务)的工作集太大。Spark 的 shuffle 操作(sortByKey
、groupByKey
、reduceByKey
、join
等)在每个任务中构建一个哈希表来执行分组,这通常可能很大。这里最简单的解决方法是增加并行度,以便每个任务的输入集更小。Spark 可以有效地支持短至 200 毫秒的任务,因为它在许多任务中重用一个 executor JVM,并且启动任务的成本很低,因此你可以安全地将并行度提高到高于集群中核心数量的水平。
广播大型变量
使用 SparkContext
中提供的 广播功能可以大大减少每个序列化任务的大小,以及在集群上启动作业的成本。如果你的任务使用驱动程序程序中的任何大型对象(例如,静态查找表),请考虑将其转换为广播变量。Spark 在 master 上打印每个任务的序列化大小,因此你可以查看它来确定你的任务是否太大;一般来说,大于约 20 KiB 的任务可能值得优化。
数据本地性
数据本地性可以对 Spark 作业的性能产生重大影响。如果数据和对其进行操作的代码在一起,那么计算往往会很快。但是,如果代码和数据分离,则必须将一个移动到另一个。通常,传输序列化代码比传输大量数据块要快,因为代码大小比数据小得多。Spark 基于数据本地性的这个通用原则构建其调度。
数据本地性是指数据与处理它的代码之间的距离。根据数据的当前位置,有几个级别的本地性。按从最近到最远排序:
PROCESS_LOCAL
数据与运行代码在同一个 JVM 中。这是最佳的本地性。NODE_LOCAL
数据在同一个节点上。例如,可能在同一节点上的 HDFS 中,或在同一节点上的另一个执行器中。这比PROCESS_LOCAL
慢一点,因为数据需要在进程之间传输。NO_PREF
从任何地方访问数据的速度都一样快,并且没有本地性偏好。RACK_LOCAL
数据在同一个服务器机架上。数据在同一个机架上的不同服务器上,因此需要通过网络发送,通常是通过单个交换机。ANY
数据在网络上的其他位置,并且不在同一个机架中。
Spark 倾向于以最佳本地性级别调度所有任务,但这并非总是可行。在任何空闲执行器上都没有未处理数据的情况下,Spark 会切换到较低的本地性级别。有两种选择:a) 等待繁忙的 CPU 空闲,以便在同一服务器上的数据上启动任务;或者 b) 立即在更远的地方启动新任务,这需要将数据移动到那里。
Spark 通常的做法是等待一段时间,希望繁忙的 CPU 可以空闲出来。一旦该超时时间到期,它就会开始将数据从远处移动到空闲的 CPU。每个级别之间回退的等待超时时间可以单独配置,也可以在一个参数中一起配置;有关详细信息,请参阅 配置页面上的 spark.locality
参数。如果您的任务时间很长并且本地性较差,则应增加这些设置,但默认值通常效果很好。
总结
这是一个简短的指南,旨在指出您在调整 Spark 应用程序时应了解的主要问题 - 最重要的是,数据序列化和内存调整。对于大多数程序,切换到 Kryo 序列化并将数据以序列化形式持久化将解决大多数常见的性能问题。随时在 Spark 邮件列表上询问其他调整最佳实践。