Spark Connect 概述

构建客户端 Spark 应用程序

在 Apache Spark 3.4 中,Spark Connect 引入了一种解耦的客户端-服务器架构,允许使用 DataFrame API 和未解析的逻辑计划作为协议远程连接到 Spark 集群。客户端和服务器之间的分离允许从任何地方利用 Spark 及其开放生态系统。它可以嵌入到现代数据应用程序、IDE、笔记本和编程语言中。

要开始使用,请参阅 快速入门:Spark Connect

Spark Connect API Diagram

Spark Connect 的工作原理

Spark Connect 客户端库旨在简化 Spark 应用程序开发。它是一个轻量级的 API,可以嵌入到任何地方:应用程序服务器、IDE、笔记本和编程语言中。Spark Connect API 基于 Spark 的 DataFrame API,使用未解析的逻辑计划作为客户端和 Spark 驱动程序之间与语言无关的协议。

Spark Connect 客户端将 DataFrame 操作转换为未解析的逻辑查询计划,这些计划使用协议缓冲区进行编码。这些计划使用 gRPC 框架发送到服务器。

嵌入在 Spark 服务器上的 Spark Connect 端点接收并转换未解析的逻辑计划为 Spark 的逻辑计划操作符。这类似于解析 SQL 查询,其中属性和关系被解析并构建初始解析计划。从那里开始,标准的 Spark 执行过程开始,确保 Spark Connect 利用了 Spark 的所有优化和增强功能。结果通过 gRPC 作为 Apache Arrow 编码的行批次流回客户端。

Spark Connect communication

Spark Connect 的操作优势

凭借这种新的架构,Spark Connect 缓解了多个租户操作中的几个问题

稳定性:使用过多内存的应用程序现在只会影响它们自己的环境,因为它们可以在自己的进程中运行。用户可以在客户端定义自己的依赖项,并且无需担心与 Spark 驱动程序的潜在冲突。

可升级性:Spark 驱动程序现在可以与应用程序独立地无缝升级,例如,以利用性能改进和安全修复。这意味着应用程序可以向前兼容,只要服务器端 RPC 定义被设计为向后兼容。

可调试性和可观察性:Spark Connect 允许在开发过程中直接从您最喜欢的 IDE 进行交互式调试。类似地,可以使用应用程序框架的本机指标和日志记录库来监控应用程序。

如何使用 Spark Connect

从 Spark 3.4 开始,Spark Connect 可用并支持 PySpark 和 Scala 应用程序。我们将逐步介绍如何运行带有 Spark Connect 的 Apache Spark 服务器,并使用 Spark Connect 客户端库从客户端应用程序连接到它。

下载并启动带有 Spark Connect 的 Spark 服务器

首先,从 下载 Apache Spark 页面下载 Spark。Spark Connect 在 Apache Spark 3.4 版本中引入,因此请确保在页面顶部的版本下拉菜单中选择 3.4.0 或更高版本。然后选择您的软件包类型,通常是“为 Apache Hadoop 3.3 及更高版本预构建”,然后单击链接下载。

现在,将您刚刚下载的 Spark 软件包解压缩到您的计算机上,例如

tar -xvf spark-3.5.1-bin-hadoop3.tgz

在终端窗口中,转到您之前解压缩 Spark 的位置的 spark 文件夹,然后运行 start-connect-server.sh 脚本以启动带有 Spark Connect 的 Spark 服务器,例如

./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1

请注意,我们在启动 Spark 服务器时包含了一个 Spark Connect 软件包 (spark-connect_2.12:3.5.1)。这是使用 Spark Connect 所必需的。确保使用与您之前下载的 Spark 版本相同的软件包版本。在本例中,Spark 3.5.1 与 Scala 2.12。

现在 Spark 服务器正在运行,并准备接受来自客户端应用程序的 Spark Connect 会话。在下一节中,我们将逐步介绍如何在编写客户端应用程序时使用 Spark Connect。

使用 Spark Connect 进行交互式分析

