选项和设置¶
Spark 上的 Pandas API 具有一个选项系统,可用于自定义其某些方面的行为,其中显示相关的选项是用户最有可能调整的选项。
选项具有完整的“点式”大小写不敏感的名称(例如 display.max_rows
)。您可以直接作为顶层 options
属性的属性来获取/设置选项
>>> import pyspark.pandas as ps
>>> ps.options.display.max_rows
1000
>>> ps.options.display.max_rows = 10
>>> ps.options.display.max_rows
10
该 API 由 3 个相关函数组成,可直接从 pandas_on_spark
命名空间获得
get_option()
/set_option()
- 获取/设置单个选项的值。reset_option()
- 将一个或多个选项重置为其默认值。
注意: 开发人员可以查看 pyspark.pandas/config.py 以获取更多信息。
>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 101)
>>> ps.get_option("display.max_rows")
101
获取和设置选项¶
如上所述,get_option()
和 set_option()
可从 pandas_on_spark
命名空间获得。要更改选项,请调用 set_option('option name', new_value)
。
>>> import pyspark.pandas as ps
>>> ps.get_option('compute.max_rows')
1000
>>> ps.set_option('compute.max_rows', 2000)
>>> ps.get_option('compute.max_rows')
2000
所有选项也都有一个默认值,您可以使用 reset_option
来执行此操作
>>> import pyspark.pandas as ps
>>> ps.reset_option("display.max_rows")
>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 999)
>>> ps.get_option("display.max_rows")
999
>>> ps.reset_option("display.max_rows")
>>> ps.get_option("display.max_rows")
1000
option_context
上下文管理器已通过顶层 API 公开,允许您使用给定的选项值执行代码。当您退出 with 块时,选项值会自动恢复
>>> with ps.option_context("display.max_rows", 10, "compute.max_rows", 5):
... print(ps.get_option("display.max_rows"))
... print(ps.get_option("compute.max_rows"))
10
5
>>> print(ps.get_option("display.max_rows"))
>>> print(ps.get_option("compute.max_rows"))
1000
1000
对不同 DataFrames 的操作¶
Spark 上的 Pandas API 默认情况下不允许对不同的 DataFrames(或 Series)进行操作,以防止昂贵的操作。它内部执行一个连接操作,通常可能很昂贵。
可以通过将 compute.ops_on_diff_frames 设置为 True 来启用这种情况,以允许此类情况。 请参见下面的示例。
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf1 = ps.range(5)
>>> psdf2 = ps.DataFrame({'id': [5, 4, 3]})
>>> (psdf1 - psdf2).sort_index()
id
0 -5.0
1 -3.0
2 -1.0
3 NaN
4 NaN
>>> ps.reset_option('compute.ops_on_diff_frames')
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf = ps.range(5)
>>> psser_a = ps.Series([1, 2, 3, 4])
>>> # 'psser_a' is not from 'psdf' DataFrame. So it is considered as a Series not from 'psdf'.
>>> psdf['new_col'] = psser_a
>>> psdf
id new_col
0 0 1.0
1 1 2.0
3 3 4.0
2 2 3.0
4 4 NaN
>>> ps.reset_option('compute.ops_on_diff_frames')
默认索引类型¶
在 Spark 上的 pandas API 中,默认索引在多种情况下使用,例如,当 Spark DataFrame 转换为 Spark 上的 pandas DataFrame 时。 在这种情况下,Spark 上的 pandas API 在内部将默认索引附加到 Spark 上的 pandas DataFrame。
有几种类型的默认索引,可以通过 compute.default_index_type 进行配置,如下所示
sequence:它实现了一个一个递增的序列,通过 PySpark 的 Window 函数而不指定分区。 因此,它可能最终在一个节点中包含整个分区。 当数据量很大时,应避免使用此索引类型。 请参见下面的示例
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Int64Index([0, 1, 2], dtype='int64')
这在概念上等效于下面的 PySpark 示例
>>> from pyspark.sql import functions as sf, Window
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> sequential_index = sf.row_number().over(
... Window.orderBy(sf.monotonically_increasing_id().asc())) - 1
>>> spark_df.select(sequential_index).rdd.map(lambda r: r[0]).collect()
[0, 1, 2]
distributed-sequence (默认):它实现了一个一个递增的序列,通过分组依据和分组映射方法以分布式方式。 它仍然在全局范围内生成顺序索引。 如果默认索引必须是大型数据集中的序列,则必须使用此索引。 请参见下面的示例
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed-sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Int64Index([0, 1, 2], dtype='int64')
这在概念上等效于下面的 PySpark 示例
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.rdd.zipWithIndex().map(lambda p: p[1]).collect()
[0, 1, 2]
distributed:它仅通过使用 PySpark 的 monotonically_increasing_id 函数以完全分布式的方式实现单调递增序列。 这些值是不确定的。 如果索引不必是一个一个递增的序列,则应使用此索引。 从性能上讲,与其他索引类型相比,此索引几乎没有任何惩罚。 请参见下面的示例
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Int64Index([25769803776, 60129542144, 94489280512], dtype='int64')
这在概念上等效于下面的 PySpark 示例
>>> from pyspark.sql import functions as sf
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.select(sf.monotonically_increasing_id()) \
... .rdd.map(lambda r: r[0]).collect()
[25769803776, 60129542144, 94489280512]
警告
这种类型的索引不太可能用于计算两个不同的数据帧,因为它不能保证在两个数据帧中具有相同的索引。 如果您使用此默认索引并打开 compute.ops_on_diff_frames,由于不确定的索引值,两个不同 DataFrames 之间的操作结果可能是一个意外的输出。
可用选项¶
选项 |
默认值 |
描述 |
---|---|---|
display.max_rows |
1000 |
这设置了 pandas-on-Spark 在打印各种输出时应输出的最大行数。 例如,此值确定要在 dataframe 的 repr() 中显示的行数。 设置 None 以取消限制输入长度。 默认为 1000。 |
compute.max_rows |
1000 |
‘compute.max_rows’ 设置了当前 pandas-on-Spark DataFrame 的限制。 设置 None 以取消限制输入长度。 设置限制后,它通过将数据收集到驱动程序中,然后使用 pandas API 来通过快捷方式执行。 如果未设置限制,则该操作由 PySpark 执行。 默认为 1000。 |
compute.shortcut_limit |
1000 |
‘compute.shortcut_limit’ 设置了快捷方式的限制。 它计算指定的行数并使用其模式。 当 dataframe 长度大于此限制时,pandas-on-Spark 使用 PySpark 进行计算。 |
compute.ops_on_diff_frames |
False |
这确定是否在两个不同的数据帧之间进行操作。 例如,‘combine_frames’ 函数在内部执行一个连接操作,通常可能很昂贵。 因此,如果 compute.ops_on_diff_frames 变量不为 True,则该方法将引发异常。 |
compute.default_index_type |
‘distributed-sequence’ |
这设置了默认索引类型:sequence、distributed 和 distributed-sequence。 |
compute.default_index_cache |
‘MEMORY_AND_DISK_SER’ |
这设置了分布式序列索引中缓存的临时 RDD 的默认存储级别:‘NONE’、‘DISK_ONLY’、‘DISK_ONLY_2’、‘DISK_ONLY_3’、‘MEMORY_ONLY’、‘MEMORY_ONLY_2’、‘MEMORY_ONLY_SER’、‘MEMORY_ONLY_SER_2’、‘MEMORY_AND_DISK’、‘MEMORY_AND_DISK_2’、‘MEMORY_AND_DISK_SER’、‘MEMORY_AND_DISK_SER_2’、‘OFF_HEAP’、‘LOCAL_CHECKPOINT’。 |
compute.ordered_head |
False |
‘compute.ordered_head’ 设置是否使用自然排序来操作 head。 pandas-on-Spark 不保证行排序,因此 head 可能会从分布式分区返回一些行。 如果 ‘compute.ordered_head’ 设置为 True,则 pandas-on-Spark 会预先执行自然排序,但这会导致性能开销。 |
compute.eager_check |
True |
‘compute.eager_check’ 设置是否启动一些 Spark 作业只是为了进行验证。 如果 ‘compute.eager_check’ 设置为 True,则 pandas-on-Spark 会预先执行验证,但这会导致性能开销。 否则,pandas-on-Spark 将跳过验证,并且与 pandas 略有不同。 受影响的 API:Series.dot、Series.asof、Series.compare、FractionalExtensionOps.astype、IntegralExtensionOps.astype、FractionalOps.astype、DecimalOps.astype、statistical functions 的 skipna。 |
compute.isin_limit |
80 |
‘compute.isin_limit’ 设置了通过 ‘Column.isin(list)’ 过滤的限制。 如果 ‘list’ 的长度超过限制,则使用广播连接以获得更好的性能。 |
plotting.max_rows |
1000 |
‘plotting.max_rows’ 设置了基于 top-n 的图(如 plot.bar 和 plot.pie)的视觉限制。 如果设置为 1000,则前 1000 个数据点将用于绘图。 默认为 1000。 |
plotting.sample_ratio |
None |
‘plotting.sample_ratio’ 设置了将要绘制的用于基于样本的图(如 plot.line 和 plot.area)的数据比例。 此选项默认为 ‘plotting.max_rows’ 选项。 |
plotting.backend |
‘plotly’ |
用于绘图的后端。 默认为 plotly。 支持任何具有顶层 .plot 方法的包。 已知选项有:[matplotlib, plotly]。 |