快速入门
本教程简要介绍如何使用 Spark。我们将首先通过 Spark 的交互式 Shell(使用 Python 或 Scala)介绍 API,然后展示如何使用 Java、Scala 和 Python 编写应用程序。
要跟随本指南,请首先从 Spark 网站下载 Spark 的打包发行版。由于我们不使用 HDFS,您可以下载任何 Hadoop 版本的软件包。
请注意,在 Spark 2.0 之前,Spark 的主要编程接口是弹性分布式数据集 (RDD)。 在 Spark 2.0 之后,RDD 被 Dataset 替换,Dataset 具有与 RDD 类似的强类型,但在底层具有更丰富的优化。 RDD 接口仍然受支持,您可以在 RDD 编程指南中获得更详细的参考。 但是,我们强烈建议您切换到使用 Dataset,它比 RDD 具有更好的性能。 请参阅 SQL 编程指南以获取有关 Dataset 的更多信息。
使用 Spark Shell 进行交互式分析
基础知识
Spark 的 Shell 提供了一种学习 API 的简单方法,也是一种以交互方式分析数据的强大工具。 它可以采用 Scala(在 Java VM 上运行,因此是使用现有 Java 库的好方法)或 Python。 通过在 Spark 目录中运行以下命令启动它:
./bin/pyspark
或者,如果 PySpark 已通过 pip 安装在您当前的环境中:
pyspark
Spark 的主要抽象是名为 Dataset 的分布式项目集合。 Dataset 可以从 Hadoop InputFormat(例如 HDFS 文件)创建,也可以通过转换其他 Dataset 来创建。 由于 Python 的动态特性,我们不需要在 Python 中对 Dataset 进行强类型化。 因此,Python 中的所有 Dataset 都是 Dataset[Row],为了与 Pandas 和 R 中的数据框概念保持一致,我们将其称为 DataFrame
。 让我们从 Spark 源代码目录中 README 文件的文本创建一个新的 DataFrame
>>> textFile = spark.read.text("README.md")
您可以通过调用一些操作直接从 DataFrame 获取值,或者转换 DataFrame 以获得新的 DataFrame。 有关更多详细信息,请阅读 _API 文档_。
>>> textFile.count() # Number of rows in this DataFrame
126
>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')
现在让我们将这个 DataFrame 转换为一个新的 DataFrame。 我们调用 filter
来返回一个新的 DataFrame,其中包含文件中行的子集。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我们可以将转换和操作链接在一起
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
15
./bin/spark-shell
Spark 的主要抽象是名为 Dataset 的分布式项目集合。 Dataset 可以从 Hadoop InputFormat(例如 HDFS 文件)创建,也可以通过转换其他 Dataset 来创建。 让我们从 Spark 源代码目录中 README 文件的文本创建一个新的 Dataset
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
您可以通过调用一些操作直接从 Dataset 获取值,或者转换 Dataset 以获得新的 Dataset。 有关更多详细信息,请阅读 _API 文档_。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
现在让我们将这个 Dataset 转换为一个新的 Dataset。 我们调用 filter
以返回一个新的 Dataset,其中包含文件中项目的子集。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
我们可以将转换和操作链接在一起
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
关于 Dataset 操作的更多信息
Dataset 操作和转换可用于更复杂的计算。 假设我们要找到包含单词最多的行
>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]
首先将一行映射到一个整数值,并将其别名为 “numWords”,从而创建一个新的 DataFrame。 在该 DataFrame 上调用 agg
以查找最大的单词计数。 select
和 agg
的参数都是 _Column_,我们可以使用 df.colName
从 DataFrame 获取列。 我们还可以导入 pyspark.sql.functions,它提供了许多方便的函数来从旧列构建新列。
一种常见的数据流模式是 MapReduce,它因 Hadoop 而流行。 Spark 可以轻松实现 MapReduce 流程
>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
这里,我们使用 select
中的 explode
函数将行 Dataset 转换为单词 Dataset,然后组合 groupBy
和 count
以计算文件中每个单词的计数,并将结果作为包含 2 列(“word” 和 “count”)的 DataFrame。 要在我们的 Shell 中收集单词计数,我们可以调用 collect
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 15
首先将一行映射到一个整数值,从而创建一个新的 Dataset。 在该 Dataset 上调用 reduce
以查找最大的单词计数。 map
和 reduce
的参数是 Scala 函数文字(闭包),可以使用任何语言特性或 Scala/Java 库。 例如,我们可以轻松调用在其他地方声明的函数。 我们将使用 Math.max()
函数来使此代码更易于理解
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一种常见的数据流模式是 MapReduce,它因 Hadoop 而流行。 Spark 可以轻松实现 MapReduce 流程
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
这里,我们调用 flatMap
以将行 Dataset 转换为单词 Dataset,然后组合 groupByKey
和 count
以计算文件中每个单词的计数,并将结果作为 (String, Long) 对的 Dataset。 要在我们的 Shell 中收集单词计数,我们可以调用 collect
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
缓存
Spark 还支持将数据集拉入集群范围的内存缓存中。 当重复访问数据时,这非常有用,例如在查询小型“热”数据集时,或者在运行像 PageRank 这样的迭代算法时。 作为一个简单的例子,让我们将我们的 linesWithSpark
数据集标记为要缓存
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
使用 Spark 来浏览和缓存 100 行的文本文件可能看起来很傻。 有趣的是,这些相同的函数可以用于非常大的数据集,即使它们分布在数十或数百个节点上。 您还可以通过将 bin/pyspark
连接到集群来以交互方式执行此操作,如 RDD 编程指南中所述。
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
使用 Spark 来浏览和缓存 100 行的文本文件可能看起来很傻。 有趣的是,这些相同的函数可以用于非常大的数据集,即使它们分布在数十或数百个节点上。 您还可以通过将 bin/spark-shell
连接到集群来以交互方式执行此操作,如 RDD 编程指南中所述。
自包含应用
假设我们希望使用 Spark API 编写一个自包含的应用程序。 我们将介绍一个简单的 Scala(使用 sbt)、Java(使用 Maven)和 Python(pip)应用程序。
现在我们将展示如何使用 Python API (PySpark) 编写应用程序。
如果您正在构建打包的 PySpark 应用程序或库,您可以将其添加到 setup.py 文件中,如下所示:
install_requires=[
'pyspark==3.5.5'
]
作为一个例子,我们将创建一个简单的 Spark 应用程序,SimpleApp.py
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
此程序仅计算文本文件中包含 ‘a’ 的行数和包含 ‘b’ 的行数。 请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。 与 Scala 和 Java 示例一样,我们使用 SparkSession 来创建 Datasets。 对于使用自定义类或第三方库的应用程序,我们还可以通过将其打包到 .zip 文件中,通过 spark-submit
的 --py-files
参数将代码依赖项添加到 spark-submit
(有关详细信息,请参阅 spark-submit --help
)。 SimpleApp
非常简单,我们不需要指定任何代码依赖项。
我们可以使用 bin/spark-submit
脚本运行此应用程序
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
如果您已将 PySpark pip 安装到您的环境中(例如,pip install pyspark
),您可以选择使用常规 Python 解释器运行您的应用程序,或者使用提供的“spark-submit”。
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23
我们将用 Scala 创建一个非常简单的 Spark 应用程序——实际上,它非常简单,因此被命名为 SimpleApp.scala
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]): Unit = {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
请注意,应用程序应定义一个 main()
方法,而不是扩展 scala.App
。scala.App
的子类可能无法正常工作。
此程序仅计算 Spark README 中包含 ‘a’ 的行数和包含 ‘b’ 的行数。 请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。 与之前使用 Spark shell 的示例(它初始化自己的 SparkSession)不同,我们将 SparkSession 初始化为程序的一部分。
我们调用 SparkSession.builder
来构建一个 SparkSession
,然后设置应用程序名称,最后调用 getOrCreate
来获取 SparkSession
实例。
我们的应用程序依赖于 Spark API,因此我们还会包含一个 sbt 配置文件,build.sbt
,它说明 Spark 是一个依赖项。 此文件还添加了 Spark 所依赖的存储库。
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.5"
为了使 sbt 能够正确工作,我们需要按照典型的目录结构来布局 SimpleApp.scala
和 build.sbt
。 一旦完成,我们就可以创建一个包含应用程序代码的 JAR 包,然后使用 spark-submit
脚本来运行我们的程序。
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23
本例将使用 Maven 来编译一个应用程序 JAR,但任何类似的构建系统都可以工作。
我们将创建一个非常简单的 Spark 应用程序,SimpleApp.java
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
此程序仅计算 Spark README 中包含 ‘a’ 的行数和包含 ‘b’ 的行数。 请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。 与之前使用 Spark shell 的示例(它初始化自己的 SparkSession)不同,我们将 SparkSession 初始化为程序的一部分。
为了构建程序,我们还需要编写一个 Maven pom.xml
文件,其中将 Spark 列为依赖项。 请注意,Spark 工件使用 Scala 版本进行标记。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
我们按照规范的 Maven 目录结构来布局这些文件。
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
现在,我们可以使用 Maven 打包应用程序,并使用 ./bin/spark-submit
执行它。
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
其他依赖管理工具(如 Conda 和 pip)也可用于自定义类或第三方库。 另请参阅 Python 包管理。
后续步骤
祝贺您运行了您的第一个 Spark 应用程序!
- 有关 API 的深入概述,请从 RDD 编程指南和 SQL 编程指南开始,或者查看“编程指南”菜单以获取其他组件。
- 有关在集群上运行应用程序的信息,请访问部署概述。
- 最后,Spark 在
examples
目录中包含几个示例(Scala, Java, Python, R)。 您可以按如下方式运行它们
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R