在创建 Spark 会话时,您可以指定要使用 Spark Connect,并且有几种方法可以做到这一点,如下所述。

如果您没有使用此处概述的机制之一,您的 Spark 会话将像以前一样工作,而不会利用 Spark Connect。

设置 SPARK_REMOTE 环境变量

如果您在运行 Spark 客户端应用程序的客户端机器上设置了 SPARK_REMOTE 环境变量,并在以下示例中创建了一个新的 Spark 会话,则该会话将成为一个 Spark Connect 会话。使用这种方法,无需更改代码即可开始使用 Spark Connect。

在终端窗口中,将 SPARK_REMOTE 环境变量设置为指向您之前在计算机上启动的本地 Spark 服务器

export SPARK_REMOTE="sc://127.0.0.1"

并像往常一样启动 Spark shell

./bin/pyspark

PySpark shell 现在已使用 Spark Connect 连接到 Spark,如欢迎消息中所示

Client connected to the Spark Connect server at localhost

在创建 Spark 会话时指定 Spark Connect

您也可以在创建 Spark 会话时明确指定要使用 Spark Connect。

例如,您可以使用 Spark Connect 启动 PySpark shell,如这里所示。

要使用 Spark Connect 启动 PySpark shell,只需包含 remote 参数并指定 Spark 服务器的位置。在本例中,我们使用 localhost 连接到我们之前启动的本地 Spark 服务器

./bin/pyspark --remote "sc://127.0.0.1"

您会注意到,PySpark shell 欢迎消息告诉您已使用 Spark Connect 连接到 Spark

Client connected to the Spark Connect server at localhost

您还可以检查 Spark 会话类型。如果它包含 .connect.,则表示您正在使用 Spark Connect,如本例所示

SparkSession available as 'spark'.
>>> type(spark)
<class 'pyspark.sql.connect.session.SparkSession'>

现在,您可以在 shell 中运行 PySpark 代码以查看 Spark Connect 的实际应用

>>> columns = ["id","name"]
>>> data = [(1,"Sarah"),(2,"Maria")]
>>> df = spark.createDataFrame(data).toDF(*columns)
>>> df.show()
+---+-----+
| id| name|
+---+-----+
|  1|Sarah|
|  2|Maria|
+---+-----+

对于 Scala shell,我们使用基于 Ammonite 的 REPL,该 REPL 目前未包含在 Apache Spark 软件包中。

要设置新的 Scala shell,首先下载并安装 Coursier CLI。然后,使用以下命令在终端窗口中安装 REPL

cs install –-contrib spark-connect-repl

现在,您可以启动基于 Ammonite 的 Scala REPL/shell 以连接到您的 Spark 服务器,如下所示

spark-connect-repl

REPL 成功初始化后将显示问候消息

