调优 Spark

由于大多数 Spark 计算的内存特性,Spark 程序可能会受到集群中任何资源的限制:CPU、网络带宽或内存。通常,如果数据适合内存,瓶颈是网络带宽,但有时,您还需要进行一些调优,例如以序列化形式存储 RDD,以减少内存使用量。本指南将涵盖两个主要主题:数据序列化,这对于良好的网络性能至关重要,也可以减少内存使用量;以及内存调优。我们还简要介绍了几个较小的论题。

数据序列化

序列化在任何分布式应用程序的性能中都起着重要作用。将对象序列化为对象的格式速度很慢,或者消耗大量字节,这将大大降低计算速度。通常,这将是您应该调整以优化 Spark 应用程序的第一件事。 Spark 旨在在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两个序列化库

您可以通过使用 SparkConf 初始化作业并调用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 来切换到使用 Kryo。此设置配置了用于不仅在 worker 节点之间 shuffle 数据,而且在将 RDD 序列化到磁盘时使用的序列化器。Kryo 不是默认设置的唯一原因是自定义注册要求,但我们建议在任何网络密集型应用程序中尝试使用它。从 Spark 2.0.0 开始,我们在使用简单类型、简单类型数组或字符串类型 shuffle RDD 时,内部使用 Kryo 序列化器。

Spark 自动包含 Kryo 序列化器,用于 Twitter chill 库中 AllScalaRegistrar 涵盖的许多常用的核心 Scala 类。

要使用 Kryo 注册您自己的自定义类,请使用 registerKryoClasses 方法。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo 文档 描述了更高级的注册选项,例如添加自定义序列化代码。

如果您的对象很大,您可能还需要增加 spark.kryoserializer.buffer config。此值需要足够大以容纳您将序列化的最大对象。

最后,如果您不注册自定义类,Kryo 仍然可以工作,但它必须为每个对象存储完整的类名,这是浪费的。

内存调优

调优内存使用量时有三个注意事项:对象使用的内存(您可能希望整个数据集都适合内存)、访问这些对象的成本以及 垃圾回收的开销(如果您在对象方面有很高的周转率)。

默认情况下,Java 对象可以快速访问,但与字段中“原始”数据相比,很容易消耗 2-5 倍以上的空间。这是由于以下几个原因

本节将首先概述 Spark 中的内存管理,然后讨论用户可以采取的具体策略,以更有效地利用其应用程序中的内存。特别是,我们将描述如何确定对象的内存使用情况,以及如何通过更改数据结构或以序列化格式存储数据来改进它。然后,我们将介绍调优 Spark 的缓存大小和 Java 垃圾回收器。

内存管理概述

Spark 中的内存使用量主要分为两类:执行和存储。执行内存是指用于 shuffle、join、sort 和聚合计算的内存,而存储内存是指用于缓存并在集群中传播内部数据的内存。在 Spark 中,执行和存储共享一个统一区域 (M)。当不使用执行内存时,存储可以获取所有可用内存,反之亦然。如有必要,执行可能会驱逐存储,但前提是总存储内存使用量低于某个阈值 (R)。换句话说,R 描述了 M 中的一个子区域,在该子区域中,缓存的块永远不会被驱逐。由于实现的复杂性,存储可能不会驱逐执行。

这种设计确保了几个理想的属性。首先,不使用缓存的应用程序可以使用整个空间进行执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小的存储空间 (R),其数据块不受驱逐的影响。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户具备如何内部划分内存的专业知识。

虽然有两个相关的配置,但典型的用户无需调整它们,因为默认值适用于大多数工作负载

应设置 spark.memory.fraction 的值,以便将此堆空间量舒适地容纳在 JVM 的旧或“tenured”生成中。有关详细信息,请参见下面关于高级 GC 调优的讨论。

确定内存消耗

确定数据集将需要的内存消耗量的最佳方法是创建一个 RDD,将其放入缓存,然后查看 Web UI 中的“存储”页面。该页面将告诉您 RDD 占用的内存量。

