快速入门
本教程提供了一个使用 Spark 的快速入门。我们将首先通过 Spark 的交互式 shell(在 Python 或 Scala 中)介绍 API,然后展示如何在 Java、Scala 和 Python 中编写应用程序。
要跟随本指南,首先从 Spark 网站 下载 Spark 的打包版本。由于我们不会使用 HDFS,您可以下载任何 Hadoop 版本的包。
请注意,在 Spark 2.0 之前,Spark 的主要编程接口是弹性分布式数据集 (RDD)。在 Spark 2.0 之后,RDD 被 Dataset 取代,Dataset 与 RDD 一样是强类型化的,但在内部有更丰富的优化。RDD 接口仍然受支持,您可以在 RDD 编程指南 中获得更详细的参考。但是,我们强烈建议您切换到使用 Dataset,它比 RDD 具有更好的性能。请参阅 SQL 编程指南 以获取有关 Dataset 的更多信息。
使用 Spark Shell 进行交互式分析
基础
Spark 的 shell 提供了一种简单的方法来学习 API,以及一个强大的工具来交互式地分析数据。它可以在 Scala(在 Java VM 上运行,因此是使用现有 Java 库的好方法)或 Python 中使用。在 Spark 目录中运行以下命令启动它
或者,如果 PySpark 在您当前的环境中使用 pip 安装
Spark 的主要抽象是一个称为 Dataset 的分布式项目集合。Dataset 可以从 Hadoop InputFormat(如 HDFS 文件)创建,也可以通过转换其他 Dataset 创建。由于 Python 的动态特性,我们不需要 Dataset 在 Python 中是强类型化的。因此,Python 中的所有 Dataset 都是 Dataset[Row],我们将其称为 DataFrame
以与 Pandas 和 R 中的数据框概念保持一致。让我们从 Spark 源目录中 README 文件的文本创建一个新的 DataFrame
您可以通过调用一些操作直接从 DataFrame 获取值,或者转换 DataFrame 以获得一个新的 DataFrame。有关更多详细信息,请阅读API 文档。
现在让我们将此 DataFrame 转换为一个新的 DataFrame。我们调用 filter
以返回一个包含文件中部分行的新的 DataFrame。
我们可以将转换和操作链接在一起
Spark 的主要抽象是一个称为 Dataset 的分布式项目集合。Dataset 可以从 Hadoop InputFormat(如 HDFS 文件)创建,也可以通过转换其他 Dataset 创建。让我们从 Spark 源目录中 README 文件的文本创建一个新的 Dataset
您可以通过调用一些操作直接从 Dataset 获取值,或者转换 Dataset 以获得一个新的 Dataset。有关更多详细信息,请阅读API 文档。
现在让我们将此 Dataset 转换为一个新的 Dataset。我们调用 filter
以返回一个包含文件中部分项目的新的 Dataset。
我们可以将转换和操作链接在一起
更多关于 Dataset 操作
Dataset 操作和转换可用于更复杂的计算。假设我们要找到单词最多的行
这首先将一行映射到一个整数值并将其别名为“numWords”,创建一个新的 DataFrame。在该 DataFrame 上调用 agg
以找到最大的字数。select
和 agg
的参数都是Column,我们可以使用 df.colName
从 DataFrame 获取列。我们还可以导入 pyspark.sql.functions,它提供许多方便的函数来从旧的 Column 构建新的 Column。
一种常见的数据流模式是 MapReduce,正如 Hadoop 所普及的那样。Spark 可以轻松地实现 MapReduce 流程
在这里,我们在 select
中使用 explode
函数,将 Dataset 行转换为 Dataset 单词,然后组合 groupBy
和 count
以计算文件中每个单词的计数,作为包含 2 列的 DataFrame:“word” 和 “count”。要收集 shell 中的单词计数,我们可以调用 collect
这首先将一行映射到一个整数值,创建一个新的 Dataset。在该 Dataset 上调用 reduce
以找到最大的字数。map
和 reduce
的参数是 Scala 函数字面量(闭包),可以使用任何语言特性或 Scala/Java 库。例如,我们可以轻松地调用在其他地方声明的函数。我们将使用 Math.max()
函数来使此代码更容易理解
一种常见的数据流模式是 MapReduce,正如 Hadoop 所普及的那样。Spark 可以轻松地实现 MapReduce 流程
在这里,我们调用 flatMap
将 Dataset 行转换为 Dataset 单词,然后组合 groupByKey
和 count
以计算文件中每个单词的计数,作为 (String, Long) 对的 Dataset。要收集 shell 中的单词计数,我们可以调用 collect
缓存
Spark 还支持将数据集拉入集群范围的内存缓存中。当数据被重复访问时,这非常有用,例如查询小型“热点”数据集或运行 PageRank 等迭代算法时。作为一个简单的例子,让我们标记我们的 linesWithSpark
数据集以进行缓存
使用 Spark 来探索和缓存一个 100 行的文本文件似乎很愚蠢。有趣的是,这些相同的函数可以用于非常大的数据集,即使它们被条带化到数十或数百个节点上。您还可以通过将 bin/pyspark
连接到集群来交互式地执行此操作,如 RDD 编程指南 中所述。
使用 Spark 来探索和缓存一个 100 行的文本文件似乎很愚蠢。有趣的是,这些相同的函数可以用于非常大的数据集,即使它们被条带化到数十或数百个节点上。您还可以通过将 bin/spark-shell
连接到集群来交互式地执行此操作,如 RDD 编程指南 中所述。
自包含应用程序
假设我们希望使用 Spark API 编写一个自包含的应用程序。我们将逐步介绍一个简单的 Scala(使用 sbt)、Java(使用 Maven)和 Python(pip)应用程序。
现在我们将展示如何使用 Python API (PySpark) 编写应用程序。
如果您正在构建一个打包的 PySpark 应用程序或库,您可以将其添加到您的 setup.py 文件中,如下所示
例如,我们将创建一个简单的 Spark 应用程序,SimpleApp.py
此程序仅计算文本文件中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 安装的位置。与 Scala 和 Java 示例一样,我们使用 SparkSession 创建 Dataset。对于使用自定义类或第三方库的应用程序,我们还可以通过其 --py-files
参数将代码依赖项添加到 spark-submit
,方法是将它们打包成 .zip 文件(有关详细信息,请参阅 spark-submit --help
)。SimpleApp
足够简单,因此我们不需要指定任何代码依赖项。
我们可以使用 bin/spark-submit
脚本运行此应用程序
如果您在环境中安装了 PySpark pip(例如,pip install pyspark
),您可以使用常规 Python 解释器运行您的应用程序,或者使用提供的“spark-submit”,您可以根据自己的喜好进行选择。
我们将在 Scala 中创建一个非常简单的 Spark 应用程序——实际上非常简单,它被命名为 SimpleApp.scala
请注意,应用程序应该定义一个 main()
方法,而不是扩展 scala.App
。scala.App
的子类可能无法正常工作。
此程序仅计算 Spark README 中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 安装的位置。与之前使用 Spark shell 的示例不同,它初始化自己的 SparkSession,我们将在程序中初始化 SparkSession。
我们调用 SparkSession.builder
来构建一个 SparkSession
,然后设置应用程序名称,最后调用 getOrCreate
来获取 SparkSession
实例。
我们的应用程序依赖于 Spark API,因此我们还将包含一个 sbt 配置文件,build.sbt
,它解释了 Spark 是一个依赖项。此文件还添加了 Spark 依赖的存储库
为了使 sbt 正确工作,我们需要按照典型的目录结构布局 SimpleApp.scala
和 build.sbt
。完成此操作后,我们可以创建一个包含应用程序代码的 JAR 包,然后使用 spark-submit
脚本运行我们的程序。
本示例将使用 Maven 编译应用程序 JAR,但任何类似的构建系统都可以使用。
我们将创建一个非常简单的 Spark 应用程序,SimpleApp.java
此程序仅计算 Spark README 中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 安装的位置。与之前使用 Spark shell 的示例不同,它初始化自己的 SparkSession,我们将在程序中初始化 SparkSession。
为了构建程序,我们还需要编写一个 Maven pom.xml
文件,该文件将 Spark 列为依赖项。请注意,Spark 工件带有 Scala 版本标签。
我们将这些文件按照规范的 Maven 目录结构布局
现在,我们可以使用 Maven 打包应用程序,并使用 ./bin/spark-submit
执行它。
其他依赖项管理工具,如 Conda 和 pip,也可以用于自定义类或第三方库。另请参见 Python 包管理。
下一步
恭喜您运行了第一个 Spark 应用程序!