快速入门
本教程提供了一个使用 Spark 的快速入门。我们将首先通过 Spark 的交互式 shell(在 Python 或 Scala 中)介绍 API,然后展示如何在 Java、Scala 和 Python 中编写应用程序。
要跟随本指南,首先从 Spark 网站 下载 Spark 的打包版本。由于我们不会使用 HDFS,您可以下载任何 Hadoop 版本的包。
请注意,在 Spark 2.0 之前,Spark 的主要编程接口是弹性分布式数据集 (RDD)。在 Spark 2.0 之后,RDD 被 Dataset 取代,Dataset 与 RDD 一样是强类型化的,但在内部有更丰富的优化。RDD 接口仍然受支持,您可以在 RDD 编程指南 中获得更详细的参考。但是,我们强烈建议您切换到使用 Dataset,它比 RDD 具有更好的性能。请参阅 SQL 编程指南 以获取有关 Dataset 的更多信息。
使用 Spark Shell 进行交互式分析
基础
Spark 的 shell 提供了一种简单的方法来学习 API,以及一个强大的工具来交互式地分析数据。它可以在 Scala(在 Java VM 上运行,因此是使用现有 Java 库的好方法)或 Python 中使用。在 Spark 目录中运行以下命令启动它
./bin/pyspark
或者,如果 PySpark 在您当前的环境中使用 pip 安装
pyspark
Spark 的主要抽象是一个称为 Dataset 的分布式项目集合。Dataset 可以从 Hadoop InputFormat(如 HDFS 文件)创建,也可以通过转换其他 Dataset 创建。由于 Python 的动态特性,我们不需要 Dataset 在 Python 中是强类型化的。因此,Python 中的所有 Dataset 都是 Dataset[Row],我们将其称为 DataFrame
以与 Pandas 和 R 中的数据框概念保持一致。让我们从 Spark 源目录中 README 文件的文本创建一个新的 DataFrame
>>> textFile = spark.read.text("README.md")
您可以通过调用一些操作直接从 DataFrame 获取值,或者转换 DataFrame 以获得一个新的 DataFrame。有关更多详细信息,请阅读API 文档。
>>> textFile.count() # Number of rows in this DataFrame
126
>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')
现在让我们将此 DataFrame 转换为一个新的 DataFrame。我们调用 filter
以返回一个包含文件中部分行的新的 DataFrame。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我们可以将转换和操作链接在一起
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
15
./bin/spark-shell
Spark 的主要抽象是一个称为 Dataset 的分布式项目集合。Dataset 可以从 Hadoop InputFormat(如 HDFS 文件)创建,也可以通过转换其他 Dataset 创建。让我们从 Spark 源目录中 README 文件的文本创建一个新的 Dataset
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
您可以通过调用一些操作直接从 Dataset 获取值,或者转换 Dataset 以获得一个新的 Dataset。有关更多详细信息,请阅读API 文档。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
现在让我们将此 Dataset 转换为一个新的 Dataset。我们调用 filter
以返回一个包含文件中部分项目的新的 Dataset。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
我们可以将转换和操作链接在一起
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
更多关于 Dataset 操作
Dataset 操作和转换可用于更复杂的计算。假设我们要找到单词最多的行
>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]
这首先将一行映射到一个整数值并将其别名为“numWords”,创建一个新的 DataFrame。在该 DataFrame 上调用 agg
以找到最大的字数。select
和 agg
的参数都是Column,我们可以使用 df.colName
从 DataFrame 获取列。我们还可以导入 pyspark.sql.functions,它提供许多方便的函数来从旧的 Column 构建新的 Column。
一种常见的数据流模式是 MapReduce,正如 Hadoop 所普及的那样。Spark 可以轻松地实现 MapReduce 流程
>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
在这里,我们在 select
中使用 explode
函数,将 Dataset 行转换为 Dataset 单词,然后组合 groupBy
和 count
以计算文件中每个单词的计数,作为包含 2 列的 DataFrame:“word” 和 “count”。要收集 shell 中的单词计数,我们可以调用 collect
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 15
这首先将一行映射到一个整数值,创建一个新的 Dataset。在该 Dataset 上调用 reduce
以找到最大的字数。map
和 reduce
的参数是 Scala 函数字面量(闭包),可以使用任何语言特性或 Scala/Java 库。例如,我们可以轻松地调用在其他地方声明的函数。我们将使用 Math.max()
函数来使此代码更容易理解
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一种常见的数据流模式是 MapReduce,正如 Hadoop 所普及的那样。Spark 可以轻松地实现 MapReduce 流程
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
在这里,我们调用 flatMap
将 Dataset 行转换为 Dataset 单词,然后组合 groupByKey
和 count
以计算文件中每个单词的计数,作为 (String, Long) 对的 Dataset。要收集 shell 中的单词计数,我们可以调用 collect
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
缓存
Spark 还支持将数据集拉入集群范围的内存缓存中。当数据被重复访问时,这非常有用,例如查询小型“热点”数据集或运行 PageRank 等迭代算法时。作为一个简单的例子,让我们标记我们的 linesWithSpark
数据集以进行缓存
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
使用 Spark 来探索和缓存一个 100 行的文本文件似乎很愚蠢。有趣的是,这些相同的函数可以用于非常大的数据集,即使它们被条带化到数十或数百个节点上。您还可以通过将 bin/pyspark
连接到集群来交互式地执行此操作,如 RDD 编程指南 中所述。
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
使用 Spark 来探索和缓存一个 100 行的文本文件似乎很愚蠢。有趣的是,这些相同的函数可以用于非常大的数据集,即使它们被条带化到数十或数百个节点上。您还可以通过将 bin/spark-shell
连接到集群来交互式地执行此操作,如 RDD 编程指南 中所述。
自包含应用程序
假设我们希望使用 Spark API 编写一个自包含的应用程序。我们将逐步介绍一个简单的 Scala(使用 sbt)、Java(使用 Maven)和 Python(pip)应用程序。
现在我们将展示如何使用 Python API (PySpark) 编写应用程序。
如果您正在构建一个打包的 PySpark 应用程序或库,您可以将其添加到您的 setup.py 文件中,如下所示
install_requires=[
'pyspark==3.5.1'
]
例如,我们将创建一个简单的 Spark 应用程序,SimpleApp.py
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
此程序仅计算文本文件中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 安装的位置。与 Scala 和 Java 示例一样,我们使用 SparkSession 创建 Dataset。对于使用自定义类或第三方库的应用程序,我们还可以通过其 --py-files
参数将代码依赖项添加到 spark-submit
,方法是将它们打包成 .zip 文件(有关详细信息,请参阅 spark-submit --help
)。SimpleApp
足够简单,因此我们不需要指定任何代码依赖项。
我们可以使用 bin/spark-submit
脚本运行此应用程序
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
如果您在环境中安装了 PySpark pip(例如,pip install pyspark
),您可以使用常规 Python 解释器运行您的应用程序,或者使用提供的“spark-submit”,您可以根据自己的喜好进行选择。
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23
我们将在 Scala 中创建一个非常简单的 Spark 应用程序——实际上非常简单,它被命名为 SimpleApp.scala
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]): Unit = {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
请注意,应用程序应该定义一个 main()
方法,而不是扩展 scala.App
。scala.App
的子类可能无法正常工作。
此程序仅计算 Spark README 中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 安装的位置。与之前使用 Spark shell 的示例不同,它初始化自己的 SparkSession,我们将在程序中初始化 SparkSession。
我们调用 SparkSession.builder
来构建一个 SparkSession
,然后设置应用程序名称,最后调用 getOrCreate
来获取 SparkSession
实例。
我们的应用程序依赖于 Spark API,因此我们还将包含一个 sbt 配置文件,build.sbt
,它解释了 Spark 是一个依赖项。此文件还添加了 Spark 依赖的存储库
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"
为了使 sbt 正确工作,我们需要按照典型的目录结构布局 SimpleApp.scala
和 build.sbt
。完成此操作后,我们可以创建一个包含应用程序代码的 JAR 包,然后使用 spark-submit
脚本运行我们的程序。
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23
本示例将使用 Maven 编译应用程序 JAR,但任何类似的构建系统都可以使用。
我们将创建一个非常简单的 Spark 应用程序,SimpleApp.java
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
此程序仅计算 Spark README 中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 安装的位置。与之前使用 Spark shell 的示例不同,它初始化自己的 SparkSession,我们将在程序中初始化 SparkSession。
为了构建程序,我们还需要编写一个 Maven pom.xml
文件,该文件将 Spark 列为依赖项。请注意,Spark 工件带有 Scala 版本标签。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
我们将这些文件按照规范的 Maven 目录结构布局
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
现在,我们可以使用 Maven 打包应用程序,并使用 ./bin/spark-submit
执行它。
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
其他依赖项管理工具,如 Conda 和 pip,也可以用于自定义类或第三方库。另请参见 Python 包管理。
下一步
恭喜您运行了第一个 Spark 应用程序!
- 要深入了解 API,请从 RDD 编程指南 和 SQL 编程指南 开始,或查看“编程指南”菜单以了解其他组件。
- 要将应用程序运行在集群上,请转到 部署概述。
- 最后,Spark 在
examples
目录中包含多个示例 (Scala,Java,Python,R)。您可以按如下方式运行它们
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R