调优 Spark

由于大多数 Spark 计算的内存特性,Spark 程序可能会受到集群中任何资源的瓶颈:CPU、网络带宽或内存。通常,如果数据适合内存,瓶颈是网络带宽,但有时,您还需要进行一些调优,例如 以序列化形式存储 RDD 以减少内存使用。本指南将涵盖两个主要主题:数据序列化,这对于良好的网络性能至关重要,也可以减少内存使用;以及内存调优。我们还将概述几个较小的主题。

数据序列化

序列化在任何分布式应用程序的性能中都起着重要作用。序列化对象速度慢或消耗大量字节的格式会极大地减慢计算速度。通常,这将是您优化 Spark 应用程序时应该调优的第一件事。Spark 旨在在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两个序列化库

您可以通过使用 SparkConf 初始化作业并调用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 来切换到使用 Kryo。此设置不仅配置了用于在工作节点之间混洗数据的序列化器,而且还配置了将 RDD 序列化到磁盘时的序列化器。Kryo 不是默认值的原因仅仅是因为自定义注册要求,但我们建议在任何网络密集型应用程序中尝试它。从 Spark 2.0.0 开始,我们在使用简单类型、简单类型数组或字符串类型的 RDD 混洗时,内部使用 Kryo 序列化器。

Spark 自动包含 Kryo 序列化器,用于来自 Twitter chill 库的 AllScalaRegistrar 中涵盖的许多常用的核心 Scala 类。

要使用 Kryo 注册您自己的自定义类,请使用 registerKryoClasses 方法。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo 文档 描述了更高级的注册选项,例如添加自定义序列化代码。

如果您的对象很大,您可能还需要增加 spark.kryoserializer.buffer 配置。此值需要足够大以容纳您将要序列化的最大对象。

最后,如果您没有注册自定义类,Kryo 仍然可以工作,但它必须将完整的类名与每个对象一起存储,这会浪费空间。

内存调优

在调优内存使用方面,有三个考虑因素:对象使用的内存量(您可能希望整个数据集都适合内存)、访问这些对象的成本以及垃圾回收的开销(如果您在对象方面有很高的周转率)。

默认情况下,Java 对象访问速度很快,但很容易消耗比其字段中的“原始”数据多 2-5 倍的空间。这是由于以下几个原因

本节将首先概述 Spark 中的内存管理,然后讨论用户可以采取的具体策略,以更有效地利用其应用程序中的内存。特别是,我们将描述如何确定对象的内存使用情况,以及如何改进它——要么通过更改数据结构,要么通过以序列化形式存储数据。然后,我们将介绍调优 Spark 的缓存大小和 Java 垃圾收集器。

内存管理概述

Spark 中的内存使用主要分为两类:执行和存储。执行内存是指用于混洗、连接、排序和聚合中的计算的内存,而存储内存是指用于缓存和在集群中传播内部数据的内存。在 Spark 中,执行和存储共享一个统一区域 (M)。当没有使用执行内存时,存储可以获取所有可用内存,反之亦然。如果需要,执行可能会驱逐存储,但仅限于总存储内存使用量低于某个阈值 (R) 时。换句话说,R 描述了 M 中的一个子区域,其中缓存的块永远不会被驱逐。由于实现上的复杂性,存储可能无法驱逐执行。

这种设计确保了几个理想的特性。首先,不使用缓存的应用程序可以使用整个空间进行执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留一个最小存储空间 (R),其中它们的数据块不会被执行驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户了解内存是如何在内部划分的。

虽然有两个相关的配置,但典型的用户不需要调整它们,因为默认值适用于大多数工作负载

spark.memory.fraction 的值应该设置为使此数量的堆空间能够舒适地容纳在 JVM 的旧或“永久”代中。有关详细信息,请参阅下面关于高级 GC 调优的讨论。

确定内存消耗

确定数据集将需要多少内存消耗的最佳方法是创建一个 RDD,将其放入缓存,然后查看 Web UI 中的“存储”页面。该页面将告诉您 RDD 占用了多少内存。

