使用 Spark Connect 进行应用开发

Spark Connect 概述

在 Apache Spark 3.4 中,Spark Connect 引入了一种解耦的客户端-服务器架构,允许使用 DataFrame API 和未解析的逻辑计划作为协议,远程连接到 Spark 集群。客户端与服务器之间的分离使得 Spark 及其开放生态系统可以从任何地方被利用。它可嵌入到现代数据应用、IDE、Notebooks 和编程语言中。

要了解更多关于 Spark Connect 的信息,请参阅 Spark Connect 概述

使用 Spark Connect 重新定义 Spark 应用程序

凭借其解耦的客户端-服务器架构,Spark Connect 简化了 Spark 应用程序的开发方式。Spark 客户端应用程序 (Spark Client Applications) 和 Spark 服务器库 (Spark Server Libraries) 的概念介绍如下

随着 Spark 3.4 和 Spark Connect 的发布,Spark 客户端应用程序的开发得以简化,并提供了关于如何构建 Spark 服务器库的清晰扩展点和指南,使这两种类型的应用程序都能轻松与 Spark 一同发展。如图1所示,Spark 客户端应用程序使用 Spark Connect API 连接到 Spark,该 API 本质上是 DataFrame API,并且完全是声明式的。

Extending Spark
Connect Diagram

Spark 服务器库扩展了 Spark。它们通常提供与 Spark 集成的额外服务器端逻辑,通过 Spark Connect 扩展点作为 Spark Connect API 的一部分暴露给客户端应用程序。例如,Spark 服务器库包含自定义的服务端逻辑(由标有 Custom Library Plugin 的蓝色框表示),该逻辑通过蓝色框作为 Spark Connect API 的一部分暴露给客户端。客户端使用此 API,例如与 PySpark 或 Spark Scala 客户端一同使用,从而使 Spark 客户端应用程序可以轻松地使用自定义逻辑/库。

Spark API 模式:Spark 客户端和 Spark Classic

Spark 提供了 API 模式,即 spark.api.mode 配置,使 Spark Classic 应用程序能够无缝切换到 Spark Connect。根据 spark.api.mode 的值,应用程序可以在 Spark Classic 模式或 Spark Connect 模式下运行。示例如下

from pyspark.sql import SparkSession

SparkSession.builder.config("spark.api.mode", "connect").master("...").getOrCreate()

在提交应用程序时,您还可以将此配置应用于 Scala 和 PySpark 应用程序

spark-submit --master "..." --conf spark.api.mode=connect

此外,Spark Connect 为本地测试提供了便捷选项。通过将 spark.remote 设置为 local[...]local-cluster[...],您可以启动本地 Spark Connect 服务器并访问 Spark Connect 会话。

这与将 --conf spark.api.mode=connect--master ... 结合使用类似。然而,请注意 spark.remote--remote 仅限于 local* 值,而 --conf spark.api.mode=connect--master ... 结合使用支持额外的集群 URL,例如 spark://,以实现与 Spark Classic 更广泛的兼容性。

Spark 客户端应用程序

Spark 客户端应用程序是 Spark 用户目前开发的常规 Spark 应用程序,例如 ETL 管道、数据准备或模型训练或推理。这些应用程序通常使用 Spark 的声明式 DataFrame 和 DataSet API 构建。使用 Spark Connect 后,核心行为保持不变,但存在一些差异

基于 Spark Connect 的客户端应用程序可以像任何以前的作业一样提交。此外,与使用早期 Spark 版本(3.4 及更早版本)的经典 Spark 应用程序相比,基于 Spark Connect 的 Spark 客户端应用程序具有以下几个优势

Spark 服务器库

在 Spark 3.4 之前,Spark 的扩展(例如 Spark MLSpark-NLP)是像 Spark 客户端应用程序一样构建和部署的。随着 Spark 3.4 和 Spark Connect 的发布,现在提供了显式的扩展点,可以通过 Spark 服务器库来扩展 Spark。这些扩展点提供了可以暴露给客户端的功能,这与 Spark 中现有的扩展点(如 SparkSession 扩展Spark 插件)不同。

入门:使用 Spark 服务器库扩展 Spark

