选项和设置#

Spark 上的 Pandas API 具有一个选项系统,允许您自定义其行为的某些方面,其中与显示相关的选项是用户最可能调整的。

选项具有完整的“点分样式”名称,不区分大小写(例如 display.max_rows)。您可以直接将选项作为顶级 options 属性的属性来获取/设置

>>> import pyspark.pandas as ps
>>> ps.options.display.max_rows
1000
>>> ps.options.display.max_rows = 10
>>> ps.options.display.max_rows
10

该 API 由 3 个相关函数组成,可直接从 pandas_on_spark 命名空间中获取

注意:开发者可以查看 pyspark.pandas/config.py 获取更多信息。

>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 101)
>>> ps.get_option("display.max_rows")
101

获取和设置选项#

如上所述,get_option()set_option() 可从 pandas_on_spark 命名空间获取。要更改选项,请调用 set_option('option name', new_value)

>>> import pyspark.pandas as ps
>>> ps.get_option('compute.max_rows')
1000
>>> ps.set_option('compute.max_rows', 2000)
>>> ps.get_option('compute.max_rows')
2000

所有选项都有一个默认值,您可以使用 reset_option 来恢复默认值

>>> import pyspark.pandas as ps
>>> ps.reset_option("display.max_rows")
>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 999)
>>> ps.get_option("display.max_rows")
999
>>> ps.reset_option("display.max_rows")
>>> ps.get_option("display.max_rows")
1000

option_context 上下文管理器已通过顶层 API 公开,允许您使用给定选项值执行代码。当您退出 with 块时,选项值将自动恢复。

>>> with ps.option_context("display.max_rows", 10, "compute.max_rows", 5):
...    print(ps.get_option("display.max_rows"))
...    print(ps.get_option("compute.max_rows"))
10
5
>>> print(ps.get_option("display.max_rows"))
>>> print(ps.get_option("compute.max_rows"))
1000
1000

对不同 DataFrames 的操作#

Spark 上的 Pandas API 默认不允许对不同 DataFrames(或 Series)进行操作,以防止昂贵的操作。它内部执行连接操作,这通常会很昂贵。

可以通过将 compute.ops_on_diff_frames 设置为 True 来启用这种情况。请参阅下面的示例。

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf1 = ps.range(5)
>>> psdf2 = ps.DataFrame({'id': [5, 4, 3]})
>>> (psdf1 - psdf2).sort_index()
    id
0 -5.0
1 -3.0
2 -1.0
3  NaN
4  NaN
>>> ps.reset_option('compute.ops_on_diff_frames')
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf = ps.range(5)
>>> psser_a = ps.Series([1, 2, 3, 4])
>>> # 'psser_a' is not from 'psdf' DataFrame. So it is considered as a Series not from 'psdf'.
>>> psdf['new_col'] = psser_a
>>> psdf
   id  new_col
0   0      1.0
1   1      2.0
3   3      4.0
2   2      3.0
4   4      NaN
>>> ps.reset_option('compute.ops_on_diff_frames')

默认索引类型#

在 Spark 上的 pandas API 中,默认索引在多种情况下使用,例如,当 Spark DataFrame 转换为 Spark 上的 pandas DataFrame 时。在这种情况下,Spark 上的 pandas API 内部会将一个默认索引附加到 Spark 上的 pandas DataFrame 中。

默认索引有几种类型,可以通过 compute.default_index_type 配置,如下所示

sequence(序列):它通过 PySpark 的 Window 函数实现一个递增序列,且不指定分区。因此,它可能导致整个分区都在单个节点上。当数据量大时应避免使用此索引类型。请参阅下面的示例

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Index([0, 1, 2], dtype='int64')

这在概念上等同于下面的 PySpark 示例

>>> from pyspark.sql import functions as sf, Window
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> sequential_index = sf.row_number().over(
...    Window.orderBy(sf.monotonically_increasing_id().asc())) - 1
>>> spark_df.select(sequential_index).rdd.map(lambda r: r[0]).collect()
[0, 1, 2]

distributed-sequence(分布式序列,默认):它通过 group-by 和 group-map 方法以分布式方式实现一个递增序列。它仍然全局生成顺序索引。如果默认索引在大型数据集中必须是顺序的,则必须使用此索引。请参阅下面的示例

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed-sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Index([0, 1, 2], dtype='int64')

这在概念上等同于下面的 PySpark 示例

>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.rdd.zipWithIndex().map(lambda p: p[1]).collect()
[0, 1, 2]

警告

sequence 不同,由于 distributed-sequence 在分布式环境中执行,尽管索引本身仍然是全局顺序的,但每个索引对应的行可能会有所不同。

发生这种情况是因为行分布在多个分区和节点中,导致数据加载时行到索引的映射不确定。

此外,当使用诸如 apply()groupby()transform() 等操作时,可能会生成新的 distributed-sequence 索引,它不一定与 DataFrame 的原始索引匹配。这可能导致行到索引的映射错位,从而导致计算不正确。

为避免此问题,请参阅 处理分布式序列中的索引错位

distributed(分布式):它通过使用 PySpark 的 monotonically_increasing_id 函数以完全分布式方式实现单调递增序列。值是不确定的。如果索引不必是逐个递增的序列,则应使用此索引。在性能方面,与其他索引类型相比,此索引几乎没有开销。请参阅下面的示例

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Index([25769803776, 60129542144, 94489280512], dtype='int64')

这在概念上等同于下面的 PySpark 示例

>>> from pyspark.sql import functions as sf
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.select(sf.monotonically_increasing_id()) \
...     .rdd.map(lambda r: r[0]).collect()
[25769803776, 60129542144, 94489280512]

警告

这种索引类型不太可能用于计算两个不同的数据框,因为它不能保证在两个数据框中具有相同的索引。如果您使用此默认索引并打开 compute.ops_on_diff_frames,则由于索引值不确定,两个不同 DataFrame 之间的操作结果可能会是意外的输出。

可用选项#

选项

默认值

描述

display.max_rows

1000

这设置了 Spark 上的 pandas 在打印各种输出时应输出的最大行数。例如,此值决定了数据框中 repr() 显示的行数。设置为 None 以不限制输入长度。默认值为 1000。

compute.max_rows

1000

‘compute.max_rows’ 设置当前 Spark 上的 pandas DataFrame 的限制。设置为 None 以不限制输入长度。设置限制时,通过将数据收集到驱动程序,然后使用 pandas API 来执行快捷操作。如果未设置限制,则操作由 PySpark 执行。默认值为 1000。

compute.shortcut_limit

1000

‘compute.shortcut_limit’ 设置快捷方式的限制。它计算指定行数并使用其模式。当数据框长度大于此限制时,Spark 上的 pandas 使用 PySpark 进行计算。

compute.ops_on_diff_frames

这决定了是否在两个不同的数据框之间进行操作。例如,‘combine_frames’ 函数内部执行连接操作,这通常会很昂贵。因此,如果 compute.ops_on_diff_frames 变量不为 True,则该方法会抛出异常。

compute.default_index_type

‘distributed-sequence’

这设置了默认索引类型:sequence、distributed 和 distributed-sequence。

compute.default_index_cache

‘MEMORY_AND_DISK_SER’

这设置了分布式序列索引中缓存的临时 RDD 的默认存储级别:‘NONE’、‘DISK_ONLY’、‘DISK_ONLY_2’、‘DISK_ONLY_3’、‘MEMORY_ONLY’、‘MEMORY_ONLY_2’、‘MEMORY_ONLY_SER’、‘MEMORY_ONLY_SER_2’、‘MEMORY_AND_DISK’、‘MEMORY_AND_DISK_2’、‘MEMORY_AND_DISK_SER’、‘MEMORY_AND_DISK_SER_2’、‘OFF_HEAP’、‘LOCAL_CHECKPOINT’。

compute.ordered_head

‘compute.ordered_head’ 设置是否以自然排序操作 head。Spark 上的 pandas 不保证行顺序,因此 head 可能会从分布式分区返回一些行。如果将 ‘compute.ordered_head’ 设置为 True,Spark 上的 pandas 会预先执行自然排序,但这会带来性能开销。

compute.eager_check

‘compute.eager_check’ 设置是否仅为了验证而启动某些 Spark 作业。如果将 ‘compute.eager_check’ 设置为 True,Spark 上的 pandas 会预先执行验证,但这会带来性能开销。否则,Spark 上的 pandas 将跳过验证,并且与 pandas 略有不同。受影响的 API 包括:Series.dotSeries.asofSeries.compareFractionalExtensionOps.astypeIntegralExtensionOps.astypeFractionalOps.astypeDecimalOps.astype统计函数中的 skipna

compute.isin_limit

80

‘compute.isin_limit’ 设置通过 ‘Column.isin(list)’ 进行过滤的限制。如果 ‘list’ 的长度超过限制,将改用广播连接以获得更好的性能。

compute.pandas_fallback

‘compute.pandas_fallback’ 设置是否自动回退到 Pandas 的实现。

compute.fail_on_ansi_mode

‘compute.fail_on_ansi_mode’ 设置是否与 ANSI 模式一起工作。如果为 True,如果底层 Spark 在启用 ANSI 模式下工作,Spark 上的 pandas API 将引发异常;否则,它会强制工作,尽管可能导致意外行为。

plotting.max_rows

1000

‘plotting.max_rows’ 设置基于 top-n 的图表(例如 plot.barplot.pie)的视觉限制。如果设置为 1000,则前 1000 个数据点将用于绘图。默认值为 1000。

plotting.sample_ratio

‘plotting.sample_ratio’ 设置用于基于样本的图表(例如 plot.lineplot.area)的数据比例。如果未设置,则通过计算 ‘plotting.max_rows’ 与总数据大小的比率,从 ‘plotting.max_rows’ 派生。

plotting.backend

‘plotly’

用于绘图的后端。默认是 plotly。支持任何具有顶级 .plot 方法的包。已知选项包括:[matplotlib, plotly]。