要估计特定对象的内存消耗,请使用 SizeEstimatorestimate 方法。这对于试验不同的数据布局以修剪内存使用量以及确定广播变量将在每个执行器堆上占用多少空间很有用。

调优数据结构

减少内存消耗的第一种方法是避免添加开销的 Java 特性,例如基于指针的数据结构和包装器对象。有几种方法可以做到这一点

  1. 设计数据结构以优先考虑对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如 HashMap)。fastutil 库为与 Java 标准库兼容的原始类型提供了方便的集合类。
  2. 尽可能避免使用包含大量小对象和指针的嵌套结构。
  3. 考虑使用数字 ID 或枚举对象来代替字符串作为键。
  4. 如果您有不到 32 GiB 的 RAM,请设置 JVM 标志 -XX:+UseCompressedOops,使指针为四个字节而不是八个字节。您可以在 spark-env.sh 中添加这些选项。

序列化 RDD 存储

当您的对象即使经过这种调优后仍然太大而无法有效地存储时,减少内存使用量的更简单方法是将它们以序列化形式存储,在 RDD 持久性 API 中使用序列化的 StorageLevels,例如 MEMORY_ONLY_SER。Spark 然后将每个 RDD 分区存储为一个大型字节数组。以序列化形式存储数据的唯一缺点是访问时间较慢,因为必须动态地反序列化每个对象。我们强烈建议 使用 Kryo,如果您想以序列化形式缓存数据,因为它会导致比 Java 序列化(当然也比原始 Java 对象)小得多的尺寸。

垃圾回收调优

当你的程序存储的 RDD 存在大量“ churn ”(即频繁创建和销毁)时,JVM 垃圾回收可能会成为问题。(对于只读取一次 RDD 并在其上执行大量操作的程序来说,这通常不是问题。)当 Java 需要清除旧对象以腾出空间给新对象时,它需要遍历所有 Java 对象并找到未使用的对象。这里需要记住的一点是,**垃圾回收的成本与 Java 对象的数量成正比**,因此使用对象更少的 数据结构(例如,使用 `Int` 数组而不是 `LinkedList`)可以大大降低这种成本。更好的方法是将对象以序列化形式持久化,如上所述:现在每个 RDD 分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 成为问题,首先尝试使用 序列化缓存

GC 也可能由于任务的工作内存(运行任务所需的内存空间)与节点上缓存的 RDD 之间的干扰而成为问题。我们将讨论如何控制分配给 RDD 缓存的空间以缓解这种情况。

衡量 GC 的影响

GC 调优的第一步是收集有关垃圾回收频率和 GC 所用时间的统计信息。这可以通过在 Java 选项中添加 `-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps` 来实现。(有关将 Java 选项传递给 Spark 作业的信息,请参阅 配置指南。)下次运行 Spark 作业时,你将在每次垃圾回收发生时看到打印在工作节点日志中的消息。请注意,这些日志将位于集群的工作节点上(在其工作目录中的 `stdout` 文件中),**而不是**你的驱动程序程序上。

高级 GC 调优

为了进一步调整垃圾回收,我们首先需要了解一些关于 JVM 中内存管理的基本信息。

Spark 中 GC 调优的目标是确保只有长生命周期的 RDD 存储在老年代,并且年轻代的大小足以存储短生命周期的对象。这将有助于避免完全 GC 来收集任务执行期间创建的临时对象。一些可能有用步骤如下:

我们的经验表明,GC 调优的效果取决于你的应用程序和可用的内存量。在线上描述了 更多调优选项,但在高层次上,管理完全 GC 的频率可以帮助减少开销。

可以通过在作业的配置中设置 `spark.executor.defaultJavaOptions` 或 `spark.executor.extraJavaOptions` 来指定执行器的 GC 调优标志。

其他注意事项

并行度