Spark Connect 可用并支持 PySpark 和 Scala 应用程序。我们将逐步介绍如何运行带有 Spark Connect 的 Apache Spark 服务器,并使用 Spark Connect 客户端库从客户端应用程序连接到它。

Spark 服务器库包含以下组件,如图 2 所示

  1. Spark Connect 协议扩展(蓝色框 Proto API)
  2. 一个 Spark Connect 插件。
  3. 扩展 Spark 的应用程序逻辑。
  4. 客户端包,它将 Spark 服务器库应用程序逻辑暴露给 Spark 客户端应用程序,与 PySpark 或 Scala Spark 客户端一同使用。

Extending Spark
Connect Diagram - Labelled Steps

(1) Spark Connect 协议扩展

要使用新的 Spark 服务器库扩展 Spark,开发人员可以扩展 Spark Connect 协议中的三种主要操作类型:RelationExpressionCommand

message Relation {
  oneof rel_type {
    Read read = 1;
    // ...
    google.protobuf.Any extension = 998;
  } 
}

message Expression {
  oneof expr_type {
    Literal literal = 1;
    // ...
    google.protobuf.Any extension = 999;
  } 
}

message Command {
  oneof command_type {
    WriteCommand write_command = 1;
    // ...
    google.protobuf.Any extension = 999;
  } 
} 

它们的扩展字段允许将任意 protobuf 消息序列化为 Spark Connect 协议的一部分。这些消息表示扩展实现的参数或状态。要构建自定义表达式类型,开发人员首先定义表达式的自定义 protobuf 定义。

message ExamplePluginExpression {
  Expression child = 1;
  string custom_field = 2;
}

(2) Spark Connect 插件实现与 (3) 自定义应用程序逻辑

下一步,开发人员根据 protobuf 消息的输入参数,使用自定义应用程序逻辑实现 Spark Connect 的 ExpressionPlugin 类。

class ExampleExpressionPlugin extends ExpressionPlugin {
  override def transform(
      relation: protobuf.Any,
      planner: SparkConnectPlanner): Option[Expression] = {
    // Check if the serialized value of protobuf.Any matches the type
    // of our example expression.
    if (!relation.is(classOf[proto.ExamplePluginExpression])) {
      return None
    }
    val exp = relation.unpack(classOf[proto.ExamplePluginExpression])
    Some(Alias(planner.transformExpression(
        exp.getChild), exp.getCustomField)(explicitMetadata = None))
  }
}

应用程序逻辑开发完成后,代码必须打包为 jar 文件,并且必须配置 Spark 以加载额外的逻辑。相关的 Spark 配置选项如下:

(4) Spark 服务器库客户端包

服务器组件部署后,任何客户端都可以使用正确的 protobuf 消息来使用它。在上面的示例中,发送到 Spark Connect 端点的以下消息负载足以触发扩展机制。

{
  "project": {
    "input": {
      "sql": {
        "query": "select * from samples.nyctaxi.trips"
      }
    },
    "expressions": [
      {
        "extension": {
          "typeUrl": "type.googleapis.com/spark.connect.ExamplePluginExpression",
          "value": "\n\006\022\004\n\002id\022\006testval"
        }
      }
    ]
  }
} 

为了在 Python 中提供此示例,应用程序开发人员提供一个 Python 库,该库封装新表达式并将其嵌入到 PySpark 中。为任何表达式提供函数的最简单方法是接受一个 PySpark 列实例作为参数,并返回一个应用了该表达式的新 Column 实例。

from pyspark.sql.connect.column import Expression
import pyspark.sql.connect.proto as proto

from myxample.proto import ExamplePluginExpression

# Internal class that satisfies the interface by the Python client
# of Spark Connect to generate the protobuf representation from
# an instance of the expression.
class ExampleExpression(Expression):
    def to_plan(self, session) -> proto.Expression:
        fun = proto.Expression()
        plugin = ExamplePluginExpression()
        plugin.child.literal.long = 10
        plugin.custom_field = "example"
        fun.extension.Pack(plugin)
        return fun

# Defining the function to be used from the consumers.
def example_expression(col: Column) -> Column:
    return Column(ExampleExpression())


# Using the expression in the Spark Connect client code.
df = spark.read.table("samples.nyctaxi.trips")
df.select(example_expression(df["fare_amount"])).collect()