Python 用户自定义表函数 (UDTF)

Spark 3.5 引入了 Python 用户自定义表函数 (UDTF),这是一种新型的用户自定义函数。与从每次调用返回单个结果值的标量函数不同,每个 UDTF 都在查询的 FROM 子句中调用,并返回整个表作为输出。每个 UDTF 调用可以接受零个或多个参数。这些参数可以是标量表达式,也可以是表示整个输入表的表参数。

实现 Python UDTF

要实现 Python UDTF,您首先需要定义一个类,该类实现以下方法

class PythonUDTF:

    def __init__(self) -> None:
        """
        Initializes the user-defined table function (UDTF). This is optional.

        This method serves as the default constructor and is called once when the
        UDTF is instantiated on the executor side.

        Any class fields assigned in this method will be available for subsequent
        calls to the `eval` and `terminate` methods. This class instance will remain
        alive until all rows in the current partition have been consumed by the `eval`
        method.

        Notes
        -----
        - This method does not accept any extra arguments. Only the default
          constructor is supported.
        - You cannot create or reference the Spark session within the UDTF. Any
          attempt to do so will result in a serialization error.
        """
        ...

    def eval(self, *args: Any) -> Iterator[Any]:
        """
        Evaluates the function using the given input arguments.

        This method is required and must be implemented.

        Argument Mapping:
        - Each provided scalar expression maps to exactly one value in the
          `*args` list.
        - Each provided table argument maps to a pyspark.sql.Row object containing
          the columns in the order they appear in the provided input table,
          and with the names computed by the query analyzer.

        This method is called on every input row, and can produce zero or more
        output rows. Each element in the output tuple corresponds to one column
        specified in the return type of the UDTF.

        Parameters
        ----------
        *args : Any
            Arbitrary positional arguments representing the input to the UDTF.

        Yields
        ------
        tuple
            A tuple representing a single row in the UDTF result table.
            Yield as many times as needed to produce multiple rows.

        Notes
        -----
        - The result of the function must be a tuple representing a single row
          in the UDTF result table.
        - UDTFs currently do not accept keyword arguments during the function call.

        Examples
        --------
        eval that returns one row and one column for each input.

        >>> def eval(self, x: int):
        ...     yield (x, )

        eval that returns two rows and two columns for each input.

        >>> def eval(self, x: int, y: int):
        ...     yield (x + y, x - y)
        ...     yield (y + x, y - x)
        """
        ...

    def terminate(self) -> Iterator[Any]:
        """
        Called when the UDTF has processed all input rows.

        This method is optional to implement and is useful for performing any
        cleanup or finalization operations after the UDTF has finished processing
        all rows. It can also be used to yield additional rows if needed.
        Table functions that consume all rows in the entire input partition
        and then compute and return the entire output table can do so from
        this method as well (please be mindful of memory usage when doing
        this).

        Yields
        ------
        tuple
            A tuple representing a single row in the UDTF result table.
            Yield this if you want to return additional rows during termination.

        Examples
        --------
        >>> def terminate(self) -> Iterator[Any]:
        >>>     yield "done", None
        """
        ...

UDTF 的返回类型定义了它输出的表的模式。它必须是 StructType,例如 StructType().add("c1", StringType()),或者表示结构类型的 DDL 字符串,例如 c1: string

UDTF 类实现的示例

这是一个简单的 UDTF 类实现示例

# Define the UDTF class and implement the required `eval` method.
class SquareNumbers:
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)

使用 ``udtf`` 装饰器实例化 UDTF

要使用 UDTF,您首先需要使用 @udtf 装饰器实例化它

from pyspark.sql.functions import lit, udtf

# Create a UDTF using the class definition and the `udtf` function.
square_num = udtf(SquareNumbers, returnType="num: int, squared: int")

# Invoke the UDTF in PySpark.
square_num(lit(1), lit(3)).show()
# +---+-------+
# |num|squared|
# +---+-------+
# |  1|      1|
# |  2|      4|
# |  3|      9|
# +---+-------+

使用 ``udtf`` 函数实例化 UDTF

创建 UDTF 的另一种方法是使用 udtf() 函数

from pyspark.sql.functions import lit, udtf

