PySpark 中的 Apache Arrow#
Apache Arrow 是一种内存列式数据格式,在 Spark 中用于在 JVM 和 Python 进程之间高效传输数据。这目前对使用 Pandas/NumPy 数据的 Python 用户最为有利。它的使用不是自动的,可能需要对配置或代码进行一些小的更改才能充分利用并确保兼容性。本指南将高层次地描述如何在 Spark 中使用 Arrow,并强调使用 Arrow 启用数据时的任何差异。
确保 PyArrow 已安装#
要在 PySpark 中使用 Apache Arrow,应安装推荐版本的 PyArrow。如果您使用 pip 安装 PySpark,那么 PyArrow 可以作为 SQL 模块的额外依赖项通过命令 pip install pyspark[sql]
安装。否则,您必须确保 PyArrow 已安装并可在所有集群节点上使用。您可以使用 pip 或 conda 从 conda-forge 频道安装它。有关详细信息,请参阅 PyArrow 安装。
转换为/从 Arrow Table#
从 Spark 4.0 开始,您可以使用 SparkSession.createDataFrame()
从 PyArrow Table 创建 Spark DataFrame,并且可以使用 DataFrame.toArrow()
将 Spark DataFrame 转换为 PyArrow Table。
import pyarrow as pa
import numpy as np
# Create a PyArrow Table
table = pa.table([pa.array(np.random.rand(100)) for i in range(3)], names=["a", "b", "c"])
# Create a Spark DataFrame from the PyArrow Table
df = spark.createDataFrame(table)
# Convert the Spark DataFrame to a PyArrow Table
result_table = df.select("*").toArrow()
print(result_table.schema)
# a: double
# b: double
# c: double
请注意,DataFrame.toArrow()
会导致将 DataFrame 中的所有记录收集到驱动程序中,因此应在小部分数据上执行。并非所有 Spark 和 Arrow 数据类型目前都受支持,如果列具有不受支持的类型,则可能会引发错误。
启用转换为/从 Pandas#
Arrow 可用作优化,当使用 DataFrame.toPandas()
将 Spark DataFrame 转换为 Pandas DataFrame,以及使用 SparkSession.createDataFrame()
从 Pandas DataFrame 创建 Spark DataFrame 时。要在执行这些调用时使用 Arrow,用户需要首先将 Spark 配置 spark.sql.execution.arrow.pyspark.enabled
设置为 true
。此选项默认禁用。
此外,如果 Spark 内部实际计算之前发生错误,由 spark.sql.execution.arrow.pyspark.enabled
启用的优化可以自动回退到非 Arrow 优化实现。这可以通过 spark.sql.execution.arrow.pyspark.fallback.enabled
控制。
import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))
使用 Arrow 的上述优化将产生与未启用 Arrow 时相同的结果。
请注意,即使使用 Arrow,DataFrame.toPandas()
也会将 DataFrame 中的所有记录收集到驱动程序中,因此应在小部分数据上执行。并非所有 Spark 数据类型目前都受支持,如果列具有不受支持的类型,则可能会引发错误。如果在 SparkSession.createDataFrame()
期间发生错误,Spark 将回退到不使用 Arrow 创建 DataFrame。
Pandas UDFs (又名向量化 UDFs)#
Pandas UDF 是用户定义函数,由 Spark 使用 Arrow 传输数据和 Pandas 处理数据来执行,从而允许向量化操作。Pandas UDF 使用 pandas_udf()
作为装饰器或包装函数来定义,不需要额外的配置。Pandas UDF 通常表现为常规的 PySpark 函数 API。
在 Spark 3.0 之前,Pandas UDF 通常使用 pyspark.sql.functions.PandasUDFType
定义。从 Spark 3.0 和 Python 3.6+ 开始,您还可以使用 Python 类型提示。推荐使用 Python 类型提示,而 pyspark.sql.functions.PandasUDFType
将在未来版本中弃用。
请注意,类型提示在所有情况下都应使用 pandas.Series
,但有一种变体是当输入或输出列为 StructType
类型时,应将其输入或输出类型提示改为 pandas.DataFrame
。下面的示例展示了一个 Pandas UDF,它接受长整型列、字符串列和结构体列,并输出一个结构体列。它要求函数如下所示指定 pandas.Series
和 pandas.DataFrame
的类型提示:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("col1 string, col2 long") # type: ignore[call-overload]
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3['col2'] = s1 + s2.str.len()
return s3
# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# | |-- col1: string (nullable = true)
df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# | |-- col1: string (nullable = true)
# | |-- col2: long (nullable = true)
在以下章节中,将描述支持的类型提示的组合。为简洁起见,省略了 pandas.DataFrame
变体。
Series 到 Series#
类型提示可以表示为 pandas.Series
, … -> pandas.Series
。
通过使用 pandas_udf()
并在函数中具有上述类型提示,它创建了一个 Pandas UDF,其中给定函数接受一个或多个 pandas.Series
并输出一个 pandas.Series
。函数的输出长度应始终与输入相同。在内部,PySpark 将通过将列分成批次,并对每个批次(作为数据的子集)调用函数,然后将结果连接起来,来执行 Pandas UDF。
以下示例展示了如何创建此 Pandas UDF,它计算 2 列的乘积。
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType()) # type: ignore[call-overload]
# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
有关详细用法,请参阅 pandas_udf()
。
Series 迭代器到 Series 迭代器#
类型提示可以表示为 Iterator[pandas.Series]
-> Iterator[pandas.Series]
。
通过使用 pandas_udf()
和具有上述类型提示的函数,它创建了一个 Pandas UDF,其中给定函数接受一个 pandas.Series
迭代器并输出一个 pandas.Series
迭代器。函数输出的整体长度应与整个输入长度相同;因此,只要长度相同,它就可以从输入迭代器中预取数据。在这种情况下,创建的 Pandas UDF 在调用时需要一个输入列。要使用多个输入列,需要不同的类型提示。请参阅“多个 Series 迭代器到 Series 迭代器”。
当 UDF 执行需要初始化某些状态时,它也很有用,尽管其内部工作方式与 Series 到 Series 的情况相同。下面的伪代码演示了该示例。
@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
# Do some expensive initialization with a state
state = very_expensive_initialization()
for x in iterator:
# Use that state for the whole iterator.
yield calculate_with_state(x, state)
df.select(calculate("value")).show()
以下示例展示了如何创建此 Pandas UDF
from typing import Iterator
import pandas as pd
from pyspark.sql.functions import pandas_udf
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# Declare the function and create the UDF
@pandas_udf("long") # type: ignore[call-overload]
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in iterator:
yield x + 1
df.select(plus_one("x")).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
有关详细用法,请参阅 pandas_udf()
。
多个 Series 迭代器到 Series 迭代器#
类型提示可以表示为 Iterator[Tuple[pandas.Series, ...]]
-> Iterator[pandas.Series]
。
通过使用 pandas_udf()
和具有上述类型提示的函数,它创建了一个 Pandas UDF,其中给定函数接受一个包含多个 pandas.Series
的元组迭代器,并输出一个 pandas.Series
迭代器。在这种情况下,创建的 Pandas UDF 在调用时需要与元组中 Series 数量一样多的输入列。否则,它具有与 Series 迭代器到 Series 迭代器情况相同的特性和限制。
以下示例展示了如何创建此 Pandas UDF
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import pandas_udf
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# Declare the function and create the UDF
@pandas_udf("long") # type: ignore[call-overload]
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
有关详细用法,请参阅 pandas_udf()
。
Series 到 Scalar#
类型提示可以表示为 pandas.Series
, … -> Any
。
通过使用 pandas_udf()
并让函数具有上述类型提示,它创建了一个类似于 PySpark 聚合函数的 Pandas UDF。给定函数接受 pandas.Series 并返回一个标量值。返回类型应为基本数据类型,返回的标量可以是 Python 基本类型,例如 int
或 float
,也可以是 NumPy 数据类型,例如 numpy.int64
或 numpy.float64
。Any
理想情况下应根据具体情况指定为特定的标量类型。
此 UDF 也可以与 GroupedData.agg()
和 Window 一起使用。它定义了一个从一个或多个 pandas.Series
到一个标量值的聚合,其中每个 pandas.Series
表示组或窗口内的一列。
请注意,这种类型的 UDF 不支持部分聚合,并且组或窗口的所有数据都将加载到内存中。此外,目前只有无界窗口支持分组聚合 Pandas UDF。以下示例展示了如何使用这种类型的 UDF 来计算带有 group-by 和窗口操作的平均值
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double") # type: ignore[call-overload]
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
有关详细用法,请参阅 pandas_udf()
。
Pandas 函数 API#
Pandas 函数 API 可以使用 Pandas 实例直接对整个 DataFrame
应用 Python 原生函数。其内部工作方式与 Pandas UDF 类似,通过使用 Arrow 传输数据和 Pandas 处理数据,从而允许向量化操作。然而,Pandas 函数 API 在 PySpark DataFrame
下的行为类似于常规 API,而不是 Column
,并且 Pandas 函数 API 中的 Python 类型提示是可选的,目前不影响其内部工作方式,尽管将来可能需要它们。
从 Spark 3.0 开始,分组映射 pandas UDF 现在被归类为独立的 Pandas 函数 API,即 DataFrame.groupby().applyInPandas()
。仍然可以使用 pyspark.sql.functions.PandasUDFType
和 DataFrame.groupby().apply()
来使用它;但是,更推荐直接使用 DataFrame.groupby().applyInPandas()
。使用 pyspark.sql.functions.PandasUDFType
将在未来版本中弃用。
分组映射#
使用 Pandas 实例进行分组映射操作由 DataFrame.groupby().applyInPandas()
支持,它需要一个 Python 函数,该函数接受一个 pandas.DataFrame
并返回另一个 pandas.DataFrame
。它将每个组映射到 Python 函数中的每个 pandas.DataFrame
。
此 API 实现了“拆分-应用-合并”模式,该模式包含三个步骤
使用
DataFrame.groupBy()
将数据拆分为组。对每个组应用一个函数。函数的输入和输出都是
pandas.DataFrame
。输入数据包含每个组的所有行和列。将结果合并到新的 PySpark
DataFrame
中。
要使用 DataFrame.groupBy().applyInPandas()
,用户需要定义以下内容
一个定义每个组计算的 Python 函数。
一个
StructType
对象或一个字符串,用于定义输出 PySparkDataFrame
的模式。
返回的 pandas.DataFrame
的列标签必须与定义的输出模式中的字段名称(如果指定为字符串)匹配,或者按位置与字段数据类型匹配(如果不是字符串,例如整数索引)。有关如何构造 pandas.DataFrame
时标记列的信息,请参阅 pandas.DataFrame。
请注意,在应用函数之前,一个组的所有数据都将加载到内存中。这可能导致内存不足异常,尤其是当组大小倾斜时。对 maxRecordsPerBatch 的配置不适用于组,由用户自行确保分组数据适合可用内存。
以下示例展示了如何使用 DataFrame.groupby().applyInPandas()
从组中的每个值中减去平均值。
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
有关详细用法,请参阅 GroupedData.applyInPandas()
映射#
使用 Pandas 实例进行的映射操作由 DataFrame.mapInPandas()
支持,它将一个 pandas.DataFrame
迭代器映射到另一个 pandas.DataFrame
迭代器,后者表示当前的 PySpark DataFrame
,并将结果作为 PySpark DataFrame
返回。该函数接受并输出一个 pandas.DataFrame
的迭代器。与某些 Pandas UDF 不同,它可以返回任意长度的输出,尽管其内部工作方式与 Series 到 Series Pandas UDF 类似。
以下示例展示了如何使用 DataFrame.mapInPandas()
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
有关详细用法,请参阅 DataFrame.mapInPandas()
。
协同分组映射#
使用 Pandas 实例的协同分组映射操作由 DataFrame.groupby().cogroup().applyInPandas()
支持,该方法允许两个 PySpark DataFrame
通过公共键进行协同分组,然后将 Python 函数应用于每个协同分组。它包括以下步骤:
混洗数据,使每个数据框中共享键的组协同分组在一起。
对每个协同分组应用一个函数。函数的输入是两个
pandas.DataFrame
(带可选的表示键的元组)。函数的输出是一个pandas.DataFrame
。将所有组中的
pandas.DataFrame
合并到一个新的 PySparkDataFrame
中。
要使用 groupBy().cogroup().applyInPandas()
,用户需要定义以下内容
一个定义每个协同分组计算的 Python 函数。
一个
StructType
对象或一个字符串,用于定义输出 PySparkDataFrame
的模式。
返回的 pandas.DataFrame
的列标签必须与定义的输出模式中的字段名称(如果指定为字符串)匹配,或者按位置与字段数据类型匹配(如果不是字符串,例如整数索引)。有关如何构造 pandas.DataFrame
时标记列的信息,请参阅 pandas.DataFrame。
请注意,在应用函数之前,协同分组的所有数据都将加载到内存中。这可能导致内存不足异常,尤其是当组大小倾斜时。maxRecordsPerBatch 的配置不适用,用户应确保协同分组的数据适合可用内存。
以下示例展示了如何使用 DataFrame.groupby().cogroup().applyInPandas()
在两个数据集之间执行 asof 连接。
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def merge_ordered(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.merge_ordered(left, right)
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
merge_ordered, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+----+
# | time| id| v1| v2|
# +--------+---+---+----+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0|null|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0|null|
# +--------+---+---+----+
有关详细用法,请参阅 PandasCogroupedOps.applyInPandas()
Arrow Python UDFs#
Arrow Python UDF 是用户定义函数,它们逐行执行,利用 Arrow 进行高效的批数据传输和序列化。要定义 Arrow Python UDF,您可以使用 udf()
装饰器或使用 udf()
方法包装函数,并确保将 useArrow
参数设置为 True。此外,您可以通过将 Spark 配置 spark.sql.execution.pythonUDF.arrow.enabled
设置为 true 来为整个 SparkSession 中的 Python UDF 启用 Arrow 优化。需要注意的是,Spark 配置仅在 useArrow
未设置或设置为 None 时生效。
Arrow Python UDF 的类型提示应以与默认的、pickle 序列化的 Python UDF 相同的方式指定。
以下示例演示了默认的、pickle 序列化的 Python UDF 和 Arrow Python UDF 的用法
from pyspark.sql.functions import udf
@udf(returnType='int') # A default, pickled Python UDF
def slen(s): # type: ignore[no-untyped-def]
return len(s)
@udf(returnType='int', useArrow=True) # An Arrow Python UDF
def arrow_slen(s): # type: ignore[no-untyped-def]
return len(s)
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name"), arrow_slen("name")).show()
# +----------+----------------+
# |slen(name)|arrow_slen(name)|
# +----------+----------------+
# | 8| 8|
# +----------+----------------+
与默认的、pickle 序列化的 Python UDF 相比,Arrow Python UDF 提供了更连贯的类型强制转换机制。当 UDF 返回的 Python 实例与用户指定的返回类型不一致时,UDF 类型强制转换会带来挑战。默认的、pickle 序列化的 Python UDF 的类型强制转换存在某些限制,例如依赖 None 作为类型不匹配的后备,这可能导致潜在的歧义和数据丢失。此外,将日期、日期时间、元组转换为字符串可能会产生模糊的结果。另一方面,Arrow Python UDF 利用 Arrow 的能力来标准化类型强制转换并有效地解决这些问题。
使用注意事项#
支持的 SQL 类型#
目前,所有 Spark SQL 数据类型都受基于 Arrow 的转换支持,但 ArrayType
的 TimestampType
除外。MapType
和嵌套 StructType
的 ArrayType
仅在使用 PyArrow 2.0.0 及更高版本时才受支持。
设置 Arrow 批处理大小#
Spark 中的数据分区被转换为 Arrow 记录批处理,这可能导致 JVM 内存暂时使用过高。为了避免可能的内存不足异常,可以通过设置配置 spark.sql.execution.arrow.maxRecordsPerBatch
来调整 Arrow 记录批处理的大小,该整数将确定每个批处理的最大行数。默认值为每个批处理 10,000 条记录。如果列数很大,则应相应调整该值。使用此限制,每个数据分区将生成 1 个或更多记录批处理以进行处理。
带有时区语义的时间戳#
Spark 内部将时间戳存储为 UTC 值,并且没有指定时区的时间戳数据会以本地时间转换为具有微秒分辨率的 UTC。当时间戳数据在 Spark 中导出或显示时,会话时区用于本地化时间戳值。会话时区由配置 spark.sql.session.timeZone
设置,如果未设置,则默认为 JVM 系统本地时区。Pandas 使用具有纳秒分辨率的 datetime64
类型,即 datetime64[ns]
,并可选择按列指定时区。
当时间戳数据从 Spark 传输到 Pandas 时,它将被转换为纳秒,并且每列将被转换为 Spark 会话时区,然后本地化到该时区,这将移除时区并以本地时间显示值。这将在调用 DataFrame.toPandas()
或包含时间戳列的 pandas_udf
时发生。
当时间戳数据从 Spark 传输到 PyArrow Table 时,它将保持微秒分辨率并使用 UTC 时区。这在调用 DataFrame.toArrow()
并包含时间戳列时发生。
当时间戳数据从 Pandas 或 PyArrow 传输到 Spark 时,它将被转换为 UTC 微秒。这在调用 SparkSession.createDataFrame()
并使用 Pandas DataFrame 或 PyArrow Table 时,或从 pandas_udf
返回时间戳时发生。这些转换是自动完成的,以确保 Spark 将以预期格式获取数据,因此无需自行进行任何此类转换。任何纳秒值都将被截断。
请注意,标准 UDF(非 Pandas)将时间戳数据加载为 Python datetime 对象,这与 Pandas 时间戳不同。建议在使用 pandas_udf
处理时间戳时使用 Pandas 时间序列功能以获得最佳性能,详情请参见此处。
推荐的 Pandas 和 PyArrow 版本#
对于 pyspark.sql 的使用,Pandas 的最低支持版本为 2.0.0,PyArrow 为 11.0.0。可以使用更高版本,但不能保证兼容性和数据正确性,用户应自行验证。
为节省内存设置 Arrow self_destruct
#
自 Spark 3.2 以来,Spark 配置 spark.sql.execution.arrow.pyspark.selfDestruct.enabled
可用于启用 PyArrow 的 self_destruct
功能,该功能可在通过 toPandas
创建 Pandas DataFrame 时通过释放 Arrow 分配的内存来节省内存。此选项还可在通过 toArrow
创建 PyArrow Table 时节省内存。此选项是实验性的。当与 toPandas
一起使用时,由于不可变的支持数组,某些操作可能在生成的 Pandas DataFrame 上失败。通常,您会看到错误 ValueError: buffer source array is read-only
。较新版本的 Pandas 可能会通过改进对此类情况的支持来修复这些错误。您可以通过预先复制列来解决此错误。此外,此转换可能较慢,因为它是单线程的。