RDD 编程指南

概述

从高层次上看,每个 Spark 应用程序都由一个驱动程序组成,该驱动程序运行用户的 main 函数并在集群上执行各种并行操作。 Spark 提供的最主要的抽象是弹性分布式数据集 (RDD),它是跨集群节点分区的元素集合,可以并行操作。 RDD 的创建方式是从 Hadoop 文件系统(或任何其他 Hadoop 支持的文件系统)中的文件开始,或从驱动程序中现有的 Scala 集合开始,然后对其进行转换。 用户还可以要求 Spark 将 RDD 持久化到内存中,从而允许跨并行操作高效地重用它。 最后,RDD 会自动从节点故障中恢复。

Spark 中的第二个抽象是可以在并行操作中使用的共享变量。 默认情况下,当 Spark 将一个函数作为一组任务并行运行在不同的节点上时,它会将该函数中使用的每个变量的副本发送到每个任务。 有时,需要跨任务共享变量,或在任务与驱动程序之间共享变量。 Spark 支持两种类型的共享变量:广播变量,可用于在所有节点上缓存内存中的值;以及累加器,它们是仅“添加”到的变量,例如计数器和总和。

本指南展示了 Spark 支持的每种语言中的每个功能。 如果您启动 Spark 的交互式 Shell,则最容易跟进——Scala Shell 使用 bin/spark-shell,Python Shell 使用 bin/pyspark

与 Spark 链接

Spark 3.5.5 与 Python 3.8+ 配合使用。 它可以使用标准的 CPython 解释器,因此可以使用 NumPy 等 C 库。 它也可以与 PyPy 7.3.6+ 配合使用。

Python 中的 Spark 应用程序可以使用包含运行时 Spark 的 bin/spark-submit 脚本运行,或者通过将其包含在您的 setup.py 中运行,如下所示:

    install_requires=[
        'pyspark==3.5.5'
    ]

要在没有 pip 安装 PySpark 的情况下在 Python 中运行 Spark 应用程序,请使用位于 Spark 目录中的 bin/spark-submit 脚本。 此脚本将加载 Spark 的 Java/Scala 库,并允许您将应用程序提交到集群。 您也可以使用 bin/pyspark 来启动交互式 Python shell。

如果您希望访问 HDFS 数据,您需要使用链接到您的 HDFS 版本的 PySpark 构建版本。 预构建包也可在 Spark 主页上找到,适用于常见的 HDFS 版本。

最后,您需要将一些 Spark 类导入到您的程序中。 添加以下行

from pyspark import SparkContext, SparkConf

PySpark 要求 driver 和 worker 使用相同的 Python 次版本。 它使用 PATH 中的默认 python 版本,您可以使用 PYSPARK_PYTHON 指定要使用的 Python 版本,例如

$ PYSPARK_PYTHON=python3.8 bin/pyspark
$ PYSPARK_PYTHON=/path-to-your-pypy/pypy bin/spark-submit examples/src/main/python/pi.py

Spark 3.5.5 默认构建和分发以与 Scala 2.12 配合使用。(Spark 也可以构建为与其他版本的 Scala 配合使用。)要使用 Scala 编写应用程序,您需要使用兼容的 Scala 版本(例如 2.12.X)。

要编写 Spark 应用程序,您需要在 Maven 中添加 Spark 的依赖项。 Spark 可通过 Maven Central 获取,地址为

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.5

此外,如果您希望访问 HDFS 集群,您需要为您的 HDFS 版本添加 hadoop-client 的依赖项。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,您需要将一些 Spark 类导入到您的程序中。 添加以下行

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在 Spark 1.3.0 之前,您需要显式地 import org.apache.spark.SparkContext._ 才能启用必要的隐式转换。)

Spark 3.5.5 支持 lambda 表达式,用于简洁地编写函数,否则您可以使用 org.apache.spark.api.java.function 包中的类。

请注意,Spark 2.2.0 中已删除对 Java 7 的支持。

要用 Java 编写 Spark 应用程序,您需要在 Maven 中添加 Spark 的依赖项。 Spark 可通过 Maven Central 获取,地址为

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.5

此外,如果您希望访问 HDFS 集群,您需要为您的 HDFS 版本添加 hadoop-client 的依赖项。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,您需要将一些 Spark 类导入到您的程序中。 添加以下行

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

初始化 Spark

Spark 程序必须做的第一件事是创建一个 SparkContext 对象,该对象告诉 Spark 如何访问集群。 要创建 SparkContext,您首先需要构建一个 SparkConf 对象,其中包含有关您的应用程序的信息。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

Spark 程序必须做的第一件事是创建一个 SparkContext 对象,该对象告诉 Spark 如何访问集群。 要创建 SparkContext,您首先需要构建一个 SparkConf 对象,其中包含有关您的应用程序的信息。

每个 JVM 应该只有一个活动的 SparkContext。 在创建新的 SparkContext 之前,必须 stop() 活跃的 SparkContext。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

Spark 程序必须做的第一件事是创建一个 JavaSparkContext 对象,该对象告诉 Spark 如何访问集群。 要创建 SparkContext,您首先需要构建一个 SparkConf 对象,其中包含有关您的应用程序的信息。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appName 参数是您的应用程序的名称,用于在集群 UI 上显示。 master 是一个 Spark、Mesos 或 YARN 集群 URL,或者一个特殊的“local”字符串,用于在本地模式下运行。 实际上,在集群上运行时,您不会希望在程序中硬编码 master,而是 使用 spark-submit 启动应用程序 并在那里接收它。 但是,对于本地测试和单元测试,您可以传递“local”以在进程内运行 Spark。

使用 Shell

在 PySpark Shell 中,已经为您创建了一个特殊的、感知解释器的 SparkContext,它位于名为 sc 的变量中。 制作自己的 SparkContext 将不起作用。 您可以使用 --master 参数设置上下文连接到的 master,并且可以通过将逗号分隔的列表传递给 --py-files 将 Python .zip、.egg 或 .py 文件添加到运行时路径。 有关第三方 Python 依赖项,请参阅 Python 包管理。 您还可以通过将 Maven 坐标的逗号分隔列表提供给 --packages 参数,将依赖项(例如 Spark 包)添加到您的 Shell 会话。 任何依赖项可能存在的其他存储库(例如 Sonatype)都可以传递给 --repositories 参数。 例如,要在完全四个核心上运行 bin/pyspark,请使用

$ ./bin/pyspark --master local[4]

或者,要也将 code.py 添加到搜索路径(以便稍后能够 import code),请使用

$ ./bin/pyspark --master local[4] --py-files code.py

有关选项的完整列表,请运行 pyspark --help。 在幕后,pyspark 调用更通用的 spark-submit 脚本

