选项和设置

Spark 上的 Pandas API 具有一个选项系统,可用于自定义其某些方面的行为,其中显示相关的选项是用户最有可能调整的选项。

选项具有完整的“点式”大小写不敏感的名称(例如 display.max_rows)。您可以直接作为顶层 options 属性的属性来获取/设置选项

>>> import pyspark.pandas as ps
>>> ps.options.display.max_rows
1000
>>> ps.options.display.max_rows = 10
>>> ps.options.display.max_rows
10

该 API 由 3 个相关函数组成,可直接从 pandas_on_spark 命名空间获得

注意: 开发人员可以查看 pyspark.pandas/config.py 以获取更多信息。

>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 101)
>>> ps.get_option("display.max_rows")
101

获取和设置选项

如上所述,get_option()set_option() 可从 pandas_on_spark 命名空间获得。要更改选项,请调用 set_option('option name', new_value)

>>> import pyspark.pandas as ps
>>> ps.get_option('compute.max_rows')
1000
>>> ps.set_option('compute.max_rows', 2000)
>>> ps.get_option('compute.max_rows')
2000

所有选项也都有一个默认值,您可以使用 reset_option 来执行此操作

>>> import pyspark.pandas as ps
>>> ps.reset_option("display.max_rows")
>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 999)
>>> ps.get_option("display.max_rows")
999
>>> ps.reset_option("display.max_rows")
>>> ps.get_option("display.max_rows")
1000

option_context 上下文管理器已通过顶层 API 公开,允许您使用给定的选项值执行代码。当您退出 with 块时,选项值会自动恢复

>>> with ps.option_context("display.max_rows", 10, "compute.max_rows", 5):
...    print(ps.get_option("display.max_rows"))
...    print(ps.get_option("compute.max_rows"))
10
5
>>> print(ps.get_option("display.max_rows"))
>>> print(ps.get_option("compute.max_rows"))
1000
1000

对不同 DataFrames 的操作

Spark 上的 Pandas API 默认情况下不允许对不同的 DataFrames(或 Series)进行操作,以防止昂贵的操作。它内部执行一个连接操作,通常可能很昂贵。

可以通过将 compute.ops_on_diff_frames 设置为 True 来启用这种情况,以允许此类情况。 请参见下面的示例。

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf1 = ps.range(5)
>>> psdf2 = ps.DataFrame({'id': [5, 4, 3]})
>>> (psdf1 - psdf2).sort_index()
    id
0 -5.0
1 -3.0
2 -1.0
3  NaN
4  NaN
>>> ps.reset_option('compute.ops_on_diff_frames')
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf = ps.range(5)
>>> psser_a = ps.Series([1, 2, 3, 4])
>>> # 'psser_a' is not from 'psdf' DataFrame. So it is considered as a Series not from 'psdf'.
>>> psdf['new_col'] = psser_a
>>> psdf
   id  new_col
0   0      1.0
1   1      2.0
3   3      4.0
2   2      3.0
4   4      NaN
>>> ps.reset_option('compute.ops_on_diff_frames')

默认索引类型

在 Spark 上的 pandas API 中,默认索引在多种情况下使用,例如,当 Spark DataFrame 转换为 Spark 上的 pandas DataFrame 时。 在这种情况下,Spark 上的 pandas API 在内部将默认索引附加到 Spark 上的 pandas DataFrame。

有几种类型的默认索引,可以通过 compute.default_index_type 进行配置,如下所示

sequence:它实现了一个一个递增的序列,通过 PySpark 的 Window 函数而不指定分区。 因此,它可能最终在一个节点中包含整个分区。 当数据量很大时,应避免使用此索引类型。 请参见下面的示例

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Int64Index([0, 1, 2], dtype='int64')

这在概念上等效于下面的 PySpark 示例

>>> from pyspark.sql import functions as sf, Window
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> sequential_index = sf.row_number().over(
...    Window.orderBy(sf.monotonically_increasing_id().asc())) - 1
>>> spark_df.select(sequential_index).rdd.map(lambda r: r[0]).collect()
[0, 1, 2]

distributed-sequence (默认):它实现了一个一个递增的序列,通过分组依据和分组映射方法以分布式方式。 它仍然在全局范围内生成顺序索引。 如果默认索引必须是大型数据集中的序列,则必须使用此索引。 请参见下面的示例

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed-sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Int64Index([0, 1, 2], dtype='int64')

这在概念上等效于下面的 PySpark 示例

>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.rdd.zipWithIndex().map(lambda p: p[1]).collect()
[0, 1, 2]

