Spark 性能调优

由于大多数 Spark 计算都是基于内存的,Spark 程序可能受到集群中任何资源的瓶颈:CPU、网络带宽或内存。通常,如果数据能完全放入内存,瓶颈会是网络带宽,但有时您也需要进行一些调优,例如以序列化形式存储 RDD,以减少内存使用。本指南将涵盖两个主要主题:数据序列化(对于良好的网络性能至关重要,并且可以减少内存使用)和内存调优。我们还将简要介绍几个较小的主题。

数据序列化

序列化在任何分布式应用程序的性能中都扮演着重要角色。序列化对象缓慢或消耗大量字节的格式会极大地拖慢计算速度。通常,这会是您优化 Spark 应用程序时首先应该调整的地方。Spark 旨在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两种序列化库:

您可以通过使用 SparkConf 初始化作业并调用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 来切换使用 Kryo。此设置不仅配置了工作节点之间数据混洗(shuffling)所使用的序列化器,也配置了将 RDD 序列化到磁盘时所使用的序列化器。Kryo 不是默认序列化器的唯一原因是其自定义注册要求,但我们建议在任何网络密集型应用程序中尝试使用它。自 Spark 2.0.0 起,当混洗(shuffling)简单类型、简单类型数组或字符串类型的 RDD 时,我们内部使用 Kryo 序列化器。

Spark 自动包含 Kryo 序列化器,用于 Twitter chill 库中 AllScalaRegistrar 所涵盖的许多常用核心 Scala 类。

要使用 Kryo 注册您自己的自定义类,请使用 registerKryoClasses 方法。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo 文档描述了更高级的注册选项,例如添加自定义序列化代码。

如果您的对象很大,您可能还需要增加 spark.kryoserializer.buffer 配置。此值需要足够大,以容纳您将序列化的最大对象。

最后,如果您不注册自定义类,Kryo 仍然会工作,但它将不得不为每个对象存储完整的类名,这是浪费的。

内存调优

在调优内存使用方面有三个考量:对象使用的内存(您可能希望整个数据集都适合内存)、访问这些对象的开销,以及垃圾回收的开销(如果您有高对象周转率)。

默认情况下,Java 对象的访问速度很快,但它们很容易比其字段中的“原始”数据多消耗 2-5 倍的空间。这有几个原因:

本节将首先概述 Spark 中的内存管理,然后讨论用户可以采取哪些特定策略来更有效地利用其应用程序中的内存。特别是,我们将描述如何确定对象的内存使用情况,以及如何改进它——无论是通过更改数据结构,还是通过以序列化格式存储数据。然后我们将介绍 Spark 缓存大小和 Java 垃圾回收器的调优。

内存管理概述

Spark 中的内存使用主要分为两类:执行内存和存储内存。执行内存指用于混洗(shuffles)、连接(joins)、排序(sorts)和聚合(aggregations)计算的内存,而存储内存指用于缓存和在集群中传播内部数据的内存。在 Spark 中,执行内存和存储内存共享一个统一区域 (M)。当没有执行内存被使用时,存储内存可以获取所有可用内存,反之亦然。执行内存必要时可以逐出存储内存,但仅限于总存储内存使用量低于某个阈值 (R) 时。换句话说,R 描述了 M 内的一个子区域,其中缓存块永远不会被逐出。由于实现的复杂性,存储内存不能逐出执行内存。

这种设计确保了几个理想的特性。首先,不使用缓存的应用程序可以将整个空间用于执行,从而避免了不必要的磁盘溢出。其次,使用缓存的应用程序可以保留一个最小存储空间 (R),其数据块在该空间中不会被逐出。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户了解内存内部是如何划分的专业知识。

尽管有两个相关的配置,但典型用户无需调整它们,因为默认值适用于大多数工作负载:

spark.memory.fraction 的值应该被设置,以使这部分堆空间舒适地适应 JVM 的老年代或“永久代”。有关详细信息,请参阅下面的高级 GC 调优讨论。

确定内存消耗