也可以在 IPython(增强的 Python 解释器)中启动 PySpark Shell。 PySpark 适用于 IPython 1.0.0 及更高版本。 要使用 IPython,请在运行 bin/pyspark 时将 PYSPARK_DRIVER_PYTHON 变量设置为 ipython

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

要使用 Jupyter notebook(以前称为 IPython notebook),请

$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

您可以通过设置 PYSPARK_DRIVER_PYTHON_OPTS 来自定义 ipythonjupyter 命令。

启动 Jupyter Notebook 服务器后,您可以从“Files”(文件)选项卡创建一个新的 notebook。在 notebook 内部,您可以在开始尝试从 Jupyter notebook 使用 Spark 之前,输入命令 %pylab inline 作为 notebook 的一部分。

在 Spark shell 中,已经为您创建了一个特殊的、支持解释器的 SparkContext,它存储在名为 sc 的变量中。您自己创建 SparkContext 将无法工作。您可以使用 --master 参数设置上下文连接到的 master,并且可以通过将逗号分隔的列表传递给 --jars 参数来将 JAR 添加到 classpath 中。您还可以通过将逗号分隔的 Maven 坐标列表提供给 --packages 参数,将依赖项(例如,Spark Packages)添加到您的 shell 会话中。任何可能存在依赖项的其他存储库(例如,Sonatype)都可以传递给 --repositories 参数。例如,要在四个核心上运行 bin/spark-shell,请使用:

$ ./bin/spark-shell --master local[4]

或者,要也将 code.jar 添加到其 classpath,请使用:

$ ./bin/spark-shell --master local[4] --jars code.jar

要使用 Maven 坐标包含依赖项:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

要获取完整的选项列表,请运行 spark-shell --help。在幕后,spark-shell 调用更通用的 spark-submit 脚本

弹性分布式数据集(RDD)

Spark 围绕着弹性分布式数据集 (RDD) 的概念,它是一个可以并行操作的容错元素集合。有两种创建 RDD 的方法:并行化驱动程序中现有的集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。

并行化集合

并行化集合是通过在驱动程序中使用 SparkContextparallelize 方法对现有的可迭代对象或集合进行调用来创建的。集合的元素被复制以形成可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字 1 到 5 的并行化集合:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

创建后,可以并行操作分布式数据集(distData)。例如,我们可以调用 distData.reduce(lambda a, b: a + b) 来将列表中的元素相加。我们将在后面描述分布式数据集上的操作。

并行化集合是通过在驱动程序中使用 SparkContextparallelize 方法对现有的集合(Scala Seq)进行调用来创建的。集合的元素被复制以形成可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字 1 到 5 的并行化集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

创建后,可以并行操作分布式数据集(distData)。例如,我们可以调用 distData.reduce((a, b) => a + b) 来将数组中的元素相加。我们将在后面描述分布式数据集上的操作。

并行化集合是通过在驱动程序中使用 JavaSparkContextparallelize 方法对现有的 Collection 进行调用来创建的。集合的元素被复制以形成可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字 1 到 5 的并行化集合:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

创建后,可以并行操作分布式数据集(distData)。例如,我们可以调用 distData.reduce((a, b) -> a + b) 来将列表中的元素相加。我们将在后面描述分布式数据集上的操作。

并行集合的一个重要参数是将数据集分割成的分区的数量。Spark 将为集群的每个分区运行一个任务。通常,您希望集群中每个 CPU 有 2-4 个分区。通常,Spark 会尝试根据您的集群自动设置分区数。但是,您也可以通过将其作为第二个参数传递给 parallelize 来手动设置它(例如,sc.parallelize(data, 10))。注意:代码中的某些地方使用术语 slices(分区的同义词)来保持向后兼容性。

外部数据集

PySpark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他的 Hadoop InputFormat

可以使用 SparkContexttextFile 方法创建文本文件 RDD。此方法接受文件的 URI(可以是机器上的本地路径,也可以是 hdfs://s3a:// 等 URI),并将其读取为行的集合。这是一个示例调用:

>>> distFile = sc.textFile("data.txt")

创建后,distFile 可以通过数据集操作进行操作。例如,我们可以使用 mapreduce 操作将所有行的长度加起来,如下所示:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

关于使用 Spark 读取文件的一些注意事项:

  • 如果使用本地文件系统上的路径,则该文件还必须在 worker 节点上的相同路径上可访问。可以将文件复制到所有 worker,或者使用网络挂载的共享文件系统。

  • Spark 的所有基于文件的输入方法(包括 textFile)都支持在目录、压缩文件和通配符上运行。例如,您可以使用 textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • textFile 方法还接受一个可选的第二个参数,用于控制文件的分区数。默认情况下,Spark 为文件的每个块创建一个分区(默认情况下,HDFS 中的块为 128MB),但您也可以通过传递一个更大的值来请求更高的分区数。请注意,您的分区数不能少于块数。

除了文本文件之外,Spark 的 Python API 还支持几种其他数据格式:

  • SparkContext.wholeTextFiles 允许您读取包含多个小型文本文件的目录,并将每个文件作为 (filename, content) 对返回。这与 textFile 不同,后者将为每个文件中的每一行返回一条记录。

  • RDD.saveAsPickleFileSparkContext.pickleFile 支持以一种简单的格式保存 RDD,该格式由 pickled Python 对象组成。批量处理用于 pickle 序列化,默认批量大小为 10。

  • SequenceFile 和 Hadoop 输入/输出格式

注意:此功能目前标记为 Experimental,旨在供高级用户使用。将来可能会被基于 Spark SQL 的读/写支持所取代,在这种情况下,Spark SQL 是首选方法。

Writable 支持

PySpark SequenceFile 支持加载 Java 中的键值对 RDD,将 Writables 转换为基本 Java 类型,并使用 pickle 对生成的 Java 对象进行 pickle。当将键值对 RDD 保存到 SequenceFile 时,PySpark 会执行相反的操作。它将 Python 对象 unpickle 到 Java 对象中,然后将它们转换为 Writables。以下 Writables 会自动转换:

Writable 类型Python 类型
Textstr
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
MapWritabledict

数组不直接处理。用户在读取或写入时需要指定自定义的 ArrayWritable 子类型。写入时,用户还需要指定自定义转换器,将数组转换为自定义的 ArrayWritable 子类型。读取时,默认转换器会将自定义的 ArrayWritable 子类型转换为 Java Object[],然后将其 pickle 到 Python 元组中。要获取原始类型的 Python array.array 数组,用户需要指定自定义转换器。

保存和加载 SequenceFiles

与文本文件类似,可以通过指定路径来保存和加载 SequenceFiles。可以指定键和值类,但对于标准 Writables,这不是必需的。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

保存和加载其他 Hadoop 输入/输出格式

PySpark 还可以读取任何 Hadoop InputFormat 或写入任何 Hadoop OutputFormat,适用于“新”和“旧” Hadoop MapReduce API。如果需要,可以将 Hadoop 配置作为 Python dict 传入。这是一个使用 Elasticsearch ESInputFormat 的示例:

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
                             "org.apache.hadoop.io.NullWritable",
                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",
                             conf=conf)
