RDD 编程指南
概述
从高层次来看,每个 Spark 应用程序都包含一个驱动程序,它运行用户的 main
函数并在集群上执行各种并行操作。Spark 提供的主要抽象是弹性分布式数据集 (RDD),它是一个分布在集群节点上的元素集合,可以并行操作。RDD 是通过从 Hadoop 文件系统(或任何其他 Hadoop 支持的文件系统)中的文件或驱动程序中的现有 Scala 集合开始创建的,并对其进行转换。用户还可以要求 Spark持久化一个 RDD 到内存中,使其能够在并行操作中有效地重复使用。最后,RDD 会自动从节点故障中恢复。
Spark 中的第二个抽象是共享变量,它可以在并行操作中使用。默认情况下,当 Spark 并行运行一个函数作为一组在不同节点上的任务时,它会将函数中使用的每个变量的副本发送到每个任务。有时,需要在任务之间或任务和驱动程序之间共享一个变量。Spark 支持两种类型的共享变量:广播变量,它可以用来在所有节点的内存中缓存一个值,以及累加器,它是一些只能“添加”的变量,例如计数器和总和。
本指南展示了 Spark 支持的每种语言中的每个功能。如果你启动 Spark 的交互式 shell,最容易理解 - 对于 Scala shell,使用 bin/spark-shell
,对于 Python shell,使用 bin/pyspark
。
与 Spark 链接
Spark 3.5.1 与 Python 3.8+ 兼容。它可以使用标准的 CPython 解释器,因此可以使用 NumPy 等 C 库。它也与 PyPy 7.3.6+ 兼容。
Python 中的 Spark 应用程序可以使用包含 Spark 的 bin/spark-submit
脚本在运行时运行,或者将其包含在你的 setup.py 中,如下所示:
要运行没有通过 pip 安装 PySpark 的 Python 中的 Spark 应用程序,请使用 Spark 目录中的 bin/spark-submit
脚本。此脚本将加载 Spark 的 Java/Scala 库,并允许你将应用程序提交到集群。你也可以使用 bin/pyspark
启动交互式 Python shell。
如果你想访问 HDFS 数据,你需要使用链接到你的 HDFS 版本的 PySpark 构建。Spark 主页上也提供了 预构建包,用于常见的 HDFS 版本。
最后,你需要将一些 Spark 类导入你的程序。添加以下行:
PySpark 要求驱动程序和工作程序中的 Python 具有相同的次要版本。它使用 PATH 中的默认 python 版本,你可以通过 PYSPARK_PYTHON
指定要使用的 Python 版本,例如:
Spark 3.5.1 默认构建并分发为与 Scala 2.12 兼容。(Spark 也可以构建为与其他版本的 Scala 兼容。)要使用 Scala 编写应用程序,你需要使用兼容的 Scala 版本(例如 2.12.X)。
要编写 Spark 应用程序,你需要添加一个对 Spark 的 Maven 依赖项。Spark 可通过 Maven Central 获取,地址为:
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.1
此外,如果你想访问 HDFS 集群,你需要添加对你的 HDFS 版本的 hadoop-client
的依赖项。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,你需要将一些 Spark 类导入你的程序。添加以下行:
(在 Spark 1.3.0 之前,你需要显式地 import org.apache.spark.SparkContext._
来启用必要的隐式转换。)
Spark 3.5.1 支持 lambda 表达式,以便简洁地编写函数,否则你可以使用 org.apache.spark.api.java.function 包中的类。
请注意,Spark 2.2.0 中删除了对 Java 7 的支持。
要使用 Java 编写 Spark 应用程序,你需要添加一个对 Spark 的依赖项。Spark 可通过 Maven Central 获取,地址为:
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.1
此外,如果你想访问 HDFS 集群,你需要添加对你的 HDFS 版本的 hadoop-client
的依赖项。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,你需要将一些 Spark 类导入你的程序。添加以下行:
初始化 Spark
Spark 程序首先要做的是创建一个 SparkContext 对象,它告诉 Spark 如何访问集群。要创建一个 SparkContext
,你需要先构建一个 SparkConf 对象,其中包含有关你的应用程序的信息。
Spark 程序首先要做的是创建一个 SparkContext 对象,它告诉 Spark 如何访问集群。要创建一个 SparkContext
,你需要先构建一个 SparkConf 对象,其中包含有关你的应用程序的信息。
每个 JVM 应该只有一个 SparkContext 处于活动状态。在创建新的 SparkContext 之前,必须 stop()
活动的 SparkContext。
Spark 程序首先要做的是创建一个 JavaSparkContext 对象,它告诉 Spark 如何访问集群。要创建一个 SparkContext
,你需要先构建一个 SparkConf 对象,其中包含有关你的应用程序的信息。
appName
参数是你的应用程序的名称,将在集群 UI 上显示。 master
是一个 Spark、Mesos 或 YARN 集群 URL,或者是一个特殊的“local”字符串,用于在本地模式下运行。在实践中,当在集群上运行时,你不会希望在程序中硬编码 master
,而是 使用 spark-submit
启动应用程序 并在那里接收它。但是,对于本地测试和单元测试,你可以传递“local”来在进程内运行 Spark。
使用 Shell
在 PySpark shell 中,一个特殊的解释器感知的 SparkContext 已经为你创建好了,它位于名为 sc
的变量中。创建自己的 SparkContext 将不起作用。你可以使用 --master
参数设置上下文连接到的主节点,并且可以通过将逗号分隔的列表传递给 --py-files
参数来将 Python .zip、.egg 或 .py 文件添加到运行时路径。对于第三方 Python 依赖项,请参阅 Python 包管理。你也可以通过将逗号分隔的 Maven 坐标列表传递给 --packages
参数来将依赖项(例如 Spark 包)添加到你的 shell 会话中。任何可能存在依赖项的额外仓库(例如 Sonatype)都可以传递给 --repositories
参数。例如,要在正好四个核心上运行 bin/pyspark
,请使用:
或者,要将 code.py
添加到搜索路径(以便稍后能够 import code
),请使用:
有关选项的完整列表,请运行 pyspark --help
。在幕后,pyspark
调用了更通用的 spark-submit
脚本。
也可以在 IPython(增强的 Python 解释器)中启动 PySpark shell。PySpark 与 IPython 1.0.0 及更高版本兼容。要使用 IPython,在运行 bin/pyspark
时将 PYSPARK_DRIVER_PYTHON
变量设置为 ipython
要使用 Jupyter notebook(以前称为 IPython notebook),请执行以下操作:
你可以通过设置 PYSPARK_DRIVER_PYTHON_OPTS
来自定义 ipython
或 jupyter
命令。
启动 Jupyter Notebook 服务器后,你可以从“文件”选项卡中创建一个新的笔记本。在笔记本中,你可以在开始从 Jupyter notebook 中尝试 Spark 之前,将命令 %pylab inline
作为笔记本的一部分输入。
在 Spark shell 中,一个特殊的解释器感知的 SparkContext 已经为你创建好了,它位于名为 sc
的变量中。创建自己的 SparkContext 将不起作用。你可以使用 --master
参数设置上下文连接到的主节点,并且可以通过将逗号分隔的列表传递给 --jars
参数来将 JAR 添加到类路径中。你也可以通过将逗号分隔的 Maven 坐标列表传递给 --packages
参数来将依赖项(例如 Spark 包)添加到你的 shell 会话中。任何可能存在依赖项的额外仓库(例如 Sonatype)都可以传递给 --repositories
参数。例如,要在正好四个核心上运行 bin/spark-shell
,请使用:
或者,要将 code.jar
添加到其类路径中,请使用
要使用 Maven 坐标包含依赖项
有关所有选项的完整列表,请运行 spark-shell --help
。在幕后,spark-shell
调用更通用的 spark-submit
脚本。
弹性分布式数据集 (RDD)
Spark 围绕着弹性分布式数据集 (RDD) 的概念展开,RDD 是一个容错的元素集合,可以在并行环境中进行操作。创建 RDD 有两种方法:并行化驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。
并行化集合
并行化集合是通过在驱动程序中的现有可迭代对象或集合上调用 SparkContext
的 parallelize
方法来创建的。集合的元素被复制以形成一个分布式数据集,可以在并行环境中进行操作。例如,以下是如何创建一个包含数字 1 到 5 的并行化集合
创建后,分布式数据集 (distData
) 可以在并行环境中进行操作。例如,我们可以调用 distData.reduce(lambda a, b: a + b)
来将列表中的元素加起来。我们将在后面介绍对分布式数据集的操作。
并行化集合是通过在驱动程序中的现有集合(Scala Seq
)上调用 SparkContext
的 parallelize
方法来创建的。集合的元素被复制以形成一个分布式数据集,可以在并行环境中进行操作。例如,以下是如何创建一个包含数字 1 到 5 的并行化集合
创建后,分布式数据集 (distData
) 可以在并行环境中进行操作。例如,我们可以调用 distData.reduce((a, b) => a + b)
来将数组中的元素加起来。我们将在后面介绍对分布式数据集的操作。
并行化集合是通过在驱动程序中的现有 Collection
上调用 JavaSparkContext
的 parallelize
方法来创建的。集合的元素被复制以形成一个分布式数据集,可以在并行环境中进行操作。例如,以下是如何创建一个包含数字 1 到 5 的并行化集合
创建后,分布式数据集 (distData
) 可以在并行环境中进行操作。例如,我们可以调用 distData.reduce((a, b) -> a + b)
来将列表中的元素加起来。我们将在后面介绍对分布式数据集的操作。
并行化集合的一个重要参数是将数据集分成多少个分区。Spark 将为集群的每个分区运行一个任务。通常,您希望在集群中的每个 CPU 上有 2-4 个分区。通常,Spark 会尝试根据您的集群自动设置分区数量。但是,您也可以通过将其作为第二个参数传递给 parallelize
来手动设置它(例如 sc.parallelize(data, 10)
)。注意:代码中某些地方使用术语切片(分区的同义词)来保持向后兼容性。
外部数据集
PySpark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat。
文本文件 RDD 可以使用 SparkContext
的 textFile
方法创建。此方法接受文件的 URI(机器上的本地路径或 hdfs://
、s3a://
等 URI)并将其读取为行集合。以下是一个示例调用
创建后,可以使用数据集操作对 distFile
进行操作。例如,我们可以使用 map
和 reduce
操作将所有行的长度加起来,如下所示:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
。
关于使用 Spark 读取文件的一些注意事项
-
如果使用本地文件系统上的路径,则工作节点上的相同路径也必须可以访问该文件。将文件复制到所有工作节点或使用网络挂载的共享文件系统。
-
Spark 的所有基于文件的输入方法(包括
textFile
)都支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory")
、textFile("/my/directory/*.txt")
和textFile("/my/directory/*.gz")
。 -
textFile
方法还接受一个可选的第二个参数来控制文件的分区数量。默认情况下,Spark 为文件的每个块创建一个分区(块在 HDFS 中默认情况下为 128MB),但您也可以通过传递更大的值来请求更多分区。请注意,您不能拥有少于块的分区。
除了文本文件之外,Spark 的 Python API 还支持其他几种数据格式
-
SparkContext.wholeTextFiles
允许您读取包含多个小型文本文件的目录,并将每个文件作为 (文件名,内容) 对返回。这与textFile
形成对比,textFile
将为每个文件中的每一行返回一个记录。 -
RDD.saveAsPickleFile
和SparkContext.pickleFile
支持将 RDD 保存为由腌制 Python 对象组成的简单格式。在腌制序列化上使用批处理,默认批处理大小为 10。 -
SequenceFile 和 Hadoop 输入/输出格式
注意 此功能目前标记为 Experimental
,适用于高级用户。它可能会在将来被基于 Spark SQL 的读/写支持所取代,在这种情况下,Spark SQL 是首选方法。
Writable 支持
PySpark SequenceFile 支持在 Java 中加载键值对的 RDD,将 Writable 转换为基本 Java 类型,并使用 pickle 腌制生成的 Java 对象。将键值对的 RDD 保存到 SequenceFile 时,PySpark 会执行相反的操作。它将 Python 对象解开为 Java 对象,然后将其转换为 Writable。以下 Writable 会自动转换
Writable 类型 | Python 类型 |
---|---|
Text | str |
IntWritable | int |
FloatWritable | float |
DoubleWritable | float |
BooleanWritable | bool |
BytesWritable | bytearray |
NullWritable | None |
MapWritable | dict |
数组不会开箱即用地处理。用户需要在读取或写入时指定自定义 ArrayWritable
子类型。在写入时,用户还需要指定自定义转换器,将数组转换为自定义 ArrayWritable
子类型。在读取时,默认转换器将自定义 ArrayWritable
子类型转换为 Java Object[]
,然后将其腌制为 Python 元组。要获取 Python array.array
以用于基本类型的数组,用户需要指定自定义转换器。
保存和加载 SequenceFiles
与文本文件类似,可以通过指定路径来保存和加载 SequenceFiles。可以指定键和值类,但对于标准 Writable,这不是必需的。
保存和加载其他 Hadoop 输入/输出格式
PySpark 还可以读取任何 Hadoop InputFormat 或写入任何 Hadoop OutputFormat,适用于“新”和“旧” Hadoop MapReduce API。如果需要,可以将 Hadoop 配置作为 Python 字典传递。以下是如何使用 Elasticsearch ESInputFormat 的示例
请注意,如果 InputFormat 仅依赖于 Hadoop 配置和/或输入路径,并且键和值类可以根据上表轻松转换,那么这种方法对于此类情况应该很有效。
如果您有自定义序列化二进制数据(例如从 Cassandra/HBase 加载数据),那么您首先需要在 Scala/Java 端将该数据转换为 pickle 的 pickler 可以处理的内容。为此提供了一个 Converter 特性。只需扩展此特性并在 convert
方法中实现您的转换代码即可。请记住确保将此类以及访问您的 InputFormat
所需的任何依赖项打包到您的 Spark 作业 jar 中,并将其包含在 PySpark 类路径中。
请参阅 Python 示例 和 Converter 示例,以了解如何使用 Cassandra/HBase InputFormat
和 OutputFormat
以及自定义转换器。
Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat。
文本文件 RDD 可以使用 SparkContext
的 textFile
方法创建。此方法接受文件的 URI(机器上的本地路径或 hdfs://
、s3a://
等 URI)并将其读取为行集合。以下是一个示例调用
创建后,可以使用数据集操作对 distFile
进行操作。例如,我们可以使用 map
和 reduce
操作将所有行的长度加起来,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)
。
关于使用 Spark 读取文件的一些注意事项
-
如果使用本地文件系统上的路径,则工作节点上的相同路径也必须可以访问该文件。将文件复制到所有工作节点或使用网络挂载的共享文件系统。
-
Spark 的所有基于文件的输入方法(包括
textFile
)都支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory")
、textFile("/my/directory/*.txt")
和textFile("/my/directory/*.gz")
。当读取多个文件时,分区的顺序取决于文件从文件系统返回的顺序。例如,它可能或可能不遵循按路径对文件进行的字典排序。在分区内,元素按其在基础文件中的顺序排序。 -
textFile
方法还接受一个可选的第二个参数来控制文件的分区数量。默认情况下,Spark 为文件的每个块创建一个分区(块在 HDFS 中默认情况下为 128MB),但您也可以通过传递更大的值来请求更多分区。请注意,您不能拥有少于块的分区。
除了文本文件之外,Spark 的 Scala API 还支持其他几种数据格式
-
SparkContext.wholeTextFiles
允许您读取包含多个小型文本文件的目录,并将其中的每个文件作为 (文件名,内容) 对返回。这与textFile
形成对比,后者将返回每个文件中每行的记录。分区由数据局部性决定,在某些情况下,这可能导致分区过少。对于这些情况,wholeTextFiles
提供了一个可选的第二个参数来控制最小分区数。 -
对于 SequenceFiles,请使用 SparkContext 的
sequenceFile[K, V]
方法,其中K
和V
是文件中键和值的类型。它们应该是 Hadoop 的 Writable 接口的子类,例如 IntWritable 和 Text。此外,Spark 允许您为一些常用的 Writables 指定原生类型;例如,sequenceFile[Int, String]
将自动读取 IntWritables 和 Texts。 -
对于其他 Hadoop InputFormats,您可以使用
SparkContext.hadoopRDD
方法,该方法接受任意JobConf
和输入格式类、键类和值类。设置这些参数的方式与您在 Hadoop 作业中使用输入源的方式相同。您还可以使用SparkContext.newAPIHadoopRDD
用于基于“新” MapReduce API (org.apache.hadoop.mapreduce
) 的 InputFormats。 -
RDD.saveAsObjectFile
和SparkContext.objectFile
支持以由序列化 Java 对象组成的简单格式保存 RDD。虽然这不如 Avro 等专用格式高效,但它提供了一种保存任何 RDD 的简单方法。
Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat。
文本文件 RDD 可以使用 SparkContext
的 textFile
方法创建。此方法接受文件的 URI(机器上的本地路径或 hdfs://
、s3a://
等 URI)并将其读取为行集合。以下是一个示例调用
创建后,可以使用数据集操作对 distFile
进行操作。例如,我们可以使用 map
和 reduce
操作将所有行的尺寸加起来,如下所示:distFile.map(s -> s.length()).reduce((a, b) -> a + b)
。
关于使用 Spark 读取文件的一些注意事项
-
如果使用本地文件系统上的路径,则工作节点上的相同路径也必须可以访问该文件。将文件复制到所有工作节点或使用网络挂载的共享文件系统。
-
Spark 的所有基于文件的输入方法(包括
textFile
)都支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory")
、textFile("/my/directory/*.txt")
和textFile("/my/directory/*.gz")
。 -
textFile
方法还接受一个可选的第二个参数来控制文件的分区数量。默认情况下,Spark 为文件的每个块创建一个分区(块在 HDFS 中默认情况下为 128MB),但您也可以通过传递更大的值来请求更多分区。请注意,您不能拥有少于块的分区。
除了文本文件之外,Spark 的 Java API 还支持其他几种数据格式
-
JavaSparkContext.wholeTextFiles
允许您读取包含多个小型文本文件的目录,并将其中的每个文件作为 (文件名,内容) 对返回。这与textFile
形成对比,后者将返回每个文件中每行的记录。 -
对于 SequenceFiles,请使用 SparkContext 的
sequenceFile[K, V]
方法,其中K
和V
是文件中键和值的类型。它们应该是 Hadoop 的 Writable 接口的子类,例如 IntWritable 和 Text。 -
对于其他 Hadoop InputFormats,您可以使用
JavaSparkContext.hadoopRDD
方法,该方法接受任意JobConf
和输入格式类、键类和值类。设置这些参数的方式与您在 Hadoop 作业中使用输入源的方式相同。您还可以使用JavaSparkContext.newAPIHadoopRDD
用于基于“新” MapReduce API (org.apache.hadoop.mapreduce
) 的 InputFormats。 -
JavaRDD.saveAsObjectFile
和JavaSparkContext.objectFile
支持以由序列化 Java 对象组成的简单格式保存 RDD。虽然这不如 Avro 等专用格式高效,但它提供了一种保存任何 RDD 的简单方法。
RDD 操作
RDD 支持两种类型的操作:转换,它从现有数据集创建新数据集,以及操作,它在对数据集运行计算后将值返回给驱动程序。例如,map
是一种转换,它将每个数据集元素通过一个函数传递,并返回一个表示结果的新 RDD。另一方面,reduce
是一种操作,它使用某个函数聚合 RDD 中的所有元素,并将最终结果返回给驱动程序(尽管也存在一个并行的 reduceByKey
,它返回一个分布式数据集)。
Spark 中的所有转换都是惰性的,这意味着它们不会立即计算结果。相反,它们只记住应用于某个基本数据集(例如文件)的转换。只有当操作需要将结果返回给驱动程序时,才会计算转换。这种设计使 Spark 能够更有效地运行。例如,我们可以意识到,通过 map
创建的数据集将在 reduce
中使用,并且只将 reduce
的结果返回给驱动程序,而不是更大的映射数据集。
默认情况下,每次对转换后的 RDD 运行操作时,它都可能被重新计算。但是,您也可以使用 persist
(或 cache
)方法将 RDD 持久化到内存中,在这种情况下,Spark 将在集群中保留这些元素,以便下次查询时能够更快地访问它们。还支持将 RDD 持久化到磁盘,或复制到多个节点上。
基础
为了说明 RDD 的基础知识,请考虑以下简单程序
第一行从外部文件定义了一个基本 RDD。此数据集不会加载到内存中或以其他方式进行操作:lines
只是一个指向文件的指针。第二行将 lineLengths
定义为 map
转换的结果。同样,由于惰性,lineLengths
不会立即计算。最后,我们运行 reduce
,这是一个操作。此时,Spark 将计算分解为要在不同机器上运行的任务,每台机器都运行其部分的 map 和本地归约,只将其答案返回给驱动程序。
如果我们还想在以后再次使用 lineLengths
,我们可以添加
在 reduce
之前,这将导致 lineLengths
在第一次计算后保存在内存中。
为了说明 RDD 的基础知识,请考虑以下简单程序
第一行从外部文件定义了一个基本 RDD。此数据集不会加载到内存中或以其他方式进行操作:lines
只是一个指向文件的指针。第二行将 lineLengths
定义为 map
转换的结果。同样,由于惰性,lineLengths
不会立即计算。最后,我们运行 reduce
,这是一个操作。此时,Spark 将计算分解为要在不同机器上运行的任务,每台机器都运行其部分的 map 和本地归约,只将其答案返回给驱动程序。
如果我们还想在以后再次使用 lineLengths
,我们可以添加
在 reduce
之前,这将导致 lineLengths
在第一次计算后保存在内存中。
为了说明 RDD 的基础知识,请考虑以下简单程序
第一行从外部文件定义了一个基本 RDD。此数据集不会加载到内存中或以其他方式进行操作:lines
只是一个指向文件的指针。第二行将 lineLengths
定义为 map
转换的结果。同样,由于惰性,lineLengths
不会立即计算。最后,我们运行 reduce
,这是一个操作。此时,Spark 将计算分解为要在不同机器上运行的任务,每台机器都运行其部分的 map 和本地归约,只将其答案返回给驱动程序。
如果我们还想在以后再次使用 lineLengths
,我们可以添加
在 reduce
之前,这将导致 lineLengths
在第一次计算后保存在内存中。
将函数传递给 Spark
Spark 的 API 很大程度上依赖于在驱动程序中传递函数以在集群上运行。有三种推荐的方法
- Lambda 表达式,用于可以作为表达式编写的简单函数。(Lambda 不支持多语句函数或不返回值的语句。)
- 调用 Spark 的函数内部的本地
def
,用于更长的代码。 - 模块中的顶级函数。
例如,要传递一个比使用 lambda
支持的更长的函数,请考虑以下代码
请注意,虽然也可以传递类实例中的方法的引用(而不是单例对象),但这需要将包含该类的对象与该方法一起发送。例如,请考虑
在这里,如果我们创建一个 new MyClass
并对其调用 doStuff
,则其中的 map
会引用该 MyClass
实例的 func
方法,因此需要将整个对象发送到集群。
以类似的方式,访问外部对象的字段将引用整个对象
为了避免此问题,最简单的方法是将 field
复制到本地变量中,而不是从外部访问它
Spark 的 API 很大程度上依赖于在驱动程序中传递函数以在集群上运行。有两种推荐的方法
- 匿名函数语法,可用于简短的代码段。
- 全局单例对象中的静态方法。例如,您可以定义
object MyFunctions
,然后传递MyFunctions.func1
,如下所示
请注意,虽然也可以传递类实例中的方法的引用(而不是单例对象),但这需要将包含该类的对象与该方法一起发送。例如,请考虑
在这里,如果我们创建一个新的 MyClass
实例并对其调用 doStuff
,则其中的 map
会引用该 MyClass
实例的 func1
方法,因此需要将整个对象发送到集群。这类似于编写 rdd.map(x => this.func1(x))
。
以类似的方式,访问外部对象的字段将引用整个对象
等效于编写 rdd.map(x => this.field + x)
,它引用了所有 this
。为了避免此问题,最简单的方法是将 field
复制到本地变量中,而不是从外部访问它
Spark 的 API 很大程度上依赖于在驱动程序中传递函数以在集群上运行。在 Java 中,函数由实现 org.apache.spark.api.java.function 包中的接口的类表示。有两种方法可以创建此类函数
- 在您自己的类中实现 Function 接口,无论是作为匿名内部类还是命名类,并将其实例传递给 Spark。
- 使用 lambda 表达式 简洁地定义实现。
虽然本指南的大部分内容都使用 lambda 语法来简洁起见,但使用所有相同的 API 以长格式也很容易。例如,我们可以将上面的代码编写如下
或者,如果在内联编写函数很麻烦
请注意,Java 中的匿名内部类也可以访问封闭范围内的变量,只要它们被标记为 final
。Spark 会像其他语言一样,将这些变量的副本发送到每个工作节点。
理解闭包
Spark 最难理解的事情之一是,在跨集群执行代码时,变量和方法的范围和生命周期。修改其范围之外的变量的 RDD 操作可能是常见的混淆来源。在下面的示例中,我们将查看使用 foreach()
来递增计数器的代码,但类似的问题也可能发生在其他操作中。
示例
考虑以下简单的 RDD 元素求和,其行为可能会因执行是否在同一个 JVM 中而异。一个常见的例子是在本地模式 (local
模式 (--master = local[n]
)) 中运行 Spark 与将 Spark 应用程序部署到集群(例如通过 spark-submit 到 YARN)之间的区别。
本地模式与集群模式
上面代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark 将 RDD 操作的处理分解成任务,每个任务由一个执行器执行。在执行之前,Spark 会计算任务的**闭包**。闭包是指执行器在 RDD 上执行计算(在本例中为 foreach()
)时必须可见的那些变量和方法。此闭包被序列化并发送到每个执行器。
发送到每个执行器的闭包中的变量现在是副本,因此,当在 foreach
函数中引用**counter** 时,它不再是驱动程序节点上的**counter**。驱动程序节点的内存中仍然存在一个**counter**,但执行器无法访问它!执行器只能看到来自序列化闭包的副本。因此,**counter** 的最终值将仍然为零,因为对**counter** 的所有操作都引用了序列化闭包中的值。
在本地模式下,在某些情况下,foreach
函数实际上将在与驱动程序相同的 JVM 中执行,并将引用相同的原始**counter**,并且实际上可能会更新它。
为了确保在这些情况下有明确的行为,应该使用 Accumulator
。Spark 中的累加器专门用于提供一种机制,用于在集群中跨工作节点拆分执行时安全地更新变量。本指南的累加器部分将更详细地讨论这些内容。
一般来说,闭包(如循环或局部定义的方法)不应用于修改全局状态。Spark 没有定义或保证对从闭包外部引用的对象的修改的行为。某些执行此操作的代码可能在本地模式下工作,但这仅仅是偶然的,并且此类代码在分布式模式下不会按预期工作。如果需要全局聚合,请改用累加器。
打印 RDD 的元素
另一个常见的习惯用法是尝试使用 rdd.foreach(println)
或 rdd.map(println)
打印 RDD 的元素。在一台机器上,这将生成预期的输出并打印所有 RDD 的元素。但是,在 cluster
模式下,执行器调用的 stdout
现在写入执行器的 stdout
而不是驱动程序上的 stdout
,因此驱动程序上的 stdout
不会显示这些!要打印驱动程序上的所有元素,可以使用 collect()
方法首先将 RDD 带到驱动程序节点,如下所示:rdd.collect().foreach(println)
。但这会导致驱动程序内存不足,因为 collect()
将整个 RDD 提取到一台机器上;如果您只需要打印 RDD 的几个元素,更安全的方法是使用 take()
:rdd.take(100).foreach(println)
。
使用键值对
虽然大多数 Spark 操作在包含任何类型对象的 RDD 上工作,但一些特殊操作仅适用于包含键值对的 RDD。最常见的是分布式“shuffle”操作,例如按键对元素进行分组或聚合。
在 Python 中,这些操作在包含内置 Python 元组(如 (1, 2)
)的 RDD 上工作。只需创建此类元组,然后调用您想要的运算。
例如,以下代码使用 reduceByKey
操作对键值对进行操作,以计算文本文件中每行出现的次数。
我们还可以使用 counts.sortByKey()
(例如)按字母顺序对对进行排序,最后使用 counts.collect()
将它们作为对象列表带回驱动程序程序。
虽然大多数 Spark 操作在包含任何类型对象的 RDD 上工作,但一些特殊操作仅适用于包含键值对的 RDD。最常见的是分布式“shuffle”操作,例如按键对元素进行分组或聚合。
在 Scala 中,这些操作在包含 Tuple2 对象(语言中的内置元组,只需编写 (a, b)
即可创建)的 RDD 上自动可用。键值对操作在 PairRDDFunctions 类中可用,该类自动围绕元组的 RDD 进行包装。
例如,以下代码使用 reduceByKey
操作对键值对进行操作,以计算文本文件中每行出现的次数。
我们还可以使用 counts.sortByKey()
(例如)按字母顺序对对进行排序,最后使用 counts.collect()
将它们作为对象数组带回驱动程序程序。
注意:当在键值对操作中使用自定义对象作为键时,必须确保自定义 equals()
方法与匹配的 hashCode()
方法一起使用。有关完整详细信息,请参阅 Object.hashCode() 文档 中概述的契约。
虽然大多数 Spark 操作在包含任何类型对象的 RDD 上工作,但一些特殊操作仅适用于包含键值对的 RDD。最常见的是分布式“shuffle”操作,例如按键对元素进行分组或聚合。
在 Java 中,键值对使用来自 Scala 标准库的 scala.Tuple2 类表示。您只需调用 new Tuple2(a, b)
即可创建元组,并在以后使用 tuple._1()
和 tuple._2()
访问其字段。
键值对的 RDD 由 JavaPairRDD 类表示。您可以使用 map
操作的特殊版本(如 mapToPair
和 flatMapToPair
)从 JavaRDD 构造 JavaPairRDD。JavaPairRDD 将具有标准 RDD 函数和特殊的键值函数。
例如,以下代码使用 reduceByKey
操作对键值对进行操作,以计算文本文件中每行出现的次数。
我们还可以使用 counts.sortByKey()
(例如)按字母顺序对对进行排序,最后使用 counts.collect()
将它们作为对象数组带回驱动程序程序。
注意:当在键值对操作中使用自定义对象作为键时,必须确保自定义 equals()
方法与匹配的 hashCode()
方法一起使用。有关完整详细信息,请参阅 Object.hashCode() 文档 中概述的契约。
转换
下表列出了 Spark 支持的一些常见转换。有关详细信息,请参阅 RDD API 文档 (Scala、Java、Python、R) 和对 RDD 函数文档 (Scala、Java)。
操作
下表列出了 Spark 支持的一些常见操作。有关详细信息,请参阅 RDD API 文档(Scala、Java、Python、R)
操作 | 含义 |
---|---|
reduce(func) | 使用函数 func(接受两个参数并返回一个参数)聚合数据集中的元素。该函数应该是可交换的和可结合的,以便它可以在并行中正确计算。 |
collect() | 将数据集中的所有元素作为数组返回到驱动程序程序。这通常在过滤或其他操作之后有用,这些操作返回数据的小子集。 |
count() | 返回数据集中元素的数量。 |
first() | 返回数据集的第一个元素(类似于 take(1))。 |
take(n) | 返回一个数组,其中包含数据集的前 n 个元素。 |
takeSample(withReplacement, num, [seed]) | 返回一个数组,其中包含数据集的 num 个元素的随机样本,可以有放回或无放回,可以选择预先指定随机数生成器种子。 |
takeOrdered(n, [ordering]) | 使用其自然顺序或自定义比较器返回 RDD 的前 n 个元素。 |
saveAsTextFile(path) | 将数据集的元素写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定目录中的文本文件(或一组文本文件)。Spark 将对每个元素调用 toString 以将其转换为文件中的文本行。 |
saveAsSequenceFile(path) (Java 和 Scala) |
将数据集的元素写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统中的给定路径中的 Hadoop SequenceFile。这在实现 Hadoop 的 Writable 接口的键值对的 RDD 上可用。在 Scala 中,它也适用于可以隐式转换为 Writable 的类型(Spark 包含对基本类型(如 Int、Double、String 等)的转换)。 |
saveAsObjectFile(path) (Java 和 Scala) |
使用 Java 序列化以简单格式写入数据集的元素,然后可以使用 SparkContext.objectFile() 加载。 |
countByKey() | 仅适用于类型为 (K, V) 的 RDD。返回一个 (K, Int) 对的哈希映射,其中包含每个键的计数。 |
foreach(func) | 对数据集中的每个元素运行函数 func。这通常用于副作用,例如更新 累加器 或与外部存储系统交互。 注意:修改 foreach() 外部累加器以外的变量可能会导致未定义的行为。有关更多详细信息,请参阅 理解闭包 。 |
Spark RDD API 还公开了某些操作的异步版本,例如 foreachAsync
用于 foreach
,它会立即向调用者返回一个 FutureAction
,而不是阻塞在操作完成时。这可用于管理或等待操作的异步执行。
洗牌操作
Spark 中的某些操作会触发称为混洗的事件。混洗是 Spark 用于重新分配数据的机制,以便它在分区之间以不同的方式分组。这通常涉及在执行器和机器之间复制数据,使混洗成为一项复杂且昂贵的操作。
背景
为了理解混洗过程中发生了什么,我们可以考虑 reduceByKey
操作的示例。 reduceByKey
操作生成一个新的 RDD,其中单个键的所有值都组合成一个元组 - 键和对与该键关联的所有值执行 reduce 函数的结果。挑战在于,单个键并非所有值都一定位于同一个分区,甚至同一台机器上,但它们必须位于同一位置才能计算结果。
在 Spark 中,数据通常不会跨分区分布以处于特定操作所需的适当位置。在计算过程中,单个任务将对单个分区进行操作 - 因此,为了组织所有数据以执行单个 reduceByKey
reduce 任务,Spark 需要执行全对全操作。它必须从所有分区读取以查找所有键的所有值,然后将跨分区的这些值组合在一起以计算每个键的最终结果 - 这称为 **混洗**。
虽然新混洗数据的每个分区中的元素集将是确定性的,分区本身的排序也是如此,但这些元素的排序不是。如果希望在混洗后获得可预测排序的数据,则可以使用
mapPartitions
使用例如.sorted
对每个分区进行排序repartitionAndSortWithinPartitions
在同时重新分区时有效地对分区进行排序sortBy
创建全局排序的 RDD
可能导致混洗的操作包括 **重新分区** 操作(如 repartition
和 coalesce
)、**'ByKey** 操作(除计数外)(如 groupByKey
和 reduceByKey
)以及 **连接** 操作(如 cogroup
和 join
)。
性能影响
**混洗** 是一项昂贵的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。为了组织混洗数据,Spark 生成一组任务 - map 任务来组织数据,以及一组 reduce 任务来聚合数据。这种命名法来自 MapReduce,与 Spark 的 map
和 reduce
操作没有直接关系。
在内部,单个 map 任务的结果将保存在内存中,直到它们无法容纳。然后,这些结果将根据目标分区进行排序并写入单个文件。在 reduce 端,任务读取相关的排序块。
某些混洗操作可能会消耗大量的堆内存,因为它们在传输记录之前或之后使用内存中数据结构来组织记录。具体来说,reduceByKey
和 aggregateByKey
在 map 端创建这些结构,而 'ByKey
操作在 reduce 端生成这些结构。当数据无法容纳在内存中时,Spark 会将这些表溢出到磁盘,从而导致磁盘 I/O 的额外开销和垃圾回收增加。
混洗还会在磁盘上生成大量中间文件。从 Spark 1.3 开始,这些文件将保留,直到相应的 RDD 不再使用并被垃圾回收。这样做是为了避免在重新计算血统时重新创建混洗文件。垃圾回收可能只在很长一段时间后发生,如果应用程序保留了对这些 RDD 的引用,或者如果 GC 不经常启动。这意味着长时间运行的 Spark 作业可能会消耗大量的磁盘空间。临时存储目录由配置 Spark 上下文时的 spark.local.dir
配置参数指定。
可以通过调整各种配置参数来调整混洗行为。请参阅 Spark 配置指南 中的“混洗行为”部分。
RDD 持久化
Spark 中最重要的功能之一是将数据集持久化(或缓存)在内存中以跨操作使用。当您持久化 RDD 时,每个节点都会将它计算的任何分区存储在内存中,并在对该数据集(或从该数据集派生的数据集)的其他操作中重复使用它们。这使得未来的操作速度快得多(通常快 10 倍以上)。缓存是迭代算法和快速交互式使用的关键工具。
您可以使用 persist()
或 cache()
方法将 RDD 标记为持久化。在操作中首次计算它时,它将保存在节点的内存中。Spark 的缓存是容错的 - 如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。
此外,每个持久化的 RDD 可以使用不同的存储级别进行存储,例如,您可以将数据集持久化到磁盘,将数据集持久化到内存中,但作为序列化的 Java 对象(以节省空间),在节点之间复制它。这些级别是通过将 StorageLevel
对象(Scala、Java、Python)传递给 persist()
来设置的。 cache()
方法是使用默认存储级别的简写,即 StorageLevel.MEMORY_ONLY
(在内存中存储反序列化的对象)。存储级别的完整集是
存储级别 | 含义 |
---|---|
MEMORY_ONLY | 将 RDD 存储为 JVM 中的反序列化 Java 对象。如果 RDD 无法容纳在内存中,则某些分区将不会被缓存,并且将在每次需要时动态重新计算。这是默认级别。 |
MEMORY_AND_DISK | 将 RDD 存储为 JVM 中的反序列化 Java 对象。如果 RDD 无法容纳在内存中,则将无法容纳在磁盘上的分区存储在磁盘上,并在需要时从磁盘读取它们。 |
MEMORY_ONLY_SER (Java 和 Scala) |
将 RDD 存储为序列化的 Java 对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,尤其是在使用 快速序列化器 时,但读取时更占用 CPU。 |
MEMORY_AND_DISK_SER (Java 和 Scala) |
与 MEMORY_ONLY_SER 相似,但将无法容纳在内存中的分区溢出到磁盘,而不是在每次需要时动态重新计算它们。 |
DISK_ONLY | 仅将 RDD 分区存储在磁盘上。 |
MEMORY_ONLY_2、MEMORY_AND_DISK_2 等 | 与上面的级别相同,但在两个集群节点上复制每个分区。 |
OFF_HEAP(实验性) | 与 MEMORY_ONLY_SER 相似,但将数据存储在 堆外内存 中。这需要启用堆外内存。 |
注意:在 Python 中,存储的对象将始终使用 Pickle 库进行序列化,因此您选择序列化级别无关紧要。Python 中可用的存储级别包括 MEMORY_ONLY
、MEMORY_ONLY_2
、MEMORY_AND_DISK
、MEMORY_AND_DISK_2
、DISK_ONLY
、DISK_ONLY_2
和 DISK_ONLY_3
。
Spark 在 shuffle 操作(例如 reduceByKey
)中也会自动持久化一些中间数据,即使用户没有调用 persist
。这样做是为了避免在 shuffle 过程中节点发生故障时重新计算整个输入。我们仍然建议用户在计划重用结果 RDD 时调用 persist
。
选择哪个存储级别?
Spark 的存储级别旨在在内存使用和 CPU 效率之间提供不同的权衡。我们建议您按照以下步骤选择一个
-
如果您的 RDD 可以舒适地使用默认存储级别 (
MEMORY_ONLY
),请保持这种方式。这是最有效的 CPU 选项,允许对 RDD 的操作尽可能快地运行。 -
如果不是,请尝试使用
MEMORY_ONLY_SER
和 选择一个快速的序列化库 使对象更节省空间,但仍然可以相当快地访问。(Java 和 Scala) -
除非计算数据集的函数很昂贵,或者它们过滤了大量数据,否则不要溢出到磁盘。否则,重新计算分区可能与从磁盘读取一样快。
-
如果您希望快速故障恢复(例如,如果使用 Spark 为 Web 应用程序提供服务),请使用复制存储级别。所有存储级别都通过重新计算丢失的数据提供完整的容错能力,但复制存储级别允许您在 RDD 上继续运行任务,而无需等待重新计算丢失的分区。
删除数据
Spark 会自动监控每个节点上的缓存使用情况,并以最近最少使用 (LRU) 的方式删除旧数据分区。如果您想手动删除 RDD 而不是等待它从缓存中消失,请使用 RDD.unpersist()
方法。请注意,此方法默认情况下不会阻塞。要阻塞直到资源释放,请在调用此方法时指定 blocking=true
。
共享变量
通常,当传递给 Spark 操作(例如 map
或 reduce
)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本上工作。这些变量被复制到每台机器上,并且对远程机器上的变量的更新不会传播回驱动程序。支持跨任务的一般读写共享变量将效率低下。但是,Spark 为两种常见的用法模式提供了两种有限类型的共享变量:广播变量和累加器。
广播变量
广播变量允许程序员将只读变量缓存在每台机器上,而不是将它的副本与任务一起发送。例如,它们可以用来以有效的方式为每个节点提供一个大型输入数据集的副本。Spark 还尝试使用高效的广播算法来分发广播变量,以降低通信成本。
Spark 操作通过一组阶段执行,这些阶段由分布式“shuffle”操作隔开。Spark 会自动广播每个阶段内任务所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着,只有当跨多个阶段的任务需要相同数据或当以反序列化形式缓存数据很重要时,显式创建广播变量才有用。
广播变量是从变量 v
创建的,方法是调用 SparkContext.broadcast(v)
。广播变量是 v
的包装器,它的值可以通过调用 value
方法来访问。下面的代码展示了这一点
创建广播变量后,应该在集群上运行的任何函数中使用它,而不是使用值 v
,这样 v
就不会被多次发送到节点。此外,在广播对象 v
后,不应该修改它,以确保所有节点都获得广播变量的相同值(例如,如果变量稍后被发送到新节点)。
要释放广播变量复制到执行器上的资源,请调用 .unpersist()
。如果广播变量之后再次使用,它将被重新广播。要永久释放广播变量使用的所有资源,请调用 .destroy()
。广播变量之后不能再使用。请注意,这些方法默认情况下不会阻塞。要阻塞直到资源释放,请在调用它们时指定 blocking=true
。
累加器
累加器是仅通过关联和可交换操作“添加”的变量,因此可以在并行中得到有效支持。它们可以用来实现计数器(如在 MapReduce 中)或总和。Spark 本地支持数字类型的累加器,程序员可以添加对新类型的支持。
作为用户,您可以创建命名或未命名的累加器。如下图所示,命名累加器(在本例中为 counter
)将显示在修改该累加器的阶段的 Web UI 中。Spark 在“任务”表中显示了每个任务修改的每个累加器的值。
跟踪 UI 中的累加器对于了解正在运行的阶段的进度很有用(注意:这在 Python 中尚不支持)。
累加器是从初始值 v
创建的,方法是调用 SparkContext.accumulator(v)
。在集群上运行的任务可以使用 add
方法或 +=
运算符向它添加内容。但是,它们不能读取它的值。只有驱动程序可以读取累加器的值,使用它的 value
方法。
下面的代码展示了累加器被用来将数组的元素加起来
虽然这段代码使用了对 Int 类型累加器的内置支持,但程序员也可以通过子类化 AccumulatorParam 来创建自己的类型。AccumulatorParam 接口有两个方法:zero
用于为您的数据类型提供“零值”,以及 addInPlace
用于将两个值加在一起。例如,假设我们有一个 Vector
类来表示数学向量,我们可以编写
可以通过调用 SparkContext.longAccumulator()
或 SparkContext.doubleAccumulator()
来创建数字累加器,分别累加 Long 或 Double 类型的数值。在集群上运行的任务可以使用 add
方法向它添加内容。但是,它们不能读取它的值。只有驱动程序可以读取累加器的值,使用它的 value
方法。
下面的代码展示了累加器被用来将数组的元素加起来
虽然这段代码使用了对 Long 类型累加器的内置支持,但程序员也可以通过子类化 AccumulatorV2 来创建自己的类型。AccumulatorV2 抽象类有几个方法需要重写:reset
用于将累加器重置为零,add
用于将另一个值添加到累加器中,merge
用于将另一个相同类型的累加器合并到此累加器中。必须重写的其他方法包含在 API 文档 中。例如,假设我们有一个 MyVector
类来表示数学向量,我们可以编写
请注意,当程序员定义自己的 AccumulatorV2 类型时,结果类型可能与添加的元素类型不同。
可以通过调用 SparkContext.longAccumulator()
或 SparkContext.doubleAccumulator()
来创建数字累加器,分别累加 Long 或 Double 类型的数值。在集群上运行的任务可以使用 add
方法向它添加内容。但是,它们不能读取它的值。只有驱动程序可以读取累加器的值,使用它的 value
方法。
下面的代码展示了累加器被用来将数组的元素加起来
虽然这段代码使用了对 Long 类型累加器的内置支持,但程序员也可以通过子类化 AccumulatorV2 来创建自己的类型。AccumulatorV2 抽象类有几个方法需要重写:reset
用于将累加器重置为零,add
用于将另一个值添加到累加器中,merge
用于将另一个相同类型的累加器合并到此累加器中。必须重写的其他方法包含在 API 文档 中。例如,假设我们有一个 MyVector
类来表示数学向量,我们可以编写
请注意,当程序员定义自己的 AccumulatorV2 类型时,结果类型可能与添加的元素类型不同。
警告:当 Spark 任务完成时,Spark 将尝试将此任务中累积的更新合并到累加器中。如果失败,Spark 将忽略失败,并将任务标记为成功,并继续运行其他任务。因此,有问题的累加器不会影响 Spark 作业,但它可能不会被正确更新,尽管 Spark 作业成功了。
对于仅在操作中执行的累加器更新,Spark 保证每个任务对累加器的更新只会被应用一次,即重新启动的任务不会更新值。在转换中,用户应该意识到,如果任务或作业阶段被重新执行,每个任务的更新可能会被应用多次。
累加器不会改变 Spark 的惰性求值模型。如果它们正在 RDD 上的操作中被更新,它们的价值只有在 RDD 作为操作的一部分被计算时才会被更新。因此,在像 map()
这样的惰性转换中进行时,累加器更新不会被保证执行。下面的代码片段演示了这一特性
部署到集群
应用程序提交指南 描述了如何将应用程序提交到集群。简而言之,一旦您将应用程序打包到 JAR(对于 Java/Scala)或一组 .py
或 .zip
文件(对于 Python)中,bin/spark-submit
脚本就可以让您将它提交到任何支持的集群管理器。
从 Java / Scala 启动 Spark 作业
org.apache.spark.launcher 包提供了用于使用简单的 Java API 启动 Spark 作业作为子进程的类。
单元测试
Spark 对使用任何流行的单元测试框架进行单元测试很友好。只需在您的测试中创建一个 SparkContext
,并将主 URL 设置为 local
,运行您的操作,然后调用 SparkContext.stop()
来拆除它。确保您在 finally
块或测试框架的 tearDown
方法中停止上下文,因为 Spark 不支持在同一个程序中同时运行两个上下文。
下一步
您可以在 Spark 网站上看到一些 示例 Spark 程序。此外,Spark 在 examples
目录中包含了几个示例 (Scala,Java,Python,R)。您可以通过将类名传递给 Spark 的 bin/run-example
脚本运行 Java 和 Scala 示例;例如
./bin/run-example SparkPi
对于 Python 示例,请使用 spark-submit
代替
./bin/spark-submit examples/src/main/python/pi.py
对于 R 示例,请使用 spark-submit
代替
./bin/spark-submit examples/src/main/r/dataframe.R
有关优化程序的帮助,请参阅 配置 和 调优 指南,其中提供了最佳实践信息。这些指南对于确保您的数据以高效格式存储在内存中尤为重要。有关部署的帮助,请参阅 集群模式概述,其中描述了分布式操作中涉及的组件和支持的集群管理器。