第一章:DataFrame - 结构化数据视图#
[1]:
pip install pyspark
Requirement already satisfied: pyspark in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (3.5.0)
Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (from pyspark) (0.10.9.7)
Note: you may need to restart the kernel to use updated packages.
[2]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
本节介绍 PySpark 中最基本的数据结构:DataFrame。
DataFrame 是一种二维的带标签数据结构,其列可能包含不同类型的数据。您可以将 DataFrame 看作电子表格、SQL 表或序列对象的字典。Apache Spark DataFrame 支持丰富的 API 集(选择列、过滤、连接、聚合等),使您能够高效地解决常见数据分析问题。
与传统关系型数据库相比,Spark DataFrame 在大数据处理和分析方面具有以下几个关键优势:
分布式计算:Spark 将数据分布在集群中的多个节点上,从而实现大数据的并行处理
内存处理:Spark 在内存中执行计算,这比基于磁盘的处理要快得多
模式灵活性:与传统数据库不同,PySpark DataFrame 支持模式演变和动态类型。
容错性:PySpark DataFrame 构建在弹性分布式数据集 (RDD) 之上,它们本身具有容错性。Spark 自动处理节点故障和数据复制,确保数据可靠性和完整性。
关于 RDD 的说明:从 Spark 4.0 开始,Spark Connect 不再直接支持 RDD。直接与 Spark DataFrame 交互使用统一的规划和优化引擎,使我们能够在 Databricks 上所有支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。
创建 DataFrame#
在 PySpark 中创建 DataFrame 有几种方法。
从字典列表创建#
最简单的方法是使用 createDataFrame() 方法,如下所示:
[3]:
employees = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
df.show()
+---+--------+
|age| name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35| Bob T.|
| 28| Eve A.|
+---+--------+
从本地文件创建#
我们也可以从本地 CSV 文件创建 DataFrame
[4]:
df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
df.show()
+-----------+-----------------+-----------------+
|Employee ID| Role| Location|
+-----------+-----------------+-----------------+
| 19238| Data Analyst| Seattle, WA|
| 19239|Software Engineer| Seattle, WA|
| 19240| IT Specialist| Seattle, WA|
| 19241| Data Analyst| New York, NY|
| 19242| Recruiter|San Francisco, CA|
| 19243| Product Manager| New York, NY|
+-----------+-----------------+-----------------+
或者从本地 JSON 文件创建
[5]:
df = spark.read.option("multiline","true").json("../data/employees.json")
df.show()
+-----------+-----------------+-----------------+
|Employee ID| Location| Role|
+-----------+-----------------+-----------------+
| 19238| Seattle, WA| Data Analyst|
| 19239| Seattle, WA|Software Engineer|
| 19240| Seattle, WA| IT Specialist|
| 19241| New York, NY| Data Analyst|
| 19242|San Francisco, CA| Recruiter|
| 19243| New York, NY| Product Manager|
+-----------+-----------------+-----------------+
从现有 DataFrame 创建#
我们甚至可以通过选择特定列,从另一个现有 DataFrame 创建 DataFrame
[6]:
employees = [
{"name": "John D.", "age": 30, "department": "HR"},
{"name": "Alice G.", "age": 25, "department": "Finance"},
{"name": "Bob T.", "age": 35, "department": "IT"},
{"name": "Eve A.", "age": 28, "department": "Marketing"}
]
df = spark.createDataFrame(employees)
# Select only the name and age columns
new_df = df.select("name", "age")
从表创建#
如果您的 Spark 环境中有一个现有表 table_name
,您可以这样创建 DataFrame
[7]:
df = spark.read.table("table_name")
从数据库创建#
如果您的表在数据库中,您可以使用 JDBC 将表读取到 DataFrame 中。
[9]:
url = "jdbc:mysql://localhost:3306/mydatabase"
table = "employees"
properties = {
"user": "username",
"password": "password"
}
# Read table into DataFrame
df = spark.read.jdbc(url=url, table=table, properties=properties)
查看 DataFrame#
我们可以使用 PySpark 查看并与 DataFrame 进行交互。
显示 DataFrame#
df.show()
显示 DataFrame 内容的基本可视化。根据我们上面 createDataFrame()
的示例
[10]:
employees = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
[11]:
df.show()
+---+--------+
|age| name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35| Bob T.|
| 28| Eve A.|
+---+--------+
df.show()
有 3 个可选参数:n
、truncate
和 vertical
。
默认情况下,df.show()
最多显示 DataFrame 的前 20 行。我们可以通过向 show() 方法传递参数来控制显示的行数
[12]:
df.show(n=2)
+---+--------+
|age| name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
+---+--------+
only showing top 2 rows
truncate 参数控制显示列值的长度(默认值为 20)
[13]:
df.show(truncate=3)
+---+----+
|age|name|
+---+----+
| 30| Joh|
| 25| Ali|
| 35| Bob|
| 28| Eve|
+---+----+
如果我们将 vertical
设置为 True,DataFrame 将垂直显示,每行一个值
[14]:
df.show(vertical=True)
-RECORD 0--------
age | 30
name | John D.
-RECORD 1--------
age | 25
name | Alice G.
-RECORD 2--------
age | 35
name | Bob T.
-RECORD 3--------
age | 28
name | Eve A.
打印 DataFrame 模式#
我们可以使用 printSchema()
方法查看 DataFrame 模式的信息
[15]:
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
DataFrame 操作#
让我们看看如何转换我们的 DataFrame。
有关更详细的信息,请参阅数据操作部分,第六章:函数枢纽 - 使用 PySpark 进行数据操作。
重命名列#
我们可以使用 withColumnRenamed()
方法重命名 DataFrame 列
[16]:
df.show()
df2 = df.withColumnRenamed("name", "full_name")
df2.show()
+---+--------+
|age| name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35| Bob T.|
| 28| Eve A.|
+---+--------+
+---+---------+
|age|full_name|
+---+---------+
| 30| John D.|
| 25| Alice G.|
| 35| Bob T.|
| 28| Eve A.|
+---+---------+
过滤行#
我们可以过滤出特定年龄范围内的员工。下面的 df.filter
将创建一个符合我们年龄条件的新 DataFrame
[17]:
filtered_df = df.filter((df["age"] > 26) & (df["age"] < 32))
filtered_df.show()
+---+-------+
|age| name|
+---+-------+
| 30|John D.|
| 28| Eve A.|
+---+-------+
我们也可以使用 df.where
获得相同的结果
[18]:
where_df = df.where((df["age"] > 26) & (df["age"] < 32))
where_df.show()
+---+-------+
|age| name|
+---+-------+
| 30|John D.|
| 28| Eve A.|
+---+-------+
DataFrame vs. 表#
DataFrame 是一个不可变的分布式数据集合,仅在当前 Spark 会话中可用。
表是一种持久性数据结构,可以在多个 Spark 会话中访问。
如果您希望将 DataFrame 提升为表,可以使用 createOrReplaceTempView()
方法
[19]:
df.createOrReplaceTempView("employees")
请注意,此临时表的生命周期与用于创建此 DataFrame 的 SparkSession 绑定。要使表在此 Spark 会话之外持久化,您需要将其保存到持久存储中。
将 DataFrame 保存到持久存储#
在 PySpark 中,有几种方法可以将 DataFrame 保存到持久存储中。有关将数据保存到本地环境的更详细信息,请参阅数据加载部分(TODO:添加链接)。
保存到基于文件的数据源#
对于基于文件的数据源(文本、parquet、json 等),您可以这样指定自定义表路径:
[20]:
df.write.option("path", "../dataout").saveAsTable("dataframes_savetable_example")
即使表被删除,自定义表路径和表数据仍将存在。
如果未指定自定义表路径,Spark 会将数据写入 warehouse 目录下的默认表路径。当表被删除时,默认表路径也将被移除。
保存到 Hive metastore#
要保存到 Hive metastore,您可以使用以下方法:
[21]:
df.write().mode("overwrite").saveAsTable("schemaName.tableName")