衡量数据集所需内存消耗量的最佳方法是创建一个 RDD,将其放入缓存,然后查看 Web UI 中的“存储(Storage)”页面。该页面会告诉您 RDD 占用了多少内存。

要估计特定对象的内存消耗,请使用 SizeEstimatorestimate 方法。这对于尝试不同的数据布局以削减内存使用量,以及确定广播变量在每个执行器(executor)堆上将占用多少空间非常有用。

调优数据结构

减少内存消耗的第一种方法是避免增加开销的 Java 特性,例如基于指针的数据结构和包装器对象。有几种方法可以实现这一点:

  1. 设计您的数据结构时,优先使用对象数组和基本类型,而不是标准的 Java 或 Scala 集合类(例如 HashMap)。fastutil 库为基本类型提供了与 Java 标准库兼容的便捷集合类。
  2. 尽可能避免使用包含大量小对象和指针的嵌套结构。
  3. 考虑使用数字 ID 或枚举对象作为键,而不是字符串。
  4. 如果您的内存少于 32 GiB,请设置 JVM 标志 -XX:+UseCompressedOops,使指针为四字节而不是八字节。您可以将这些选项添加到 spark-env.sh 中。

序列化 RDD 存储

如果尽管进行了这些调优,您的对象仍然太大而无法高效存储,那么一个更简单的方法是使用 RDD 持久化 API 中的序列化存储级别(例如 MEMORY_ONLY_SER)以序列化形式存储它们。Spark 将把每个 RDD 分区存储为一个大的字节数组。以序列化形式存储数据的唯一缺点是访问时间会变慢,因为需要即时反序列化每个对象。如果您想以序列化形式缓存数据,我们强烈建议使用 Kryo,因为它比 Java 序列化(当然也比原始 Java 对象)占用更小的空间。

垃圾回收调优

当您的程序存储的 RDD 发生大量“数据流失”(churn)时,JVM 垃圾回收可能成为一个问题。(对于那些只读取一次 RDD 然后在其上执行许多操作的程序来说,这通常不是问题。)当 Java 需要逐出旧对象以腾出空间给新对象时,它将需要遍历所有 Java 对象并找到未使用的对象。这里要记住的关键点是:垃圾回收的成本与 Java 对象的数量成正比,因此使用对象较少的数据结构(例如 Int 数组而不是 LinkedList)会大大降低此成本。一个更好的方法是以序列化形式持久化对象,如上所述:现在每个 RDD 分区将只有一个对象(一个字节数组)。在尝试其他技术之前,如果 GC 是一个问题,首先要尝试的是使用序列化缓存

GC 也可能因为任务的工作内存(运行任务所需的空间量)和节点上缓存的 RDD 之间的干扰而成为问题。我们将讨论如何控制分配给 RDD 缓存的空间以缓解此问题。

衡量 GC 的影响

GC 调优的第一步是收集垃圾回收发生的频率和 GC 所花费时间的统计信息。这可以通过向 Java 选项添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 来完成。(有关如何将 Java 选项传递给 Spark 作业的信息,请参阅配置指南。)下次运行 Spark 作业时,每当发生垃圾回收时,您都会在工作节点的日志中看到打印的消息。请注意,这些日志将在您集群的工作节点上(在其工作目录的 stdout 文件中),而不是在您的驱动程序中。

高级 GC 调优

为了进一步调优垃圾回收,我们首先需要了解一些关于 JVM 内存管理的基本信息:

Spark 中 GC 调优的目标是确保只有生命周期长的 RDD 存储在老年代中,并且新生代的大小足以存储生命周期短的对象。这将有助于避免 Full GC 回收任务执行期间创建的临时对象。一些可能有用的步骤是:

我们的经验表明,GC 调优的效果取决于您的应用程序和可用内存量。网上描述了更多调优选项,但从高层次来看,管理 Full GC 发生的频率有助于降低开销。

执行器(executor)的 GC 调优标志可以通过在作业配置中设置 spark.executor.defaultJavaOptionsspark.executor.extraJavaOptions 来指定。

其他考量

并行度

除非您将每个操作的并行度设置得足够高,否则集群将无法充分利用。Spark 会根据每个文件的大小自动设置要运行的“map”任务数量(尽管您可以通过 SparkContext.textFile 等的可选参数来控制),对于分布式“reduce”操作,例如 groupByKeyreduceByKey,它会使用最大父 RDD 的分区数量。您可以将并行度作为第二个参数传递(请参阅 spark.PairRDDFunctions 文档),或者设置配置属性 spark.default.parallelism 来更改默认值。总的来说,我们建议您的集群中每个 CPU 核心分配 2-3 个任务。

输入路径的并行列表

有时,当作业输入包含大量目录时,您可能还需要增加目录列表的并行度,否则该过程可能需要很长时间,尤其是在面对像 S3 这样的对象存储时。如果您的作业使用 Hadoop 输入格式处理 RDD(例如,通过 SparkContext.sequenceFile),则并行度通过 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads 控制(当前默认值为 1)。

对于使用基于文件数据源的 Spark SQL,您可以调优 spark.sql.sources.parallelPartitionDiscovery.thresholdspark.sql.sources.parallelPartitionDiscovery.parallelism 以提高列表并行度。更多详细信息请参阅Spark SQL 性能调优指南

Reduce 任务的内存使用

有时,您会遇到 OutOfMemoryError,不是因为您的 RDD 不适合内存,而是因为您的某个任务(例如 groupByKey 中的某个 reduce 任务)的工作集太大。Spark 的混洗操作(sortByKeygroupByKeyreduceByKeyjoin 等)会在每个任务中构建一个哈希表来执行分组,这个哈希表通常可能很大。这里最简单的修复方法是提高并行度,以便每个任务的输入集更小。Spark 可以高效支持短至 200 毫秒的任务,因为它在多个任务之间重用一个执行器(executor)JVM 并且任务启动成本低,因此您可以安全地将并行度提高到超过集群中核心的数量。

广播大型变量

使用 SparkContext 中提供的广播功能可以大大减少每个序列化任务的大小,以及在集群上启动作业的成本。如果您的任务在其内部使用了来自驱动程序中的任何大型对象(例如静态查找表),请考虑将其转换为广播变量。Spark 会在主节点(master)上打印每个任务的序列化大小,因此您可以查看该信息来决定您的任务是否过大;通常,大于约 20 KiB 的任务可能值得优化。

数据本地性

数据本地性对 Spark 作业的性能有重大影响。如果数据和操作数据的代码在一起,那么计算往往会很快。但如果代码和数据分离,其中一个必须移动到另一个的位置。通常,将序列化代码从一个地方传输到另一个地方比传输一大块数据更快,因为代码大小远小于数据。Spark 的调度是围绕着数据本地性这个一般原则构建的。

数据本地性是指数据与处理其的代码之间的接近程度。根据数据当前位置,本地性有几个级别。从近到远依次为:

Spark 倾向于在最佳本地性级别调度所有任务,但这并非总是可行。在没有任何空闲执行器(executor)上没有未处理数据的情况下,Spark 会切换到较低的本地性级别。有两种选择:a) 等待繁忙的 CPU 释放以在同一服务器上的数据上启动任务,或者 b) 立即在较远的地方启动一个新任务,这需要将数据移动到那里。

Spark 通常会等待一段时间,希望能有繁忙的 CPU 释放出来。一旦超时,它就开始将数据从较远的地方移动到空闲的 CPU 上。每个级别之间的回退等待超时可以单独配置或在一个参数中一起配置;详细信息请参阅配置页面上的 spark.locality 参数。如果您的任务耗时较长且本地性较差,您应该增加这些设置,但默认值通常效果很好。

总结

这篇简短的指南旨在指出您在调优 Spark 应用程序时应了解的主要关注点——最重要的是数据序列化和内存调优。对于大多数程序,切换到 Kryo 序列化并以序列化形式持久化数据将解决大多数常见的性能问题。关于其他调优最佳实践,请随时在Spark 邮件列表上提问。