第一章:DataFrame - 结构化数据视图#

[1]:
pip install pyspark
Requirement already satisfied: pyspark in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (3.5.0)
Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (from pyspark) (0.10.9.7)
Note: you may need to restart the kernel to use updated packages.
[2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

本节介绍 PySpark 中最基本的数据结构:DataFrame。

DataFrame 是一种二维的带标签数据结构,其列可能包含不同类型的数据。您可以将 DataFrame 看作电子表格、SQL 表或序列对象的字典。Apache Spark DataFrame 支持丰富的 API 集(选择列、过滤、连接、聚合等),使您能够高效地解决常见数据分析问题。

与传统关系型数据库相比,Spark DataFrame 在大数据处理和分析方面具有以下几个关键优势:

  • 分布式计算:Spark 将数据分布在集群中的多个节点上,从而实现大数据的并行处理

  • 内存处理:Spark 在内存中执行计算,这比基于磁盘的处理要快得多

  • 模式灵活性:与传统数据库不同,PySpark DataFrame 支持模式演变和动态类型。

  • 容错性:PySpark DataFrame 构建在弹性分布式数据集 (RDD) 之上,它们本身具有容错性。Spark 自动处理节点故障和数据复制,确保数据可靠性和完整性。

关于 RDD 的说明:从 Spark 4.0 开始,Spark Connect 不再直接支持 RDD。直接与 Spark DataFrame 交互使用统一的规划和优化引擎,使我们能够在 Databricks 上所有支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。

创建 DataFrame#

在 PySpark 中创建 DataFrame 有几种方法。

从字典列表创建#

最简单的方法是使用 createDataFrame() 方法,如下所示:

[3]:
employees = [{"name": "John D.", "age": 30},
  {"name": "Alice G.", "age": 25},
  {"name": "Bob T.", "age": 35},
  {"name": "Eve A.", "age": 28}]

# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
df.show()
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+

从本地文件创建#

我们也可以从本地 CSV 文件创建 DataFrame

[4]:
df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
df.show()
+-----------+-----------------+-----------------+
|Employee ID|             Role|         Location|
+-----------+-----------------+-----------------+
|      19238|     Data Analyst|      Seattle, WA|
|      19239|Software Engineer|      Seattle, WA|
|      19240|    IT Specialist|      Seattle, WA|
|      19241|     Data Analyst|     New York, NY|
|      19242|        Recruiter|San Francisco, CA|
|      19243|  Product Manager|     New York, NY|
+-----------+-----------------+-----------------+

或者从本地 JSON 文件创建

[5]:
df = spark.read.option("multiline","true").json("../data/employees.json")
df.show()
+-----------+-----------------+-----------------+
|Employee ID|         Location|             Role|
+-----------+-----------------+-----------------+
|      19238|      Seattle, WA|     Data Analyst|
|      19239|      Seattle, WA|Software Engineer|
|      19240|      Seattle, WA|    IT Specialist|
|      19241|     New York, NY|     Data Analyst|
|      19242|San Francisco, CA|        Recruiter|
|      19243|     New York, NY|  Product Manager|
+-----------+-----------------+-----------------+

从现有 DataFrame 创建#

我们甚至可以通过选择特定列,从另一个现有 DataFrame 创建 DataFrame

[6]:
employees = [
  {"name": "John D.", "age": 30, "department": "HR"},
  {"name": "Alice G.", "age": 25, "department": "Finance"},
  {"name": "Bob T.", "age": 35, "department": "IT"},
  {"name": "Eve A.", "age": 28, "department": "Marketing"}
]
df = spark.createDataFrame(employees)

# Select only the name and age columns
new_df = df.select("name", "age")

从表创建#

如果您的 Spark 环境中有一个现有表 table_name,您可以这样创建 DataFrame

[7]:
df = spark.read.table("table_name")

从数据库创建#

如果您的表在数据库中,您可以使用 JDBC 将表读取到 DataFrame 中。

[9]:
url = "jdbc:mysql://localhost:3306/mydatabase"
table = "employees"
properties = {
  "user": "username",
  "password": "password"
}

# Read table into DataFrame
df = spark.read.jdbc(url=url, table=table, properties=properties)

查看 DataFrame#

我们可以使用 PySpark 查看并与 DataFrame 进行交互。

显示 DataFrame#

df.show() 显示 DataFrame 内容的基本可视化。根据我们上面 createDataFrame() 的示例

[10]:
employees = [{"name": "John D.", "age": 30},
  {"name": "Alice G.", "age": 25},
  {"name": "Bob T.", "age": 35},
  {"name": "Eve A.", "age": 28}]

# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
[11]:
df.show()
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+

df.show() 有 3 个可选参数:ntruncatevertical

默认情况下,df.show() 最多显示 DataFrame 的前 20 行。我们可以通过向 show() 方法传递参数来控制显示的行数

[12]:
df.show(n=2)
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
+---+--------+
only showing top 2 rows

truncate 参数控制显示列值的长度(默认值为 20)

[13]:
df.show(truncate=3)
+---+----+
|age|name|
+---+----+
| 30| Joh|
| 25| Ali|
| 35| Bob|
| 28| Eve|
+---+----+

如果我们将 vertical 设置为 True,DataFrame 将垂直显示,每行一个值

[14]:
df.show(vertical=True)
-RECORD 0--------
 age  | 30
 name | John D.
-RECORD 1--------
 age  | 25
 name | Alice G.
-RECORD 2--------
 age  | 35
 name | Bob T.
-RECORD 3--------
 age  | 28
 name | Eve A.

DataFrame 操作#

让我们看看如何转换我们的 DataFrame。

有关更详细的信息,请参阅数据操作部分,第六章:函数枢纽 - 使用 PySpark 进行数据操作

重命名列#

我们可以使用 withColumnRenamed() 方法重命名 DataFrame 列

[16]:
df.show()
df2 = df.withColumnRenamed("name", "full_name")
df2.show()
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+

+---+---------+
|age|full_name|
+---+---------+
| 30|  John D.|
| 25| Alice G.|
| 35|   Bob T.|
| 28|   Eve A.|
+---+---------+

过滤行#

我们可以过滤出特定年龄范围内的员工。下面的 df.filter 将创建一个符合我们年龄条件的新 DataFrame

[17]:
filtered_df = df.filter((df["age"] > 26) & (df["age"] < 32))
filtered_df.show()
+---+-------+
|age|   name|
+---+-------+
| 30|John D.|
| 28| Eve A.|
+---+-------+

我们也可以使用 df.where 获得相同的结果

[18]:
where_df = df.where((df["age"] > 26) & (df["age"] < 32))
where_df.show()
+---+-------+
|age|   name|
+---+-------+
| 30|John D.|
| 28| Eve A.|
+---+-------+

DataFrame vs. 表#

DataFrame 是一个不可变的分布式数据集合,仅在当前 Spark 会话中可用。

表是一种持久性数据结构,可以在多个 Spark 会话中访问。

如果您希望将 DataFrame 提升为表,可以使用 createOrReplaceTempView() 方法

[19]:
df.createOrReplaceTempView("employees")

请注意,此临时表的生命周期与用于创建此 DataFrame 的 SparkSession 绑定。要使表在此 Spark 会话之外持久化,您需要将其保存到持久存储中。

将 DataFrame 保存到持久存储#

在 PySpark 中,有几种方法可以将 DataFrame 保存到持久存储中。有关将数据保存到本地环境的更详细信息,请参阅数据加载部分(TODO:添加链接)。

保存到基于文件的数据源#

对于基于文件的数据源(文本、parquet、json 等),您可以这样指定自定义表路径:

[20]:
df.write.option("path", "../dataout").saveAsTable("dataframes_savetable_example")

即使表被删除,自定义表路径和表数据仍将存在。

如果未指定自定义表路径,Spark 会将数据写入 warehouse 目录下的默认表路径。当表被删除时,默认表路径也将被移除。

保存到 Hive metastore#

要保存到 Hive metastore,您可以使用以下方法:

[21]:
df.write().mode("overwrite").saveAsTable("schemaName.tableName")