>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

请注意,如果 InputFormat 仅依赖于 Hadoop 配置和/或输入路径,并且可以根据上表轻松转换键和值类,则此方法应该适用于此类情况。

如果您有自定义序列化的二进制数据(例如,从 Cassandra / HBase 加载数据),那么您首先需要将该数据在 Scala / Java 端转换为可以由 pickle 的 pickler 处理的东西。为此提供了一个 Converter trait。只需扩展此 trait 并在 convert 方法中实现您的转换代码即可。请记住确保此类以及访问您的 InputFormat 所需的任何依赖项都打包到您的 Spark 作业 jar 中,并包含在 PySpark classpath 中。

有关将 Cassandra / HBase InputFormatOutputFormat 与自定义转换器一起使用的示例,请参见 Python 示例Converter 示例

Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他的 Hadoop InputFormat

可以使用 SparkContexttextFile 方法创建文本文件 RDD。此方法接受文件的 URI(可以是机器上的本地路径,也可以是 hdfs://s3a:// 等 URI),并将其读取为行的集合。这是一个示例调用:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

创建后,distFile 可以通过数据集操作进行操作。例如,我们可以使用 mapreduce 操作将所有行的长度加起来,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)

关于使用 Spark 读取文件的一些注意事项:

  • 如果使用本地文件系统上的路径,则该文件还必须在 worker 节点上的相同路径上可访问。可以将文件复制到所有 worker,或者使用网络挂载的共享文件系统。

  • Spark 所有基于文件的输入方法,包括 textFile,都支持在目录、压缩文件和通配符上运行。 例如,您可以使用 textFile("/my/directory"), textFile("/my/directory/*.txt"), 和 textFile("/my/directory/*.gz")。 当读取多个文件时,分区的顺序取决于文件系统返回文件的顺序。 例如,它可能遵循,也可能不遵循文件路径的字典顺序。 在一个分区中,元素的顺序与它们在底层文件中的顺序一致。

  • textFile 方法还接受一个可选的第二个参数,用于控制文件的分区数。默认情况下,Spark 为文件的每个块创建一个分区(默认情况下,HDFS 中的块为 128MB),但您也可以通过传递一个更大的值来请求更高的分区数。请注意,您的分区数不能少于块数。

除了文本文件,Spark 的 Scala API 还支持几种其他数据格式

  • SparkContext.wholeTextFiles 允许您读取包含多个小型文本文件的目录,并将每个文件作为 (filename, content) 对返回。 这与 textFile 不同,后者将为每个文件中的每一行返回一条记录。 分区由数据本地性决定,在某些情况下,可能导致分区过少。 对于这些情况,wholeTextFiles 提供了一个可选的第二个参数来控制最小分区数。

  • 对于 SequenceFiles,使用 SparkContext 的 sequenceFile[K, V] 方法,其中 KV 是文件中键和值的类型。 这些应该是 Hadoop 的 Writable 接口的子类,如 IntWritableText。 此外,Spark 允许您为一些常见的 Writables 指定原生类型;例如,sequenceFile[Int, String] 将自动读取 IntWritables 和 Texts。

  • 对于其他 Hadoop InputFormats,您可以使用 SparkContext.hadoopRDD 方法,该方法接受任意 JobConf 和输入格式类、键类和值类。 按照与使用输入源的 Hadoop 作业相同的方式设置这些。 您还可以使用 SparkContext.newAPIHadoopRDD 用于基于“new” MapReduce API (org.apache.hadoop.mapreduce) 的 InputFormats。

  • RDD.saveAsObjectFileSparkContext.objectFile 支持以一种由序列化的 Java 对象组成的简单格式保存 RDD。 虽然这不如像 Avro 这样的专用格式高效,但它提供了一种保存任何 RDD 的简便方法。

Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他的 Hadoop InputFormat

