为 PySpark 做贡献#
贡献有多种类型,例如,帮助其他用户、测试发布版本、审查更改、贡献文档、报告错误、维护 JIRA、代码更改等。这些内容已在一般指南中记录。本页面重点介绍 PySpark,并包含专门针对 PySpark 的更多详细信息。
通过测试发布版本做出贡献#
在正式发布之前,PySpark 发布候选版本会在 dev@spark.apache.org 邮件列表中分享以供投票。这些发布候选版本可以通过 pip 轻松安装。例如,对于 Spark 3.0.0 RC1,您可以按如下方式安装
pip install https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin/pyspark-3.0.0.tar.gz
发布文件的链接,例如 https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin
,可以在投票帖中找到。
根据发布候选版本测试和验证用户现有的工作负载是对 PySpark 至关重要的贡献之一。它可以在正式发布之前防止破坏用户现有的工作负载。当出现足以导致放弃发布候选版本的问题时,例如回归、正确性问题或性能下降,通常会放弃该发布候选版本,社区会集中精力修复它,以便将其包含在下一个发布候选版本中。
贡献文档更改#
发布文档位于 Spark 的 docs 目录下。README.md 描述了生成文档所需的依赖项和步骤。通常,PySpark 文档会在 docs 目录下通过以下命令进行测试
SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 bundle exec jekyll serve --watch
PySpark 使用 Sphinx 生成其发布的 PySpark 文档。因此,如果您只想单独构建 PySpark 文档,可以在 python/docs 目录下通过以下方式构建
make html
它会在 python/docs/build/html
下生成相应的 HTML 文件。
最后,请确保通过在 python/docs/source/reference
下的相应 RST 文件中手动添加方法和/或类来记录新的 API。否则,它们将不会在 PySpark 文档中记录。
准备贡献代码更改#
在开始在 PySpark 中编写代码之前,建议阅读一般指南。此外,在贡献 PySpark 代码时,还有一些额外的注意事项需要记住
- 遵循 Python 风格
请参阅Python 之禅。
- 与 Scala 和 Java 侧的 API 保持一致
Apache Spark 是一个统一的引擎,提供一致的 API 层。通常,API 在其他语言中也得到一致的支持。
- 可以接受 PySpark 特有的 API
只要它们符合 Python 风格且不与现有 API 冲突,就可以提出 API 请求,例如 UDF 的装饰器用法。
- 如果您扩展或修改公共 API,请调整相应的类型提示
有关详细信息,请参阅贡献和维护类型提示。
如果您正在修复 Spark 上的 pandas API (pyspark.pandas
) 包,请考虑以下设计原则
- 对于大数据返回 pandas-on-Spark 数据结构,对于小数据返回 pandas 数据结构
开发人员经常面临一个问题,即特定函数应该返回 pandas-on-Spark DataFrame/Series 还是 pandas DataFrame/Series。原则是:如果返回的对象可能很大,则使用 pandas-on-Spark DataFrame/Series。如果数据注定很小,则使用 pandas DataFrame/Series。例如,
DataFrame.dtypes
返回一个 pandas Series,因为 DataFrame 中的列数是有限且小的,而DataFrame.head()
或Series.unique()
返回一个 pandas-on-Spark DataFrame/Series,因为结果对象可能很大。
- 为常见数据科学任务提供可发现的 API
冒着过度概括的风险,API 设计有两种方法:第一种侧重于为常见任务提供 API;第二种从抽象开始,通过组合原语使用户能够完成任务。虽然世界并非非黑即白,但 pandas 更多地采用了前一种方法,而 Spark 更多地采用了后一种方法。
一个例子是值计数(按某个键列计数),这是数据科学中最常见的操作之一。pandas 的
DataFrame.value_counts()
以排序顺序返回结果,在 90% 的情况下,这是用户在探索数据时更喜欢的方式,而 Spark 的不进行排序,这在构建数据管道时更可取,因为用户可以通过添加显式的orderBy
来实现 pandas 的行为。类似于 pandas,Spark 上的 pandas API 也应更倾向于前一种方法,为常见数据科学任务提供可发现的 API。在大多数情况下,只需实现 pandas 的 API 即可很好地遵循这一原则。但是,在某些情况下,pandas 的 API 无法满足特定需求,例如大数据绘图。
- 防止用户自作自受的护栏
随着数据规模的扩大,pandas 中的某些操作会非常昂贵,我们不希望让用户误以为他们可以在 Spark 上的 pandas API 中依赖此类操作。也就是说,Spark 上的 pandas API 中实现的方法默认情况下应在大型数据集上安全执行。因此,Spark 上的 pandas API 中未实现以下功能
从根本上不可并行化的功能:例如命令式地遍历每个元素
需要将整个工作集具体化到单个节点内存中的功能。这就是我们不实现 pandas.DataFrame.to_xarray 的原因。另一个例子是
_repr_html_
调用将显示的总记录数限制为最多 1000 条,以防止用户在笔记本中简单地输入 DataFrame 的名称就导致其驱动程序节点崩溃。
然而,也存在一些例外。大数据科学的一个常见模式是,虽然初始数据集很大,但随着分析的深入,工作集会变小。例如,数据科学家经常对数据集执行聚合操作,然后希望将聚合后的数据集转换为某些本地数据结构。为了帮助数据科学家,我们提供了以下功能
DataFrame.to_pandas
: 返回一个 pandas DataFrame(仅限 pandas-on-Spark)DataFrame.to_numpy
: 返回一个 numpy 数组,适用于 pandas 和 Spark 上的 pandas API
请注意,从名称中可以清楚地看出,这些函数返回的本地数据结构需要将数据具体化到单个节点的内存中。对于这些函数,我们还明确地用警告注释进行了说明,指出结果数据结构必须很小。
环境设置#
先决条件#
PySpark 开发需要构建 Spark,而这需要安装合适的 JDK 等。有关更多详细信息,请参阅构建 Spark。
请注意,如果您打算为 Python 中的 Spark Connect 做出贡献,则需要 buf
,有关更多详细信息,请参阅Buf 安装。
Conda#
如果您正在使用 Conda,可以按如下方式设置开发环境。
# Python 3.9+ is required
conda create --name pyspark-dev-env python=3.9
conda activate pyspark-dev-env
pip install --upgrade -r dev/requirements.txt
设置完成后,请确保在开始开发之前切换到 pyspark-dev-env
conda activate pyspark-dev-env
现在,您可以开始开发并运行测试了。
pip#
对于 Python 3.9+,可以使用 pip 如下所示安装和设置开发环境。
pip install --upgrade -r dev/requirements.txt
现在,您可以开始开发并运行测试了。
贡献和维护类型提示#
PySpark 类型提示是内联的,以利用静态类型检查。
经验法则:只有公共 API 才会被标注。
在可能的情况下,注解应
反映底层 JVM API 的预期,以帮助避免 Python 解释器外部的类型相关故障。
如果出现过宽(
Any
)和过窄的参数注解之间的冲突,只要能覆盖大多数典型用例,就优先选择后者。使用
@overload
注解指示不合理的参数组合。例如,指示*Col
和*Cols
参数是互斥的@overload def __init__( self, *, threshold: float = ..., inputCol: Optional[str] = ..., outputCol: Optional[str] = ... ) -> None: ... @overload def __init__( self, *, thresholds: Optional[List[float]] = ..., inputCols: Optional[List[str]] = ..., outputCols: Optional[List[str]] = ... ) -> None: ...
与当前稳定的 MyPy 版本兼容。
复杂的辅助类型定义应放置在专门的 _typing.pyi
存根文件中。例如,请参阅 pyspark.sql._typing.pyi。
可以使用 dev/lint-python
脚本或直接调用 mypy 来验证注解
mypy --config python/mypy.ini python/pyspark
代码和文档字符串指南#
代码约定#
请遵循现有代码库的风格,这实际上是 PEP 8,但有一个例外:行长可以达到 100 个字符,而不是 79 个。
请注意
PySpark 中的方法和变量命名与 Python 自身的
threading
库类似,其 API 受 Java 启发。PySpark 也为与 Scala 和 Java 匹配的公开 API 遵循 camelCase 命名约定。相比之下,
functions.py
使用 snake_case 以使 API 对 SQL(和 Python)更友好。此外,Spark 上的 pandas (
pyspark.pandas
) 也使用 snake_case,因为此包不受与其他语言 API 一致性的限制。
PySpark 利用了诸如 pycodestyle 和 flake8 等代码检查工具,dev/lint-python
会运行这些工具。因此,请务必运行该脚本进行双重检查。
文档字符串约定#
PySpark 遵循 NumPy 文档风格。
Doctest 约定#
通常,doctest 应通过换行符进行逻辑分组。
例如,第一个块用于准备语句,第二个块用于使用特定参数的函数,第三个块用于另一个参数。作为示例,请参阅 pandas 中的 DataFrame.rsub。
这些块在 PySpark doctest 中应始终保持分离,如果 doctest 的覆盖范围或要显示的示例数量不足,则应添加更多 doctest。
贡献错误和异常#
为了抛出标准化的面向用户的错误或异常,开发人员应指定错误类和消息参数,而不是任意的错误消息。
用法#
检查 PySpark 中的错误类中是否已存在合适的错误类。如果存在,请使用该错误类并跳到步骤 3。
将新类添加到 error-conditions.json;请记住下面的不变量。
检查异常类型是否已经扩展了 PySparkException。如果为 true,请跳到步骤 5。
将 PySparkException 混合到异常中。
使用错误类和消息参数抛出异常。
之前
抛出任意错误消息
raise ValueError("Problem A because B")
之后
error-conditions.json
"PROBLEM_BECAUSE": {
"message": ["Problem <problem> because <cause>"]
}
exceptions.py
class PySparkTestError(PySparkException):
def __init__(self, errorClass: str, messageParameters: Dict[str, str]):
super().__init__(errorClass=errorClass, messageParameters=messageParameters)
def getMessageParameters(self) -> Optional[Dict[str, str]]:
return super().getMessageParameters()
使用错误类和消息参数抛出
raise PySparkTestError("PROBLEM_BECAUSE", {"problem": "A", "cause": "B"})
访问字段#
要访问错误字段,请捕获扩展 PySparkException
的异常,并使用 PySparkException.getCondition()
访问错误类。
try:
...
except PySparkException as pe:
if pe.getCondition() == "PROBLEM_BECAUSE":
...
字段#
错误类
错误类是错误类别的简洁、人类可读的表示形式。
未分类的错误可以分配给以 _LEGACY_ERROR_TEMP_ 为前缀和未使用的序列号的传统错误类,例如 _LEGACY_ERROR_TEMP_0053。
不变量
唯一性
跨发布版本一致
按字母顺序排序
消息
错误消息提供了错误的描述性、人类可读的表示形式。消息格式通过 C 风格的 printf 语法接受字符串参数。
错误消息的质量应符合 Apache Spark 错误消息指南
不变量
唯一性