选项和设置#
Spark 上的 Pandas API 具有一个选项系统,允许您自定义其行为的某些方面,其中与显示相关的选项是用户最可能调整的。
选项具有完整的“点分样式”名称,不区分大小写(例如 display.max_rows
)。您可以直接将选项作为顶级 options
属性的属性来获取/设置
>>> import pyspark.pandas as ps
>>> ps.options.display.max_rows
1000
>>> ps.options.display.max_rows = 10
>>> ps.options.display.max_rows
10
该 API 由 3 个相关函数组成,可直接从 pandas_on_spark
命名空间中获取
get_option()
/set_option()
- 获取/设置单个选项的值。reset_option()
- 将一个或多个选项重置为其默认值。
注意:开发者可以查看 pyspark.pandas/config.py 获取更多信息。
>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 101)
>>> ps.get_option("display.max_rows")
101
获取和设置选项#
如上所述,get_option()
和 set_option()
可从 pandas_on_spark
命名空间获取。要更改选项,请调用 set_option('option name', new_value)
。
>>> import pyspark.pandas as ps
>>> ps.get_option('compute.max_rows')
1000
>>> ps.set_option('compute.max_rows', 2000)
>>> ps.get_option('compute.max_rows')
2000
所有选项都有一个默认值,您可以使用 reset_option
来恢复默认值
>>> import pyspark.pandas as ps
>>> ps.reset_option("display.max_rows")
>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 999)
>>> ps.get_option("display.max_rows")
999
>>> ps.reset_option("display.max_rows")
>>> ps.get_option("display.max_rows")
1000
option_context
上下文管理器已通过顶层 API 公开,允许您使用给定选项值执行代码。当您退出 with 块时,选项值将自动恢复。
>>> with ps.option_context("display.max_rows", 10, "compute.max_rows", 5):
... print(ps.get_option("display.max_rows"))
... print(ps.get_option("compute.max_rows"))
10
5
>>> print(ps.get_option("display.max_rows"))
>>> print(ps.get_option("compute.max_rows"))
1000
1000
对不同 DataFrames 的操作#
Spark 上的 Pandas API 默认不允许对不同 DataFrames(或 Series)进行操作,以防止昂贵的操作。它内部执行连接操作,这通常会很昂贵。
可以通过将 compute.ops_on_diff_frames 设置为 True 来启用这种情况。请参阅下面的示例。
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf1 = ps.range(5)
>>> psdf2 = ps.DataFrame({'id': [5, 4, 3]})
>>> (psdf1 - psdf2).sort_index()
id
0 -5.0
1 -3.0
2 -1.0
3 NaN
4 NaN
>>> ps.reset_option('compute.ops_on_diff_frames')
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf = ps.range(5)
>>> psser_a = ps.Series([1, 2, 3, 4])
>>> # 'psser_a' is not from 'psdf' DataFrame. So it is considered as a Series not from 'psdf'.
>>> psdf['new_col'] = psser_a
>>> psdf
id new_col
0 0 1.0
1 1 2.0
3 3 4.0
2 2 3.0
4 4 NaN
>>> ps.reset_option('compute.ops_on_diff_frames')
默认索引类型#
在 Spark 上的 pandas API 中,默认索引在多种情况下使用,例如,当 Spark DataFrame 转换为 Spark 上的 pandas DataFrame 时。在这种情况下,Spark 上的 pandas API 内部会将一个默认索引附加到 Spark 上的 pandas DataFrame 中。
默认索引有几种类型,可以通过 compute.default_index_type 配置,如下所示
sequence(序列):它通过 PySpark 的 Window 函数实现一个递增序列,且不指定分区。因此,它可能导致整个分区都在单个节点上。当数据量大时应避免使用此索引类型。请参阅下面的示例
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Index([0, 1, 2], dtype='int64')
这在概念上等同于下面的 PySpark 示例
>>> from pyspark.sql import functions as sf, Window
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> sequential_index = sf.row_number().over(
... Window.orderBy(sf.monotonically_increasing_id().asc())) - 1
>>> spark_df.select(sequential_index).rdd.map(lambda r: r[0]).collect()
[0, 1, 2]
distributed-sequence(分布式序列,默认):它通过 group-by 和 group-map 方法以分布式方式实现一个递增序列。它仍然全局生成顺序索引。如果默认索引在大型数据集中必须是顺序的,则必须使用此索引。请参阅下面的示例
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed-sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Index([0, 1, 2], dtype='int64')
这在概念上等同于下面的 PySpark 示例
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.rdd.zipWithIndex().map(lambda p: p[1]).collect()
[0, 1, 2]
警告
与 sequence
不同,由于 distributed-sequence
在分布式环境中执行,尽管索引本身仍然是全局顺序的,但每个索引对应的行可能会有所不同。
发生这种情况是因为行分布在多个分区和节点中,导致数据加载时行到索引的映射不确定。
此外,当使用诸如 apply()
、groupby()
或 transform()
等操作时,可能会生成新的 distributed-sequence
索引,它不一定与 DataFrame 的原始索引匹配。这可能导致行到索引的映射错位,从而导致计算不正确。
为避免此问题,请参阅 处理分布式序列中的索引错位
distributed(分布式):它通过使用 PySpark 的 monotonically_increasing_id 函数以完全分布式方式实现单调递增序列。值是不确定的。如果索引不必是逐个递增的序列,则应使用此索引。在性能方面,与其他索引类型相比,此索引几乎没有开销。请参阅下面的示例
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Index([25769803776, 60129542144, 94489280512], dtype='int64')
这在概念上等同于下面的 PySpark 示例
>>> from pyspark.sql import functions as sf
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.select(sf.monotonically_increasing_id()) \
... .rdd.map(lambda r: r[0]).collect()
[25769803776, 60129542144, 94489280512]
警告
这种索引类型不太可能用于计算两个不同的数据框,因为它不能保证在两个数据框中具有相同的索引。如果您使用此默认索引并打开 compute.ops_on_diff_frames,则由于索引值不确定,两个不同 DataFrame 之间的操作结果可能会是意外的输出。
可用选项#
选项 |
默认值 |
描述 |
---|---|---|
display.max_rows |
1000 |
这设置了 Spark 上的 pandas 在打印各种输出时应输出的最大行数。例如,此值决定了数据框中 repr() 显示的行数。设置为 None 以不限制输入长度。默认值为 1000。 |
compute.max_rows |
1000 |
‘compute.max_rows’ 设置当前 Spark 上的 pandas DataFrame 的限制。设置为 None 以不限制输入长度。设置限制时,通过将数据收集到驱动程序,然后使用 pandas API 来执行快捷操作。如果未设置限制,则操作由 PySpark 执行。默认值为 1000。 |
compute.shortcut_limit |
1000 |
‘compute.shortcut_limit’ 设置快捷方式的限制。它计算指定行数并使用其模式。当数据框长度大于此限制时,Spark 上的 pandas 使用 PySpark 进行计算。 |
compute.ops_on_diff_frames |
真 |
这决定了是否在两个不同的数据框之间进行操作。例如,‘combine_frames’ 函数内部执行连接操作,这通常会很昂贵。因此,如果 compute.ops_on_diff_frames 变量不为 True,则该方法会抛出异常。 |
compute.default_index_type |
‘distributed-sequence’ |
这设置了默认索引类型:sequence、distributed 和 distributed-sequence。 |
compute.default_index_cache |
‘MEMORY_AND_DISK_SER’ |
这设置了分布式序列索引中缓存的临时 RDD 的默认存储级别:‘NONE’、‘DISK_ONLY’、‘DISK_ONLY_2’、‘DISK_ONLY_3’、‘MEMORY_ONLY’、‘MEMORY_ONLY_2’、‘MEMORY_ONLY_SER’、‘MEMORY_ONLY_SER_2’、‘MEMORY_AND_DISK’、‘MEMORY_AND_DISK_2’、‘MEMORY_AND_DISK_SER’、‘MEMORY_AND_DISK_SER_2’、‘OFF_HEAP’、‘LOCAL_CHECKPOINT’。 |
compute.ordered_head |
假 |
‘compute.ordered_head’ 设置是否以自然排序操作 head。Spark 上的 pandas 不保证行顺序,因此 head 可能会从分布式分区返回一些行。如果将 ‘compute.ordered_head’ 设置为 True,Spark 上的 pandas 会预先执行自然排序,但这会带来性能开销。 |
compute.eager_check |
真 |
‘compute.eager_check’ 设置是否仅为了验证而启动某些 Spark 作业。如果将 ‘compute.eager_check’ 设置为 True,Spark 上的 pandas 会预先执行验证,但这会带来性能开销。否则,Spark 上的 pandas 将跳过验证,并且与 pandas 略有不同。受影响的 API 包括:Series.dot、Series.asof、Series.compare、FractionalExtensionOps.astype、IntegralExtensionOps.astype、FractionalOps.astype、DecimalOps.astype、统计函数中的 skipna。 |
compute.isin_limit |
80 |
‘compute.isin_limit’ 设置通过 ‘Column.isin(list)’ 进行过滤的限制。如果 ‘list’ 的长度超过限制,将改用广播连接以获得更好的性能。 |
compute.pandas_fallback |
假 |
‘compute.pandas_fallback’ 设置是否自动回退到 Pandas 的实现。 |
compute.fail_on_ansi_mode |
真 |
‘compute.fail_on_ansi_mode’ 设置是否与 ANSI 模式一起工作。如果为 True,如果底层 Spark 在启用 ANSI 模式下工作,Spark 上的 pandas API 将引发异常;否则,它会强制工作,尽管可能导致意外行为。 |
plotting.max_rows |
1000 |
‘plotting.max_rows’ 设置基于 top-n 的图表(例如 plot.bar 和 plot.pie)的视觉限制。如果设置为 1000,则前 1000 个数据点将用于绘图。默认值为 1000。 |
plotting.sample_ratio |
无 |
‘plotting.sample_ratio’ 设置用于基于样本的图表(例如 plot.line 和 plot.area)的数据比例。如果未设置,则通过计算 ‘plotting.max_rows’ 与总数据大小的比率,从 ‘plotting.max_rows’ 派生。 |
plotting.backend |
‘plotly’ |
用于绘图的后端。默认是 plotly。支持任何具有顶级 .plot 方法的包。已知选项包括:[matplotlib, plotly]。 |