可以使用 SparkContexttextFile 方法创建文本文件 RDD。此方法接受文件的 URI(可以是机器上的本地路径,也可以是 hdfs://s3a:// 等 URI),并将其读取为行的集合。这是一个示例调用:

JavaRDD<String> distFile = sc.textFile("data.txt");

创建后,可以通过数据集操作对 distFile 进行操作。 例如,我们可以使用 mapreduce 操作将所有行的长度相加,如下所示:distFile.map(s -> s.length()).reduce((a, b) -> a + b).

关于使用 Spark 读取文件的一些注意事项:

  • 如果使用本地文件系统上的路径,则该文件还必须在 worker 节点上的相同路径上可访问。可以将文件复制到所有 worker,或者使用网络挂载的共享文件系统。

  • Spark 的所有基于文件的输入方法(包括 textFile)都支持在目录、压缩文件和通配符上运行。例如,您可以使用 textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")

  • textFile 方法还接受一个可选的第二个参数,用于控制文件的分区数。默认情况下,Spark 为文件的每个块创建一个分区(默认情况下,HDFS 中的块为 128MB),但您也可以通过传递一个更大的值来请求更高的分区数。请注意,您的分区数不能少于块数。

除了文本文件,Spark 的 Java API 还支持几种其他数据格式

  • JavaSparkContext.wholeTextFiles 允许您读取包含多个小型文本文件的目录,并将每个文件作为 (filename, content) 对返回。 这与 textFile 不同,后者将为每个文件中的每一行返回一条记录。

  • 对于 SequenceFiles,使用 SparkContext 的 sequenceFile[K, V] 方法,其中 KV 是文件中键和值的类型。 这些应该是 Hadoop 的 Writable 接口的子类,如 IntWritableText

  • 对于其他 Hadoop InputFormats,您可以使用 JavaSparkContext.hadoopRDD 方法,该方法接受任意 JobConf 和输入格式类、键类和值类。 按照与使用输入源的 Hadoop 作业相同的方式设置这些。 您还可以使用 JavaSparkContext.newAPIHadoopRDD 用于基于“new” MapReduce API (org.apache.hadoop.mapreduce) 的 InputFormats。

  • JavaRDD.saveAsObjectFileJavaSparkContext.objectFile 支持以一种由序列化的 Java 对象组成的简单格式保存 RDD。 虽然这不如像 Avro 这样的专用格式高效,但它提供了一种保存任何 RDD 的简便方法。

RDD 操作

RDD 支持两种类型的操作:*转换 (transformations)*,它从现有数据集创建一个新数据集;以及*动作 (actions)*,它在数据集上运行计算后将值返回给驱动程序。 例如,map 是一种转换,它通过一个函数传递每个数据集元素,并返回一个表示结果的新 RDD。 另一方面,reduce 是一种动作,它使用某个函数聚合 RDD 的所有元素,并将最终结果返回给驱动程序(尽管也有一个并行的 reduceByKey 返回一个分布式数据集)。

Spark 中的所有转换都是惰性的,也就是说,它们不会立即计算结果。 相反,它们只记住应用于某些基本数据集(例如,文件)的转换。 只有当一个动作需要将结果返回给驱动程序时,才会计算这些转换。 这种设计使 Spark 能够更有效地运行。 例如,我们可以意识到通过 map 创建的数据集将在 reduce 中使用,并且只将 reduce 的结果返回给驱动程序,而不是更大的映射数据集。

默认情况下,每次您在转换后的 RDD 上运行一个动作时,该 RDD 可能会被重新计算。 但是,您也可以使用 persist(或 cache)方法将 RDD *持久化 (persist)* 在内存中,在这种情况下,Spark 会将这些元素保留在集群上,以便下次查询时更快地访问。 还支持将 RDD 持久化到磁盘上,或跨多个节点进行复制。

基础

为了说明 RDD 的基本概念,请考虑以下简单程序

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

第一行从外部文件定义了一个基本 RDD。 此数据集不会加载到内存中或以其他方式对其进行操作:lines 仅仅是指向该文件的指针。 第二行将 lineLengths 定义为 map 转换的结果。 同样,由于惰性,不会立即计算 lineLengths。 最后,我们运行 reduce,这是一个动作。 此时,Spark 将计算分解为在单独的机器上运行的任务,每台机器运行其部分的 map 和本地 reduce,只将其答案返回给驱动程序。

如果我们以后还想再次使用 lineLengths,我们可以添加

lineLengths.persist()

reduce 之前,这将导致 lineLengths 在第一次计算后保存在内存中。

为了说明 RDD 的基本概念,请考虑以下简单程序

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行从外部文件定义了一个基本 RDD。 此数据集不会加载到内存中或以其他方式对其进行操作:lines 仅仅是指向该文件的指针。 第二行将 lineLengths 定义为 map 转换的结果。 同样,由于惰性,不会立即计算 lineLengths。 最后,我们运行 reduce,这是一个动作。 此时,Spark 将计算分解为在单独的机器上运行的任务,每台机器运行其部分的 map 和本地 reduce,只将其答案返回给驱动程序。

如果我们以后还想再次使用 lineLengths,我们可以添加

lineLengths.persist()

reduce 之前,这将导致 lineLengths 在第一次计算后保存在内存中。

为了说明 RDD 的基本概念,请考虑以下简单程序

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行从外部文件定义了一个基本 RDD。 此数据集不会加载到内存中或以其他方式对其进行操作:lines 仅仅是指向该文件的指针。 第二行将 lineLengths 定义为 map 转换的结果。 同样,由于惰性,不会立即计算 lineLengths。 最后,我们运行 reduce,这是一个动作。 此时,Spark 将计算分解为在单独的机器上运行的任务,每台机器运行其部分的 map 和本地 reduce,只将其答案返回给驱动程序。

如果我们以后还想再次使用 lineLengths,我们可以添加

lineLengths.persist(StorageLevel.MEMORY_ONLY());

reduce 之前,这将导致 lineLengths 在第一次计算后保存在内存中。

将函数传递给 Spark

Spark 的 API 在很大程度上依赖于在驱动程序中传递函数以在集群上运行。 有三种推荐的方法可以做到这一点

  • Lambda 表达式,用于可以写成表达式的简单函数。 (Lambdas 不支持多语句函数或不返回值语句。)
  • 调用 Spark 的函数内部的局部 defs,用于更长的代码。
  • 模块中的顶级函数。

例如,要传递比使用 lambda 支持的更长的函数,请考虑以下代码

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

请注意,虽然也可以传递对类实例(而不是单例对象)中的方法的引用,但这需要将包含该类的对象与该方法一起发送。 例如,考虑

class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

在这里,如果我们创建一个 new MyClass 并在其上调用 doStuff,那么其中的 map 引用MyClass 实例func 方法,因此需要将整个对象发送到集群。

以类似的方式,访问外部对象的字段将引用整个对象

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

为了避免这个问题,最简单的方法是将 field 复制到局部变量中,而不是从外部访问它

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

Spark 的 API 在很大程度上依赖于在驱动程序中传递函数以在集群上运行。 有两种推荐的方法可以做到这一点

  • 匿名函数语法,可用于短代码片段。
  • 全局单例对象中的静态方法。 例如,您可以定义 object MyFunctions,然后传递 MyFunctions.func1,如下所示
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

请注意,虽然也可以传递对类实例(而不是单例对象)中的方法的引用,但这需要将包含该类的对象与该方法一起发送。 例如,考虑

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

在这里,如果我们创建一个新的 MyClass 实例并在其上调用 doStuff,那么其中的 map 引用MyClass 实例func1 方法,因此需要将整个对象发送到集群。 它类似于编写 rdd.map(x => this.func1(x))

以类似的方式,访问外部对象的字段将引用整个对象

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

等价于编写 rdd.map(x => this.field + x),它引用所有的 this。 为了避免这个问题,最简单的方法是将 field 复制到局部变量中,而不是从外部访问它

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

Spark 的 API 很大程度上依赖于将函数从驱动程序传递到集群上运行。在 Java 中,函数由实现 org.apache.spark.api.java.function 包中接口的类表示。有两种方法可以创建这样的函数:

  • 在你自己的类中实现 Function 接口,可以作为匿名内部类或命名类,并将它的一个实例传递给 Spark。
  • 使用 lambda 表达式来简洁地定义一个实现。

尽管本指南的大部分内容为了简洁起见使用了 lambda 语法,但使用长格式使用所有相同的 API 也很容易。例如,我们可以将上面的代码编写如下:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

或者,如果内联编写函数很笨拙:

class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

请注意,Java 中的匿名内部类也可以访问封闭范围内的变量,只要这些变量被标记为 final。Spark 会将这些变量的副本发送到每个 worker 节点,就像对其他语言一样。

理解闭包

关于 Spark 比较难理解的事情之一是在集群上执行代码时变量和方法的范围和生命周期。修改其范围之外的变量的 RDD 操作可能是造成混淆的常见原因。在下面的例子中,我们将看到使用 foreach() 递增计数器的代码,但类似的问题也可能发生在其他操作中。

示例

考虑下面简单的 RDD 元素求和,它的行为可能因执行是否发生在同一个 JVM 中而有所不同。一个常见的例子是在 local 模式下运行 Spark(--master = local[n]),而不是将 Spark 应用程序部署到集群(例如,通过 spark-submit 到 YARN)。

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

本地模式与集群模式

上述代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark 将 RDD 操作的处理分解为多个任务,每个任务由一个 executor 执行。在执行之前,Spark 计算任务的 **closure(闭包)**。闭包是那些 executor 必须可见才能对 RDD 执行计算的变量和方法(在本例中是 foreach())。这个闭包会被序列化并发送到每个 executor。

发送到每个 executor 的闭包中的变量现在都是副本,因此,当在 foreach 函数中引用 **counter** 时,它不再是驱动程序节点上的 **counter**。驱动程序节点的内存中仍然存在一个 **counter**,但 executor 看不到它!Executor 只能看到来自序列化闭包的副本。因此,**counter** 的最终值仍然是零,因为对 **counter** 的所有操作都在引用序列化闭包中的值。

在 local 模式下,在某些情况下,foreach 函数实际上将在与驱动程序相同的 JVM 中执行,并将引用相同的原始 **counter**,并且实际上可能会更新它。

为了确保在这些场景中定义良好的行为,应该使用 Accumulator。Spark 中的累加器专门用于提供一种机制,以便在执行在集群中的 worker 节点之间拆分时安全地更新变量。本指南的累加器部分将更详细地讨论这些内容。

一般来说,闭包——像循环或本地定义的方法这样的构造——不应该被用来改变一些全局状态。Spark 不定义或保证从闭包外部引用的对象的突变行为。一些执行此操作的代码可能在 local 模式下工作,但这只是偶然,并且此类代码在分布式模式下将无法按预期运行。如果需要一些全局聚合,请改用累加器。

打印 RDD 的元素

另一个常见的习惯用法是尝试使用 rdd.foreach(println)rdd.map(println) 打印出 RDD 的元素。在单台机器上,这将生成预期的输出并打印所有 RDD 的元素。但是,在 cluster 模式下,executor 调用的 stdout 输出现在写入 executor 的 stdout,而不是驱动程序上的 stdout,因此驱动程序上的 stdout 不会显示这些!要在驱动程序上打印所有元素,可以使用 collect() 方法首先将 RDD 带到驱动程序节点,如下所示:rdd.collect().foreach(println)。但是,这可能会导致驱动程序耗尽内存,因为 collect() 将整个 RDD 获取到单台机器;如果只需要打印 RDD 的一些元素,更安全的方法是使用 take()rdd.take(100).foreach(println)

使用键值对

虽然大多数 Spark 操作都可以在包含任何类型对象的 RDD 上工作,但一些特殊操作仅在键值对的 RDD 上可用。最常见的是分布式“shuffle”操作,例如按键分组或聚合元素。

在 Python 中,这些操作适用于包含内置 Python 元组(例如 (1, 2))的 RDD。只需创建这样的元组,然后调用所需的运算。

例如,以下代码使用键值对上的 reduceByKey 操作来计算文件中每行文本出现的次数:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

我们也可以使用 counts.sortByKey() 例如按字母顺序对这些对进行排序,最后使用 counts.collect() 将它们作为对象列表返回到驱动程序程序。

虽然大多数 Spark 操作都可以在包含任何类型对象的 RDD 上工作,但一些特殊操作仅在键值对的 RDD 上可用。最常见的是分布式“shuffle”操作,例如按键分组或聚合元素。

在 Scala 中,这些操作在包含 Tuple2 对象(该语言中的内置元组,只需编写 (a, b) 即可创建)的 RDD 上自动可用。键值对操作在 PairRDDFunctions 类中可用,该类自动封装了元组的 RDD。

例如,以下代码使用键值对上的 reduceByKey 操作来计算文件中每行文本出现的次数:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我们也可以使用 counts.sortByKey() 例如按字母顺序对这些对进行排序,最后使用 counts.collect() 将它们作为对象数组返回到驱动程序程序。

**注意:**当使用自定义对象作为键值对操作中的键时,你必须确保自定义的 equals() 方法与匹配的 hashCode() 方法一起使用。有关完整详细信息,请参阅 Object.hashCode() 文档中概述的约定。

虽然大多数 Spark 操作都可以在包含任何类型对象的 RDD 上工作,但一些特殊操作仅在键值对的 RDD 上可用。最常见的是分布式“shuffle”操作,例如按键分组或聚合元素。

在 Java 中,键值对使用 Scala 标准库中的 scala.Tuple2 类表示。你可以简单地调用 new Tuple2(a, b) 来创建一个元组,并在以后使用 tuple._1()tuple._2() 访问它的字段。

键值对的 RDD 由 JavaPairRDD 类表示。你可以使用特殊版本的 map 操作(如 mapToPairflatMapToPair)从 JavaRDD 构造 JavaPairRDD。JavaPairRDD 将同时具有标准 RDD 函数和特殊的键值函数。

例如,以下代码使用键值对上的 reduceByKey 操作来计算文件中每行文本出现的次数:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

我们也可以使用 counts.sortByKey() 例如按字母顺序对这些对进行排序,最后使用 counts.collect() 将它们作为对象数组返回到驱动程序程序。

**注意:**当使用自定义对象作为键值对操作中的键时,你必须确保自定义的 equals() 方法与匹配的 hashCode() 方法一起使用。有关完整详细信息,请参阅 Object.hashCode() 文档中概述的约定。

转换

下表列出了 Spark 支持的一些常见转换。有关详细信息,请参阅 RDD API 文档(ScalaJavaPythonR)和pair RDD 函数文档(ScalaJava)。

转换含义
map(func) 返回一个新的分布式数据集,该数据集由通过函数 func 传递源的每个元素形成。
filter(func) 返回一个新的数据集,该数据集通过选择源中 func 返回 true 的那些元素形成。
flatMap(func) 类似于 map,但每个输入项可以映射到 0 个或多个输出项(因此 func 应返回 Seq 而不是单个项)。
mapPartitions(func) 类似于 map,但在 RDD 的每个分区(块)上单独运行,因此在类型为 T 的 RDD 上运行时,func 必须是 Iterator<T> => Iterator<U> 类型。
mapPartitionsWithIndex(func) 类似于 mapPartitions,但也为 func 提供一个整数值,表示分区的索引,因此在类型为 T 的 RDD 上运行时,func 必须是 (Int, Iterator<T>) => Iterator<U> 类型。
sample(withReplacement, fraction, seed) 使用给定的随机数生成器种子,对数据的一小部分 fraction 进行采样,可以进行替换或不进行替换。
union(otherDataset) 返回一个新的数据集,其中包含源数据集和参数中元素的并集。
intersection(otherDataset) 返回一个新的 RDD,其中包含源数据集和参数中元素的交集。
distinct([numPartitions])) 返回一个新的数据集,其中包含源数据集的不同元素。
groupByKey([numPartitions]) 在 (K, V) 对的数据集上调用时,返回 (K, Iterable<V>) 对的数据集。
注意: 如果你正在分组以对每个键执行聚合(例如总和或平均值),使用 reduceByKeyaggregateByKey 会产生更好的性能。
注意: 默认情况下,输出中的并行级别取决于父 RDD 的分区数。你可以传递一个可选的 numPartitions 参数来设置不同的任务数。
reduceByKey(func, [numPartitions]) 当在一个 (K, V) 键值对的数据集上调用时,返回一个 (K, V) 键值对的数据集,其中每个键的值都使用给定的 reduce 函数 *func* 进行聚合,*func* 必须是 (V,V) => V 类型。 像 groupByKey 中一样,reduce 任务的数量可以通过一个可选的第二个参数来配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) 当在一个 (K, V) 键值对的数据集上调用时,返回一个 (K, U) 键值对的数据集,其中每个键的值都使用给定的组合函数和一个中性“零”值进行聚合。 允许聚合值的类型与输入值的类型不同,同时避免不必要的内存分配。 像 groupByKey 中一样,reduce 任务的数量可以通过一个可选的第二个参数来配置。
sortByKey([ascending], [numPartitions]) 当在一个 (K, V) 键值对的数据集上调用,且 K 实现了 Ordered 接口时,返回一个 (K, V) 键值对的数据集,该数据集按键以升序或降序排序,排序方式由布尔值 ascending 参数指定。
join(otherDataset, [numPartitions]) 当在类型为 (K, V) 和 (K, W) 的数据集上调用时,返回一个 (K, (V, W)) 键值对的数据集,其中包含每个键的所有元素对。 通过 leftOuterJoinrightOuterJoinfullOuterJoin 支持外连接。
cogroup(otherDataset, [numPartitions]) 当在类型为 (K, V) 和 (K, W) 的数据集上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) 元组的数据集。 此操作也称为 groupWith
cartesian(otherDataset) 当在类型为 T 和 U 的数据集上调用时,返回一个 (T, U) 键值对的数据集(所有元素的对)。
pipe(command, [envVars]) 通过 shell 命令(例如 Perl 或 bash 脚本)传递 RDD 的每个分区。 RDD 元素被写入进程的标准输入,而输出到其标准输出的行将作为字符串的 RDD 返回。
coalesce(numPartitions) 将 RDD 中的分区数减少到 numPartitions。 在过滤大型数据集后,对于更有效地运行操作很有用。
repartition(numPartitions) 随机重新混洗 RDD 中的数据,以创建更多或更少的分区,并在它们之间进行平衡。 这总是通过网络混洗所有数据。
repartitionAndSortWithinPartitions(partitioner) 根据给定的 partitioner 重新分区 RDD,并在每个结果分区中,按其键对记录进行排序。 这比先调用 repartition 然后在每个分区中排序更有效,因为它可以将排序推送到混洗机制中。

