测试 PySpark

本指南是为 PySpark 代码编写可靠测试的参考。

要查看 PySpark 测试工具的文档,请参见此处。 要查看 PySpark 内置测试工具的代码,请在此处查看 Spark 仓库。 要查看 PySpark 测试框架的 JIRA 看板票据,请参见此处。

构建 PySpark 应用程序

以下是如何启动 PySpark 应用程序的示例。 如果你已经有一个准备好测试的应用程序,请随意跳到下一节“测试你的 PySpark 应用程序”。

首先,启动你的 Spark Session。

[3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()

接下来,创建一个 DataFrame。

[5]:
sample_data = [{"name": "John    D.", "age": 30},
  {"name": "Alice   G.", "age": 25},
  {"name": "Bob  T.", "age": 35},
  {"name": "Eve   A.", "age": 28}]

df = spark.createDataFrame(sample_data)

现在,让我们定义一个转换函数并将其应用于我们的 DataFrame。

[7]:
from pyspark.sql.functions import col, regexp_replace

# Remove additional spaces in name
def remove_extra_spaces(df, column_name):
    # Remove extra spaces from the specified column
    df_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))

    return df_transformed

transformed_df = remove_extra_spaces(df, "name")

transformed_df.show()
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+

测试你的 PySpark 应用程序

现在让我们测试我们的 PySpark 转换函数。

一种选择是简单地目测结果 DataFrame。 但是,对于大型 DataFrame 或输入大小,这可能不切实际。

更好的方法是编写测试。 以下是如何测试我们的代码的一些示例。 以下示例适用于 Spark 3.5 及以上版本。

请注意,这些示例并非详尽无遗,因为你可以使用许多其他测试框架来代替 unittestpytest。 内置的 PySpark 测试实用程序函数是独立的,这意味着它们可以与任何测试框架或 CI 测试管道兼容。

选项 1:仅使用 PySpark 内置测试实用程序函数

对于简单的临时验证用例,可以在独立上下文中使用 PySpark 测试实用程序,如 assertDataFrameEqualassertSchemaEqual。 你可以轻松地在 notebook 会话中测试 PySpark 代码。 例如,假设你想断言两个 DataFrame 之间相等

[10]:
import pyspark.testing
from pyspark.testing.utils import assertDataFrameEqual

# Example 1
df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2)  # pass, DataFrames are identical
[11]:
# Example 2
df1 = spark.createDataFrame(data=[("1", 0.1), ("2", 3.23)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 0.109), ("2", 3.23)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2, rtol=1e-1)  # pass, DataFrames are approx equal by rtol

你也可以简单地比较两个 DataFrame 模式

[13]:
from pyspark.testing.utils import assertSchemaEqual
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType

s1 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])
s2 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])

assertSchemaEqual(s1, s2)  # pass, schemas are identical

选项 2:使用 单元测试

对于更复杂的测试场景,你可能需要使用测试框架。

最流行的测试框架选项之一是单元测试。 让我们逐步介绍如何使用内置的 Python unittest 库来编写 PySpark 测试。 有关 unittest 库的更多信息,请参见此处:https://docs.pythonlang.cn/3/library/unittest.html

首先,你需要一个 Spark 会话。 你可以使用 unittest 包中的 @classmethod 装饰器来处理设置和拆卸 Spark 会话。

[15]:
import unittest

class PySparkTestCase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()


    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

现在让我们编写一个 unittest 类。

[17]:
from pyspark.testing.utils import assertDataFrameEqual

class TestTranformation(PySparkTestCase):
    def test_single_space(self):
        sample_data = [{"name": "John    D.", "age": 30},
                       {"name": "Alice   G.", "age": 25},
                       {"name": "Bob  T.", "age": 35},
                       {"name": "Eve   A.", "age": 28}]

        # Create a Spark DataFrame
        original_df = spark.createDataFrame(sample_data)

        # Apply the transformation function from before
        transformed_df = remove_extra_spaces(original_df, "name")

        expected_data = [{"name": "John D.", "age": 30},
        {"name": "Alice G.", "age": 25},
        {"name": "Bob T.", "age": 35},
        {"name": "Eve A.", "age": 28}]

        expected_df = spark.createDataFrame(expected_data)

        assertDataFrameEqual(transformed_df, expected_df)

运行时,unittest 将选择所有名称以“test”开头的函数。

选项 3:使用 Pytest

我们也可以使用 pytest 编写测试,它是最流行的 Python 测试框架之一。 有关 pytest 的更多信息,请在此处查看文档:https://pytest.cn/en/7.1.x/contents.html

使用 pytest fixture 允许我们在测试之间共享一个 spark 会话,并在测试完成后将其拆除。

[20]:
import pytest

@pytest.fixture
def spark_fixture():
    spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()
    yield spark

然后我们可以像这样定义我们的测试

[22]:
import pytest
from pyspark.testing.utils import assertDataFrameEqual

def test_single_space(spark_fixture):
    sample_data = [{"name": "John    D.", "age": 30},
                   {"name": "Alice   G.", "age": 25},
                   {"name": "Bob  T.", "age": 35},
                   {"name": "Eve   A.", "age": 28}]

    # Create a Spark DataFrame
    original_df = spark.createDataFrame(sample_data)

    # Apply the transformation function from before
    transformed_df = remove_extra_spaces(original_df, "name")

    expected_data = [{"name": "John D.", "age": 30},
    {"name": "Alice G.", "age": 25},
    {"name": "Bob T.", "age": 35},
    {"name": "Eve A.", "age": 28}]

    expected_df = spark.createDataFrame(expected_data)

    assertDataFrameEqual(transformed_df, expected_df)

当你使用 pytest 命令运行测试文件时,它将选择所有名称以“test”开头的函数。

整合所有内容!

让我们一起看看所有步骤,在一个单元测试示例中。

[25]:
# pkg/etl.py
import unittest

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.testing.utils import assertDataFrameEqual

# Create a SparkSession
spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()

sample_data = [{"name": "John    D.", "age": 30},
  {"name": "Alice   G.", "age": 25},
  {"name": "Bob  T.", "age": 35},
  {"name": "Eve   A.", "age": 28}]

df = spark.createDataFrame(sample_data)

# Define DataFrame transformation function
def remove_extra_spaces(df, column_name):
    # Remove extra spaces from the specified column using regexp_replace
    df_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))

    return df_transformed
[26]:
# pkg/test_etl.py
import unittest

from pyspark.sql import SparkSession

# Define unit test base class
class PySparkTestCase(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()

    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

# Define unit test
class TestTranformation(PySparkTestCase):
    def test_single_space(self):
        sample_data = [{"name": "John    D.", "age": 30},
                        {"name": "Alice   G.", "age": 25},
                        {"name": "Bob  T.", "age": 35},
                        {"name": "Eve   A.", "age": 28}]

        # Create a Spark DataFrame
        original_df = spark.createDataFrame(sample_data)

        # Apply the transformation function from before
        transformed_df = remove_extra_spaces(original_df, "name")

        expected_data = [{"name": "John D.", "age": 30},
        {"name": "Alice G.", "age": 25},
        {"name": "Bob T.", "age": 35},
        {"name": "Eve A.", "age": 28}]

        expected_df = spark.createDataFrame(expected_data)

        assertDataFrameEqual(transformed_df, expected_df)
[27]:
unittest.main(argv=[''], verbosity=0, exit=False)
Ran 1 test in 1.734s

OK
[27]:
<unittest.main.TestProgram at 0x174539db0>