# Define a UDTF using the `udtf` decorator directly on the class.
@udtf(returnType="num: int, squared: int")
class SquareNumbers:
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)

# Invoke the UDTF in PySpark using the SquareNumbers class directly.
SquareNumbers(lit(1), lit(3)).show()
# +---+-------+
# |num|squared|
# +---+-------+
# |  1|      1|
# |  2|      4|
# |  3|      9|
# +---+-------+

有关更详细的用法,请参阅 udtf()

在 SQL 中注册和使用 Python UDTF

Python UDTF 也可以在 SQL 查询中注册和使用。

from pyspark.sql.functions import udtf

@udtf(returnType="word: string")
class WordSplitter:
    def eval(self, text: str):
        for word in text.split(" "):
            yield (word.strip(),)

# Register the UDTF for use in Spark SQL.
spark.udtf.register("split_words", WordSplitter)

# Example: Using the UDTF in SQL.
spark.sql("SELECT * FROM split_words('hello world')").show()
# +-----+
# | word|
# +-----+
# |hello|
# |world|
# +-----+

# Example: Using the UDTF with a lateral join in SQL.
# The lateral join allows us to reference the columns and aliases
# in the previous FROM clause items as inputs to the UDTF.
spark.sql(
    "SELECT * FROM VALUES ('Hello World'), ('Apache Spark') t(text), "
    "LATERAL split_words(text)"
).show()
# +------------+------+
# |        text|  word|
# +------------+------+
# | Hello World| Hello|
# | Hello World| World|
# |Apache Spark|Apache|
# |Apache Spark| Spark|
# +------------+------+

Arrow 优化

Apache Arrow 是一种内存中的列式数据格式,用于 Spark 在 Java 和 Python 进程之间高效地传输数据。 默认情况下,Python UDTF 禁用 Apache Arrow。

当每个输入行从 UDTF 生成一个大型结果表时,Arrow 可以提高性能。

要启用 Arrow 优化,请将 spark.sql.execution.pythonUDTF.arrow.enabled 配置设置为 true。您也可以在声明 UDTF 时指定 useArrow 参数来启用它。

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1

有关更多详细信息,请参阅 PySpark 中的 Apache Arrow

TABLE 输入参数

Python UDTF 还可以将 TABLE 作为输入参数,并且可以与标量输入参数结合使用。 默认情况下,出于性能原因,您只允许有一个 TABLE 参数作为输入。 如果您需要多个 TABLE 输入参数,可以通过将 spark.sql.tvf.allowMultipleTableArguments.enabled 配置设置为 true 来启用此功能。

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

更多示例

将日期范围扩展为单个日期的 Python UDTF

from datetime import datetime, timedelta
from pyspark.sql.functions import lit, udtf

@udtf(returnType="date: string")
class DateExpander:
    def eval(self, start_date: str, end_date: str):
        current = datetime.strptime(start_date, '%Y-%m-%d')
        end = datetime.strptime(end_date, '%Y-%m-%d')
        while current <= end:
            yield (current.strftime('%Y-%m-%d'),)
            current += timedelta(days=1)

DateExpander(lit("2023-02-25"), lit("2023-03-01")).show()
# +----------+
# |      date|
# +----------+
# |2023-02-25|
# |2023-02-26|
# |2023-02-27|
# |2023-02-28|
# |2023-03-01|
# +----------+

具有 __init__terminate 的 Python UDTF

from pyspark.sql.functions import udtf

@udtf(returnType="cnt: int")
class CountUDTF:
    def __init__(self):
        # Initialize the counter to 0 when an instance of the class is created.
        self.count = 0

    def eval(self, x: int):
        # Increment the counter by 1 for each input value received.
        self.count += 1

    def terminate(self):
        # Yield the final count when the UDTF is done processing.
        yield self.count,

spark.udtf.register("count_udtf", CountUDTF)
spark.sql("SELECT * FROM range(0, 10, 1, 1), LATERAL count_udtf(id)").show()
# +---+---+
# | id|cnt|
# +---+---+
# |  9| 10|
# +---+---+
spark.sql("SELECT * FROM range(0, 10, 1, 2), LATERAL count_udtf(id)").show()
# +---+---+
# | id|cnt|
# +---+---+
# |  4|  5|
# |  9|  5|
# +---+---+