Spark session available as 'spark'.
   _____                  __      ______                            __
  / ___/____  ____ ______/ /__   / ____/___  ____  ____  ___  _____/ /_
  \__ \/ __ \/ __ `/ ___/ //_/  / /   / __ \/ __ \/ __ \/ _ \/ ___/ __/
 ___/ / /_/ / /_/ / /  / ,<    / /___/ /_/ / / / / / / /  __/ /__/ /_
/____/ .___/\__,_/_/  /_/|_|   \____/\____/_/ /_/_/ /_/\___/\___/\__/
    /_/

默认情况下,REPL 将尝试连接到本地 Spark 服务器。在 shell 中运行以下 Scala 代码以查看 Spark Connect 的实际应用

@ spark.range(10).count
res0: Long = 10L

配置客户端-服务器连接

默认情况下,REPL 将尝试连接到端口 15002 上的本地 Spark 服务器。但是,连接可以通过多种方式进行配置,如本配置 参考 中所述。

设置 SPARK_REMOTE 环境变量

可以在客户端机器上设置 SPARK_REMOTE 环境变量以自定义在 REPL 启动时初始化的客户端-服务器连接。

export SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG"
spark-connect-repl

SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG" spark-connect-repl

使用 CLI 参数

自定义也可以通过 CLI 参数传递,如下所示

spark-connect-repl --host myhost.com --port 443 --token ABCDEFG

支持的 CLI 参数列表可以在 此处 找到。

使用连接字符串以编程方式配置

连接也可以使用 SparkSession#builder 以编程方式创建,如本例所示

@ import org.apache.spark.sql.SparkSession
@ val spark = SparkSession.builder.remote("sc://127.0.0.1:443/;token=ABCDEFG").build()

在独立应用程序中使用 Spark Connect

首先,使用 pip install pyspark[connect]==3.5.0 安装 PySpark,或者如果要构建打包的 PySpark 应用程序/库,请将其添加到您的 setup.py 文件中,如下所示

install_requires=[
'pyspark[connect]==3.5.0'
]

在编写自己的代码时,在创建 Spark 会话时,包含带有对 Spark 服务器的引用的 remote 函数,如本例所示

from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://127.0.0.1").getOrCreate()

为了说明目的,我们将创建一个简单的 Spark Connect 应用程序 SimpleApp.py

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.remote("sc://127.0.0.1").appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

该程序只是计算文本文件中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 安装的位置。

我们可以使用常规的 Python 解释器运行此应用程序,如下所示

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 72, lines with b: 39

要将 Spark Connect 用作 Scala 应用程序/项目的组成部分,我们首先需要包含正确的依赖项。以 sbt 构建系统为例,我们将以下依赖项添加到 build.sbt 文件中

libraryDependencies += "org.apache.spark" %% "spark-sql-api" % "3.5.0"
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.0"

在编写自己的代码时,在创建 Spark 会话时,包含带有对 Spark 服务器的引用的 remote 函数,如本例所示

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().remote("sc://127.0.0.1").build()

注意:引用用户定义代码的操作(如 UDF、filter、map 等)需要注册一个 ClassFinder 来获取并上传任何必需的类文件。此外,任何 JAR 依赖项都必须使用 SparkSession#AddArtifact 上传到服务器。

示例

import org.apache.spark.sql.connect.client.REPLClassDirMonitor
// Register a ClassFinder to monitor and upload the classfiles from the build output.
val classFinder = new REPLClassDirMonitor(<ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR>)
spark.registerClassFinder(classfinder)

// Upload JAR dependencies
spark.addArtifact(<ABSOLUTE_PATH_JAR_DEP>)

这里,ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR 是构建系统将类文件写入的输出目录,而 ABSOLUTE_PATH_JAR_DEP 是 JAR 在本地文件系统中的位置。

REPLClassDirMonitorClassFinder 的一个提供的实现,它监控特定目录,但您可以实现自己的扩展 ClassFinder 的类以进行自定义搜索和监控。

客户端应用程序身份验证

虽然 Spark Connect 没有内置身份验证,但它被设计为与您现有的身份验证基础设施无缝协作。它的 gRPC HTTP/2 接口允许使用身份验证代理,这使得在不直接在 Spark 中实现身份验证逻辑的情况下保护 Spark Connect 成为可能。

Spark 3.4 中支持的功能

PySpark:在 Spark 3.4 中,Spark Connect 支持大多数 PySpark API,包括 DataFrameFunctionsColumn。但是,一些 API(例如 SparkContextRDD)不受支持。您可以在 API 参考 文档中查看当前支持的 API。支持的 API 被标记为“支持 Spark Connect”,因此您可以在将现有代码迁移到 Spark Connect 之前检查您正在使用的 API 是否可用。

Scala:在 Spark 3.5 中,Spark Connect 支持大多数 Scala API,包括 DatasetfunctionsColumnCatalogKeyValueGroupedDataset

用户定义函数 (UDF) 受支持,默认情况下适用于 shell 和在具有额外设置要求的独立应用程序中。

大多数流式 API 受支持,包括 DataStreamReaderDataStreamWriterStreamingQueryStreamingQueryListener

在所有 Spark Connect 版本中,SparkContextRDD 等 API 已弃用。

计划在即将发布的 Spark 版本中支持更多 API。