[2]:
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("error")

第 6 章:老 SQL,新技巧 - 在 PySpark 上运行 SQL#

简介#

本节解释如何在 PySpark 中使用 Spark SQL API,并将其与 DataFrame API 进行比较。它还涵盖了如何在两种 API 之间无缝切换,以及一些实用技巧。

在 PySpark 中运行 SQL#

PySpark 提供了两种主要方式来执行 SQL 操作

使用 spark.sql()#

spark.sql() 函数允许您直接执行 SQL 查询。

[10]:
# Create a table via spark.sql()
spark.sql("DROP TABLE IF EXISTS people")
spark.sql("""
CREATE TABLE people USING PARQUET
AS SELECT * FROM VALUES (1, 'Alice', 10), (2, 'Bob', 20), (3, 'Charlie', 30) t(id, name, age)
""")
[10]:
DataFrame[]
[11]:
# Use spark.sql() to select data from a table
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
+-------+---+
|   name|age|
+-------+---+
|Charlie| 30|
+-------+---+

使用 PySpark DataFrame API#

PySpark DataFrame API 提供了与 SQL 等效的功能,但采用的是 Pythonic 方法。

[12]:
# Read a table using the DataFrame API
people_df = spark.read.table("people")

# Use DataFrame API to select data
people_df.select("name", "age").filter("age > 21").show()
+-------+---+
|   name|age|
+-------+---+
|Charlie| 30|
+-------+---+

PySpark 中的 SQL 与 DataFrame API#

何时使用哪个 API 取决于您的背景和具体任务

SQL API: - 适用于具有 SQL 背景、更习惯编写 SQL 查询的用户。

DataFrame API: - Python 开发者更喜欢,因为它与 Python 语法和习惯用法保持一致。 - 为复杂转换提供更大的灵活性,尤其是与用户定义函数(UDF)结合使用时。

代码示例:SQL 与 DataFrame API#

以下是一些示例,比较了如何使用 SQL API 和 PySpark 的 DataFrame API 执行常见任务,以让您了解它们的区别以及何时选择其中一种可能更合适。

示例:SELECT 和 FILTER 操作#

SQL API

[15]:
spark.sql("SELECT name FROM people WHERE age > 21").show()
+-------+
|   name|
+-------+
|Charlie|
+-------+

DataFrame API

[16]:
spark.read.table("people").select("name").filter("age > 21").show()
+-------+
|   name|
+-------+
|Charlie|
+-------+

示例:JOIN 操作#

[18]:
spark.sql("DROP TABLE IF EXISTS orders")
spark.sql("""
CREATE TABLE orders USING PARQUET
AS SELECT * FROM VALUES (101, 1, 200), (102, 2, 150), (103,3, 300) t(order_id, customer_id, amount)
""")
[18]:
DataFrame[]

SQL API

