PySpark 中的 Apache Arrow

Apache Arrow 是一种内存中的列式数据格式,Spark 使用它来高效地在 JVM 和 Python 进程之间传输数据。目前,这对于使用 Pandas/NumPy 数据的 Python 用户最为有利。它的使用不是自动的,可能需要对配置或代码进行一些小的更改才能充分利用并确保兼容性。本指南将提供如何在 Spark 中使用 Arrow 的高级描述,并突出显示在使用启用 Arrow 的数据时的任何差异。

确保已安装 PyArrow

要在 PySpark 中使用 Apache Arrow,应安装推荐版本的 PyArrow。如果您使用 pip 安装 PySpark,则可以使用命令 pip install pyspark[sql] 将 PyArrow 作为 SQL 模块的额外依赖项引入。否则,您必须确保 PyArrow 已安装并且在所有集群节点上都可用。您可以使用 pip 或 conda 从 conda-forge 频道安装它。有关详细信息,请参见 PyArrow 安装

启用与 Pandas 之间的转换

当使用调用 DataFrame.toPandas() 将 Spark DataFrame 转换为 Pandas DataFrame 时,以及当使用 SparkSession.createDataFrame() 从 Pandas DataFrame 创建 Spark DataFrame 时,Arrow 可用作优化。要在执行这些调用时使用 Arrow,用户需要首先将 Spark 配置 spark.sql.execution.arrow.pyspark.enabled 设置为 true。默认情况下,此配置处于禁用状态。

此外,如果 Spark 内的实际计算之前发生错误,由 spark.sql.execution.arrow.pyspark.enabled 启用的优化可能会自动回退到非 Arrow 优化实现。这可以通过 spark.sql.execution.arrow.pyspark.fallback.enabled 控制。

import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))

使用 Arrow 的上述优化将产生与未启用 Arrow 时相同的结果。

请注意,即使使用 Arrow,DataFrame.toPandas() 也会导致 DataFrame 中的所有记录都收集到驱动程序程序中,因此应在数据的较小子集上执行此操作。并非所有 Spark 数据类型都当前受支持,如果列具有不受支持的类型,则可能会引发错误。如果在 SparkSession.createDataFrame() 期间发生错误,Spark 将回退到不使用 Arrow 创建 DataFrame。

Pandas UDF(又名向量化 UDF)

Pandas UDF 是用户定义的函数,由 Spark 使用 Arrow 传输数据并使用 Pandas 来处理数据来执行,从而允许向量化操作。使用 pandas_udf() 作为装饰器或包装函数来定义 Pandas UDF,并且不需要其他配置。一般来说,Pandas UDF 的行为类似于常规 PySpark 函数 API。

在 Spark 3.0 之前,Pandas UDF 过去使用 pyspark.sql.functions.PandasUDFType 定义。从带有 Python 3.6+ 的 Spark 3.0 开始,您还可以使用 Python 类型提示。首选使用 Python 类型提示,并且将来版本将弃用 pyspark.sql.functions.PandasUDFType 的使用。

请注意,类型提示应在所有情况下都使用 pandas.Series,但是当输入或输出列属于 StructType 时,有一种变体应改为使用 pandas.DataFrame 作为其输入或输出类型提示。以下示例显示了一个 Pandas UDF,它接受 long 列、字符串列和结构列,并输出一个结构列。它要求该函数指定 pandas.Seriespandas.DataFrame 的类型提示,如下所示

import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")  # type: ignore[call-overload]
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# |    |-- col1: string (nullable = true)

df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# |    |-- col1: string (nullable = true)
# |    |-- col2: long (nullable = true)

在以下各节中,它描述了支持的类型提示的组合。为简单起见,省略了 pandas.DataFrame 变体。

Series 到 Series

类型提示可以表示为 pandas.Series, … -> pandas.Series

通过将 pandas_udf() 与具有上述类型提示的函数一起使用,它会创建一个 Pandas UDF,其中给定的函数接受一个或多个 pandas.Series 并输出一个 pandas.Series。函数的输出应始终与输入的长度相同。在内部,PySpark 将通过将列拆分为批处理并为每个批处理(作为数据的子集)调用函数,然后将结果连接在一起来执行 Pandas UDF。

以下示例显示了如何创建此 Pandas UDF,该 UDF 计算 2 列的乘积。

import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())  # type: ignore[call-overload]

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

有关详细用法,请参见 pandas_udf()

Series 迭代器到 Series 迭代器

