调优 Spark
由于大多数 Spark 计算的内存特性,Spark 程序可能会受到集群中任何资源的瓶颈:CPU、网络带宽或内存。通常,如果数据适合内存,瓶颈是网络带宽,但有时,您还需要进行一些调优,例如 以序列化形式存储 RDD 以减少内存使用。本指南将涵盖两个主要主题:数据序列化,这对于良好的网络性能至关重要,也可以减少内存使用;以及内存调优。我们还将概述几个较小的主题。
数据序列化
序列化在任何分布式应用程序的性能中都起着重要作用。序列化对象速度慢或消耗大量字节的格式会极大地减慢计算速度。通常,这将是您优化 Spark 应用程序时应该调优的第一件事。Spark 旨在在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两个序列化库
- Java 序列化:默认情况下,Spark 使用 Java 的
ObjectOutputStream
框架序列化对象,并且可以与您创建的实现java.io.Serializable
的任何类一起使用。您还可以通过扩展java.io.Externalizable
来更密切地控制序列化性能。Java 序列化很灵活,但通常速度很慢,并且会导致许多类的序列化格式很大。 - Kryo 序列化:Spark 还可以使用 Kryo 库(版本 4)来更快地序列化对象。Kryo 比 Java 序列化快得多,也更紧凑(通常快 10 倍),但不支持所有
Serializable
类型,并且需要您注册程序中将使用的类以获得最佳性能。
您可以通过使用 SparkConf 初始化作业并调用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
来切换到使用 Kryo。此设置不仅配置了用于在工作节点之间混洗数据的序列化器,而且还配置了将 RDD 序列化到磁盘时的序列化器。Kryo 不是默认值的原因仅仅是因为自定义注册要求,但我们建议在任何网络密集型应用程序中尝试它。从 Spark 2.0.0 开始,我们在使用简单类型、简单类型数组或字符串类型的 RDD 混洗时,内部使用 Kryo 序列化器。
Spark 自动包含 Kryo 序列化器,用于来自 Twitter chill 库的 AllScalaRegistrar 中涵盖的许多常用的核心 Scala 类。
要使用 Kryo 注册您自己的自定义类,请使用 registerKryoClasses
方法。
Kryo 文档 描述了更高级的注册选项,例如添加自定义序列化代码。
如果您的对象很大,您可能还需要增加 spark.kryoserializer.buffer
配置。此值需要足够大以容纳您将要序列化的最大对象。
最后,如果您没有注册自定义类,Kryo 仍然可以工作,但它必须将完整的类名与每个对象一起存储,这会浪费空间。
内存调优
在调优内存使用方面,有三个考虑因素:对象使用的内存量(您可能希望整个数据集都适合内存)、访问这些对象的成本以及垃圾回收的开销(如果您在对象方面有很高的周转率)。
默认情况下,Java 对象访问速度很快,但很容易消耗比其字段中的“原始”数据多 2-5 倍的空间。这是由于以下几个原因
- 每个不同的 Java 对象都有一个“对象头”,大约 16 字节,其中包含指向其类的指针等信息。对于数据很少的对象(例如一个
Int
字段),这可能比数据更大。 - Java
String
在原始字符串数据上大约有 40 字节的开销(因为它们将数据存储在Char
的数组中并保留额外的信息,例如长度),并将每个字符存储为两个字节,因为String
内部使用 UTF-16 编码。因此,一个 10 个字符的字符串很容易消耗 60 字节。 - 常见的集合类,例如
HashMap
和LinkedList
,使用链接数据结构,其中每个条目都有一个“包装器”对象(例如Map.Entry
)。此对象不仅有头,还有指向列表中下一个对象的指针(通常每个指针 8 字节)。 - 原始类型的集合通常将它们存储为“装箱”对象,例如
java.lang.Integer
。
本节将首先概述 Spark 中的内存管理,然后讨论用户可以采取的具体策略,以更有效地利用其应用程序中的内存。特别是,我们将描述如何确定对象的内存使用情况,以及如何改进它——要么通过更改数据结构,要么通过以序列化形式存储数据。然后,我们将介绍调优 Spark 的缓存大小和 Java 垃圾收集器。
内存管理概述
Spark 中的内存使用主要分为两类:执行和存储。执行内存是指用于混洗、连接、排序和聚合中的计算的内存,而存储内存是指用于缓存和在集群中传播内部数据的内存。在 Spark 中,执行和存储共享一个统一区域 (M)。当没有使用执行内存时,存储可以获取所有可用内存,反之亦然。如果需要,执行可能会驱逐存储,但仅限于总存储内存使用量低于某个阈值 (R) 时。换句话说,R
描述了 M
中的一个子区域,其中缓存的块永远不会被驱逐。由于实现上的复杂性,存储可能无法驱逐执行。
这种设计确保了几个理想的特性。首先,不使用缓存的应用程序可以使用整个空间进行执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留一个最小存储空间 (R),其中它们的数据块不会被执行驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户了解内存是如何在内部划分的。
虽然有两个相关的配置,但典型的用户不需要调整它们,因为默认值适用于大多数工作负载
spark.memory.fraction
表示M
的大小,它是 (JVM 堆空间 - 300MiB) 的一部分(默认值为 0.6)。其余空间 (40%) 预留给用户数据结构、Spark 中的内部元数据以及在稀疏和异常大的记录情况下防止 OOM 错误。spark.memory.storageFraction
表示R
的大小,它是M
的一部分(默认值为 0.5)。R
是M
中的存储空间,其中缓存的块不会被执行驱逐。
spark.memory.fraction
的值应该设置为使此数量的堆空间能够舒适地容纳在 JVM 的旧或“永久”代中。有关详细信息,请参阅下面关于高级 GC 调优的讨论。
确定内存消耗
确定数据集将需要多少内存消耗的最佳方法是创建一个 RDD,将其放入缓存,然后查看 Web UI 中的“存储”页面。该页面将告诉您 RDD 占用了多少内存。
要估计特定对象的内存消耗,请使用 SizeEstimator
的 estimate
方法。这对于试验不同的数据布局以修剪内存使用量以及确定广播变量将在每个执行器堆上占用多少空间很有用。
调优数据结构
减少内存消耗的第一种方法是避免添加开销的 Java 特性,例如基于指针的数据结构和包装器对象。有几种方法可以做到这一点
- 设计数据结构以优先考虑对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如
HashMap
)。fastutil 库为与 Java 标准库兼容的原始类型提供了方便的集合类。 - 尽可能避免使用包含大量小对象和指针的嵌套结构。
- 考虑使用数字 ID 或枚举对象来代替字符串作为键。
- 如果您有不到 32 GiB 的 RAM,请设置 JVM 标志
-XX:+UseCompressedOops
,使指针为四个字节而不是八个字节。您可以在spark-env.sh
中添加这些选项。
序列化 RDD 存储
当您的对象即使经过这种调优后仍然太大而无法有效地存储时,减少内存使用量的更简单方法是将它们以序列化形式存储,在 RDD 持久性 API 中使用序列化的 StorageLevels,例如 MEMORY_ONLY_SER
。Spark 然后将每个 RDD 分区存储为一个大型字节数组。以序列化形式存储数据的唯一缺点是访问时间较慢,因为必须动态地反序列化每个对象。我们强烈建议 使用 Kryo,如果您想以序列化形式缓存数据,因为它会导致比 Java 序列化(当然也比原始 Java 对象)小得多的尺寸。
垃圾回收调优
当你的程序存储的 RDD 存在大量“ churn ”(即频繁创建和销毁)时,JVM 垃圾回收可能会成为问题。(对于只读取一次 RDD 并在其上执行大量操作的程序来说,这通常不是问题。)当 Java 需要清除旧对象以腾出空间给新对象时,它需要遍历所有 Java 对象并找到未使用的对象。这里需要记住的一点是,**垃圾回收的成本与 Java 对象的数量成正比**,因此使用对象更少的 数据结构(例如,使用 `Int` 数组而不是 `LinkedList`)可以大大降低这种成本。更好的方法是将对象以序列化形式持久化,如上所述:现在每个 RDD 分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 成为问题,首先尝试使用 序列化缓存。
GC 也可能由于任务的工作内存(运行任务所需的内存空间)与节点上缓存的 RDD 之间的干扰而成为问题。我们将讨论如何控制分配给 RDD 缓存的空间以缓解这种情况。
衡量 GC 的影响
GC 调优的第一步是收集有关垃圾回收频率和 GC 所用时间的统计信息。这可以通过在 Java 选项中添加 `-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps` 来实现。(有关将 Java 选项传递给 Spark 作业的信息,请参阅 配置指南。)下次运行 Spark 作业时,你将在每次垃圾回收发生时看到打印在工作节点日志中的消息。请注意,这些日志将位于集群的工作节点上(在其工作目录中的 `stdout` 文件中),**而不是**你的驱动程序程序上。
高级 GC 调优
为了进一步调整垃圾回收,我们首先需要了解一些关于 JVM 中内存管理的基本信息。
-
Java 堆空间被划分为两个区域:年轻代和老年代。年轻代用于保存短生命周期的对象,而老年代则用于保存长生命周期的对象。
-
年轻代进一步划分为三个区域:[Eden、Survivor1、Survivor2]。
-
垃圾回收过程的简化描述:当 Eden 满了时,会对 Eden 进行一次次要 GC,并将 Eden 和 Survivor1 中存活的对象复制到 Survivor2。Survivor 区域会进行交换。如果一个对象足够老或 Survivor2 满了,它会被移动到老年代。最后,当老年代接近满时,会调用一次完全 GC。
Spark 中 GC 调优的目标是确保只有长生命周期的 RDD 存储在老年代,并且年轻代的大小足以存储短生命周期的对象。这将有助于避免完全 GC 来收集任务执行期间创建的临时对象。一些可能有用步骤如下:
-
通过收集 GC 统计信息来检查是否发生了太多垃圾回收。如果在任务完成之前多次调用完全 GC,则意味着没有足够的内存可用于执行任务。
-
如果发生了太多次次要收集,但没有太多次主要 GC,则为 Eden 分配更多内存将有所帮助。你可以将 Eden 的大小设置为每个任务所需内存量的估计值。如果确定 Eden 的大小为 `E`,则可以使用选项 `-Xmn=4/3*E` 设置年轻代的大小。(乘以 4/3 是为了考虑 Survivor 区域使用的空间。)
-
在打印的 GC 统计信息中,如果 OldGen 接近满,则通过降低 `spark.memory.fraction` 来减少用于缓存的内存量;缓存更少的对象比减慢任务执行速度更好。或者,考虑减小年轻代的大小。这意味着如果已如上设置,则降低 `-Xmn`。如果没有,请尝试更改 JVM 的 `NewRatio` 参数的值。许多 JVM 默认将其设置为 2,这意味着老年代占堆空间的 2/3。它应该足够大,以至于这个比例超过 `spark.memory.fraction`。
-
尝试使用 `-XX:+UseG1GC` 使用 G1GC 垃圾收集器。在垃圾回收成为瓶颈的某些情况下,它可以提高性能。请注意,对于大型执行器堆大小,可能需要使用 `-XX:G1HeapRegionSize` 增加 G1 区域大小。
-
例如,如果你的任务是从 HDFS 读取数据,则可以使用从 HDFS 读取的数据块的大小来估计任务使用的内存量。请注意,解压缩后的块的大小通常是块大小的 2 或 3 倍。因此,如果我们希望有 3 或 4 个任务的工作空间,并且 HDFS 块大小为 128 MiB,我们可以估计 Eden 的大小为 `4*3*128MiB`。
-
监控垃圾回收频率和时间随着新设置的变化而变化。
我们的经验表明,GC 调优的效果取决于你的应用程序和可用的内存量。在线上描述了 更多调优选项,但在高层次上,管理完全 GC 的频率可以帮助减少开销。
可以通过在作业的配置中设置 `spark.executor.defaultJavaOptions` 或 `spark.executor.extraJavaOptions` 来指定执行器的 GC 调优标志。
其他注意事项
并行度
除非你为每个操作设置足够高的并行度,否则集群将无法充分利用。Spark 会根据文件的大小自动设置在每个文件上运行的“map”任务的数量(尽管你可以通过 `SparkContext.textFile` 等的可选参数来控制它),对于分布式“reduce”操作(例如 `groupByKey` 和 `reduceByKey`),它使用最大的父 RDD 的分区数。你可以将并行度作为第二个参数传递(请参阅 spark.PairRDDFunctions
文档),或者设置配置属性 `spark.default.parallelism` 来更改默认值。一般来说,我们建议在集群中每个 CPU 内核使用 2-3 个任务。
输入路径上的并行列表
有时,你可能还需要在作业输入具有大量目录时增加目录列表并行度,否则该过程可能需要很长时间,尤其是在针对 S3 等对象存储时。如果你的作业在具有 Hadoop 输入格式的 RDD 上工作(例如,通过 `SparkContext.sequenceFile`),则并行度通过 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads
控制(当前默认值为 1)。
对于使用基于文件的數據源的 Spark SQL,你可以调整 `spark.sql.sources.parallelPartitionDiscovery.threshold` 和 `spark.sql.sources.parallelPartitionDiscovery.parallelism` 来提高列表并行度。有关更多详细信息,请参阅 Spark SQL 性能调优指南。
Reduce 任务的内存使用
有时,你可能会遇到 OutOfMemoryError,不是因为你的 RDD 不适合内存,而是因为你的某个任务的工作集(例如 `groupByKey` 中的某个 reduce 任务)太大。Spark 的 shuffle 操作(`sortByKey`、`groupByKey`、`reduceByKey`、`join` 等)在每个任务中构建一个哈希表来执行分组,这通常会很大。这里最简单的解决方法是**增加并行度**,这样每个任务的输入集就会更小。Spark 可以有效地支持短至 200 毫秒的任务,因为它在多个任务之间重用一个执行器 JVM,并且任务启动成本很低,因此你可以安全地将并行度增加到超过集群中内核数量。
广播大型变量
使用 广播功能(在 `SparkContext` 中可用)可以大大减少每个序列化任务的大小,以及在集群上启动作业的成本。如果你的任务在其中使用来自驱动程序程序的任何大型对象(例如静态查找表),请考虑将其转换为广播变量。Spark 在主节点上打印每个任务的序列化大小,因此你可以查看它来决定你的任务是否太大;一般来说,大于约 20 KiB 的任务可能值得优化。
数据局部性
数据局部性会对 Spark 作业的性能产生重大影响。如果数据和对其进行操作的代码在一起,则计算往往很快。但是,如果代码和数据分离,则必须将其中一个移动到另一个位置。通常,将序列化代码从一个地方发送到另一个地方比发送数据块更快,因为代码大小远小于数据大小。Spark 的调度围绕着这个数据局部性的基本原则构建。
数据局部性是指数据与处理它的代码的距离。根据数据的当前位置,存在几种局部性级别。从最近到最远,依次为:
PROCESS_LOCAL
数据与正在运行的代码位于同一个 JVM 中。这是可能的最佳局部性。NODE_LOCAL
数据位于同一个节点上。例如,可能位于同一个节点上的 HDFS 中,或者位于同一个节点上的另一个执行器中。这比PROCESS_LOCAL
稍微慢一些,因为数据必须在进程之间传输。NO_PREF
数据可以从任何地方以相同的速度访问,并且没有局部性偏好。RACK_LOCAL
数据位于同一个机架的服务器上。数据位于同一个机架上的不同服务器上,因此需要通过网络发送,通常通过单个交换机发送。ANY
数据位于网络上的其他地方,不在同一个机架上。
Spark 倾向于在最佳局部性级别调度所有任务,但这并不总是可能的。在没有空闲执行器上的未处理数据的状況下,Spark 会切换到较低的局部性级别。有两种选择:a) 等待繁忙的 CPU 空闲下来,以便在同一个服务器上的数据上启动任务,或者 b) 立即在需要将数据移动到更远的地方启动一个新任务。
Spark 通常会等待一段时间,希望繁忙的 CPU 能释放出来。一旦超时,它就开始将数据从远处移动到空闲的 CPU 上。每个级别之间回退的等待超时可以单独配置,也可以一起在一个参数中配置;有关详细信息,请参阅配置页面上的 spark.locality
参数。如果您的任务很长并且您看到位置不佳,您应该增加这些设置,但默认设置通常效果很好。
总结
这是一份简短的指南,指出了在调整 Spark 应用程序时您应该了解的主要问题 - 最重要的是数据序列化和内存调整。对于大多数程序,切换到 Kryo 序列化并将数据以序列化形式持久化将解决大多数常见的性能问题。欢迎您在 Spark 邮件列表中询问其他调整最佳实践。