除非你为每个操作设置足够高的并行度,否则集群将无法充分利用。Spark 会根据文件的大小自动设置在每个文件上运行的“map”任务的数量(尽管你可以通过 `SparkContext.textFile` 等的可选参数来控制它),对于分布式“reduce”操作(例如 `groupByKey` 和 `reduceByKey`),它使用最大的父 RDD 的分区数。你可以将并行度作为第二个参数传递(请参阅 spark.PairRDDFunctions 文档),或者设置配置属性 `spark.default.parallelism` 来更改默认值。一般来说,我们建议在集群中每个 CPU 内核使用 2-3 个任务。

输入路径上的并行列表

有时,你可能还需要在作业输入具有大量目录时增加目录列表并行度,否则该过程可能需要很长时间,尤其是在针对 S3 等对象存储时。如果你的作业在具有 Hadoop 输入格式的 RDD 上工作(例如,通过 `SparkContext.sequenceFile`),则并行度通过 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads 控制(当前默认值为 1)。

对于使用基于文件的數據源的 Spark SQL,你可以调整 `spark.sql.sources.parallelPartitionDiscovery.threshold` 和 `spark.sql.sources.parallelPartitionDiscovery.parallelism` 来提高列表并行度。有关更多详细信息,请参阅 Spark SQL 性能调优指南

Reduce 任务的内存使用

有时,你可能会遇到 OutOfMemoryError,不是因为你的 RDD 不适合内存,而是因为你的某个任务的工作集(例如 `groupByKey` 中的某个 reduce 任务)太大。Spark 的 shuffle 操作(`sortByKey`、`groupByKey`、`reduceByKey`、`join` 等)在每个任务中构建一个哈希表来执行分组,这通常会很大。这里最简单的解决方法是**增加并行度**,这样每个任务的输入集就会更小。Spark 可以有效地支持短至 200 毫秒的任务,因为它在多个任务之间重用一个执行器 JVM,并且任务启动成本很低,因此你可以安全地将并行度增加到超过集群中内核数量。

广播大型变量

使用 广播功能(在 `SparkContext` 中可用)可以大大减少每个序列化任务的大小,以及在集群上启动作业的成本。如果你的任务在其中使用来自驱动程序程序的任何大型对象(例如静态查找表),请考虑将其转换为广播变量。Spark 在主节点上打印每个任务的序列化大小,因此你可以查看它来决定你的任务是否太大;一般来说,大于约 20 KiB 的任务可能值得优化。

数据局部性

数据局部性会对 Spark 作业的性能产生重大影响。如果数据和对其进行操作的代码在一起,则计算往往很快。但是,如果代码和数据分离,则必须将其中一个移动到另一个位置。通常,将序列化代码从一个地方发送到另一个地方比发送数据块更快,因为代码大小远小于数据大小。Spark 的调度围绕着这个数据局部性的基本原则构建。

数据局部性是指数据与处理它的代码的距离。根据数据的当前位置,存在几种局部性级别。从最近到最远,依次为:

Spark 倾向于在最佳局部性级别调度所有任务,但这并不总是可能的。在没有空闲执行器上的未处理数据的状況下,Spark 会切换到较低的局部性级别。有两种选择:a) 等待繁忙的 CPU 空闲下来,以便在同一个服务器上的数据上启动任务,或者 b) 立即在需要将数据移动到更远的地方启动一个新任务。

Spark 通常会等待一段时间,希望繁忙的 CPU 能释放出来。一旦超时,它就开始将数据从远处移动到空闲的 CPU 上。每个级别之间回退的等待超时可以单独配置,也可以一起在一个参数中配置;有关详细信息,请参阅配置页面上的 spark.locality 参数。如果您的任务很长并且您看到位置不佳,您应该增加这些设置,但默认设置通常效果很好。

总结

这是一份简短的指南,指出了在调整 Spark 应用程序时您应该了解的主要问题 - 最重要的是数据序列化和内存调整。对于大多数程序,切换到 Kryo 序列化并将数据以序列化形式持久化将解决大多数常见的性能问题。欢迎您在 Spark 邮件列表中询问其他调整最佳实践。