类型提示可以表示为 Iterator[pandas.Series] -> Iterator[pandas.Series]

通过将 pandas_udf() 与具有上述类型提示的函数一起使用,它会创建一个 Pandas UDF,其中给定的函数接受 pandas.Series 的迭代器并输出 pandas.Series 的迭代器。函数的整个输出的长度应与整个输入的长度相同;因此,只要长度相同,它就可以从输入迭代器中预取数据。在这种情况下,当调用 Pandas UDF 时,创建的 Pandas UDF 需要一个输入列。要使用多个输入列,需要不同的类型提示。请参见多个 Series 迭代器到 Series 迭代器。

当 UDF 执行需要初始化某些状态时,它也很有用,尽管在内部它的工作方式与 Series 到 Series 的情况相同。下面的伪代码说明了该示例。

@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    # Do some expensive initialization with a state
    state = very_expensive_initialization()
    for x in iterator:
        # Use that state for the whole iterator.
        yield calculate_with_state(x, state)

df.select(calculate("value")).show()

以下示例显示了如何创建此 Pandas UDF

from typing import Iterator

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Declare the function and create the UDF
@pandas_udf("long")  # type: ignore[call-overload]
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x + 1

df.select(plus_one("x")).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

有关详细用法,请参见 pandas_udf()

多个 Series 迭代器到 Series 迭代器

类型提示可以表示为 Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series]

通过使用 pandas_udf(),并且函数具有上述类型提示,它会创建一个 Pandas UDF,其中给定的函数接受一个由多个 pandas.Series 组成的元组的迭代器,并输出一个 pandas.Series 的迭代器。在这种情况下,创建的 Pandas UDF 需要多个输入列,其数量与调用 Pandas UDF 时元组中的 series 数量相同。否则,它与 Series 迭代器到 Series 迭代器的情况具有相同的特性和限制。

以下示例显示了如何创建此 Pandas UDF

from typing import Iterator, Tuple

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Declare the function and create the UDF
@pandas_udf("long")  # type: ignore[call-overload]
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

有关详细用法,请参见 pandas_udf()

Series 到标量

类型提示可以表示为 pandas.Series, … -> Any

通过使用 pandas_udf(),并且函数具有上述类型提示,它会创建一个类似于 PySpark 聚合函数的 Pandas UDF。给定的函数接受 pandas.Series 并返回一个标量值。返回类型应该是原始数据类型,返回的标量可以是 Python 原始类型,例如 intfloat,也可以是 numpy 数据类型,例如 numpy.int64numpy.float64Any 理想情况下应该是一个相应的特定标量类型。

此 UDF 也可以与 GroupedData.agg()Window 一起使用。它定义了从一个或多个 pandas.Series 到标量值的聚合,其中每个 pandas.Series 代表组或窗口中的一列。

请注意,这种类型的 UDF 不支持部分聚合,并且组或窗口的所有数据都将加载到内存中。此外,目前只有无界窗口支持分组聚合 Pandas UDF。以下示例展示了如何使用此类型的 UDF 通过 group-by 和窗口操作来计算平均值

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")  # type: ignore[call-overload]
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

有关详细用法,请参见 pandas_udf()

Pandas 函数 API

Pandas 函数 API 可以通过使用 Pandas 实例将 Python 本机函数直接应用于整个 DataFrame。在内部,它的工作方式与 Pandas UDF 类似,使用 Arrow 传输数据,使用 Pandas 处理数据,从而实现向量化操作。但是,Pandas 函数 API 的行为类似于 PySpark DataFrame 下的常规 API,而不是 Column,并且 Pandas 函数 API 中的 Python 类型提示是可选的,并且目前不会影响其内部工作方式,尽管将来可能需要它们。

从 Spark 3.0 开始,分组映射 pandas UDF 现在被归类为一个单独的 Pandas 函数 API,DataFrame.groupby().applyInPandas()。仍然可以使用 pyspark.sql.functions.PandasUDFTypeDataFrame.groupby().apply() 来使用它;但是,建议直接使用 DataFrame.groupby().applyInPandas()。在未来,pyspark.sql.functions.PandasUDFType 将被弃用。

分组映射

Pandas 实例的分组映射操作由 DataFrame.groupby().applyInPandas() 支持,它需要一个 Python 函数,该函数接受一个 pandas.DataFrame 并返回另一个 pandas.DataFrame。它将每个组映射到 Python 函数中的每个 pandas.DataFrame