要估计特定对象的内存消耗量,请使用 SizeEstimatorestimate 方法。这对于试验不同的数据布局以减少内存使用量,以及确定广播变量在每个执行程序堆上将占用的空间量非常有用。

调整数据结构

减少内存消耗的第一种方法是避免添加开销的 Java 功能,例如基于指针的数据结构和包装器对象。有几种方法可以做到这一点

  1. 设计数据结构时,首选对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如 HashMap)。fastutil 库为与 Java 标准库兼容的原始类型提供了方便的集合类。
  2. 尽可能避免包含大量小对象和指针的嵌套结构。
  3. 考虑使用数字 ID 或枚举对象代替字符串作为键。
  4. 如果你的内存小于 32 GiB,请设置 JVM 标志 -XX:+UseCompressedOops,使指针占用四个字节而不是八个字节。你可以在 spark-env.sh 中添加这些选项。

序列化 RDD 存储

如果你的对象仍然太大,无法有效地存储,尽管已经进行了此调整,一个更简单的方法是使用序列化形式存储它们,使用 RDD 持久化 API 中的序列化 StorageLevels,例如 MEMORY_ONLY_SER。Spark 将把每个 RDD 分区存储为一个大的字节数组。以序列化形式存储数据的唯一缺点是访问速度较慢,因为必须动态地反序列化每个对象。如果你想以序列化形式缓存数据,我们强烈建议使用 Kryo,因为它比 Java 序列化(当然比原始 Java 对象)占用更小的空间。

垃圾回收调优

当你的程序存储的 RDD 发生大量“震荡”时,JVM 垃圾回收可能会成为一个问题。(在只读取一次 RDD 然后对其执行许多操作的程序中,通常不是问题。)当 Java 需要驱逐旧对象以为新对象腾出空间时,它需要跟踪所有 Java 对象并找到未使用的对象。这里要记住的主要一点是,垃圾回收的成本与 Java 对象的数量成正比,因此使用对象较少的数据结构(例如,使用 Int 数组而不是 LinkedList)可以大大降低此成本。一个更好的方法是以序列化形式持久化对象,如上所述:现在每个 RDD 分区只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 成为问题,首先要尝试的是使用序列化缓存

GC 也可能成为一个问题,因为你的任务的工作内存(运行任务所需的空间)与节点上缓存的 RDD 之间存在干扰。我们将讨论如何控制分配给 RDD 缓存的空间以缓解此问题。

衡量 GC 的影响

GC 调优的第一步是收集有关垃圾回收发生频率和花费在 GC 上的时间的统计信息。这可以通过将 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 添加到 Java 选项来实现。(有关将 Java 选项传递给 Spark 作业的信息,请参阅配置指南。)下次运行 Spark 作业时,你将在 worker 的日志中看到每次发生垃圾回收时打印的消息。请注意,这些日志将位于你的集群的 worker 节点上(位于其工作目录中的 stdout 文件中),而不是你的驱动程序程序上。

高级 GC 调优

要进一步调整垃圾回收,我们首先需要了解 JVM 中内存管理的一些基本信息

Spark 中 GC 调优的目标是确保只有长期存活的 RDD 存储在老年代中,并且年轻代的大小足以存储短期存活的对象。这将有助于避免 full GC 收集任务执行期间创建的临时对象。一些可能有用的步骤是

我们的经验表明,GC 调优的效果取决于你的应用程序和可用的内存量。网上描述了更多调优选项,但从更高的层面来看,管理 full GC 的发生频率有助于减少开销。

可以通过在作业的配置中设置 spark.executor.defaultJavaOptionsspark.executor.extraJavaOptions 来指定 executor 的 GC 调优标志。

其他考虑因素

并行度

