快速入门
本教程提供了 Spark 的快速入门。我们将首先通过 Spark 的交互式 shell(Python 或 Scala)介绍 API,然后展示如何用 Java、Scala 和 Python 编写应用程序。
要跟随本指南,请首先从 Spark 官网下载 Spark 的打包发行版。由于我们不使用 HDFS,您可以下载适用于任何 Hadoop 版本的软件包。
请注意,在 Spark 2.0 之前,Spark 的主要编程接口是弹性分布式数据集 (Resilient Distributed Dataset, RDD)。Spark 2.0 之后,RDD 被 Dataset 取代,Dataset 像 RDD 一样是强类型的,但在底层具有更丰富的优化。RDD 接口仍然受支持,您可以在RDD 编程指南中获取更详细的参考。但是,我们强烈建议您改用 Dataset,它比 RDD 具有更好的性能。请参阅SQL 编程指南以获取有关 Dataset 的更多信息。
使用 Spark Shell 进行交互式分析
基础知识
Spark shell 提供了一种学习 API 的简单方法,也是一个强大的交互式数据分析工具。它提供 Scala(运行在 Java VM 上,因此是使用现有 Java 库的好方法)或 Python 版本。在 Spark 目录下运行以下命令启动它
./bin/pyspark
或者如果 PySpark 在当前环境中已通过 pip 安装
pyspark
Spark 的主要抽象是一个分布式项集合,称为 Dataset。Dataset 可以从 Hadoop InputFormats(例如 HDFS 文件)创建,也可以通过转换其他 Dataset 创建。由于 Python 的动态特性,我们不需要 Python 中的 Dataset 是强类型的。因此,Python 中的所有 Dataset 都是 Dataset[Row],我们将其称为 DataFrame
,以与 Pandas 和 R 中的数据帧概念保持一致。让我们从 Spark 源代码目录中的 README 文件的文本创建一个新的 DataFrame
>>> textFile = spark.read.text("README.md")
您可以直接从 DataFrame 获取值,通过调用一些操作,或者转换 DataFrame 来获得一个新的 DataFrame。更多详细信息,请阅读API 文档。
>>> textFile.count() # Number of rows in this DataFrame
126
>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')
现在让我们将此 DataFrame 转换为一个新的。我们调用 filter
以返回一个包含文件中行子集的新 DataFrame。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我们可以将转换和操作链接起来
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
15
./bin/spark-shell
Spark 的主要抽象是一个分布式项集合,称为 Dataset。Dataset 可以从 Hadoop InputFormats(例如 HDFS 文件)创建,也可以通过转换其他 Dataset 创建。让我们从 Spark 源代码目录中的 README 文件的文本创建一个新的 Dataset
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
您可以直接从 Dataset 获取值,通过调用一些操作,或者转换 Dataset 来获得一个新的 Dataset。更多详细信息,请阅读API 文档。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
现在让我们将此 Dataset 转换为一个新的。我们调用 filter
以返回一个包含文件中项子集的新 Dataset。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
我们可以将转换和操作链接起来
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
更多关于 Dataset 操作
Dataset 操作和转换可用于更复杂的计算。假设我们想找到单词最多的行
>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]
这首先将一行映射到一个整数值,并将其别名为“numWords”,从而创建一个新的 DataFrame。在该 DataFrame 上调用 agg
以找到最大的单词计数。select
和 agg
的参数都是Column,我们可以使用 df.colName
从 DataFrame 获取一个列。我们还可以导入 pyspark.sql.functions,它提供了许多方便的函数来从旧的 Column 构建一个新的 Column。
一种常见的数据流模式是 MapReduce,由 Hadoop 推广。Spark 可以轻松实现 MapReduce 流
>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
在这里,我们在 select
中使用 explode
函数,将行 Dataset 转换为单词 Dataset,然后结合 groupBy
和 count
计算文件中每个单词的计数,作为一个包含“word”和“count”两列的 DataFrame。要在我们的 shell 中收集单词计数,我们可以调用 collect
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 15
这首先将一行映射到一个整数值,从而创建一个新的 Dataset。在该 Dataset 上调用 reduce
以找到最大的单词计数。map
和 reduce
的参数是 Scala 函数字面量(闭包),并且可以使用任何语言特性或 Scala/Java 库。例如,我们可以轻松调用在其他地方声明的函数。我们将使用 Math.max()
函数来使这段代码更容易理解
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一种常见的数据流模式是 MapReduce,由 Hadoop 推广。Spark 可以轻松实现 MapReduce 流
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
在这里,我们调用 flatMap
将行 Dataset 转换为单词 Dataset,然后结合 groupByKey
和 count
计算文件中每个单词的计数,作为一个 (String, Long) 对的 Dataset。要在我们的 shell 中收集单词计数,我们可以调用 collect
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
缓存
Spark 还支持将数据集拉取到集群范围的内存缓存中。当数据被反复访问时,这非常有用,例如查询一个小的“热”数据集或运行像 PageRank 这样的迭代算法时。作为一个简单的例子,让我们将 linesWithSpark
数据集标记为缓存
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
使用 Spark 探索和缓存一个 100 行的文本文件可能看起来很傻。有趣的部分是,这些相同的功能可以在非常大的数据集上使用,即使它们分布在数十或数百个节点上。您还可以通过将 bin/pyspark
连接到集群来交互式地执行此操作,如RDD 编程指南中所述。
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
使用 Spark 探索和缓存一个 100 行的文本文件可能看起来很傻。有趣的部分是,这些相同的功能可以在非常大的数据集上使用,即使它们分布在数十或数百个节点上。您还可以通过将 bin/spark-shell
连接到集群来交互式地执行此操作,如RDD 编程指南中所述。
独立应用程序
假设我们希望使用 Spark API 编写一个独立的应用程序。我们将介绍一个用 Scala(使用 sbt)、Java(使用 Maven)和 Python(使用 pip)编写的简单应用程序。
现在我们将展示如何使用 Python API (PySpark) 编写应用程序。
如果您正在构建一个打包的 PySpark 应用程序或库,您可以将其添加到您的 setup.py 文件中,如下所示:
install_requires=[
'pyspark==4.0.0'
]
作为一个例子,我们将创建一个简单的 Spark 应用程序,SimpleApp.py
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
该程序只是计算文本文件中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。与 Scala 和 Java 示例一样,我们使用 SparkSession 来创建 Datasets。对于使用自定义类或第三方库的应用程序,我们还可以通过 spark-submit
的 --py-files
参数将代码依赖项打包成 .zip 文件(详见 spark-submit --help
)来添加它们。SimpleApp
足够简单,我们不需要指定任何代码依赖项。
我们可以使用 bin/spark-submit
脚本运行此应用程序
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master "local[4]" \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
如果您的环境中已通过 pip 安装 PySpark(例如,pip install pyspark
),您可以根据需要使用常规的 Python 解释器运行应用程序或使用提供的“spark-submit”。
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23
我们将在 Scala 中创建一个非常简单的 Spark 应用程序——事实上,它简单到被命名为 SimpleApp.scala
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]): Unit = {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
请注意,应用程序应定义 main()
方法,而不是扩展 scala.App
。 scala.App
的子类可能无法正常工作。
该程序只是计算 Spark README 中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。与之前使用 Spark shell 的示例不同,Spark shell 会初始化自己的 SparkSession,而我们则将 SparkSession 初始化作为程序的一部分。
我们调用 SparkSession.builder
来构建一个 SparkSession
,然后设置应用程序名称,最后调用 getOrCreate
来获取 SparkSession
实例。
我们的应用程序依赖于 Spark API,因此我们还将包含一个 sbt 配置文件 build.sbt
,它说明 Spark 是一个依赖项。此文件还添加了 Spark 依赖的仓库
name := "Simple Project"
version := "1.0"
scalaVersion := "2.13.16"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "4.0.0"
为了让 sbt 正常工作,我们需要按照典型的目录结构布置 SimpleApp.scala
和 build.sbt
。一旦完成,我们就可以创建一个包含应用程序代码的 JAR 包,然后使用 spark-submit
脚本运行我们的程序。
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.13/simple-project_2.13-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master "local[4]" \
target/scala-2.13/simple-project_2.13-1.0.jar
...
Lines with a: 46, Lines with b: 23
此示例将使用 Maven 编译应用程序 JAR,但任何类似的构建系统都可以。
我们将创建一个非常简单的 Spark 应用程序,SimpleApp.java
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
该程序只是计算 Spark README 中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。与之前使用 Spark shell 的示例不同,Spark shell 会初始化自己的 SparkSession,而我们则将 SparkSession 初始化作为程序的一部分。
为了构建程序,我们还编写了一个 Maven pom.xml
文件,将 Spark 列为依赖项。请注意,Spark 工件会带有 Scala 版本标签。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>4.0.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
我们按照规范的 Maven 目录结构布局这些文件
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
现在,我们可以使用 Maven 打包应用程序,并使用 ./bin/spark-submit
执行它。
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master "local[4]" \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
其他依赖管理工具,例如 Conda 和 pip,也可以用于自定义类或第三方库。另请参阅Python 包管理。
下一步
恭喜您运行了第一个 Spark 应用程序!
- 有关 API 的深入概述,请从RDD 编程指南和SQL 编程指南开始,或参阅“编程指南”菜单了解其他组件。
- 要在集群上运行应用程序,请前往部署概览。
- 最后,Spark 在
examples
目录中包含几个示例(Python、Scala、Java、R)。您可以按如下方式运行它们:
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R