[19]:
spark.sql("""
SELECT p.name, o.order_id
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
+-------+--------+
|   name|order_id|
+-------+--------+
|Charlie|     103|
|  Alice|     101|
|    Bob|     102|
+-------+--------+

DataFrame API

[20]:
people_df = spark.read.table("people")
orders_df = spark.read.table("orders")
(
    people_df
        .join(orders_df, people_df.id == orders_df.customer_id)
        .select(people_df.name, orders_df.order_id)
        .show()
)
+-------+--------+
|   name|order_id|
+-------+--------+
|Charlie|     103|
|  Alice|     101|
|    Bob|     102|
+-------+--------+

示例:GROUP BY 和聚合操作#

SQL API

[21]:
spark.sql("""
SELECT p.name, SUM(o.amount) AS total_amount
FROM people p
JOIN orders o ON p.id = o.customer_id
GROUP BY p.name
""").show()
+-------+------------+
|   name|total_amount|
+-------+------------+
|Charlie|         300|
|  Alice|         200|
|    Bob|         150|
+-------+------------+

DataFrame API

[22]:
from pyspark.sql.functions import sum

(
    people_df
        .join(orders_df, people_df.id == orders_df.customer_id)
        .groupBy("name")
        .agg(sum("amount").alias("total_amount"))
        .show()
)
+-------+------------+
|   name|total_amount|
+-------+------------+
|Charlie|         300|
|  Alice|         200|
|    Bob|         150|
+-------+------------+

示例:窗口操作#

SQL API

[23]:
spark.sql("""
SELECT
    p.name,
    o.amount,
    RANK() OVER (PARTITION BY p.name ORDER BY o.amount DESC) AS rank
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
+-------+------+----+
|   name|amount|rank|
+-------+------+----+
|  Alice|   200|   1|
|    Bob|   150|   1|
|Charlie|   300|   1|
+-------+------+----+

DataFrame API

[24]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Define the window specification
window_spec = Window.partitionBy("name").orderBy(orders_df.amount.desc())

# Window operation with RANK
(
    people_df
        .join(orders_df, people_df.id == orders_df.customer_id)
        .withColumn("rank", rank().over(window_spec))
        .select("name", "amount", "rank")
        .show()
)
+-------+------+----+
|   name|amount|rank|
+-------+------+----+
|  Alice|   200|   1|
|    Bob|   150|   1|
|Charlie|   300|   1|
+-------+------+----+

示例:UNION 操作#

SQL API: - UNION 运算符合并两个查询的行,默认情况下会删除重复项。

[25]:
spark.sql("CREATE OR REPLACE TEMP VIEW people2 AS SELECT * FROM VALUES (1, 'Alice', 10), (4, 'David', 35) t(id, name, age)")
[25]:
DataFrame[]
[26]:
spark.sql("""
SELECT * FROM people
UNION
SELECT * FROM people2
""").show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 30|
|  1|  Alice| 10|
|  2|    Bob| 20|
|  4|  David| 35|
+---+-------+---+

DataFrame API: - union() 方法用于合并两个 DataFrame,但默认情况下不删除重复项。 - 为了与 SQL 的 UNION 行为匹配,我们在联合操作后使用 .dropDuplicates() 方法来消除重复项。

[27]:
people_df = spark.read.table("people")
people2_df = spark.read.table("people2")
# This will have duplicate values.
people_df.union(people2_df).show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 30|
|  1|  Alice| 10|
|  2|    Bob| 20|
|  1|  Alice| 10|
|  4|  David| 35|
+---+-------+---+

[28]:
# Remove duplicate values
people_df.union(people2_df).dropDuplicates().show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 30|
|  1|  Alice| 10|
|  2|    Bob| 20|
|  4|  David| 35|
+---+-------+---+

示例:SET 配置#

SQL API

[29]:
spark.sql("SET spark.sql.shuffle.partitions=8")
[29]:
DataFrame[key: string, value: string]
[30]:
spark.sql("SET spark.sql.shuffle.partitions").show(truncate=False)
+----------------------------+-----+
|key                         |value|
+----------------------------+-----+
|spark.sql.shuffle.partitions|8    |
+----------------------------+-----+

DataFrame API

[31]:
spark.conf.set("spark.sql.shuffle.partitions", 10)
[32]:
spark.conf.get("spark.sql.shuffle.partitions")
[32]:
'10'

示例:列出表和视图#

SQL API

[33]:
spark.sql("SHOW TABLES").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|   orders|      false|
|  default|   people|      false|
|         |  people2|       true|
+---------+---------+-----------+

DataFrame API

[34]:
tables = spark.catalog.listTables()
for table in tables:
    print(f"Name: {table.name}, isTemporary: {table.isTemporary}")
Name: orders, isTemporary: False
Name: people, isTemporary: False
Name: people2, isTemporary: True

DataFrame API 独占函数#

某些操作是 DataFrame API 独有的,SQL 不支持,例如

withColumn:在 DataFrame 中添加或修改列。

[35]:
people_df.withColumn("new_col", people_df["age"] + 10).show()
+---+-------+---+-------+
| id|   name|age|new_col|
+---+-------+---+-------+
|  3|Charlie| 30|     40|
|  1|  Alice| 10|     20|
|  2|    Bob| 20|     30|
+---+-------+---+-------+

[39]:
people_df.withColumn("age", people_df["age"] + 10).show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 40|
|  1|  Alice| 20|
|  2|    Bob| 30|
+---+-------+---+

SQL 和 DataFrame API 互换使用#

PySpark 支持在 SQL 和 DataFrame API 之间切换,方便混合使用。

对 SQL 输出进行 DataFrame 操作链式调用#

PySpark 的 DataFrame API 允许您将多个操作链接在一起,以创建高效且可读的转换。

[36]:
# Chaining DataFrame operations on SQL results
spark.sql("SELECT name, age FROM people").filter("age > 21").show()
+-------+---+
|   name|age|
+-------+---+
|Charlie| 30|
+-------+---+

使用 selectExpr()#

selectExpr() 方法允许您在 DataFrame API 中运行 SQL 表达式。

[37]:
people_df.selectExpr("name", "age + 1 AS age_plus_one").show()
+-------+------------+
|   name|age_plus_one|
+-------+------------+
|Charlie|          31|
|  Alice|          11|
|    Bob|          21|
+-------+------------+

在 SQL 中查询 DataFrame#

您可以从 DataFrame 创建一个临时视图,并在其上运行 SQL 查询。

[38]:
# First create a temp view on top of the DataFrame.
people_df.createOrReplaceTempView("people_view")

# Then it can be referenced in SQL.
spark.sql("SELECT * FROM people_view WHERE age > 21").show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 30|
+---+-------+---+

在 SQL 中使用 Python 用户定义函数#

您可以注册 Python 用户定义函数(UDF),以便在 SQL 查询中使用,从而在 SQL 语法中实现自定义转换。

[41]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the UDF
@udf("string")
def uppercase_name(name):
    return name.upper()

# Register the UDF
spark.udf.register("uppercase_name", uppercase_name)

# Use it in SQL
spark.sql("SELECT name, uppercase_name(name) FROM people_view WHERE age > 21").show()
+-------+--------------------+
|   name|uppercase_name(name)|
+-------+--------------------+
|Charlie|             CHARLIE|
+-------+--------------------+

[ ]: