RDD 编程指南

概述

从高层次来看,每个 Spark 应用程序都包含一个驱动程序 (driver program),它运行用户的 main 函数并在集群上执行各种并行操作。Spark 提供的主要抽象是弹性分布式数据集 (RDD),它是分布在集群节点上的元素集合,可以并行操作。RDD 是通过从 Hadoop 文件系统(或任何其他 Hadoop 支持的文件系统)中的文件,或驱动程序中的现有 Scala 集合开始,并对其进行转换来创建的。用户还可以要求 Spark 在内存中持久化 RDD,从而允许在并行操作中高效地重用它。最后,RDD 会自动从节点故障中恢复。

Spark 中的第二个抽象是可在并行操作中使用的共享变量。默认情况下,当 Spark 在不同节点上将函数作为一组任务并行运行时,它会将函数中使用的每个变量的副本发送给每个任务。有时,变量需要在任务之间或任务与驱动程序之间共享。Spark 支持两种类型的共享变量:广播变量(用于在所有节点的内存中缓存值)和累加器(仅用于“添加”的变量,例如计数器和总和)。

本指南在 Spark 的每种支持语言中展示了这些功能。如果您启动 Spark 的交互式 shell,即 Scala shell 的 bin/spark-shell 或 Python shell 的 bin/pyspark,将最容易学习。

链接 Spark

Spark 4.1.1 支持 Python 3.10+。它可以使用标准的 CPython 解释器,因此可以使用 NumPy 等 C 库。它也支持 PyPy 7.3.6+。

Python 中的 Spark 应用程序可以使用在运行时包含 Spark 的 bin/spark-submit 脚本运行,也可以通过将其包含在 setup.py 中来运行,如下所示

    install_requires=[
        'pyspark==4.1.1'
    ]

要在不使用 pip 安装 PySpark 的情况下运行 Python 中的 Spark 应用程序,请使用位于 Spark 目录中的 bin/spark-submit 脚本。该脚本将加载 Spark 的 Java/Scala 库,并允许您将应用程序提交到集群。您还可以使用 bin/pyspark 启动交互式 Python shell。

如果您希望访问 HDFS 数据,则需要使用链接到您的 HDFS 版本的 PySpark 构建版本。预构建包也可在 Spark 主页上找到,适用于常见的 HDFS 版本。

最后,您需要在程序中导入一些 Spark 类。添加以下行

from pyspark import SparkContext, SparkConf

PySpark 要求驱动程序和工作节点上的 Python 小版本号一致。它使用 PATH 中的默认 python 版本,您可以通过 PYSPARK_PYTHON 指定要使用的 Python 版本,例如

$ PYSPARK_PYTHON=python3.8 bin/pyspark
$ PYSPARK_PYTHON=/path-to-your-pypy/pypy bin/spark-submit examples/src/main/python/pi.py

Spark 4.1.1 默认构建并分发以支持 Scala 2.13。(Spark 也可以构建为支持其他版本的 Scala。)要用 Scala 编写应用程序,您需要使用兼容的 Scala 版本(例如 2.13.X)。

要编写 Spark 应用程序,您需要添加对 Spark 的 Maven 依赖项。Spark 可通过 Maven Central 获取

groupId = org.apache.spark
artifactId = spark-core_2.13
version = 4.1.1

此外,如果您希望访问 HDFS 集群,则需要为您使用的 HDFS 版本添加 hadoop-client 依赖项。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,您需要在程序中导入一些 Spark 类。添加以下行

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在 Spark 1.3.0 之前,您需要显式地 import org.apache.spark.SparkContext._ 来启用必要的隐式转换。)

Spark 4.1.1 支持 Lambda 表达式 以简洁地编写函数,否则您可以使用 org.apache.spark.api.java.function 包中的类。

注意,Java 7 的支持已在 Spark 2.2.0 中移除。

要用 Java 编写 Spark 应用程序,您需要添加对 Spark 的依赖项。Spark 可通过 Maven Central 获取

groupId = org.apache.spark
artifactId = spark-core_2.13
version = 4.1.1

此外,如果您希望访问 HDFS 集群,则需要为您使用的 HDFS 版本添加 hadoop-client 依赖项。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,您需要在程序中导入一些 Spark 类。添加以下行

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

初始化 Spark

Spark 程序要做的第一件事是创建一个 SparkContext 对象,它告诉 Spark 如何访问集群。要创建一个 SparkContext,您首先需要构建一个包含应用程序信息的 SparkConf 对象。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

Spark 程序要做的第一件事是创建一个 SparkContext 对象,它告诉 Spark 如何访问集群。要创建一个 SparkContext,您首先需要构建一个包含应用程序信息的 SparkConf 对象。

每个 JVM 中只能有一个活跃的 SparkContext。在创建新的 SparkContext 之前,必须 stop() 当前活跃的 SparkContext。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

Spark 程序要做的第一件事是创建一个 JavaSparkContext 对象,它告诉 Spark 如何访问集群。要创建一个 SparkContext,您首先需要构建一个包含应用程序信息的 SparkConf 对象。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appName 参数是您的应用程序名称,将在集群 UI 上显示。masterSpark 或 YARN 集群 URL,或者是用于在本地模式下运行的特殊“local”字符串。在实践中,当在集群上运行时,您不会希望在程序中硬编码 master,而是通过 spark-submit 启动应用程序并在那里接收它。但是,对于本地测试和单元测试,您可以传入“local”以在进程内运行 Spark。

使用 Shell

在 PySpark shell 中,已经为您创建了一个特殊的、可识别解释器的 SparkContext,存储在变量 sc 中。自行创建 SparkContext 将无法工作。您可以使用 --master 参数设置上下文连接的 master,并且可以通过传递逗号分隔的列表给 --py-files 将 Python .zip、.egg 或 .py 文件添加到运行时路径中。对于第三方 Python 依赖项,请参阅 Python 包管理。您还可以通过向 --packages 参数提供逗号分隔的 Maven 坐标列表,将依赖项(例如 Spark Packages)添加到您的 shell 会话中。可以存在依赖项的任何额外存储库(例如 Sonatype)都可以传递给 --repositories 参数。例如,要在一个四核上运行 bin/pyspark,请使用

$ ./bin/pyspark --master "local[4]"

或者,要同时将 code.py 添加到搜索路径中(以便稍后能够 import code),请使用

$ ./bin/pyspark --master "local[4]" --py-files code.py

有关选项的完整列表,请运行 pyspark --help。在后台,pyspark 调用更通用的 spark-submit 脚本

也可以在 IPython(增强型 Python 解释器)中启动 PySpark shell。PySpark 适用于 IPython 1.0.0 及更高版本。要使用 IPython,请在运行 bin/pyspark 时将 PYSPARK_DRIVER_PYTHON 变量设置为 ipython

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

要使用 Jupyter notebook(以前称为 IPython notebook),

$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

您可以通过设置 PYSPARK_DRIVER_PYTHON_OPTS 来自定义 ipythonjupyter 命令。

Jupyter Notebook 服务器启动后,您可以从“Files”选项卡创建一个新的 notebook。在 notebook 中,您可以输入命令 %pylab inline 作为 notebook 的一部分,然后再开始尝试从 Jupyter notebook 中使用 Spark。

在 Spark shell 中,已经为您创建了一个特殊的、可识别解释器的 SparkContext,存储在变量 sc 中。自行创建 SparkContext 将无法工作。您可以使用 --master 参数设置上下文连接的 master,并且可以通过传递逗号分隔的列表给 --jars 参数将 JAR 文件添加到类路径中。您还可以通过向 --packages 参数提供逗号分隔的 Maven 坐标列表,将依赖项(例如 Spark Packages)添加到您的 shell 会话中。可以存在依赖项的任何额外存储库(例如 Sonatype)都可以传递给 --repositories 参数。例如,要在一个四核上运行 bin/spark-shell,请使用

$ ./bin/spark-shell --master "local[4]"

或者,要同时将 code.jar 添加到其类路径中,请使用

$ ./bin/spark-shell --master "local[4]" --jars code.jar

要使用 Maven 坐标包含依赖项

$ ./bin/spark-shell --master "local[4]" --packages "org.example:example:0.1"

有关选项的完整列表,请运行 spark-shell --help。在后台,spark-shell 调用更通用的 spark-submit 脚本

弹性分布式数据集 (RDDs)

Spark 围绕弹性分布式数据集 (RDD) 的概念展开,这是一种可容错的元素集合,可以并行操作。创建 RDD 有两种方法:并行化驱动程序中的现有集合,或引用外部存储系统(例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源)中的数据集。

并行集合

并行集合是通过在驱动程序的现有可迭代对象或集合上调用 SparkContextparallelize 方法创建的。集合中的元素被复制以形成一个可以并行操作的分布式数据集。例如,以下是如何创建包含数字 1 到 5 的并行集合

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

一旦创建,就可以并行操作该分布式数据集 (distData)。例如,我们可以调用 distData.reduce(lambda a, b: a + b) 将列表中的元素相加。我们稍后将描述分布式数据集上的操作。

并行集合是通过在驱动程序的现有集合(Scala Seq)上调用 SparkContextparallelize 方法创建的。集合中的元素被复制以形成一个可以并行操作的分布式数据集。例如,以下是如何创建包含数字 1 到 5 的并行集合

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一旦创建,就可以并行操作该分布式数据集 (distData)。例如,我们可以调用 distData.reduce((a, b) => a + b) 将数组中的元素相加。我们稍后将描述分布式数据集上的操作。

并行集合是通过在驱动程序的现有 Collection 上调用 JavaSparkContextparallelize 方法创建的。集合中的元素被复制以形成一个可以并行操作的分布式数据集。例如,以下是如何创建包含数字 1 到 5 的并行集合

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

一旦创建,就可以并行操作该分布式数据集 (distData)。例如,我们可以调用 distData.reduce((a, b) -> a + b) 将列表中的元素相加。我们稍后将描述分布式数据集上的操作。

并行集合的一个重要参数是将数据集切分成的分区数量。Spark 将为集群的每个分区运行一个任务。通常,您希望集群中每个 CPU 有 2-4 个分区。正常情况下,Spark 会尝试根据您的集群自动设置分区数量。但是,您也可以通过将其作为第二个参数传递给 parallelize 来手动设置它(例如 sc.parallelize(data, 10))。注意:代码中的某些地方使用术语切片 (slices)(分区的同义词)来保持向后兼容性。

外部数据集

PySpark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat

文本文件 RDD 可以使用 SparkContexttextFile 方法创建。此方法接受文件的 URI(机器上的本地路径,或 hdfs://s3a:// 等 URI),并将其作为行集合读取。这是一个调用示例

>>> distFile = sc.textFile("data.txt")

一旦创建,就可以通过数据集操作对 distFile 进行操作。例如,我们可以使用 mapreduce 操作将所有行的大小相加,如下所示:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

关于使用 Spark 读取文件的一些说明

  • 如果使用本地文件系统上的路径,则该文件在工作节点上也必须位于相同的路径下。要么将文件复制到所有工作节点,要么使用网络挂载的共享文件系统。

  • Spark 的所有基于文件的方法(包括 textFile)也支持在目录、压缩文件和通配符上运行。例如,您可以使用 textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • textFile 方法还接受一个可选的第二个参数,用于控制文件的分区数量。默认情况下,Spark 为文件的每个块创建一个分区(HDFS 中块默认为 128MB),但您也可以通过传递更大的值来要求更多的分区。注意,分区数不能少于块数。

除了文本文件外,Spark 的 Python API 还支持其他几种数据格式

  • SparkContext.wholeTextFiles 允许您读取包含多个小文本文件的目录,并将它们中的每一个作为 (文件名, 内容) 对返回。这与 textFile 形成对比,后者会为每个文件中的每一行返回一条记录。

  • RDD.saveAsPickleFileSparkContext.pickleFile 支持以由 pickled Python 对象组成的简单格式保存 RDD。批处理用于 pickle 序列化,默认批处理大小为 10。

  • SequenceFile 和 Hadoop 输入/输出格式

注意:此功能目前标记为 Experimental,供高级用户使用。将来可能会被基于 Spark SQL 的读/写支持所取代,届时 Spark SQL 将是首选方法。

Writable 支持

PySpark SequenceFile 支持加载 Java 中的键值对 RDD,将 Writables 转换为基础 Java 类型,并使用 pickle 对生成的 Java 对象进行 pickle 处理。当将键值对 RDD 保存到 SequenceFile 时,PySpark 执行相反的操作。它将 Python 对象 unpickle 为 Java 对象,然后将它们转换为 Writables。以下 Writables 会自动转换

Writable 类型Python 类型
Textstr
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
MapWritabledict

数组无法直接开箱即用。用户在读取或写入时需要指定自定义 ArrayWritable 子类型。写入时,用户还需要指定将数组转换为自定义 ArrayWritable 子类型的自定义转换器。读取时,默认转换器会将自定义 ArrayWritable 子类型转换为 Java Object[],然后将其 pickle 到 Python 元组。要获取基本类型数组的 Python array.array,用户需要指定自定义转换器。

保存和加载 SequenceFiles

与文本文件类似,可以通过指定路径来保存和加载 SequenceFiles。可以指定键和值类,但对于标准 Writables,这不是必需的。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

保存和加载其他 Hadoop 输入/输出格式

PySpark 还可以读取任何 Hadoop InputFormat 或写入任何 Hadoop OutputFormat,适用于“新”和“旧” Hadoop MapReduce API。如果需要,可以将 Hadoop 配置作为 Python 字典传入。这是使用 Elasticsearch ESInputFormat 的示例

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
                             "org.apache.hadoop.io.NullWritable",
                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",
                             conf=conf)