此 API 实现了“拆分-应用-组合”模式,该模式由三个步骤组成

  • 使用 DataFrame.groupBy() 将数据拆分为组。

  • 对每个组应用一个函数。函数的输入和输出都是 pandas.DataFrame。输入数据包含每个组的所有行和列。

  • 将结果组合成一个新的 PySpark DataFrame

要使用 DataFrame.groupby().applyInPandas(),用户需要定义以下内容

  • 一个 Python 函数,用于定义每个组的计算。

  • 一个 StructType 对象或一个字符串,用于定义输出 PySpark DataFrame 的 schema。

如果返回的 pandas.DataFrame 的列标签指定为字符串,则必须与定义的输出 schema 中的字段名称匹配,如果不是字符串,则必须按位置匹配字段数据类型,例如整数索引。有关如何在构造 pandas.DataFrame 时标记列,请参阅 pandas.DataFrame

请注意,在应用函数之前,会将组的所有数据加载到内存中。这可能导致内存不足异常,尤其是在组大小倾斜的情况下。对于 maxRecordsPerBatch 的配置不适用于组,并且由用户负责确保分组数据适合可用内存。

以下示例展示了如何使用 DataFrame.groupby().applyInPandas() 从组中的每个值中减去平均值。

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

有关详细用法,请参阅 GroupedData.applyInPandas()

映射

Pandas 实例的映射操作由 DataFrame.mapInPandas() 支持,它将 pandas.DataFrames 的迭代器映射到另一个 pandas.DataFrames 的迭代器,该迭代器表示当前的 PySpark DataFrame 并将结果作为 PySpark DataFrame 返回。该函数接受并输出 pandas.DataFrame 的迭代器。与某些 Pandas UDF 相比,它可以返回任意长度的输出,尽管在内部它的工作方式与 Series 到 Series Pandas UDF 类似。

以下示例展示了如何使用 DataFrame.mapInPandas()

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

有关详细用法,请参阅 DataFrame.mapInPandas()

协同分组映射

Pandas 实例的协同分组映射操作由 DataFrame.groupby().cogroup().applyInPandas() 支持,它允许两个 PySpark DataFrames 通过一个公共键进行协同分组,然后将一个 Python 函数应用于每个协同分组。它包括以下步骤

  • 对数据进行 shuffle,以便共享一个键的每个 dataframe 的组被协同分组在一起。

  • 将一个函数应用于每个协同分组。该函数的输入是两个 pandas.DataFrame(带有一个可选的元组表示键)。该函数的输出是一个 pandas.DataFrame

  • 将来自所有组的 pandas.DataFrames 组合成一个新的 PySpark DataFrame

要使用 groupBy().cogroup().applyInPandas(),用户需要定义以下内容

  • 一个 Python 函数,用于定义每个协同分组的计算。

  • 一个 StructType 对象或一个字符串,用于定义输出 PySpark DataFrame 的 schema。

如果返回的 pandas.DataFrame 的列标签指定为字符串,则必须与定义的输出 schema 中的字段名称匹配,如果不是字符串,则必须按位置匹配字段数据类型,例如整数索引。有关如何在构造 pandas.DataFrame 时标记列,请参阅 pandas.DataFrame.

请注意,cogroup 的所有数据都会在应用函数之前加载到内存中。这可能导致内存溢出异常,尤其是在组大小倾斜的情况下。 maxRecordsPerBatch 的配置不适用,用户需要确保 cogroup 的数据能够放入可用内存中。

以下示例展示了如何使用 DataFrame.groupby().cogroup().applyInPandas() 在两个数据集之间执行 asof 连接。

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def merge_ordered(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    return pd.merge_ordered(left, right)

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    merge_ordered, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+----+
# |    time| id| v1|  v2|
# +--------+---+---+----+
# |20000101|  1|1.0|   x|
# |20000102|  1|3.0|null|
# |20000101|  2|2.0|   y|
# |20000102|  2|4.0|null|
# +--------+---+---+----+

有关详细用法,请参阅 PandasCogroupedOps.applyInPandas()

Arrow Python UDF

Arrow Python UDF 是用户定义的函数,它们逐行执行,利用 Arrow 进行高效的批量数据传输和序列化。要定义 Arrow Python UDF,您可以使用 udf() 装饰器或使用 udf() 方法包装该函数,确保 useArrow 参数设置为 True。此外,您可以通过将 Spark 配置 spark.sql .execution.pythonUDF.arrow.enabled 设置为 true,为整个 SparkSession 启用 Python UDF 的 Arrow 优化。需要注意的是,只有当 useArrow 未设置或设置为 None 时,Spark 配置才会生效。

Arrow Python UDF 的类型提示应以与默认的 pickled Python UDF 相同的方式指定。

这是一个演示默认的 pickled Python UDF 和 Arrow Python UDF 的用法的例子

from pyspark.sql.functions import udf

@udf(returnType='int')  # A default, pickled Python UDF
def slen(s):  # type: ignore[no-untyped-def]
    return len(s)

@udf(returnType='int', useArrow=True)  # An Arrow Python UDF
def arrow_slen(s):  # type: ignore[no-untyped-def]
    return len(s)

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))