distributed:它仅通过使用 PySpark 的 monotonically_increasing_id 函数以完全分布式的方式实现单调递增序列。 这些值是不确定的。 如果索引不必是一个一个递增的序列,则应使用此索引。 从性能上讲,与其他索引类型相比,此索引几乎没有任何惩罚。 请参见下面的示例

>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Int64Index([25769803776, 60129542144, 94489280512], dtype='int64')

这在概念上等效于下面的 PySpark 示例

>>> from pyspark.sql import functions as sf
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.select(sf.monotonically_increasing_id()) \
...     .rdd.map(lambda r: r[0]).collect()
[25769803776, 60129542144, 94489280512]

警告

这种类型的索引不太可能用于计算两个不同的数据帧,因为它不能保证在两个数据帧中具有相同的索引。 如果您使用此默认索引并打开 compute.ops_on_diff_frames,由于不确定的索引值,两个不同 DataFrames 之间的操作结果可能是一个意外的输出。

可用选项

选项

默认值

描述

display.max_rows

1000

这设置了 pandas-on-Spark 在打印各种输出时应输出的最大行数。 例如,此值确定要在 dataframe 的 repr() 中显示的行数。 设置 None 以取消限制输入长度。 默认为 1000。

compute.max_rows

1000

‘compute.max_rows’ 设置了当前 pandas-on-Spark DataFrame 的限制。 设置 None 以取消限制输入长度。 设置限制后,它通过将数据收集到驱动程序中,然后使用 pandas API 来通过快捷方式执行。 如果未设置限制,则该操作由 PySpark 执行。 默认为 1000。

compute.shortcut_limit

1000

‘compute.shortcut_limit’ 设置了快捷方式的限制。 它计算指定的行数并使用其模式。 当 dataframe 长度大于此限制时,pandas-on-Spark 使用 PySpark 进行计算。

compute.ops_on_diff_frames

False

这确定是否在两个不同的数据帧之间进行操作。 例如,‘combine_frames’ 函数在内部执行一个连接操作,通常可能很昂贵。 因此,如果 compute.ops_on_diff_frames 变量不为 True,则该方法将引发异常。

compute.default_index_type

‘distributed-sequence’

这设置了默认索引类型:sequence、distributed 和 distributed-sequence。

compute.default_index_cache

‘MEMORY_AND_DISK_SER’

这设置了分布式序列索引中缓存的临时 RDD 的默认存储级别:‘NONE’、‘DISK_ONLY’、‘DISK_ONLY_2’、‘DISK_ONLY_3’、‘MEMORY_ONLY’、‘MEMORY_ONLY_2’、‘MEMORY_ONLY_SER’、‘MEMORY_ONLY_SER_2’、‘MEMORY_AND_DISK’、‘MEMORY_AND_DISK_2’、‘MEMORY_AND_DISK_SER’、‘MEMORY_AND_DISK_SER_2’、‘OFF_HEAP’、‘LOCAL_CHECKPOINT’。

compute.ordered_head

False

‘compute.ordered_head’ 设置是否使用自然排序来操作 head。 pandas-on-Spark 不保证行排序,因此 head 可能会从分布式分区返回一些行。 如果 ‘compute.ordered_head’ 设置为 True,则 pandas-on-Spark 会预先执行自然排序,但这会导致性能开销。

compute.eager_check

True

‘compute.eager_check’ 设置是否启动一些 Spark 作业只是为了进行验证。 如果 ‘compute.eager_check’ 设置为 True,则 pandas-on-Spark 会预先执行验证,但这会导致性能开销。 否则,pandas-on-Spark 将跳过验证,并且与 pandas 略有不同。 受影响的 API:Series.dotSeries.asofSeries.compareFractionalExtensionOps.astypeIntegralExtensionOps.astypeFractionalOps.astypeDecimalOps.astypestatistical functions 的 skipna

compute.isin_limit

80

‘compute.isin_limit’ 设置了通过 ‘Column.isin(list)’ 过滤的限制。 如果 ‘list’ 的长度超过限制,则使用广播连接以获得更好的性能。

plotting.max_rows

1000

‘plotting.max_rows’ 设置了基于 top-n 的图(如 plot.barplot.pie)的视觉限制。 如果设置为 1000,则前 1000 个数据点将用于绘图。 默认为 1000。

plotting.sample_ratio

None

‘plotting.sample_ratio’ 设置了将要绘制的用于基于样本的图(如 plot.lineplot.area)的数据比例。 此选项默认为 ‘plotting.max_rows’ 选项。

plotting.backend

‘plotly’

用于绘图的后端。 默认为 plotly。 支持任何具有顶层 .plot 方法的包。 已知选项有:[matplotlib, plotly]。