>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

注意,如果 InputFormat 仅依赖于 Hadoop 配置和/或输入路径,并且键和值类可以根据上表轻松转换,那么此方法在这些情况下应该效果很好。

如果您有自定义序列化的二进制数据(例如从 Cassandra / HBase 加载数据),那么您首先需要在 Scala/Java 端将该数据转换为可以由 pickle 的 pickler 处理的内容。为此提供了一个 Converter 特质 (trait)。只需扩展此特质并在 convert 方法中实现您的转换代码。请记住确保此类以及访问您的 InputFormat 所需的任何依赖项都打包到您的 Spark 作业 jar 中,并包含在 PySpark 类路径中。

请参阅 Python 示例Converter 示例,了解使用自定义转换器使用 Cassandra / HBase InputFormatOutputFormat 的示例。

Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat

文本文件 RDD 可以使用 SparkContexttextFile 方法创建。此方法接受文件的 URI(机器上的本地路径,或 hdfs://s3a:// 等 URI),并将其作为行集合读取。这是一个调用示例

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

一旦创建,就可以通过数据集操作对 distFile 进行操作。例如,我们可以使用 mapreduce 操作将所有行的大小相加,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)

关于使用 Spark 读取文件的一些说明

  • 如果使用本地文件系统上的路径,则该文件在工作节点上也必须位于相同的路径下。要么将文件复制到所有工作节点,要么使用网络挂载的共享文件系统。

  • Spark 的所有基于文件的方法(包括 textFile)也支持在目录、压缩文件和通配符上运行。例如,您可以使用 textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")。当读取多个文件时,分区的顺序取决于文件系统返回文件的顺序。它可能(也可能不会)遵循文件的字典顺序。在分区内,元素根据它们在基础文件中的顺序进行排序。

  • textFile 方法还接受一个可选的第二个参数,用于控制文件的分区数量。默认情况下,Spark 为文件的每个块创建一个分区(HDFS 中块默认为 128MB),但您也可以通过传递更大的值来要求更多的分区。注意,分区数不能少于块数。

除了文本文件外,Spark 的 Scala API 还支持其他几种数据格式

  • SparkContext.wholeTextFiles 允许您读取包含多个小文本文件的目录,并将它们中的每一个作为 (文件名, 内容) 对返回。这与 textFile 形成对比,后者会为每个文件中的每一行返回一条记录。分区由数据本地性决定,在某些情况下,这可能导致分区过少。对于这些情况,wholeTextFiles 提供了一个可选的第二个参数来控制分区的最小数量。

  • 对于 SequenceFiles,请使用 SparkContext 的 sequenceFile[K, V] 方法,其中 KV 是文件中键和值的类型。这些应该是 Hadoop Writable 接口的子类,例如 IntWritableText。此外,Spark 允许您为少数常见的 Writables 指定原生类型;例如,sequenceFile[Int, String] 会自动读取 IntWritables 和 Texts。

  • 对于其他 Hadoop InputFormats,您可以使用 SparkContext.hadoopRDD 方法,它接受任意 JobConf 和输入格式类、键类和值类。按照您为带有输入源的 Hadoop 作业设置它们的方式来设置这些参数。您还可以对基于“新” MapReduce API (org.apache.hadoop.mapreduce) 的 InputFormats 使用 SparkContext.newAPIHadoopRDD

  • RDD.saveAsObjectFileSparkContext.objectFile 支持以由序列化 Java 对象组成的简单格式保存 RDD。虽然这不如 Avro 等专用格式高效,但它提供了一种保存任何 RDD 的简单方法。

Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat

文本文件 RDD 可以使用 SparkContexttextFile 方法创建。此方法接受文件的 URI(机器上的本地路径,或 hdfs://s3a:// 等 URI),并将其作为行集合读取。这是一个调用示例

JavaRDD<String> distFile = sc.textFile("data.txt");

一旦创建,就可以通过数据集操作对 distFile 进行操作。例如,我们可以使用 mapreduce 操作将所有行的大小相加,如下所示:distFile.map(s -> s.length()).reduce((a, b) => a + b)

关于使用 Spark 读取文件的一些说明

  • 如果使用本地文件系统上的路径,则该文件在工作节点上也必须位于相同的路径下。要么将文件复制到所有工作节点,要么使用网络挂载的共享文件系统。

  • Spark 的所有基于文件的方法(包括 textFile)也支持在目录、压缩文件和通配符上运行。例如,您可以使用 textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • textFile 方法还接受一个可选的第二个参数,用于控制文件的分区数量。默认情况下,Spark 为文件的每个块创建一个分区(HDFS 中块默认为 128MB),但您也可以通过传递更大的值来要求更多的分区。注意,分区数不能少于块数。

除了文本文件外,Spark 的 Java API 还支持其他几种数据格式

  • JavaSparkContext.wholeTextFiles 允许您读取包含多个小文本文件的目录,并将它们中的每一个作为 (文件名, 内容) 对返回。这与 textFile 形成对比,后者会为每个文件中的每一行返回一条记录。

  • 对于 SequenceFiles,请使用 SparkContext 的 sequenceFile[K, V] 方法,其中 KV 是文件中键和值的类型。这些应该是 Hadoop Writable 接口的子类,例如 IntWritableText

  • 对于其他 Hadoop InputFormats,您可以使用 JavaSparkContext.hadoopRDD 方法,它接受任意 JobConf 和输入格式类、键类和值类。按照您为带有输入源的 Hadoop 作业设置它们的方式来设置这些参数。您还可以对基于“新” MapReduce API (org.apache.hadoop.mapreduce) 的 InputFormats 使用 JavaSparkContext.newAPIHadoopRDD

  • JavaRDD.saveAsObjectFileJavaSparkContext.objectFile 支持以由序列化 Java 对象组成的简单格式保存 RDD。虽然这不如 Avro 等专用格式高效,但它提供了一种保存任何 RDD 的简单方法。

RDD 操作

RDD 支持两种类型的操作:转换 (transformations)(从现有数据集创建新数据集)和动作 (actions)(在数据集上运行计算后将值返回给驱动程序)。例如,map 是一种转换,它将每个数据集元素通过一个函数并返回一个代表结果的新 RDD。另一方面,reduce 是一种动作,它使用某个函数聚合 RDD 的所有元素,并将最终结果返回给驱动程序(尽管也有一个并行 reduceByKey,它返回一个分布式数据集)。

Spark 中的所有转换都是惰性的,也就是说,它们不会立即计算结果。相反,它们只是记住应用于某个基础数据集(例如文件)的转换。这些转换仅在动作需要将结果返回给驱动程序时才会计算。这种设计使 Spark 能够更高效地运行。例如,我们可以意识到通过 map 创建的数据集将在 reduce 中使用,并仅将 reduce 的结果返回给驱动程序,而不是返回较大的映射数据集。

默认情况下,每个转换后的 RDD 在您每次对其运行动作时都可能被重新计算。但是,您也可以使用 persist(或 cache)方法在内存中持久化 RDD,在这种情况下,Spark 将把这些元素保留在集群上,以便您下次查询它时能更快地访问。还支持将 RDD 持久化到磁盘,或在多个节点上进行复制。

基础

为了说明 RDD 的基础知识,请考虑下面的简单程序

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

第一行从外部文件定义了一个基础 RDD。此数据集未加载到内存中或未被操作:lines 只是指向该文件的指针。第二行将 lineLengths 定义为 map 转换的结果。同样,由于惰性,lineLengths 不会立即被计算。最后,我们运行 reduce,这是一个动作。此时,Spark 将计算拆分为在不同机器上运行的任务,每台机器运行其映射部分和本地缩减,并将答案仅返回给驱动程序。

如果我们以后也想再次使用 lineLengths,我们可以添加

lineLengths.persist()

reduce 之前,这将导致 lineLengths 在第一次计算后被保存在内存中。

为了说明 RDD 的基础知识,请考虑下面的简单程序

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行从外部文件定义了一个基础 RDD。此数据集未加载到内存中或未被操作:lines 只是指向该文件的指针。第二行将 lineLengths 定义为 map 转换的结果。同样,由于惰性,lineLengths 不会立即被计算。最后,我们运行 reduce,这是一个动作。此时,Spark 将计算拆分为在不同机器上运行的任务,每台机器运行其映射部分和本地缩减,并将答案仅返回给驱动程序。

如果我们以后也想再次使用 lineLengths,我们可以添加

lineLengths.persist()

reduce 之前,这将导致 lineLengths 在第一次计算后被保存在内存中。

为了说明 RDD 的基础知识,请考虑下面的简单程序

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行从外部文件定义了一个基础 RDD。此数据集未加载到内存中或未被操作:lines 只是指向该文件的指针。第二行将 lineLengths 定义为 map 转换的结果。同样,由于惰性,lineLengths 不会立即被计算。最后,我们运行 reduce,这是一个动作。此时,Spark 将计算拆分为在不同机器上运行的任务,每台机器运行其映射部分和本地缩减,并将答案仅返回给驱动程序。

如果我们以后也想再次使用 lineLengths,我们可以添加

lineLengths.persist(StorageLevel.MEMORY_ONLY());

reduce 之前,这将导致 lineLengths 在第一次计算后被保存在内存中。

向 Spark 传递函数

Spark 的 API 严重依赖于在驱动程序中传递函数以在集群上运行。有三种推荐的方法来执行此操作

  • Lambda 表达式,适用于可以编写为表达式的简单函数。(Lambda 不支持多语句函数或不返回值的语句。)
  • 函数内部的本地 def,用于更长的代码。
  • 模块中的顶级函数。

例如,要传递比 lambda 所支持的更长的函数,请考虑下面的代码

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

注意,虽然也可以传递对类实例中方法的引用(而不是单例对象),但这需要发送包含该方法的对象。例如,考虑

class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

在这里,如果我们创建一个 new MyClass 并对其调用 doStuff,其中的 map 引用了MyClass 实例func 方法,因此整个对象都需要发送到集群。

类似地,访问外部对象的字段将引用整个对象

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

为了避免这个问题,最简单的方法是将 field 复制到本地变量中,而不是在外部访问它

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

Spark 的 API 严重依赖于在驱动程序中传递函数以在集群上运行。有两种推荐的方法来执行此操作

  • 匿名函数语法,可用于简短的代码片段。
  • 全局单例对象中的静态方法。例如,您可以定义 object MyFunctions 然后像下面这样传递 MyFunctions.func1
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意,虽然也可以传递对类实例中方法的引用(而不是单例对象),但这需要发送包含该方法的对象。例如,考虑

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

在这里,如果我们创建一个新的 MyClass 实例并对其调用 doStuff,其中的 map 引用了MyClass 实例func1 方法,因此整个对象都需要发送到集群。这类似于编写 rdd.map(x => this.func1(x))

类似地,访问外部对象的字段将引用整个对象

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

等同于编写 rdd.map(x => this.field + x),它引用了整个 this。为了避免这个问题,最简单的方法是将 field 复制到本地变量中,而不是在外部访问它

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

Spark 的 API 严重依赖于在驱动程序中传递函数以在集群上运行。在 Java 中,函数由实现 org.apache.spark.api.java.function 包中接口的类来表示。有两种方法可以创建此类函数

  • 在您自己的类中实现 Function 接口,可以是匿名内部类或命名类,并将其实例传递给 Spark。
  • 使用 Lambda 表达式 简洁地定义实现。

虽然本指南的大部分内容为了简洁而使用 lambda 语法,但同样可以使用所有相同的 API 长形式。例如,我们可以按如下方式编写上面的代码

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

或者,如果内联编写函数比较笨重

class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

请注意,Java 中的匿名内部类也可以访问封闭作用域中的变量,只要它们被标记为 final。Spark 会像对待其他语言一样,将这些变量的副本发送到每个工作节点。

理解闭包

Spark 最困难的事情之一是理解在集群中执行代码时变量和方法的作用域和生命周期。修改作用域外变量的 RDD 操作通常是困惑的来源。在下面的示例中,我们将查看使用 foreach() 递增计数器的代码,但类似的问题也可能发生在其他操作中。

示例

请考虑下面的朴素 RDD 元素求和,其行为可能因执行是否在同一个 JVM 内发生而异。这方面的一个常见示例是在 local 模式下运行 Spark (--master = "local[n]") 与将 Spark 应用程序部署到集群(例如通过 spark-submit 到 YARN)时

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

本地模式与集群模式

上述代码的行为未定义,可能无法按预期工作。为了执行作业,Spark 将 RDD 操作的处理分解为任务,每个任务由执行器 (executor) 执行。在执行之前,Spark 计算任务的 闭包 (closure)。闭包是执行器为了在 RDD 上执行计算(在本例中为 foreach())而必须可见的那些变量和方法。此闭包被序列化并发送到每个执行器。

发送到每个执行器的闭包中的变量现在是副本,因此,当在 foreach 函数中引用 counter 时,它不再是驱动程序节点上的 counter。驱动程序的内存中仍然有一个 counter,但这对执行器不再可见!执行器只能看到来自序列化闭包的副本。因此,counter 的最终值仍将为零,因为对 counter 的所有操作都是在引用序列化闭包内的值。

在本地模式下,在某些情况下,foreach 函数实际上会在与驱动程序相同的 JVM 中执行,并将引用同一个原始 counter,甚至可能更新它。

为了确保在这些场景中行为定义良好,应该使用 Accumulator。Spark 中的累加器专门用于提供一种在集群中跨工作节点拆分执行时安全更新变量的机制。本指南的累加器部分更详细地讨论了这些内容。

通常,闭包——如循环或局部定义的方法等结构,不应被用于改变某些全局状态。Spark 不定义或保证对闭包外部引用的对象进行修改的行为。执行此操作的某些代码可能在本地模式下工作,但这纯属偶然,此类代码在分布式模式下不会按预期运行。如果需要某些全局聚合,请使用累加器。

打印 RDD 元素

另一个常见的做法是尝试使用 rdd.foreach(println)rdd.map(println) 打印 RDD 的元素。在单台机器上,这将生成预期的输出并打印所有 RDD 的元素。但是,在 cluster 模式下,执行器调用的 stdout 输出现在正写入执行器的 stdout,而不是驱动程序上的那个,因此驱动程序上的 stdout 将不会显示这些!要在驱动程序上打印所有元素,可以使用 collect() 方法先将 RDD 带到驱动程序节点,如下:rdd.collect().foreach(println)。但这可能会导致驱动程序内存不足,因为 collect() 会将整个 RDD 提取到单台机器上;如果您只需要打印 RDD 的几个元素,一种更安全的方法是使用 take()rdd.take(100).foreach(println)

使用键值对

虽然大多数 Spark 操作适用于包含任何类型对象的 RDD,但一些特殊操作仅适用于键值对 RDD。最常见的是分布式“shuffle”操作,例如按键分组或聚合元素。

在 Python 中,这些操作适用于包含内置 Python 元组(如 (1, 2))的 RDD。只需创建此类元组,然后调用您所需的操作即可。

例如,以下代码在键值对上使用 reduceByKey 操作来统计文件中每一行文本出现的次数

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

我们还可以使用 counts.sortByKey()(例如,按字母顺序对键值对进行排序),最后使用 counts.collect() 将它们作为对象列表带回驱动程序。

虽然大多数 Spark 操作适用于包含任何类型对象的 RDD,但一些特殊操作仅适用于键值对 RDD。最常见的是分布式“shuffle”操作,例如按键分组或聚合元素。

在 Scala 中,这些操作自动适用于包含 Tuple2 对象(语言中的内置元组,只需编写 (a, b) 即可创建)的 RDD。键值对操作可在 PairRDDFunctions 类中找到,它会自动包装元组 RDD。

例如,以下代码在键值对上使用 reduceByKey 操作来统计文件中每一行文本出现的次数

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我们还可以使用 counts.sortByKey()(例如,按字母顺序对键值对进行排序),最后使用 counts.collect() 将它们作为对象数组带回驱动程序。

注意:当在键值对操作中使用自定义对象作为键时,必须确保自定义的 equals() 方法配有匹配的 hashCode() 方法。有关完整详细信息,请参阅 Object.hashCode() 文档中概述的契约。

虽然大多数 Spark 操作适用于包含任何类型对象的 RDD,但一些特殊操作仅适用于键值对 RDD。最常见的是分布式“shuffle”操作,例如按键分组或聚合元素。

在 Java 中,键值对使用 Scala 标准库中的 scala.Tuple2 类表示。您可以直接调用 new Tuple2(a, b) 来创建一个元组,并稍后使用 tuple._1()tuple._2() 访问其字段。

键值对 RDD 由 JavaPairRDD 类表示。您可以使用特殊版本的 map 操作(如 mapToPairflatMapToPair)从 JavaRDD 构建 JavaPairRDD。JavaPairRDD 将同时具有标准 RDD 函数和特殊的键值函数。

例如,以下代码在键值对上使用 reduceByKey 操作来统计文件中每一行文本出现的次数

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

我们还可以使用 counts.sortByKey()(例如,按字母顺序对键值对进行排序),最后使用 counts.collect() 将它们作为对象数组带回驱动程序。

注意:当在键值对操作中使用自定义对象作为键时,必须确保自定义的 equals() 方法配有匹配的 hashCode() 方法。有关完整详细信息,请参阅 Object.hashCode() 文档中概述的契约。

转换 (Transformations)

下表列出了 Spark 支持的一些常见转换。有关详细信息,请参阅 RDD API 文档(Python, Scala, Java, R)和 Pair RDD 函数文档(Scala, Java)。

转换含义
map(func) 返回一个新的分布式数据集,该数据集是通过将源的每个元素通过函数 func 形成的。
filter(func) 返回一个新的数据集,该数据集是通过选择源中 func 返回 true 的元素形成的。
flatMap(func) 类似于 map,但每个输入项可以映射到 0 个或多个输出项(因此 func 应该返回 Seq 而不是单个项)。
mapPartitions(func) 类似于 map,但在 RDD 的每个分区(块)上分别运行,因此在类型为 T 的 RDD 上运行时,func 必须是类型 Iterator<T> => Iterator<U>。
mapPartitionsWithIndex(func) 类似于 mapPartitions,但还向 func 提供一个表示分区索引的整数值,因此在类型为 T 的 RDD 上运行时,func 必须是类型 (Int, Iterator<T>) => Iterator<U>。
sample(withReplacement, fraction, seed) 使用给定的随机数生成器种子,有放回或无放回地对 fraction 部分的数据进行采样。
union(otherDataset) 返回一个包含源数据集和参数中元素并集的新数据集。
intersection(otherDataset) 返回一个包含源数据集和参数中元素交集的新 RDD。
distinct([numPartitions])) 返回一个包含源数据集中不同元素的新数据集。
groupByKey([numPartitions]) 当在 (K, V) 对的数据集上调用时,返回 (K, Iterable<V>) 对的数据集。
注意:如果您是为了在每个键上执行聚合(例如求和或平均值)而进行分组,使用 reduceByKeyaggregateByKey 将获得更好的性能。
注意:默认情况下,输出中的并行度取决于父 RDD 的分区数量。您可以传递一个可选的 numPartitions 参数来设置不同的任务数量。
reduceByKey(func, [numPartitions]) 当在 (K, V) 对的数据集上调用时,返回一个 (K, V) 对数据集,其中每个键的值使用给定的 reduce 函数 func 进行聚合,该函数必须是类型 (V,V) => V。像在 groupByKey 中一样,reduce 任务的数量可以通过可选的第二个参数进行配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) 当在 (K, V) 对的数据集上调用时,返回一个 (K, U) 对数据集,其中每个键的值使用给定的组合函数和中性的 "zero" 值进行聚合。允许聚合值类型与输入值类型不同,同时避免不必要的分配。像在 groupByKey 中一样,reduce 任务的数量可以通过可选的第二个参数进行配置。
sortByKey([ascending], [numPartitions]) 当在 K 实现 Ordered 的 (K, V) 对的数据集上调用时,返回一个按键升序或降序(由布尔 ascending 参数指定)排序的 (K, V) 对数据集。
join(otherDataset, [numPartitions]) 当在类型为 (K, V) 和 (K, W) 的数据集上调用时,返回一个 (K, (V, W)) 对数据集,其中包含每个键的所有元素对。通过 leftOuterJoinrightOuterJoinfullOuterJoin 支持外部连接。
cogroup(otherDataset, [numPartitions]) 当在类型为 (K, V) 和 (K, W) 的数据集上调用时,返回 (K, (Iterable<V>, Iterable<W>)) 元组的数据集。此操作也称为 groupWith
cartesian(otherDataset) 当在类型为 T 和 U 的数据集上调用时,返回 (T, U) 对的数据集(元素的所有对)。
pipe(command, [envVars]) 通过 shell 命令(例如 Perl 或 bash 脚本)管道传输 RDD 的每个分区。RDD 元素被写入进程的 stdin,输出到 stdout 的行作为字符串 RDD 返回。
coalesce(numPartitions) 将 RDD 中的分区数量减少到 numPartitions。对于在大数据集过滤后更高效地运行操作非常有用。
repartition(numPartitions) 随机重新混洗 RDD 中的数据以创建更多或更少的分区,并使它们均衡。这总是通过网络对所有数据进行混洗。
repartitionAndSortWithinPartitions(partitioner) 根据给定的分区器对 RDD 进行重新分区,并在每个结果分区内按键对记录进行排序。这比调用 repartition 然后在每个分区内排序更高效,因为它可以将排序推入混洗机制。