动作

下表列出了 Spark 支持的一些常见操作。 有关详细信息,请参阅 RDD API 文档(ScalaJavaPythonR

和键值对 RDD 函数文档(ScalaJava)。

Action(行动操作)含义
reduce(func) 使用函数 *func*(接受两个参数并返回一个)聚合数据集的元素。 该函数应该是可交换的和结合性的,以便可以并行地正确计算。
collect() 将数据集的所有元素作为驱动程序中的数组返回。 这通常在 filter 或其他操作返回数据的足够小的子集之后很有用。
count() 返回数据集中的元素数。
first() 返回数据集的第一个元素(类似于 take(1))。
take(n) 返回包含数据集的前 *n* 个元素的数组。
takeSample(withReplacement, num, [seed]) 返回包含数据集的 *num* 个元素的随机样本的数组,无论是否替换,可以选择预先指定随机数生成器种子。
takeOrdered(n, [ordering]) 使用其自然顺序或自定义比较器返回 RDD 的前 *n* 个元素。
saveAsTextFile(path) 将数据集的元素作为文本文件(或一组文本文件)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中给定的目录中。 Spark 将对每个元素调用 toString 以将其转换为文件中的文本行。
saveAsSequenceFile(path)
(Java 和 Scala)
将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中给定的路径中。 这在实现了 Hadoop 的 Writable 接口的键值对 RDD 上可用。 在 Scala 中,它也可用于隐式转换为 Writable 的类型(Spark 包括对基本类型(如 Int、Double、String 等)的转换)。
saveAsObjectFile(path)
(Java 和 Scala)
使用 Java 序列化以简单格式写入数据集的元素,然后可以使用 SparkContext.objectFile() 加载这些元素。
countByKey() 仅在类型为 (K, V) 的 RDD 上可用。 返回一个 (K, Int) 键值对的哈希映射,其中包含每个键的计数。
foreach(func) 在数据集的每个元素上运行函数 *func*。 这通常是为了副作用而完成的,例如更新 累加器 或与外部存储系统交互。
注意:修改 foreach() 之外的变量(累加器除外)可能会导致未定义的行为。 有关更多详细信息,请参阅 理解闭包

Spark RDD API 还公开了某些操作的异步版本,例如 foreachAsync 对于 foreach,它立即将 FutureAction 返回给调用方,而不是阻塞操作完成。 这可用于管理或等待操作的异步执行。

Shuffle 操作

Spark 中的某些操作会触发一个称为混洗的事件。 混洗是 Spark 用于重新分配数据的机制,以便以不同的方式跨分区对数据进行分组。 这通常涉及跨执行程序和计算机复制数据,这使得混洗成为一项复杂且昂贵的操作。

背景

为了理解混洗期间发生的情况,我们可以考虑 reduceByKey 操作的示例。 reduceByKey 操作生成一个新的 RDD,其中单个键的所有值都组合成一个元组 - 键和对与该键关联的所有值执行 reduce 函数的结果。 挑战在于,单个键的所有值不一定位于同一分区,甚至不位于同一台计算机上,但它们必须位于同一位置才能计算结果。

在 Spark 中,通常不会跨分区分配数据,以便位于特定操作所需的必要位置。 在计算期间,单个任务将对单个分区进行操作 - 因此,为了组织单个 reduceByKey reduce 任务执行的所有数据,Spark 需要执行一个 all-to-all 操作。 它必须从所有分区读取以找到所有键的所有值,然后将跨分区的值放在一起以计算每个键的最终结果 - 这称为 shuffle(混洗)。

尽管新混洗数据的每个分区中的元素集合将是确定性的,并且分区本身的排序也是如此,但这些元素的排序不是。 如果希望在混洗之后获得可预测的排序数据,则可以使用

可能导致混洗的操作包括 repartition 操作,例如 repartitioncoalesce‘ByKey 操作(计数除外),例如 groupByKeyreduceByKey,以及 join 操作,例如 cogroupjoin

性能影响

Shuffle(混洗)是一项昂贵的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。 为了组织混洗的数据,Spark 会生成一组任务 - 用于组织数据的 *map* 任务,以及用于聚合数据的一组 *reduce* 任务。 此命名来自 MapReduce,与 Spark 的 mapreduce 操作没有直接关系。

在内部,各个 map 任务的结果保存在内存中,直到它们无法容纳为止。 然后,这些结果根据目标分区进行排序,并写入单个文件。 在 reduce 端,任务读取相关的排序块。

某些混洗操作会消耗大量的堆内存,因为它们在传输记录之前或之后使用内存中的数据结构来组织记录。 具体来说,reduceByKeyaggregateByKey 在 map 端创建这些结构,而 'ByKey 操作在 reduce 端生成这些结构。 当数据不适合内存时,Spark 会将这些表溢出到磁盘,从而导致额外的磁盘 I/O 开销并增加垃圾回收。

混洗还在磁盘上生成大量中间文件。 从 Spark 1.3 开始,这些文件会被保留,直到不再使用相应的 RDD 并进行垃圾回收。 这样做是为了在重新计算 lineage 时不需要重新创建混洗文件。 垃圾回收可能仅在很长一段时间后才会发生,如果应用程序保留对这些 RDD 的引用,或者 GC 没有频繁启动。 这意味着长时间运行的 Spark 作业可能会消耗大量磁盘空间。 临时存储目录由配置 Spark 上下文时的 spark.local.dir 配置参数指定。

可以通过调整各种配置参数来调整混洗行为。 请参阅Spark 配置指南中的“混洗行为”部分。

RDD 持久化

Spark 中最重要的功能之一是在操作中持久化(或缓存)数据集。 当你持久化一个 RDD 时,每个节点都会将它计算的任何分区存储在内存中,并在该数据集(或从中派生的数据集)的其他操作中重用它们。 这使得将来的操作更快(通常超过 10 倍)。 缓存是迭代算法和快速交互使用的关键工具。

你可以使用 RDD 上的 persist()cache() 方法将 RDD 标记为持久化。 第一次在操作中计算它时,它将保存在节点上的内存中。 Spark 的缓存是容错的 - 如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。

此外,每个持久化的 RDD 都可以使用不同的存储级别存储,例如,允许你将数据集持久化在磁盘上,将其持久化在内存中,但作为序列化的 Java 对象(以节省空间),并在节点之间复制它。 这些级别通过将 StorageLevel 对象(ScalaJavaPython)传递给 persist() 来设置。 cache() 方法是使用默认存储级别的简写,默认存储级别为 StorageLevel.MEMORY_ONLY(在内存中存储反序列化的对象)。 完整的存储级别集合是

Storage Level(存储级别)含义
MEMORY_ONLY 将 RDD 作为反序列化的 Java 对象存储在 JVM 中。如果 RDD 无法完全放入内存,则某些分区将不被缓存,而是在每次需要时动态重新计算。这是默认级别。
MEMORY_AND_DISK 将 RDD 作为反序列化的 Java 对象存储在 JVM 中。如果 RDD 无法完全放入内存,则将无法放入内存的分区存储到磁盘上,并在需要时从磁盘读取。
MEMORY_ONLY_SER
(Java 和 Scala)
将 RDD 作为序列化的 Java 对象存储(每个分区一个字节数组)。通常,这比反序列化的对象更节省空间,尤其是在使用快速序列化器时,但读取时会占用更多的 CPU。
MEMORY_AND_DISK_SER
(Java 和 Scala)
类似于 MEMORY_ONLY_SER,但是将无法放入内存的分区溢出到磁盘,而不是在每次需要时动态重新计算。
DISK_ONLY 仅将 RDD 分区存储在磁盘上。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 与上述级别相同,但在两个集群节点上复制每个分区。
OFF_HEAP (实验性) 类似于 MEMORY_ONLY_SER,但将数据存储在 堆外内存 中。这需要启用堆外内存。

注意: 在 Python 中,存储的对象将始终使用 Pickle 库进行序列化,因此选择序列化级别无关紧要。Python 中可用的存储级别包括 MEMORY_ONLYMEMORY_ONLY_2MEMORY_AND_DISKMEMORY_AND_DISK_2DISK_ONLYDISK_ONLY_2DISK_ONLY_3

即使没有用户调用 persist,Spark 也会自动持久化 shuffle 操作(例如 reduceByKey)中的一些中间数据。这样做是为了避免在 shuffle 期间节点发生故障时重新计算整个输入。我们仍然建议用户在计划重用结果 RDD 时调用 persist

如何选择存储级别?

Spark 的存储级别旨在提供内存使用率和 CPU 效率之间的不同权衡。我们建议按照以下流程选择一个

移除数据

Spark 会自动监控每个节点上的缓存使用情况,并以最近最少使用 (LRU) 的方式删除旧的数据分区。如果您想手动删除 RDD,而不是等待它从缓存中删除,请使用 RDD.unpersist() 方法。请注意,默认情况下此方法不会阻塞。要阻塞直到资源被释放,请在调用此方法时指定 blocking=true

共享变量

通常,当传递给 Spark 操作(例如 mapreduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器,并且对远程机器上的变量的更新不会传播回驱动程序。支持跨任务的通用读写共享变量效率很低。但是,Spark 确实为两种常见的用法模式提供了两种有限类型的共享变量:广播变量和累加器。

广播变量

广播变量允许程序员将只读变量缓存在每台机器上,而不是随任务一起发送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark 还会尝试使用有效的广播算法来分发广播变量,以降低通信成本。

Spark 操作通过一系列阶段执行,这些阶段由分布式“shuffle”操作分隔。Spark 会自动广播每个阶段中任务所需的常用数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着仅当跨多个阶段的任务需要相同的数据时,或者缓存反序列化形式的数据很重要时,显式创建广播变量才有用。

广播变量是通过调用 SparkContext.broadcast(v) 从变量 v 创建的。广播变量是 v 的包装器,可以通过调用 value 方法访问其值。下面的代码显示了这一点

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

创建广播变量后,应在集群上运行的任何函数中使用它来代替值 v,以便 v 不会多次发送到节点。此外,广播对象 v 后不应修改它,以确保所有节点都获得广播变量的相同值(例如,如果该变量稍后发送到新节点)。

要释放广播变量复制到执行器的资源,请调用 .unpersist()。如果之后再次使用广播,它将被重新广播。要永久释放广播变量使用的所有资源,请调用 .destroy()。之后不能再使用广播变量。请注意,默认情况下这些方法不会阻塞。要阻塞直到资源被释放,请在调用它们时指定 blocking=true

累加器

累加器是只能通过关联和交换操作“添加”到的变量,因此可以在并行环境中有效地支持。它们可用于实现计数器(如在 MapReduce 中)或总和。Spark 本机支持数值类型的累加器,程序员可以添加对新类型的支持。

作为用户,您可以创建命名或未命名的累加器。如下图所示,命名累加器(在本例中为 counter)将显示在修改该累加器的阶段的 Web UI 中。Spark 会在“任务”表中显示任务修改的每个累加器的值。

Accumulators in the Spark UI

在 UI 中跟踪累加器对于理解正在运行的阶段的进度很有用(注意:Python 中尚不支持此功能)。

累加器是通过调用 SparkContext.accumulator(v) 从初始值 v 创建的。在集群上运行的任务可以使用 add 方法或 += 运算符向其添加内容。但是,它们无法读取其值。只有驱动程序可以使用其 value 方法读取累加器的值。

下面的代码显示了一个累加器,用于加总数组的元素

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

>>> accum.value
10

虽然此代码使用了对 Int 类型累加器的内置支持,但程序员也可以通过子类化 AccumulatorParam 来创建自己的类型。AccumulatorParam 接口有两种方法:zero 用于为您的数据类型提供“零值”,addInPlace 用于将两个值相加。例如,假设我们有一个表示数学向量的 Vector 类,我们可以编写

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

可以通过调用 SparkContext.longAccumulator()SparkContext.doubleAccumulator() 分别创建数值累加器,以累加 Long 或 Double 类型的值。在集群上运行的任务可以使用 add 方法向其添加内容。但是,它们无法读取其值。只有驱动程序可以使用其 value 方法读取累加器的值。

下面的代码显示了一个累加器,用于加总数组的元素

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

虽然此代码使用了对 Long 类型累加器的内置支持,但程序员也可以通过子类化 AccumulatorV2 来创建自己的类型。AccumulatorV2 抽象类有几个必须覆盖的方法:reset 用于将累加器重置为零,add 用于将另一个值添加到累加器中,merge 用于将另一个相同类型的累加器合并到此累加器中。必须覆盖的其他方法包含在 API 文档中。例如,假设我们有一个表示数学向量的 MyVector 类,我们可以编写

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

请注意,当程序员定义自己的 AccumulatorV2 类型时,结果类型可能与添加的元素类型不同。

可以通过调用 SparkContext.longAccumulator()SparkContext.doubleAccumulator() 分别创建数值累加器,以累加 Long 或 Double 类型的值。在集群上运行的任务可以使用 add 方法向其添加内容。但是,它们无法读取其值。只有驱动程序可以使用其 value 方法读取累加器的值。

下面的代码显示了一个累加器,用于加总数组的元素

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

虽然此代码使用了对 Long 类型累加器的内置支持,但程序员也可以通过子类化 AccumulatorV2 来创建自己的类型。AccumulatorV2 抽象类有几个必须覆盖的方法:reset 用于将累加器重置为零,add 用于将另一个值添加到累加器中,merge 用于将另一个相同类型的累加器合并到此累加器中。必须覆盖的其他方法包含在 API 文档中。例如,假设我们有一个表示数学向量的 MyVector 类,我们可以编写

class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

  private MyVector myVector = MyVector.createZeroVector();

  public void reset() {
    myVector.reset();
  }

  public void add(MyVector v) {
    myVector.add(v);
  }
  ...
}

// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");

请注意,当程序员定义自己的 AccumulatorV2 类型时,结果类型可能与添加的元素类型不同。

警告:当 Spark 任务完成时,Spark 将尝试将此任务中累积的更新合并到累加器。如果失败,Spark 将忽略该失败,仍然将该任务标记为成功并继续运行其他任务。因此,有缺陷的累加器不会影响 Spark 作业,但即使 Spark 作业成功,它也可能无法正确更新。

对于仅在操作内部执行的累加器更新,Spark 保证每个任务对累加器的更新只会应用一次,即重新启动的任务不会更新该值。在转换中,用户应注意,如果任务或作业阶段被重新执行,则每个任务的更新可能会应用多次。

累加器不会改变 Spark 的惰性求值模型。如果在 RDD 上的操作中更新了它们,则只有在作为操作的一部分计算 RDD 后才会更新其值。因此,不能保证在惰性转换(如 map())中进行累加器更新时会执行这些更新。下面的代码片段演示了此属性

accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

部署到集群

应用程序提交指南 介绍了如何将应用程序提交到集群。简而言之,一旦您将应用程序打包成 JAR 文件(对于 Java/Scala)或一组 .py.zip 文件(对于 Python),bin/spark-submit 脚本允许您将其提交到任何支持的集群管理器。

从 Java / Scala 启动 Spark 作业

org.apache.spark.launcher 包提供了使用简单的 Java API 启动 Spark 作业作为子进程的类。

单元测试

Spark 适用于使用任何流行的单元测试框架进行单元测试。 只需在测试中创建一个 SparkContext,并将 master URL 设置为 local,运行您的操作,然后调用 SparkContext.stop() 将其关闭。 请确保在 finally 块或测试框架的 tearDown 方法中停止上下文,因为 Spark 不支持两个上下文在同一程序中同时运行。

下一步

您可以在 Spark 网站上看到一些 Spark 程序示例。 此外,Spark 在 examples 目录中包含多个示例(ScalaJavaPythonR)。 您可以通过将类名传递给 Spark 的 bin/run-example 脚本来运行 Java 和 Scala 示例;例如

./bin/run-example SparkPi

对于 Python 示例,请改用 spark-submit

./bin/spark-submit examples/src/main/python/pi.py

对于 R 示例,请改用 spark-submit

./bin/spark-submit examples/src/main/r/dataframe.R

有关优化程序的帮助,配置调优 指南提供了有关最佳实践的信息。 它们对于确保您的数据以高效的格式存储在内存中尤为重要。 有关部署的帮助,集群模式概述 描述了分布式操作中涉及的组件和支持的集群管理器。

最后,完整的 API 文档可在 ScalaJavaPythonR 中找到。