df.select(slen("name"), arrow_slen("name")).show()
# +----------+----------------+
# |slen(name)|arrow_slen(name)|
# +----------+----------------+
# |         8|               8|
# +----------+----------------+

与默认的 pickled Python UDF 相比,Arrow Python UDF 提供了更一致的类型强制转换机制。当 UDF 返回的 Python 实例与用户指定的返回类型不一致时,UDF 类型强制转换会带来挑战。默认的 pickled Python UDF 的类型强制转换具有一定的局限性,例如依赖 None 作为类型不匹配的后备方案,导致潜在的歧义和数据丢失。此外,将日期、日期时间和元组转换为字符串可能会产生不明确的结果。另一方面,Arrow Python UDF 利用 Arrow 的功能来标准化类型强制转换并有效地解决这些问题。

使用说明

支持的 SQL 类型

目前,所有 Spark SQL 数据类型都支持基于 Arrow 的转换,除了 ArrayTypeTimestampType。只有在使用 PyArrow 2.0.0 及以上版本时,才支持嵌套 StructTypeMapTypeArrayType

设置 Arrow 批处理大小

Spark 中的数据分区被转换为 Arrow 记录批次,这可能会暂时导致 JVM 中的高内存使用率。为避免可能的内存溢出异常,可以通过将 conf spark.sql.execution.arrow.maxRecordsPerBatch 设置为整数来调整 Arrow 记录批次的大小,该整数将确定每个批次的最大行数。默认值为每个批次 10,000 条记录。如果列数很大,则应相应地调整该值。使用此限制,每个数据分区将被分成 1 个或多个记录批次进行处理。

带时区语义的时间戳

Spark 内部将时间戳存储为 UTC 值,并且没有指定时区的时间戳数据将以微秒分辨率转换为本地时间到 UTC。当在 Spark 中导出或显示时间戳数据时,会话时区用于本地化时间戳值。会话时区由配置 spark.sql.session.timeZone 设置,如果未设置,则默认为 JVM 系统本地时区。 Pandas 使用 datetime64 类型,具有纳秒分辨率,datetime64[ns],可以选择按列设置时区。

当时间戳数据从 Spark 传输到 Pandas 时,它将被转换为纳秒,并且每列都将转换为 Spark 会话时区,然后本地化到该时区,这将删除时区并将值显示为本地时间。当使用时间戳列调用 DataFrame.toPandas()pandas_udf 时,会发生这种情况。

当时间戳数据从 Pandas 传输到 Spark 时,它将被转换为 UTC 微秒。当使用 Pandas DataFrame 调用 SparkSession.createDataFrame() 或从 pandas_udf 返回时间戳时,会发生这种情况。这些转换是自动完成的,以确保 Spark 具有预期格式的数据,因此无需自己进行任何这些转换。任何纳秒值都将被截断。

请注意,标准 UDF(非 Pandas)将加载时间戳数据作为 Python datetime 对象,这与 Pandas 时间戳不同。建议在使用 pandas_udf 处理时间戳时使用 Pandas 时间序列功能,以获得最佳性能,有关详细信息,请参见此处

设置 Arrow self_destruct 以节省内存

自 Spark 3.2 起,Spark 配置 spark.sql.execution.arrow.pyspark.selfDestruct.enabled 可用于启用 PyArrow 的 self_destruct 功能,该功能可以通过在构建 Pandas DataFrame 时释放 Arrow 分配的内存来节省内存。此选项是实验性的,并且某些操作可能会由于不可变的后备数组而在生成的 Pandas DataFrame 上失败。通常,您会看到错误 ValueError: buffer source array is read-only。较新版本的 Pandas 可能会通过改进对此类案例的支持来修复这些错误。您可以通过事先复制列来解决此错误。此外,此转换可能会较慢,因为它采用单线程。