Apache Spark 示例

此页面展示了如何通过简单的示例使用不同的 Apache Spark API。

Spark 是处理小型和大型数据集的优秀引擎。 它可以与单节点/localhost 环境或分布式集群一起使用。 Spark 广泛的 API、卓越的性能和灵活性使其成为许多分析的理想选择。 本指南展示了以下 Spark API 的示例

  • DataFrames
  • SQL
  • 结构化流处理
  • RDDs

这些示例使用小型数据集,因此易于理解。

Spark DataFrame 示例

本节向您展示如何创建 Spark DataFrame 并运行简单的操作。 这些示例在一个小型 DataFrame 上进行,因此您可以轻松地了解其功能。

让我们首先创建一个 Spark Session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demo").getOrCreate()

一些 Spark 运行时环境带有预先实例化的 Spark Sessions。 getOrCreate() 方法将使用现有的 Spark Session,如果不存在则创建一个新的 Spark Session。

创建一个 Spark DataFrame

首先创建一个包含 first_nameage 列以及四行数据的 DataFrame

df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)

使用 show() 方法查看 DataFrame 的内容

df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+

现在,让我们在 DataFrame 上执行一些数据处理操作。

向 Spark DataFrame 添加列

让我们向 DataFrame 添加一个 life_stage 列,如果年龄为 12 岁或以下,则返回“child”,如果年龄在 13 岁至 19 岁之间,则返回“teenager”,如果年龄为 20 岁或以上,则返回“adult”。

from pyspark.sql.functions import col, when

df1 = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)

可以轻松地向 Spark DataFrame 添加列。 让我们查看 df1 的内容。

df1.show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+

请注意原始 DataFrame 如何保持不变

df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+

Spark 操作不会更改 DataFrame。 您必须将结果分配给一个新变量才能访问 DataFrame 更改以进行后续操作。

筛选 Spark DataFrame

现在,筛选 DataFrame,使其仅包含青少年和成年人。

df1.where(col("life_stage").isin(["teenager", "adult"])).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+

在 Spark DataFrame 上进行 Group by 聚合

现在,让我们计算数据集中所有人的平均年龄

from pyspark.sql.functions import avg

df1.select(avg("age")).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+

您还可以计算每个 life_stage 的平均年龄

df1.groupBy("life_stage").avg().show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+

如果您不想使用编程 API,Spark 允许您使用 SQL 对 DataFrames 运行查询。

使用 SQL 查询 DataFrame

以下是如何使用 SQL 计算所有人的平均年龄

spark.sql("select avg(age) from {df1}", df1=df1).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+

以下是如何使用 SQL 按 life_stage 计算平均年龄

spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+

Spark 允许您使用编程 API、SQL API 或两者的组合。 这种灵活性使 Spark 可供各种用户访问,并且功能强大。

Spark SQL 示例

让我们将 DataFrame 持久化到可以通过 SQL API 轻松访问的命名 Parquet 表中。

df1.write.saveAsTable("some_people")

确保可以通过表名访问该表

spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
|       sue| 32|     adult|
|       bob| 75|     adult|
|        li|  3|     child|
+----------+---+----------+

现在,让我们使用 SQL 向表中插入更多行数据

spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")

检查表内容以确认已插入该行

spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
|       sue| 32|     adult|
|       bob| 75|     adult|
|        li|  3|     child|
|     frank|  4|     child|
+----------+---+----------+

运行一个返回青少年的查询

spark.sql("select * from some_people where life_stage='teenager'").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
+----------+---+----------+

Spark 使注册表并使用纯 SQL 查询它们变得容易。

Spark 结构化流处理示例

Spark 还具有结构化流处理 API,允许您创建批量或实时流处理应用程序。

让我们看看如何使用 Spark 结构化流处理从 Kafka 读取数据并每小时将其写入 Parquet 表。

假设您有一个 Kafka 流,其中不断填充以下数据

{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}

以下是如何将 Kafka 源读取到 Spark DataFrame 中

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", subscribeTopic)
    .load()
)

创建一个清理输入数据的函数。

schema = StructType([
 StructField("student_name", StringType()),
 StructField("graduation_year", StringType()),
 StructField("major", StringType()),
])

def with_normalized_names(df, schema):
    parsed_df = (
        df.withColumn("json_data", from_json(col("value").cast("string"), schema))
        .withColumn("student_name", col("json_data.student_name"))
        .withColumn("graduation_year", col("json_data.graduation_year"))
        .withColumn("major", col("json_data.major"))
        .drop(col("json_data"))
        .drop(col("value"))
    )
    split_col = split(parsed_df["student_name"], "XX")
    return (
        parsed_df.withColumn("first_name", split_col.getItem(0))
        .withColumn("last_name", split_col.getItem(1))
        .drop("student_name")
    )

现在,创建一个函数,该函数将在每次运行时读取 Kafka 中的所有新数据。

def perform_available_now_update():
    checkpointPath = "data/tmp_students_checkpoint/"
    path = "data/tmp_students"
    return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
        availableNow=True
    ).format("parquet").option("checkpointLocation", checkpointPath).start(path)

调用 perform_available_now_update() 函数并查看 Parquet 表的内容。

您可以设置一个 cron 作业来每小时运行 perform_available_now_update() 函数,以便定期更新您的 Parquet 表。

Spark RDD 示例

Spark RDD API 适用于非结构化数据。

对于结构化数据,Spark DataFrame API 更简单且性能更高。

假设您有一个名为 some_text.txt 的文本文件,其中包含以下三行数据

these are words
these are more words
words in english

您想计算文本文件中每个单词的计数。 以下是如何使用 Spark RDD 执行此计算

text_file = spark.sparkContext.textFile("some_words.txt")

counts = (
    text_file.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)

让我们看一下结果

counts.collect()

[('these', 2),
 ('are', 2),
 ('more', 1),
 ('in', 1),
 ('words', 3),
 ('english', 1)]

Spark 允许有效执行查询,因为它并行化了此计算。 许多其他查询引擎无法并行化计算。

结论

这些示例表明,Spark 为小型数据集的计算提供了良好的用户 API。 Spark 可以将这些相同的代码示例扩展到分布式集群上的大型数据集。 Spark 能够处理大型和小型数据集真是太棒了。

与其他查询引擎相比,Spark 还具有广泛的 API。 Spark 允许您使用编程 API 执行 DataFrame 操作、编写 SQL、执行流处理分析以及进行机器学习。 Spark 可以让您免于学习多个框架并将各种库拼凑在一起以执行分析。

更多示例

许多其他示例与 Spark 一起分发

最新消息

存档