快速入门

本教程简要介绍如何使用 Spark。我们将首先通过 Spark 的交互式 Shell(使用 Python 或 Scala)介绍 API,然后展示如何使用 Java、Scala 和 Python 编写应用程序。

要跟随本指南,请首先从 Spark 网站下载 Spark 的打包发行版。由于我们不使用 HDFS,您可以下载任何 Hadoop 版本的软件包。

请注意,在 Spark 2.0 之前,Spark 的主要编程接口是弹性分布式数据集 (RDD)。 在 Spark 2.0 之后,RDD 被 Dataset 替换,Dataset 具有与 RDD 类似的强类型,但在底层具有更丰富的优化。 RDD 接口仍然受支持,您可以在 RDD 编程指南中获得更详细的参考。 但是,我们强烈建议您切换到使用 Dataset,它比 RDD 具有更好的性能。 请参阅 SQL 编程指南以获取有关 Dataset 的更多信息。

使用 Spark Shell 进行交互式分析

基础知识

Spark 的 Shell 提供了一种学习 API 的简单方法,也是一种以交互方式分析数据的强大工具。 它可以采用 Scala(在 Java VM 上运行,因此是使用现有 Java 库的好方法)或 Python。 通过在 Spark 目录中运行以下命令启动它:

./bin/pyspark

或者,如果 PySpark 已通过 pip 安装在您当前的环境中:

pyspark

Spark 的主要抽象是名为 Dataset 的分布式项目集合。 Dataset 可以从 Hadoop InputFormat(例如 HDFS 文件)创建,也可以通过转换其他 Dataset 来创建。 由于 Python 的动态特性,我们不需要在 Python 中对 Dataset 进行强类型化。 因此,Python 中的所有 Dataset 都是 Dataset[Row],为了与 Pandas 和 R 中的数据框概念保持一致,我们将其称为 DataFrame。 让我们从 Spark 源代码目录中 README 文件的文本创建一个新的 DataFrame

>>> textFile = spark.read.text("README.md")

您可以通过调用一些操作直接从 DataFrame 获取值,或者转换 DataFrame 以获得新的 DataFrame。 有关更多详细信息,请阅读 _API 文档_。

>>> textFile.count()  # Number of rows in this DataFrame
126

>>> textFile.first()  # First row in this DataFrame
Row(value=u'# Apache Spark')

现在让我们将这个 DataFrame 转换为一个新的 DataFrame。 我们调用 filter 来返回一个新的 DataFrame,其中包含文件中行的子集。

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

我们可以将转换和操作链接在一起

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15
./bin/spark-shell

Spark 的主要抽象是名为 Dataset 的分布式项目集合。 Dataset 可以从 Hadoop InputFormat(例如 HDFS 文件)创建,也可以通过转换其他 Dataset 来创建。 让我们从 Spark 源代码目录中 README 文件的文本创建一个新的 Dataset

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

您可以通过调用一些操作直接从 Dataset 获取值,或者转换 Dataset 以获得新的 Dataset。 有关更多详细信息,请阅读 _API 文档_。

scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

现在让我们将这个 Dataset 转换为一个新的 Dataset。 我们调用 filter 以返回一个新的 Dataset,其中包含文件中项目的子集。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

我们可以将转换和操作链接在一起

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

关于 Dataset 操作的更多信息

Dataset 操作和转换可用于更复杂的计算。 假设我们要找到包含单词最多的行

>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]

首先将一行映射到一个整数值,并将其别名为 “numWords”,从而创建一个新的 DataFrame。 在该 DataFrame 上调用 agg 以查找最大的单词计数。 selectagg 的参数都是 _Column_,我们可以使用 df.colName 从 DataFrame 获取列。 我们还可以导入 pyspark.sql.functions,它提供了许多方便的函数来从旧列构建新列。

一种常见的数据流模式是 MapReduce,它因 Hadoop 而流行。 Spark 可以轻松实现 MapReduce 流程

>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

这里,我们使用 select 中的 explode 函数将行 Dataset 转换为单词 Dataset,然后组合 groupBycount 以计算文件中每个单词的计数,并将结果作为包含 2 列(“word” 和 “count”)的 DataFrame。 要在我们的 Shell 中收集单词计数,我们可以调用 collect

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 15

首先将一行映射到一个整数值,从而创建一个新的 Dataset。 在该 Dataset 上调用 reduce 以查找最大的单词计数。 mapreduce 的参数是 Scala 函数文字(闭包),可以使用任何语言特性或 Scala/Java 库。 例如,我们可以轻松调用在其他地方声明的函数。 我们将使用 Math.max() 函数来使此代码更易于理解

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一种常见的数据流模式是 MapReduce,它因 Hadoop 而流行。 Spark 可以轻松实现 MapReduce 流程

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

这里,我们调用 flatMap 以将行 Dataset 转换为单词 Dataset,然后组合 groupByKeycount 以计算文件中每个单词的计数,并将结果作为 (String, Long) 对的 Dataset。 要在我们的 Shell 中收集单词计数,我们可以调用 collect

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

缓存

Spark 还支持将数据集拉入集群范围的内存缓存中。 当重复访问数据时,这非常有用,例如在查询小型“热”数据集时,或者在运行像 PageRank 这样的迭代算法时。 作为一个简单的例子,让我们将我们的 linesWithSpark 数据集标记为要缓存

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

使用 Spark 来浏览和缓存 100 行的文本文件可能看起来很傻。 有趣的是,这些相同的函数可以用于非常大的数据集,即使它们分布在数十或数百个节点上。 您还可以通过将 bin/pyspark 连接到集群来以交互方式执行此操作,如 RDD 编程指南中所述。

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

使用 Spark 来浏览和缓存 100 行的文本文件可能看起来很傻。 有趣的是,这些相同的函数可以用于非常大的数据集,即使它们分布在数十或数百个节点上。 您还可以通过将 bin/spark-shell 连接到集群来以交互方式执行此操作,如 RDD 编程指南中所述。

自包含应用

假设我们希望使用 Spark API 编写一个自包含的应用程序。 我们将介绍一个简单的 Scala(使用 sbt)、Java(使用 Maven)和 Python(pip)应用程序。

现在我们将展示如何使用 Python API (PySpark) 编写应用程序。

如果您正在构建打包的 PySpark 应用程序或库,您可以将其添加到 setup.py 文件中,如下所示:

    install_requires=[
        'pyspark==3.5.5'
    ]

作为一个例子,我们将创建一个简单的 Spark 应用程序,SimpleApp.py

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

此程序仅计算文本文件中包含 ‘a’ 的行数和包含 ‘b’ 的行数。 请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。 与 Scala 和 Java 示例一样,我们使用 SparkSession 来创建 Datasets。 对于使用自定义类或第三方库的应用程序,我们还可以通过将其打包到 .zip 文件中,通过 spark-submit--py-files 参数将代码依赖项添加到 spark-submit(有关详细信息,请参阅 spark-submit --help)。 SimpleApp 非常简单,我们不需要指定任何代码依赖项。

我们可以使用 bin/spark-submit 脚本运行此应用程序

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

如果您已将 PySpark pip 安装到您的环境中(例如,pip install pyspark),您可以选择使用常规 Python 解释器运行您的应用程序,或者使用提供的“spark-submit”。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

我们将用 Scala 创建一个非常简单的 Spark 应用程序——实际上,它非常简单,因此被命名为 SimpleApp.scala

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

请注意,应用程序应定义一个 main() 方法,而不是扩展 scala.Appscala.App 的子类可能无法正常工作。

此程序仅计算 Spark README 中包含 ‘a’ 的行数和包含 ‘b’ 的行数。 请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。 与之前使用 Spark shell 的示例(它初始化自己的 SparkSession)不同,我们将 SparkSession 初始化为程序的一部分。

我们调用 SparkSession.builder 来构建一个 SparkSession,然后设置应用程序名称,最后调用 getOrCreate 来获取 SparkSession 实例。

我们的应用程序依赖于 Spark API,因此我们还会包含一个 sbt 配置文件,build.sbt,它说明 Spark 是一个依赖项。 此文件还添加了 Spark 所依赖的存储库。

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.5"

为了使 sbt 能够正确工作,我们需要按照典型的目录结构来布局 SimpleApp.scalabuild.sbt。 一旦完成,我们就可以创建一个包含应用程序代码的 JAR 包,然后使用 spark-submit 脚本来运行我们的程序。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23

本例将使用 Maven 来编译一个应用程序 JAR,但任何类似的构建系统都可以工作。

我们将创建一个非常简单的 Spark 应用程序,SimpleApp.java

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

此程序仅计算 Spark README 中包含 ‘a’ 的行数和包含 ‘b’ 的行数。 请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。 与之前使用 Spark shell 的示例(它初始化自己的 SparkSession)不同,我们将 SparkSession 初始化为程序的一部分。

为了构建程序,我们还需要编写一个 Maven pom.xml 文件,其中将 Spark 列为依赖项。 请注意,Spark 工件使用 Scala 版本进行标记。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.5.5</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
</project>

我们按照规范的 Maven 目录结构来布局这些文件。

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,我们可以使用 Maven 打包应用程序,并使用 ./bin/spark-submit 执行它。

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

其他依赖管理工具(如 Conda 和 pip)也可用于自定义类或第三方库。 另请参阅 Python 包管理

后续步骤

祝贺您运行了您的第一个 Spark 应用程序!

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R