SQL Pipe 语法
语法
概述
Apache Spark 支持 SQL pipe 语法,它允许通过运算符组合来构建查询。
- 任何查询都可以带有零个或多个 pipe 运算符作为后缀,由 pipe 字符
|>
分隔。 - 每个 pipe 运算符都以一个或多个 SQL 关键字开头,然后是其自身的语法,如下表所示。
- 这些运算符大多数都重用了标准 SQL 子句的现有语法。
- 运算符可以以任何顺序、任意次数应用。
FROM <tableName>
现在是受支持的独立查询,其行为与 TABLE <tableName>
相同。这为开始链式 pipe SQL 查询提供了一个方便的起点,尽管可以将一个或多个 pipe 运算符添加到任何有效的 Spark SQL 查询的末尾,并具有此处所述的相同一致行为。
请参阅本文档末尾的表格,了解所有受支持运算符及其语义的完整列表。
示例
例如,这是 TPC-H 基准测试中的查询 13
SELECT c_count, COUNT(*) AS custdist
FROM
(SELECT c_custkey, COUNT(o_orderkey) c_count
FROM customer
LEFT OUTER JOIN orders ON c_custkey = o_custkey
AND o_comment NOT LIKE '%unusual%packages%' GROUP BY c_custkey
) AS c_orders
GROUP BY c_count
ORDER BY custdist DESC, c_count DESC;
要使用 SQL pipe 运算符编写相同的逻辑,我们可以这样表达
FROM customer
|> LEFT OUTER JOIN orders ON c_custkey = o_custkey
AND o_comment NOT LIKE '%unusual%packages%'
|> AGGREGATE COUNT(o_orderkey) c_count
GROUP BY c_custkey
|> AGGREGATE COUNT(*) AS custdist
GROUP BY c_count
|> ORDER BY custdist DESC, c_count DESC;
源表
要使用 SQL pipe 语法开始新查询,请使用 FROM <tableName>
或 TABLE <tableName>
子句,它会创建一个包含源表中所有行的关系。然后在此子句末尾附加一个或多个 pipe 运算符以执行进一步的转换。
投影
SQL pipe 语法支持可组合的表达式求值方式。这些投影功能的一个主要优势是它们支持以增量方式基于先前表达式计算新表达式。这里不需要侧向列引用,因为每个运算符都独立于其输入表应用,无论运算符出现的顺序如何。然后,这些计算出的列都可用于后续运算符。
SELECT
通过对提供的表达式求值来生成一个新表。
可以根据需要使用 DISTINCT
和 *
。
这类似于常规 Spark SQL 中表子查询中最外层的 SELECT
。
EXTEND
通过对提供的表达式求值,向输入表添加新列。
这还会保留表别名。
这类似于常规 Spark SQL 中的 SELECT *, new_column
。
DROP
从输入表中删除列。
这类似于常规 Spark SQL 中的 SELECT * EXCEPT (column)
。
SET
替换输入表中的列值。
这类似于常规 Spark SQL 中的 SELECT * REPLACE (expression AS column)
。
AS
转发输入表并为每行引入一个新别名。
聚合
通常,使用 SQL pipe 语法进行聚合与常规 Spark SQL 的方式不同。
要执行全表聚合,请使用 AGGREGATE
运算符,并带上要计算的聚合表达式列表。这将在输出表中返回一行。
要执行分组聚合,请使用带有 GROUP BY
子句的 AGGREGATE
运算符。这将为分组表达式值的每个唯一组合返回一行。输出表包含已计算的分组表达式,后跟已计算的聚合函数。分组表达式支持为其分配别名,以便在将来的运算符中引用它们。通过这种方式,不需要在 GROUP BY
和 SELECT
之间重复整个表达式,因为 AGGREGATE
是一个执行两者的单个运算符。
其他转换
其余运算符用于其他转换,例如过滤、连接、排序、采样和集合操作。这些运算符通常与常规 Spark SQL 中的工作方式相同,如下表所述。
独立性与互操作性
SQL pipe 语法在 Spark 中工作,无需担心与现有 SQL 查询的向后兼容性;可以使用常规 Spark SQL、pipe 语法或两者的组合来编写任何查询。因此,以下不变量始终成立:
- 每个 pipe 运算符接收一个输入表,并以相同的方式对其行进行操作,无论该表是如何计算的。
- 对于任何有效的 N 个 SQL pipe 运算符链,前 M (M <= N) 个运算符的任何子集也表示一个有效的查询。
此属性对于自省和调试非常有用,例如通过选择部分行并使用 SQL 编辑器(如 Jupyter Notebooks)的“运行高亮文本”功能。 - 可以将 pipe 运算符附加到用常规 Spark SQL 编写的任何有效查询的末尾。
启动 pipe 语法查询的规范方式是使用FROM <tableName>
子句。
请注意,这是一个有效的独立查询,可以替换为任何其他 Spark SQL 查询,而不会损失通用性。 - 表子查询可以使用常规 Spark SQL 语法或 pipe 语法编写。
它们可以出现在用任一语法编写的封闭查询中。 - 其他 Spark SQL 语句(如视图以及 DDL 和 DML 命令)可以包含使用任一语法编写的查询。
支持的运算符
运算符 | 输出行 |
---|---|
FROM 或 TABLE | 返回源表中所有未经修改的输出行。 |
SELECT | 对输入表的每一行计算提供的表达式。 |
EXTEND | 通过对每个输入行评估指定的表达式,向输入表追加新列。 |
SET | 通过用评估提供的表达式的结果替换输入表的列来更新它们。 |
DROP | 按名称删除输入表的列。 |
AS | 保留输入表的相同行和列名,但使用新的表别名。 |
WHERE | 返回通过条件的输入行的子集。 |
LIMIT | 返回指定数量的输入行,保留排序(如果有)。 |
AGGREGATE | 执行带或不带分组的聚合。 |
JOIN | 连接两个输入的行,返回输入表和表参数的过滤后的笛卡尔积。 |
ORDER BY | 按指示排序后返回输入行。 |
UNION ALL | 对输入表和/或其他表参数的组合行执行并集或其他集合操作。 |
TABLESAMPLE | 返回由提供的采样算法选择的行子集。 |
PIVOT | 返回一个新表,其中输入行已转换为列(透视)。 |
UNPIVOT | 返回一个新表,其中输入列已转换为行(逆透视)。 |
此表列出了每个受支持的 pipe 运算符,并描述了它们生成的输出行。请注意,每个运算符都接受一个输入关系,该关系由 |>
符号之前的查询生成的行组成。
FROM 或 TABLE
FROM <tableName>
TABLE <tableName>
返回源表中所有未经修改的输出行。
例如
CREATE TABLE t AS VALUES (1, 2), (3, 4) AS t(a, b);
TABLE t;
+---+---+
| a| b|
+---+---+
| 1| 2|
| 3| 4|
+---+---+
SELECT
|> SELECT <expr> [[AS] alias], ...
对输入表的每一行计算提供的表达式。
通常,SQL pipe 语法不总是需要此运算符。可以在查询的末尾或接近末尾处使用它来评估表达式或指定输出列列表。
由于最终查询结果始终包含最后一个 pipe 运算符返回的列,当不出现 SELECT
运算符时,输出将包含整行中的所有列。此行为类似于标准 SQL 语法中的 SELECT *
。
可以根据需要使用 DISTINCT
和 *
。
这类似于常规 Spark SQL 中表子查询中最外层的 SELECT
。
窗口函数也支持在 SELECT
列表中。要使用它们,必须提供 OVER
子句。您可以在 WINDOW
子句中提供窗口规范。
此运算符不支持聚合函数。要执行聚合,请改用 AGGREGATE
运算符。
例如
CREATE TABLE t AS VALUES (0), (1) AS t(col);
FROM t
|> SELECT col * 2 AS result;
+------+
|result|
+------+
| 0|
| 2|
+------+
EXTEND
|> EXTEND <expr> [[AS] alias], ...
通过对每个输入行评估指定的表达式,向输入表追加新列。
在 EXTEND
操作之后,顶级列名会更新,但表别名仍指向原始行值(例如,两个表 lhs
和 rhs
之间的内连接,随后进行 EXTEND
,然后是 SELECT lhs.col, rhs.col
)。
例如
VALUES (0), (1) tab(col)
|> EXTEND col * 2 AS result;
+---+------+
|col|result|
+---+------+
| 0| 0|
| 1| 2|
+---+------+
SET
|> SET <column> = <expression>, ...
通过用评估提供的表达式的结果替换输入表的列来更新它们。每个此类列引用必须在输入表中恰好出现一次。
这类似于常规 Spark SQL 中的 SELECT * EXCEPT (column), <expression> AS column
。
可以在单个 SET
子句中执行多个赋值。每个赋值都可以引用先前赋值的结果。
在赋值之后,顶级列名会更新,但表别名仍指向原始行值(例如,两个表 lhs
和 rhs
之间的内连接,随后进行 SET
,然后是 SELECT lhs.col, rhs.col
)。
例如
VALUES (0), (1) tab(col)
|> SET col = col * 2;
+---+
|col|
+---+
| 0|
| 2|
+---+
VALUES (0), (1) tab(col)
|> SET col = col * 2;
+---+
|col|
+---+
| 0|
| 2|
+---+
DROP
|> DROP <column>, ...
按名称删除输入表的列。每个此类列引用必须在输入表中恰好出现一次。
这类似于常规 Spark SQL 中的 SELECT * EXCEPT (column)
。
在 DROP
操作之后,顶级列名会更新,但表别名仍指向原始行值(例如,两个表 lhs
和 rhs
之间的内连接,随后进行 DROP
,然后是 SELECT lhs.col, rhs.col
)。
例如
VALUES (0, 1) tab(col1, col2)
|> DROP col1;
+----+
|col2|
+----+
| 1|
+----+
AS
|> AS <alias>
保留输入表的相同行和列名,但使用新的表别名。
此运算符可用于为输入表引入新别名,该别名可在后续运算符中引用。表的任何现有别名都将被新别名替换。
在通过 SELECT
或 EXTEND
添加新列后,或在通过 AGGREGATE
执行聚合后,使用此运算符很有用。这简化了引用后续 JOIN
运算符中列的过程,并使查询更具可读性。
例如
VALUES (0, 1) tab(col1, col2)
|> AS new_tab
|> SELECT col1 + col2 FROM new_tab;
+-----------+
|col1 + col2|
+-----------+
| 1|
+-----------+
WHERE
|> WHERE <condition>
返回通过条件的输入行的子集。
由于此运算符可以出现在任何位置,因此不需要单独的 HAVING
或 QUALIFY
语法。
例如
VALUES (0), (1) tab(col)
|> WHERE col = 1;
+---+
|col|
+---+
| 1|
+---+
LIMIT
|> [LIMIT <n>] [OFFSET <m>]
返回指定数量的输入行,保留排序(如果有)。
支持同时使用 LIMIT
和 OFFSET
。LIMIT
子句也可以在没有 OFFSET
子句的情况下使用,而 OFFSET
子句也可以在没有 LIMIT
子句的情况下使用。
例如
VALUES (0), (0) tab(col)
|> LIMIT 1;
+---+
|col|
+---+
| 0|
+---+
AGGREGATE
-- Full-table aggregation
|> AGGREGATE <agg_expr> [[AS] alias], ...
-- Aggregation with grouping
|> AGGREGATE [<agg_expr> [[AS] alias], ...] GROUP BY <grouping_expr> [AS alias], ...
对分组行或整个输入表执行聚合。
如果不存在 GROUP BY
子句,则执行全表聚合,为每个聚合表达式返回一个结果行。否则,执行分组聚合,每组返回一行。可以直接在分组表达式上分配别名。
此运算符的输出列列表首先包括分组列(如果有),然后是聚合列。
每个 <agg_expr>
表达式可以包含标准聚合函数,如 COUNT
、SUM
、AVG
、MIN
,或 Spark SQL 支持的任何其他聚合函数。聚合函数之上或之下可以出现其他表达式,例如 MIN(FLOOR(col)) + 1
。每个 <agg_expr>
表达式必须至少包含一个聚合函数(否则查询将返回错误)。每个 <agg_expr>
表达式可以包含带 AS <alias>
的列别名,并且还可以包含 DISTINCT
关键字,以便在应用聚合函数之前移除重复值(例如,COUNT(DISTINCT col)
)。
如果存在,GROUP BY
子句可以包含任意数量的分组表达式,并且每个 <agg_expr>
表达式将对分组表达式值的每个唯一组合进行评估。输出表包含已评估的分组表达式,后跟已评估的聚合函数。GROUP BY
表达式可以包含基于一的序数。与常规 SQL 中此类序数引用伴随 SELECT
子句中的表达式不同,在 SQL pipe 语法中,它们引用的是前一个运算符生成的关系的列。例如,在 TABLE t |> AGGREGATE COUNT(*) GROUP BY 2
中,我们指的是输入表 t
的第二列。
不需要在 GROUP BY
和 SELECT
之间重复整个表达式,因为 AGGREGATE
运算符会自动在其输出中包含已评估的分组表达式。同理,在 AGGREGATE
运算符之后,通常不需要发出后续的 SELECT
运算符,因为 AGGREGATE
在一个步骤中返回分组列和聚合列。
例如
-- Full-table aggregation
VALUES (0), (1) tab(col)
|> AGGREGATE COUNT(col) AS count;
+-----+
|count|
+-----+
| 2|
+-----+
-- Aggregation with grouping
VALUES (0, 1), (0, 2) tab(col1, col2)
|> AGGREGATE COUNT(col2) AS count GROUP BY col1;
+----+-----+
|col1|count|
+----+-----+
| 0| 2|
+----+-----+
JOIN
|> [LEFT | RIGHT | FULL | CROSS | SEMI | ANTI | NATURAL | LATERAL] JOIN <table> [ON <condition> | USING(col, ...)]
连接来自两个输入的行,返回 pipe 输入表和 JOIN
关键字后面的表表达式的过滤后的笛卡尔积。这与常规 SQL 中的 JOIN
子句行为相似,其中 pipe 运算符的输入表成为连接的左侧,而表参数成为连接的右侧。
在 JOIN
关键字之前支持标准的连接修饰符,如 LEFT
、RIGHT
和 FULL
。
连接谓词可能需要引用连接的两个输入中的列。在这种情况下,如果两个输入都具有相同名称的列,则可能需要使用表别名来区分列。AS
运算符在这里很有用,它可以为成为连接左侧的 pipe 输入表引入新的别名。如果需要,使用标准语法为成为连接右侧的表参数分配别名。
例如
SELECT 0 AS a, 1 AS b
|> AS lhs
|> JOIN VALUES (0, 2) rhs(a, b) ON (lhs.a = rhs.a);
+---+---+---+---+
| a| b| c| d|
+---+---+---+---+
| 0| 1| 0| 2|
+---+---+---+---+
VALUES ('apples', 3), ('bananas', 4) t(item, sales)
|> AS produce_sales
|> LEFT JOIN
(SELECT "apples" AS item, 123 AS id) AS produce_data
USING (item)
|> SELECT produce_sales.item, sales, id;
/*---------+-------+------+
| item | sales | id |
+---------+-------+------+
| apples | 3 | 123 |
| bananas | 4 | NULL |
+---------+-------+------*/
ORDER BY
|> ORDER BY <expr> [ASC | DESC], ...
按指示排序后返回输入行。支持标准修饰符,包括 NULLS FIRST/LAST。
例如
VALUES (0), (1) tab(col)
|> ORDER BY col DESC;
+---+
|col|
+---+
| 1|
| 0|
+---+
UNION, INTERSECT, EXCEPT
|> {UNION | INTERSECT | EXCEPT} {ALL | DISTINCT} (<query>)
对输入表或子查询的组合行执行并集或其他集合操作。
例如
VALUES (0), (1) tab(a, b)
|> UNION ALL VALUES (2), (3) tab(c, d);
+---+----+
| a| b|
+---+----+
| 0| 1|
| 2| 3|
+---+----+
TABLESAMPLE
|> TABLESAMPLE <method>(<size> {ROWS | PERCENT})
返回由提供的采样算法选择的行子集。
例如
VALUES (0), (0), (0), (0) tab(col)
|> TABLESAMPLE (1 ROWS);
+---+
|col|
+---+
| 0|
+---+
VALUES (0), (0) tab(col)
|> TABLESAMPLE (100 PERCENT);
+---+
|col|
+---+
| 0|
| 0|
+---+
PIVOT
|> PIVOT (agg_expr FOR col IN (val1, ...))
返回一个新表,其中输入行已转换为列(透视)。
例如
VALUES
("dotNET", 2012, 10000),
("Java", 2012, 20000),
("dotNET", 2012, 5000),
("dotNET", 2013, 48000),
("Java", 2013, 30000)
courseSales(course, year, earnings)
|> PIVOT (
SUM(earnings)
FOR COURSE IN ('dotNET', 'Java')
)
+----+------+------+
|year|dotNET| Java|
+----+------+------+
|2012| 15000| 20000|
|2013| 48000| 30000|
+----+------+------+
UNPIVOT
|> UNPIVOT (value_col FOR key_col IN (col1, ...))
返回一个新表,其中输入列已转换为行(逆透视)。
例如
VALUES
("dotNET", 2012, 10000),
("Java", 2012, 20000),
("dotNET", 2012, 5000),
("dotNET", 2013, 48000),
("Java", 2013, 30000)
courseSales(course, year, earnings)
|> UNPIVOT (
earningsYear FOR `year` IN (`2012`, `2013`, `2014`)
+--------+------+--------+
| course| year|earnings|
+--------+------+--------+
| Java| 2012| 20000|
| Java| 2013| 30000|
| dotNET| 2012| 15000|
| dotNET| 2013| 48000|
| dotNET| 2014| 22500|
+--------+------+--------+