此页面展示了如何使用不同的 Apache Spark API 和简单的示例。
Spark 是一个适用于小型和大型数据集的出色引擎。它可以与单节点/本地主机环境或分布式集群一起使用。Spark 扩展的 API、出色的性能和灵活性使其成为许多分析的理想选择。本指南展示了以下 Spark API 的示例
这些示例使用小型数据集,因此易于理解。
本节展示了如何创建 Spark DataFrame 并运行简单的操作。这些示例基于小型 DataFrame,因此您可以轻松地看到其功能。
让我们从创建 Spark Session 开始
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
一些 Spark 运行时环境附带预先实例化的 Spark Session。 getOrCreate()
方法将使用现有的 Spark Session,或者如果不存在则创建一个新的 Spark Session。
创建 Spark DataFrame
首先使用 first_name
和 age
列以及四行数据创建一个 DataFrame
df = spark.createDataFrame(
[
("sue", 32),
("li", 3),
("bob", 75),
("heo", 13),
],
["first_name", "age"],
)
使用 show()
方法查看 DataFrame 的内容
df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
现在,让我们对 DataFrame 执行一些数据处理操作。
向 Spark DataFrame 添加列
让我们向 DataFrame 添加一个 life_stage
列,如果年龄在 12 岁及以下则返回“child”,如果年龄在 13 岁到 19 岁之间则返回“teenager”,如果年龄在 20 岁及以上则返回“adult”。
from pyspark.sql.functions import col, when
df1 = df.withColumn(
"life_stage",
when(col("age") < 13, "child")
.when(col("age").between(13, 19), "teenager")
.otherwise("adult"),
)
向 Spark DataFrame 添加列很容易。让我们查看 df1
的内容。
df1.show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| li| 3| child|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
请注意原始 DataFrame 未更改
df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
Spark 操作不会改变 DataFrame。您必须将结果分配给一个新变量才能访问 DataFrame 更改以进行后续操作。
过滤 Spark DataFrame
现在,过滤 DataFrame,使其仅包含青少年和成年人。
df1.where(col("life_stage").isin(["teenager", "adult"])).show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
对 Spark DataFrame 进行分组聚合
现在,让我们计算数据集中的平均年龄
from pyspark.sql.functions import avg
df1.select(avg("age")).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
您还可以计算每个 life_stage
的平均年龄
df1.groupBy("life_stage").avg().show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
如果您不想使用编程 API,Spark 允许您使用 SQL 对 DataFrame 运行查询。
使用 SQL 查询 DataFrame
以下是如何使用 SQL 计算所有人的平均年龄
spark.sql("select avg(age) from {df1}", df1=df1).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
以下是如何使用 SQL 按 life_stage
计算平均年龄
spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
Spark 允许您使用编程 API、SQL API 或两者的组合。这种灵活性使 Spark 能够被各种用户访问,并且表达能力强大。
让我们将 DataFrame 持久化到一个名为 Parquet 的表中,该表可以通过 SQL API 轻松访问。
df1.write.saveAsTable("some_people")
确保可以通过表名访问该表
spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
+----------+---+----------+
现在,让我们使用 SQL 将更多行数据插入表中
spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")
检查表内容以确认已插入行
spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
| frank| 4| child|
+----------+---+----------+
运行一个返回青少年的查询
spark.sql("select * from some_people where life_stage='teenager'").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
+----------+---+----------+
Spark 使得注册表并使用纯 SQL 查询它们变得容易。
Spark 还具有结构化流 API,允许您创建批处理或实时流应用程序。
让我们看看如何使用 Spark 结构化流从 Kafka 读取数据并每小时将其写入 Parquet 表。
假设您有一个 Kafka 流,该流不断填充以下数据
{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}
以下是如何将 Kafka 源读取到 Spark DataFrame 中
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", subscribeTopic)
.load()
)
创建一个清理输入数据的函数。
schema = StructType([
StructField("student_name", StringType()),
StructField("graduation_year", StringType()),
StructField("major", StringType()),
])
def with_normalized_names(df, schema):
parsed_df = (
df.withColumn("json_data", from_json(col("value").cast("string"), schema))
.withColumn("student_name", col("json_data.student_name"))
.withColumn("graduation_year", col("json_data.graduation_year"))
.withColumn("major", col("json_data.major"))
.drop(col("json_data"))
.drop(col("value"))
)
split_col = split(parsed_df["student_name"], "XX")
return (
parsed_df.withColumn("first_name", split_col.getItem(0))
.withColumn("last_name", split_col.getItem(1))
.drop("student_name")
)
现在,创建一个函数,该函数将在每次运行时读取 Kafka 中的所有新数据。
def perform_available_now_update():
checkpointPath = "data/tmp_students_checkpoint/"
path = "data/tmp_students"
return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
availableNow=True
).format("parquet").option("checkpointLocation", checkpointPath).start(path)
调用 perform_available_now_update()
函数并查看 Parquet 表的内容。
您可以设置一个 cron 作业,每小时运行一次 perform_available_now_update()
函数,以便定期更新您的 Parquet 表。
Spark RDD API 适用于非结构化数据。
对于结构化数据,Spark DataFrame API 更简单且性能更高。
假设您有一个名为 some_text.txt
的文本文件,其中包含以下三行数据
these are words
these are more words
words in english
您希望计算文本文件中每个单词的计数。以下是如何使用 Spark RDD 执行此计算
text_file = spark.sparkContext.textFile("some_words.txt")
counts = (
text_file.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
)
让我们看一下结果
counts.collect()
[('these', 2),
('are', 2),
('more', 1),
('in', 1),
('words', 3),
('english', 1)]
Spark 允许高效执行查询,因为它将此计算并行化。许多其他查询引擎无法并行化计算。
这些示例展示了 Spark 如何为小型数据集上的计算提供良好的用户 API。Spark 可以将这些相同的代码示例扩展到分布式集群上的大型数据集。Spark 能够处理大型和小型数据集,这真是太棒了。
与其他查询引擎相比,Spark 还具有扩展的 API。Spark 允许您使用编程 API 执行 DataFrame 操作、编写 SQL、执行流分析以及进行机器学习。Spark 使您免于学习多个框架并拼凑各种库来执行分析。
Spark 附带了许多其他示例