动作 (Actions)

下表列出了 Spark 支持的一些常见动作。有关详细信息,请参阅 RDD API 文档(Python, Scala, Java, R

和 Pair RDD 函数文档(Scala, Java)。

动作含义
reduce(func) 使用函数 func(接受两个参数并返回一个)聚合数据集的元素。该函数应该是可交换且可结合的,以便它可以正确地并行计算。
collect() 以数组的形式在驱动程序中返回数据集的所有元素。这通常在过滤器或其他返回数据相当小的数据子集的操作之后非常有用。
count() 返回数据集中的元素数量。
first() 返回数据集的第一个元素(类似于 take(1))。
take(n) 返回一个包含数据集前 n 个元素的数组。
takeSample(withReplacement, num, [seed]) 返回一个包含数据集 num 个元素的随机样本的数组,可以有放回或无放回,并可选择预先指定随机数生成器种子。
takeOrdered(n, [ordering]) 使用自然顺序或自定义比较器返回 RDD 的前 n 个元素。
saveAsTextFile(path) 将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录。Spark 将在每个元素上调用 toString,将其转换为文件中的一行文本。
saveAsSequenceFile(path)(Java 和 Scala) 将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径。这适用于实现 Hadoop Writable 接口的键值对 RDD。在 Scala 中,它也适用于可隐式转换为 Writable 的类型(Spark 包括对基本类型如 Int、Double、String 等的转换)。
saveAsObjectFile(path)(Java 和 Scala) 使用 Java 序列化以简单格式写入数据集的元素,然后可以使用 SparkContext.objectFile() 加载。
countByKey() 仅适用于 (K, V) 类型的 RDD。返回一个包含每个键计数的 (K, Int) 对的哈希映射。
foreach(func) 在数据集的每个元素上运行函数 func。这通常用于副作用,例如更新 累加器 或与外部存储系统交互。注意:修改除累加器之外的 foreach() 外部变量可能会导致未定义的行为。有关更多详细信息,请参阅 理解闭包

Spark RDD API 还公开了一些动作的异步版本,如 foreachforeachAsync,它会立即向调用者返回一个 FutureAction,而不是阻塞直到动作完成。这可用于管理或等待动作的异步执行。

Shuffle 操作

Spark 中的某些操作会触发称为 shuffle 的事件。Shuffle 是 Spark 用于重新分发数据的机制,以便它在分区之间以不同的方式进行分组。这通常涉及跨执行器和机器复制数据,使 shuffle 成为一个复杂且昂贵的操作。

背景

要了解 shuffle 期间发生的情况,我们可以考虑 reduceByKey 操作的示例。reduceByKey 操作会生成一个新 RDD,其中单个键的所有值都被合并为一个元组——键以及针对与该键关联的所有值执行 reduce 函数的结果。挑战在于单个键的所有值不一定位于同一个分区,甚至不一定位于同一台机器上,但必须将它们共置在一起才能计算结果。

在 Spark 中,数据通常不会跨分区分布,以便位于特定操作所需的位置。在计算过程中,单个任务将在单个分区上运行——因此,为了组织所有数据以使单个 reduceByKey reduce 任务能够执行,Spark 需要执行全对全操作。它必须从所有分区读取以查找所有键的所有值,然后跨分区收集值以计算每个键的最终结果——这被称为 shuffle

虽然新混洗数据每个分区中的元素集将是确定的,并且分区本身的顺序也是如此,但这些元素的顺序却不是。如果有人希望在混洗后获得可预测排序的数据,则可以使用

可能导致 shuffle 的操作包括 重新分区 操作,如 repartitioncoalesce‘ByKey 操作(除了计数之外),如 groupByKeyreduceByKey;以及 join 操作,如 cogroupjoin

性能影响

Shuffle 是一项昂贵的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。为了组织 shuffle 的数据,Spark 生成了一组任务——用于组织数据的 map 任务,以及用于聚合它的一组 reduce 任务。这种命名法来自 MapReduce,并不直接与 Spark 的 mapreduce 操作相关。

在内部,各个 map 任务的结果会保存在内存中,直到无法容纳为止。然后,这些结果会根据目标分区进行排序并写入单个文件。在 reduce 端,任务读取相关的已排序块。

某些 shuffle 操作可能会消耗大量的堆内存,因为它们在传输记录之前或之后使用内存数据结构来组织记录。具体而言,reduceByKeyaggregateByKey 在 map 端创建这些结构,而 'ByKey 操作在 reduce 端生成这些结构。当数据无法放入内存时,Spark 会将这些表溢出到磁盘,从而产生额外的磁盘 I/O 开销并增加垃圾回收负担。

Shuffle 还会产生大量磁盘上的中间文件。从 Spark 1.3 开始,这些文件会被保留,直到对应的 RDD 不再被使用并被垃圾回收。这样做是为了在重新计算谱系 (lineage) 时无需重新创建 shuffle 文件。如果应用程序保留了对这些 RDD 的引用,或者 GC 不经常发生,垃圾回收可能只会在很长一段时间后才发生。这意味着运行时间长的 Spark 作业可能会消耗大量的磁盘空间。临时存储目录由配置 Spark 上下文时的 spark.local.dir 配置参数指定。

Shuffle 行为可以通过调整各种配置参数进行调优。请参阅 Spark 配置指南 中的“Shuffle 行为”部分。

RDD 持久化

Spark 中最重要的功能之一是跨操作在内存中 持久化(或 缓存)数据集。当您持久化 RDD 时,每个节点都会将其计算出的任何分区存储在内存中,并在该数据集(或派生自该数据集的数据集)的其他动作中重复使用它们。这使得未来的动作速度更快(通常快 10 倍以上)。缓存是迭代算法和快速交互式使用的关键工具。

您可以使用 RDD 上的 persist()cache() 方法将其标记为持久化。第一次在动作中计算它时,它将保存在节点内存中。Spark 的缓存是容错的——如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。

此外,每个持久化的 RDD 都可以使用不同的 存储级别 存储,例如,您可以将数据集持久化到磁盘、以序列化 Java 对象的形式持久化在内存中(以节省空间)、或在节点间复制。这些级别通过将 StorageLevel 对象(Python, Scala, Java)传递给 persist() 来设置。cache() 方法是使用默认存储级别的快捷方式,即 StorageLevel.MEMORY_ONLY(将反序列化的对象存储在内存中)。完整的存储级别集如下

存储级别含义
MEMORY_ONLY 以反序列化的 Java 对象形式将 RDD 存储在 JVM 中。如果 RDD 无法放入内存,一些分区将不会被缓存,并在每次需要时动态重新计算。这是默认级别。
MEMORY_AND_DISK 以反序列化的 Java 对象形式将 RDD 存储在 JVM 中。如果 RDD 无法放入内存,则将无法放入的分区存储到磁盘,并在需要时从那里读取它们。
MEMORY_ONLY_SER(Java 和 Scala) 序列化 的 Java 对象(每个分区一个字节数组)形式存储 RDD。这通常比反序列化的对象更节省空间,尤其是在使用 快速序列化器 时,但读取时 CPU 密集程度更高。
MEMORY_AND_DISK_SER(Java 和 Scala) 类似于 MEMORY_ONLY_SER,但将无法放入内存的分区溢出到磁盘,而不是在每次需要时动态重新计算。
DISK_ONLY 仅将 RDD 分区存储在磁盘上。
MEMORY_ONLY_2, MEMORY_AND_DISK_2 等。 与上述级别相同,但会在两个集群节点上复制每个分区。
OFF_HEAP(实验性) 与 MEMORY_ONLY_SER 类似,但将数据存储在堆外内存(off-heap memory)中。这要求开启堆外内存功能。

注意: 在 Python 中,存储的对象始终会使用 Pickle 库进行序列化,因此选择哪种序列化级别并没有区别。Python 中可用的存储级别包括 MEMORY_ONLYMEMORY_ONLY_2MEMORY_AND_DISKMEMORY_AND_DISK_2DISK_ONLYDISK_ONLY_2DISK_ONLY_3

即使在用户没有显式调用 persist 的情况下,Spark 也会在某些 shuffle 操作(如 reduceByKey)中自动持久化部分中间数据。这样做是为了避免在 shuffle 过程中某个节点失败时重新计算整个输入。如果用户计划重用生成的 RDD,我们仍然建议用户显式调用 persist

如何选择存储级别?

Spark 的存储级别旨在提供内存使用和 CPU 效率之间的不同权衡。我们建议通过以下流程来选择一个:

移除数据

Spark 会自动监控每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式丢弃旧的数据分区。如果你想手动移除一个 RDD 而不是等待它被移出缓存,请使用 RDD.unpersist() 方法。请注意,该方法默认是不阻塞的。如果需要等待资源被释放,请在调用此方法时指定 blocking=true

共享变量

通常,当传递给 Spark 操作(如 mapreduce)的函数在远程集群节点上执行时,它处理的是该函数所用变量的独立副本。这些变量会被复制到每台机器,且远程机器上对变量的任何更新都不会传播回驱动程序(driver program)。支持通用的读写共享变量在任务间效率低下。然而,Spark 确实为两种常见的模式提供了两种受限的共享变量:广播变量(broadcast variables)和累加器(accumulators)。

广播变量

广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务发送该变量的副本。例如,它们可以高效地为每个节点提供一个大型输入数据集的副本。Spark 还尝试使用高效的广播算法来分发广播变量,以降低通信成本。

Spark 操作通过一系列阶段执行,中间由分布式“shuffle”操作分隔。Spark 会自动广播每个阶段内任务所需的公共数据。以此方式广播的数据以序列化形式缓存,并在运行每个任务前进行反序列化。这意味着显式创建广播变量仅在以下情况有用:跨多个阶段的任务需要相同的数据,或者以反序列化形式缓存数据很重要时。

广播变量通过调用 SparkContext.broadcast(v) 从变量 v 创建。广播变量是 v 的一个包装器,其值可以通过调用 value 方法访问。以下代码展示了这一点:

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.core.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

广播变量创建后,在集群上运行的任何函数中都应使用它来代替变量 v,这样 v 就不会多次传输到节点。此外,为了确保所有节点获得相同的值(例如,如果变量稍后被发送到新节点),对象 v 在广播后不应被修改。

要释放广播变量在执行器(executors)上占用的资源,请调用 .unpersist()。如果之后再次使用该广播变量,它将被重新广播。要永久释放广播变量使用的所有资源,请调用 .destroy()。此后该广播变量将无法使用。请注意,这些方法默认不阻塞。如果需要等待资源被释放,请在调用时指定 blocking=true

累加器

累加器是只能通过关联和交换操作进行“加法”的变量,因此可以在并行计算中高效支持。它们可以用来实现计数器(如在 MapReduce 中)或求和。Spark 原生支持数值类型的累加器,程序员也可以添加对新类型的支持。

作为用户,你可以创建命名的或未命名的累加器。如下图所示,命名累加器(在本例中为 counter)会显示在修改该累加器的阶段对应的 Web UI 中。Spark 会在“Tasks”表格中显示由任务修改的每个累加器的值。

Accumulators in the Spark UI

在 UI 中跟踪累加器对于理解运行阶段的进度很有用(注意:此功能在 Python 中暂不支持)。

累加器通过调用 SparkContext.accumulator(v) 从初始值 v 创建。运行在集群上的任务可以使用 add 方法或 += 操作符对其进行累加。但是,任务无法读取其值。只有驱动程序(driver program)可以使用其 value 方法读取累加器的值。

以下代码展示了使用累加器来对数组元素求和:

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

>>> accum.value
10

虽然上述代码使用了内置的 Int 类型累加器支持,但程序员也可以通过继承 AccumulatorParam 来创建自己的类型。AccumulatorParam 接口有两个方法:zero 用于为你的数据类型提供“零值”,以及 addInPlace 用于将两个值相加。例如,假设我们有一个代表数学向量的 Vector 类,我们可以编写:

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

可以通过调用 SparkContext.longAccumulator()SparkContext.doubleAccumulator() 来创建数值累加器,分别用于累加 Long 或 Double 类型的值。运行在集群上的任务可以使用 add 方法对其进行累加。但是,任务无法读取其值。只有驱动程序(driver program)可以使用其 value 方法读取累加器的值。

以下代码展示了使用累加器来对数组元素求和:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

虽然上述代码使用了内置的 Long 类型累加器支持,但程序员也可以通过继承 AccumulatorV2 来创建自己的类型。抽象类 AccumulatorV2 有几个必须重写的方法:reset 用于将累加器重置为零,add 用于将另一个值添加到累加器中,merge 用于将另一个同类型的累加器合并到此累加器中。其他必须重写的方法包含在 API 文档中。例如,假设我们有一个代表数学向量的 MyVector 类,我们可以编写:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

请注意,当程序员定义自己的 AccumulatorV2 类型时,最终结果的类型可能与所添加元素的类型不同。

可以通过调用 SparkContext.longAccumulator()SparkContext.doubleAccumulator() 来创建数值累加器,分别用于累加 Long 或 Double 类型的值。运行在集群上的任务可以使用 add 方法对其进行累加。但是,任务无法读取其值。只有驱动程序(driver program)可以使用其 value 方法读取累加器的值。

以下代码展示了使用累加器来对数组元素求和:

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

虽然上述代码使用了内置的 Long 类型累加器支持,但程序员也可以通过继承 AccumulatorV2 来创建自己的类型。抽象类 AccumulatorV2 有几个必须重写的方法:reset 用于将累加器重置为零,add 用于将另一个值添加到累加器中,merge 用于将另一个同类型的累加器合并到此累加器中。其他必须重写的方法包含在 API 文档中。例如,假设我们有一个代表数学向量的 MyVector 类,我们可以编写:

class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

  private MyVector myVector = MyVector.createZeroVector();

  public void reset() {
    myVector.reset();
  }

  public void add(MyVector v) {
    myVector.add(v);
  }
  ...
}

// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");

请注意,当程序员定义自己的 AccumulatorV2 类型时,最终结果的类型可能与所添加元素的类型不同。

警告:当一个 Spark 任务完成时,Spark 会尝试将该任务中的累加更新合并到累加器中。如果合并失败,Spark 会忽略该失败,仍然将任务标记为成功,并继续运行其他任务。因此,有 Bug 的累加器不会影响 Spark 作业,但尽管 Spark 作业成功,累加器可能未被正确更新。

对于仅在动作(actions)内部执行的累加器更新,Spark 保证每个任务对累加器的更新只会应用一次,即重启的任务不会再次更新值。在转换(transformations)中,用户应注意:如果任务或作业阶段被重新执行,每个任务的更新可能会被应用多次。

累加器不会改变 Spark 的惰性求值模型。如果它们在 RDD 的操作中被更新,它们的值仅在 RDD 作为动作的一部分被计算时才会更新。因此,不能保证在像 map() 这样的惰性转换中执行的累加器更新会立即生效。以下代码片段演示了这一属性:

accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

部署到集群

应用程序提交指南描述了如何向集群提交应用程序。简而言之,一旦你将应用程序打包成 JAR(对于 Java/Scala)或一组 .py.zip 文件(对于 Python),bin/spark-submit 脚本就可以让你将其提交到任何受支持的集群管理器。

