[2]:
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("error")
第 6 章:老 SQL,新技巧 - 在 PySpark 上运行 SQL#
简介#
本节解释如何在 PySpark 中使用 Spark SQL API,并将其与 DataFrame API 进行比较。它还涵盖了如何在两种 API 之间无缝切换,以及一些实用技巧。
在 PySpark 中运行 SQL#
PySpark 提供了两种主要方式来执行 SQL 操作
使用 spark.sql()
#
spark.sql()
函数允许您直接执行 SQL 查询。
[10]:
# Create a table via spark.sql()
spark.sql("DROP TABLE IF EXISTS people")
spark.sql("""
CREATE TABLE people USING PARQUET
AS SELECT * FROM VALUES (1, 'Alice', 10), (2, 'Bob', 20), (3, 'Charlie', 30) t(id, name, age)
""")
[10]:
DataFrame[]
[11]:
# Use spark.sql() to select data from a table
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
+-------+---+
| name|age|
+-------+---+
|Charlie| 30|
+-------+---+
使用 PySpark DataFrame API#
PySpark DataFrame API 提供了与 SQL 等效的功能,但采用的是 Pythonic 方法。
[12]:
# Read a table using the DataFrame API
people_df = spark.read.table("people")
# Use DataFrame API to select data
people_df.select("name", "age").filter("age > 21").show()
+-------+---+
| name|age|
+-------+---+
|Charlie| 30|
+-------+---+
PySpark 中的 SQL 与 DataFrame API#
何时使用哪个 API 取决于您的背景和具体任务
SQL API: - 适用于具有 SQL 背景、更习惯编写 SQL 查询的用户。
DataFrame API: - Python 开发者更喜欢,因为它与 Python 语法和习惯用法保持一致。 - 为复杂转换提供更大的灵活性,尤其是与用户定义函数(UDF)结合使用时。
代码示例:SQL 与 DataFrame API#
以下是一些示例,比较了如何使用 SQL API 和 PySpark 的 DataFrame API 执行常见任务,以让您了解它们的区别以及何时选择其中一种可能更合适。
示例:SELECT 和 FILTER 操作#
SQL API
[15]:
spark.sql("SELECT name FROM people WHERE age > 21").show()
+-------+
| name|
+-------+
|Charlie|
+-------+
DataFrame API
[16]:
spark.read.table("people").select("name").filter("age > 21").show()
+-------+
| name|
+-------+
|Charlie|
+-------+
示例:JOIN 操作#
[18]:
spark.sql("DROP TABLE IF EXISTS orders")
spark.sql("""
CREATE TABLE orders USING PARQUET
AS SELECT * FROM VALUES (101, 1, 200), (102, 2, 150), (103,3, 300) t(order_id, customer_id, amount)
""")
[18]:
DataFrame[]
SQL API
[19]:
spark.sql("""
SELECT p.name, o.order_id
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
+-------+--------+
| name|order_id|
+-------+--------+
|Charlie| 103|
| Alice| 101|
| Bob| 102|
+-------+--------+
DataFrame API
[20]:
people_df = spark.read.table("people")
orders_df = spark.read.table("orders")
(
people_df
.join(orders_df, people_df.id == orders_df.customer_id)
.select(people_df.name, orders_df.order_id)
.show()
)
+-------+--------+
| name|order_id|
+-------+--------+
|Charlie| 103|
| Alice| 101|
| Bob| 102|
+-------+--------+
示例:GROUP BY 和聚合操作#
SQL API
[21]:
spark.sql("""
SELECT p.name, SUM(o.amount) AS total_amount
FROM people p
JOIN orders o ON p.id = o.customer_id
GROUP BY p.name
""").show()
+-------+------------+
| name|total_amount|
+-------+------------+
|Charlie| 300|
| Alice| 200|
| Bob| 150|
+-------+------------+
DataFrame API
[22]:
from pyspark.sql.functions import sum
(
people_df
.join(orders_df, people_df.id == orders_df.customer_id)
.groupBy("name")
.agg(sum("amount").alias("total_amount"))
.show()
)
+-------+------------+
| name|total_amount|
+-------+------------+
|Charlie| 300|
| Alice| 200|
| Bob| 150|
+-------+------------+
示例:窗口操作#
SQL API
[23]:
spark.sql("""
SELECT
p.name,
o.amount,
RANK() OVER (PARTITION BY p.name ORDER BY o.amount DESC) AS rank
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
+-------+------+----+
| name|amount|rank|
+-------+------+----+
| Alice| 200| 1|
| Bob| 150| 1|
|Charlie| 300| 1|
+-------+------+----+
DataFrame API
[24]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
# Define the window specification
window_spec = Window.partitionBy("name").orderBy(orders_df.amount.desc())
# Window operation with RANK
(
people_df
.join(orders_df, people_df.id == orders_df.customer_id)
.withColumn("rank", rank().over(window_spec))
.select("name", "amount", "rank")
.show()
)
+-------+------+----+
| name|amount|rank|
+-------+------+----+
| Alice| 200| 1|
| Bob| 150| 1|
|Charlie| 300| 1|
+-------+------+----+
示例:UNION 操作#
SQL API: - UNION
运算符合并两个查询的行,默认情况下会删除重复项。
[25]:
spark.sql("CREATE OR REPLACE TEMP VIEW people2 AS SELECT * FROM VALUES (1, 'Alice', 10), (4, 'David', 35) t(id, name, age)")
[25]:
DataFrame[]
[26]:
spark.sql("""
SELECT * FROM people
UNION
SELECT * FROM people2
""").show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 30|
| 1| Alice| 10|
| 2| Bob| 20|
| 4| David| 35|
+---+-------+---+
DataFrame API: - union()
方法用于合并两个 DataFrame,但默认情况下不删除重复项。 - 为了与 SQL 的 UNION 行为匹配,我们在联合操作后使用 .dropDuplicates() 方法来消除重复项。
[27]:
people_df = spark.read.table("people")
people2_df = spark.read.table("people2")
# This will have duplicate values.
people_df.union(people2_df).show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 30|
| 1| Alice| 10|
| 2| Bob| 20|
| 1| Alice| 10|
| 4| David| 35|
+---+-------+---+
[28]:
# Remove duplicate values
people_df.union(people2_df).dropDuplicates().show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 30|
| 1| Alice| 10|
| 2| Bob| 20|
| 4| David| 35|
+---+-------+---+
示例:SET 配置#
SQL API
[29]:
spark.sql("SET spark.sql.shuffle.partitions=8")
[29]:
DataFrame[key: string, value: string]
[30]:
spark.sql("SET spark.sql.shuffle.partitions").show(truncate=False)
+----------------------------+-----+
|key |value|
+----------------------------+-----+
|spark.sql.shuffle.partitions|8 |
+----------------------------+-----+
DataFrame API
[31]:
spark.conf.set("spark.sql.shuffle.partitions", 10)
[32]:
spark.conf.get("spark.sql.shuffle.partitions")
[32]:
'10'
示例:列出表和视图#
SQL API
[33]:
spark.sql("SHOW TABLES").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| default| orders| false|
| default| people| false|
| | people2| true|
+---------+---------+-----------+
DataFrame API
[34]:
tables = spark.catalog.listTables()
for table in tables:
print(f"Name: {table.name}, isTemporary: {table.isTemporary}")
Name: orders, isTemporary: False
Name: people, isTemporary: False
Name: people2, isTemporary: True
DataFrame API 独占函数#
某些操作是 DataFrame API 独有的,SQL 不支持,例如
withColumn:在 DataFrame 中添加或修改列。
[35]:
people_df.withColumn("new_col", people_df["age"] + 10).show()
+---+-------+---+-------+
| id| name|age|new_col|
+---+-------+---+-------+
| 3|Charlie| 30| 40|
| 1| Alice| 10| 20|
| 2| Bob| 20| 30|
+---+-------+---+-------+
[39]:
people_df.withColumn("age", people_df["age"] + 10).show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 40|
| 1| Alice| 20|
| 2| Bob| 30|
+---+-------+---+
SQL 和 DataFrame API 互换使用#
PySpark 支持在 SQL 和 DataFrame API 之间切换,方便混合使用。
对 SQL 输出进行 DataFrame 操作链式调用#
PySpark 的 DataFrame API 允许您将多个操作链接在一起,以创建高效且可读的转换。
[36]:
# Chaining DataFrame operations on SQL results
spark.sql("SELECT name, age FROM people").filter("age > 21").show()
+-------+---+
| name|age|
+-------+---+
|Charlie| 30|
+-------+---+
使用 selectExpr()
#
selectExpr()
方法允许您在 DataFrame API 中运行 SQL 表达式。
[37]:
people_df.selectExpr("name", "age + 1 AS age_plus_one").show()
+-------+------------+
| name|age_plus_one|
+-------+------------+
|Charlie| 31|
| Alice| 11|
| Bob| 21|
+-------+------------+
在 SQL 中查询 DataFrame#
您可以从 DataFrame 创建一个临时视图,并在其上运行 SQL 查询。
[38]:
# First create a temp view on top of the DataFrame.
people_df.createOrReplaceTempView("people_view")
# Then it can be referenced in SQL.
spark.sql("SELECT * FROM people_view WHERE age > 21").show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 3|Charlie| 30|
+---+-------+---+
在 SQL 中使用 Python 用户定义函数#
您可以注册 Python 用户定义函数(UDF),以便在 SQL 查询中使用,从而在 SQL 语法中实现自定义转换。
[41]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define the UDF
@udf("string")
def uppercase_name(name):
return name.upper()
# Register the UDF
spark.udf.register("uppercase_name", uppercase_name)
# Use it in SQL
spark.sql("SELECT name, uppercase_name(name) FROM people_view WHERE age > 21").show()
+-------+--------------------+
| name|uppercase_name(name)|
+-------+--------------------+
|Charlie| CHARLIE|
+-------+--------------------+
[ ]: