贡献 PySpark¶
有很多类型的贡献,例如,帮助其他用户、测试版本、审查更改、文档贡献、错误报告、JIRA 维护、代码更改等。这些都记录在通用指南中。此页面专注于 PySpark,并包括专门针对 PySpark 的其他详细信息。
通过测试版本贡献¶
在正式发布之前,PySpark 的候选版本会在 dev@spark.apache.org 邮件列表中共享以供投票。这些候选版本可以通过 pip 轻松安装。例如,在 Spark 3.0.0 RC1 的情况下,您可以如下安装
pip install https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin/pyspark-3.0.0.tar.gz
发行版文件的链接(例如 https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin
)可以在投票线程中找到。
测试和验证用户现有的工作负载与候选版本的兼容性是对 PySpark 的重要贡献之一。它可以在正式发布之前防止破坏用户现有的工作负载。当出现诸如回归、正确性问题或性能下降等问题,足以放弃候选版本时,通常会放弃该候选版本,并且社区专注于修复它,以便包含在下一个候选版本中。
贡献文档更改¶
发布文档位于 Spark 的 docs 目录下。README.md 描述了生成文档所需的依赖项和步骤。通常,PySpark 文档使用以下命令在 docs 目录下进行测试
SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 bundle exec jekyll serve --watch
PySpark 使用 Sphinx 生成其发布的 PySpark 文档。因此,如果您只想单独构建 PySpark 文档,则可以通过以下方式在 python/docs 目录下进行构建
make html
它会在 python/docs/build/html
下生成相应的 HTML。
最后,请确保通过在 python/docs/source/reference
下的相应 RST 文件中手动添加方法和/或类来记录新的 API。否则,它们将不会记录在 PySpark 文档中。
准备贡献代码更改¶
在开始处理 PySpark 中的代码之前,建议阅读通用指南。此外,在贡献 PySpark 中的代码时,还需要注意一些其他事项
- 遵循 Pythonic 风格
请参阅 Python 之禅。
- API 与 Scala 和 Java 侧保持一致
Apache Spark 是一个统一的引擎,提供一致的 API 层。通常,这些 API 在其他语言中得到一致的支持。
- 可以接受特定于 PySpark 的 API
只要它们是 Pythonic 风格并且不与其他现有 API 冲突,就可以提出 API 请求,例如,UDF 的装饰器用法。
- 如果您扩展或修改公共 API,请调整相应的类型提示
有关详细信息,请参阅贡献和维护类型提示。
如果您正在修复 Spark 上 pandas API (pyspark.pandas
) 包,请考虑以下设计原则
- 对于大数据,返回 pandas-on-Spark 数据结构;对于小数据,返回 pandas 数据结构
开发人员经常面临一个问题:某个特定函数应该返回 pandas-on-Spark DataFrame/Series,还是 pandas DataFrame/Series。 原则是:如果返回的对象可能很大,则使用 pandas-on-Spark DataFrame/Series。如果数据注定很小,则使用 pandas DataFrame/Series。例如,
DataFrame.dtypes
返回一个 pandas Series,因为 DataFrame 中的列数是有限且较小的,而DataFrame.head()
或Series.unique()
返回一个 pandas-on-Spark DataFrame/Series,因为结果对象可能很大。
- 为常见的数据科学任务提供可发现的 API
在过度概括的风险下,有两种 API 设计方法:第一种侧重于为常见任务提供 API;第二种从抽象开始,并使用户能够通过组合原语来完成其任务。虽然世界不是非黑即白的,但 pandas 更多地采用前一种方法,而 Spark 更多地采用后一种方法。
一个例子是值计数(按某些键列计数),这是数据科学中最常见的操作之一。 pandas
DataFrame.value_counts()
以排序的顺序返回结果,这在 90% 的情况下是用户在探索数据时更喜欢的,而 Spark 的则不排序,这在构建数据管道时更可取,因为用户可以通过添加显式的orderBy
来完成 pandas 的行为。与 pandas 类似,Spark 上的 pandas API 也应该更倾向于前者,为常见的数据科学任务提供可发现的 API。在大多数情况下,只需实现 pandas 的 API 就可以很好地解决此问题。但是,在某些情况下,pandas 的 API 无法满足特定需求,例如大数据绘图。
- 保护措施,防止用户搬起石头砸自己的脚
pandas 中的某些操作随着数据规模的扩大而变得非常昂贵,我们不希望给用户一种他们可以在 Spark 上的 pandas API 中依赖这些操作的错觉。也就是说,在 Spark 上的 pandas API 中实现的方法应该默认在大型数据集上安全执行。因此,Spark 上的 pandas API 中未实现以下功能
从根本上无法并行化的功能:例如,命令式地循环遍历每个元素
需要在单个节点的内存中实现整个工作集的功能。这就是我们不实现 pandas.DataFrame.to_xarray 的原因。另一个例子是
_repr_html_
调用将显示的记录总数限制为最多 1000 条,以防止用户仅仅通过在笔记本中键入 DataFrame 的名称就炸毁他们的驱动程序节点。
但是,存在一些例外。 “大数据科学”的一种常见模式是,虽然初始数据集很大,但随着分析的深入,工作集会变小。例如,数据科学家经常对数据集执行聚合,然后希望将聚合的数据集转换为某些本地数据结构。为了帮助数据科学家,我们提供以下内容
DataFrame.to_pandas
:返回 pandas DataFrame(仅适用于 pandas-on-Spark)DataFrame.to_numpy
:返回一个 numpy 数组,适用于 pandas 和 Spark 上的 pandas API
请注意,从名称可以清楚地看出,这些函数返回一些需要在单个节点的内存中实现数据的本地数据结构。 对于这些函数,我们还明确地用警告说明最终的数据结构必须很小。
环境设置¶
先决条件¶
PySpark 开发需要构建 Spark,这需要安装适当的 JDK 等。有关更多详细信息,请参阅 构建 Spark。
请注意,如果您打算为 Python 中的 Spark Connect 做出贡献,则需要 buf
版本 1.24.0
,有关更多详细信息,请参阅 Buf 安装。
贡献和维护类型提示¶
PySpark 类型提示是内联的,以便利用静态类型检查。
根据经验,只有公共 API 才会被注释。
注释应该尽可能
反映底层 JVM API 的预期,以帮助避免 Python 解释器之外的类型相关故障。
如果过于宽泛 (
Any
) 和过于狭窄的参数注释之间存在冲突,则首选后者,只要它涵盖了大多数典型用例。使用
@overload
注释指示无意义的参数组合。 例如,指示*Col
和*Cols
参数互斥@overload def __init__( self, *, threshold: float = ..., inputCol: Optional[str] = ..., outputCol: Optional[str] = ... ) -> None: ... @overload def __init__( self, *, thresholds: Optional[List[float]] = ..., inputCols: Optional[List[str]] = ..., outputCols: Optional[List[str]] = ... ) -> None: ...
与当前稳定的 MyPy 版本兼容。
复杂的支持类型定义应放置在专用的 _typing.pyi
存根中。 例如,请参阅 pyspark.sql._typing.pyi。
可以使用 dev/lint-python
脚本或直接调用 mypy 来验证注释
mypy --config python/mypy.ini python/pyspark
代码和文档字符串指南¶
代码约定¶
请遵循现有代码库的风格,实际上是 PEP 8,但有一个例外:行长可以达到 100 个字符,而不是 79 个字符。
请注意
PySpark 中的方法和变量命名与 Python 本身的
threading
库类似,因为它们的 API 灵感来源于 Java。PySpark 还遵循 camelCase 命名规范,以便与 Scala 和 Java 保持一致。相反,
functions.py
使用 snake_case 命名规范,以便 API 更易于 SQL (和 Python) 使用。此外,pandas-on-Spark (
pyspark.pandas
) 也使用 snake_case 命名规范,因为此包不受与其他语言 API 一致性的约束。
PySpark 利用诸如 pycodestyle 和 flake8 之类的 lint 工具,dev/lint-python
脚本会运行这些工具。因此,请务必运行该脚本进行双重检查。
Docstring 规范¶
PySpark 遵循 NumPy 文档风格。
Doctest 规范¶
通常,doctest 应通过换行符按逻辑分组。
例如,第一个块用于准备语句,第二个块用于使用带有特定参数的函数,第三个块用于另一个参数。例如,请参阅 pandas 中的 DataFrame.rsub。
这些块应在 PySpark doctest 中保持一致的分隔,如果 doctest 的覆盖范围或要显示的示例数量不足,则应添加更多 doctest。
贡献错误和异常¶
为了抛出标准化的面向用户的错误或异常,开发人员应指定错误类和消息参数,而不是任意错误消息。
用法¶
检查 error_classes.py 中是否已存在适当的错误类。如果存在,则使用该错误类并跳到步骤 3。
将新类添加到 error_classes.py;请记住以下不变式。
检查异常类型是否已扩展 PySparkException。如果存在,则跳到步骤 5。
将 PySparkException 混入到异常中。
使用错误类和消息参数抛出异常。
之前
抛出带有任意错误消息
raise ValueError("Problem A because B")
之后
error_classes.py
"PROBLEM_BECAUSE": {
"message": ["Problem <problem> because <cause>"]
}
exceptions.py
class PySparkTestError(PySparkException):
def __init__(self, error_class: str, message_parameters: Dict[str, str]):
super().__init__(error_class=error_class, message_parameters=message_parameters)
def getMessageParameters(self) -> Optional[Dict[str, str]]:
return super().getMessageParameters()
使用错误类和消息参数抛出
raise PySparkTestError("PROBLEM_BECAUSE", {"problem": "A", "cause": "B"})
访问字段¶
要访问错误字段,请捕获扩展 PySparkException
的异常,并通过 PySparkException.getErrorClass()
访问错误类。
try:
...
except PySparkException as pe:
if pe.getErrorClass() == "PROBLEM_BECAUSE":
...
字段¶
错误类
错误类是对错误类别的一种简洁、易于理解的表示。
未分类的错误可以分配给带有前缀 _LEGACY_ERROR_TEMP_ 和未使用的序列号的旧错误类,例如 _LEGACY_ERROR_TEMP_0053。
不变式
唯一
跨版本一致
按字母顺序排序
消息
错误消息提供了对错误的描述性、易于理解的表示。消息格式通过 C 风格的 printf 语法接受字符串参数。
错误消息的质量应符合 Apache Spark 错误消息指南
不变式
唯一