从 Java / Scala 启动 Spark 作业

org.apache.spark.launcher 包提供了使用简单 Java API 将 Spark 作业作为子进程启动的类。

单元测试

Spark 对使用任何流行的单元测试框架进行单元测试都很友好。只需在测试中创建一个主 URL 设置为 localSparkContext,运行你的操作,然后调用 SparkContext.stop() 将其销毁。请确保在 finally 代码块或测试框架的 tearDown 方法中停止上下文,因为 Spark 不支持在同一个程序中同时运行两个上下文。

后续步骤

你可以在 Spark 网站上查看一些 Spark 示例程序。此外,Spark 在 examples 目录中包含多个示例(Python, Scala, Java, R)。你可以通过将类名传递给 Spark 的 bin/run-example 脚本来运行 Java 和 Scala 示例;例如:

./bin/run-example SparkPi

对于 Python 示例,请改用 spark-submit

./bin/spark-submit examples/src/main/python/pi.py

对于 R 示例,请改用 spark-submit

./bin/spark-submit examples/src/main/r/dataframe.R

关于优化程序的帮助,配置调优指南提供了最佳实践信息。它们对于确保数据以高效格式存储在内存中尤为重要。关于部署的帮助,集群模式概述描述了分布式操作中涉及的组件以及受支持的集群管理器。

最后,完整的 API 文档可在 PythonScalaJavaR 中获得。