除非你为每个操作设置足够高的并行度,否则集群将无法得到充分利用。Spark 根据每个文件的大小自动设置要在每个文件上运行的“map”任务的数量(尽管你可以通过 SparkContext.textFile 等的可选参数来控制它),并且对于分布式“reduce”操作,例如 groupByKeyreduceByKey,它使用最大的父 RDD 的分区数。你可以将并行度作为第二个参数传递(请参阅 spark.PairRDDFunctions 文档),或设置配置属性 spark.default.parallelism 以更改默认值。通常,我们建议每个 CPU 核心在集群中运行 2-3 个任务。

输入路径上的并行列表

有时,当作业输入包含大量目录时,你可能还需要增加目录列表并行度,否则该过程可能需要很长时间,尤其是在针对 S3 等对象存储时。如果你的作业使用 Hadoop 输入格式的 RDD(例如,通过 SparkContext.sequenceFile),则并行度由 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads 控制(目前默认为 1)。

对于具有基于文件的数据源的 Spark SQL,你可以调整 spark.sql.sources.parallelPartitionDiscovery.thresholdspark.sql.sources.parallelPartitionDiscovery.parallelism 以提高列表并行度。有关更多详细信息,请参阅Spark SQL 性能调优指南

Reduce 任务的内存使用量

有时,你会收到 OutOfMemoryError,不是因为你的 RDD 不适合内存,而是因为你的某个任务(例如 groupByKey 中的一个 reduce 任务)的工作集太大。Spark 的 shuffle 操作(sortByKeygroupByKeyreduceByKeyjoin 等)在每个任务中构建一个哈希表来执行分组,这通常可能很大。这里最简单的解决方法是增加并行度,以便每个任务的输入集更小。Spark 可以有效地支持短至 200 毫秒的任务,因为它在许多任务中重用一个 executor JVM,并且启动任务的成本很低,因此你可以安全地将并行度提高到高于集群中核心数量的水平。

广播大型变量

使用 SparkContext 中提供的 广播功能可以大大减少每个序列化任务的大小,以及在集群上启动作业的成本。如果你的任务使用驱动程序程序中的任何大型对象(例如,静态查找表),请考虑将其转换为广播变量。Spark 在 master 上打印每个任务的序列化大小,因此你可以查看它来确定你的任务是否太大;一般来说,大于约 20 KiB 的任务可能值得优化。

数据本地性

数据本地性可以对 Spark 作业的性能产生重大影响。如果数据和对其进行操作的代码在一起,那么计算往往会很快。但是,如果代码和数据分离,则必须将一个移动到另一个。通常,传输序列化代码比传输大量数据块要快,因为代码大小比数据小得多。Spark 基于数据本地性的这个通用原则构建其调度。

数据本地性是指数据与处理它的代码之间的距离。根据数据的当前位置,有几个级别的本地性。按从最近到最远排序:

Spark 倾向于以最佳本地性级别调度所有任务,但这并非总是可行。在任何空闲执行器上都没有未处理数据的情况下,Spark 会切换到较低的本地性级别。有两种选择:a) 等待繁忙的 CPU 空闲,以便在同一服务器上的数据上启动任务;或者 b) 立即在更远的地方启动新任务,这需要将数据移动到那里。

Spark 通常的做法是等待一段时间,希望繁忙的 CPU 可以空闲出来。一旦该超时时间到期,它就会开始将数据从远处移动到空闲的 CPU。每个级别之间回退的等待超时时间可以单独配置,也可以在一个参数中一起配置;有关详细信息,请参阅 配置页面上的 spark.locality 参数。如果您的任务时间很长并且本地性较差,则应增加这些设置,但默认值通常效果很好。

总结

这是一个简短的指南,旨在指出您在调整 Spark 应用程序时应了解的主要问题 - 最重要的是,数据序列化和内存调整。对于大多数程序,切换到 Kryo 序列化并将数据以序列化形式持久化将解决大多数常见的性能问题。随时在 Spark 邮件列表上询问其他调整最佳实践。