Spark Connect 概述
构建客户端 Spark 应用程序
在 Apache Spark 3.4 中,Spark Connect 引入了一种解耦的客户端-服务器架构,允许使用 DataFrame API 和未解析的逻辑计划作为协议,远程连接到 Spark 集群。客户端和服务器之间的分离使得 Spark 及其开放生态系统能够从任何地方被利用。它可以嵌入到现代数据应用程序、IDE、Notebook 和编程语言中。
要开始使用,请参阅快速入门:Spark Connect。
Spark Connect 工作原理
Spark Connect 客户端库旨在简化 Spark 应用程序开发。它是一个轻量级 API,可以嵌入到任何地方:应用程序服务器、IDE、Notebook 和编程语言中。Spark Connect API 基于 Spark 的 DataFrame API,使用未解析的逻辑计划作为客户端和 Spark 驱动程序之间的语言无关协议。
Spark Connect 客户端将 DataFrame 操作转换为未解析的逻辑查询计划,这些计划使用 protocol buffers 进行编码。然后通过 gRPC 框架将它们发送到服务器。
嵌入在 Spark 服务器上的 Spark Connect 端点接收未解析的逻辑计划并将其转换为 Spark 的逻辑计划操作符。这类似于解析 SQL 查询,其中解析属性和关系并构建初始解析计划。从那时起,标准的 Spark 执行过程启动,确保 Spark Connect 利用 Spark 的所有优化和增强。结果通过 gRPC 作为 Apache Arrow 编码的行批次流式传输回客户端。
Spark Connect 客户端应用程序与经典 Spark 应用程序有何不同
Spark Connect 的主要设计目标之一是实现客户端与服务器的完全分离和隔离。因此,开发人员在使用 Spark Connect 时需要注意一些变化:
- 客户端不与 Spark 驱动程序在同一进程中运行。这意味着客户端无法直接访问驱动程序 JVM 并与之交互以操作执行环境。特别是在 PySpark 中,客户端不使用 Py4J,因此无法访问持有 JVM 实现的
DataFrame
、Column
、SparkSession
等的私有字段(例如df._jdf
)。 - 从设计上讲,Spark Connect 协议使用 Spark 逻辑计划作为抽象,以便声明性地描述要在服务器上执行的操作。因此,Spark Connect 协议不支持 Spark 的所有执行 API,最重要的是 RDD。
- Spark Connect 为其消费者提供了基于会话的客户端。这意味着客户端无法访问集群中用于操作所有已连接客户端环境的属性。最重要的是,客户端无法访问静态 Spark 配置或 SparkContext。
Spark Connect 的操作优势
通过这种新架构,Spark Connect 缓解了多个多租户操作问题:
稳定性:现在,使用过多内存的应用程序只会影响它们自己的环境,因为它们可以在自己的进程中运行。用户可以在客户端定义自己的依赖,无需担心与 Spark 驱动程序的潜在冲突。
可升级性:Spark 驱动程序现在可以独立于应用程序无缝升级,例如受益于性能改进和安全修复。这意味着只要服务器端 RPC 定义设计为向后兼容,应用程序就可以向前兼容。
可调试性和可观测性:Spark Connect 使得在开发期间可以直接从您喜欢的 IDE 进行交互式调试。类似地,可以使用应用程序框架的原生指标和日志库来监控应用程序。
如何使用 Spark Connect
Spark Connect 可用并支持 PySpark 和 Scala 应用程序。我们将逐步介绍如何运行带有 Spark Connect 的 Apache Spark 服务器,以及如何使用 Spark Connect 客户端库从客户端应用程序连接到它。
下载并启动带有 Spark Connect 的 Spark 服务器
首先,从下载 Apache Spark 页面下载 Spark。在页面顶部的版本下拉菜单中选择最新版本。然后选择您的包类型,通常是“Pre-built for Apache Hadoop 3.3 and later”,然后点击链接下载。
现在,在您的计算机上解压您刚刚下载的 Spark 包,例如:
tar -xvf spark-4.0.0-bin-hadoop3.tgz
在终端窗口中,进入您之前解压 Spark 的位置下的 spark
文件夹,然后运行 start-connect-server.sh
脚本以启动带有 Spark Connect 的 Spark 服务器,如本示例所示:
./sbin/start-connect-server.sh
请确保使用的包版本与您之前下载的 Spark 版本相同。在此示例中,Spark 4.0.0 与 Scala 2.13。
现在 Spark 服务器正在运行,并准备好接受来自客户端应用程序的 Spark Connect 会话。在下一节中,我们将逐步介绍如何在编写客户端应用程序时使用 Spark Connect。
使用 Spark Connect 进行交互式分析
创建 Spark 会话时,您可以指定要使用 Spark Connect,有以下几种方法:
如果您不使用此处概述的任何机制,您的 Spark 会话将像以前一样工作,不利用 Spark Connect。
设置 SPARK_REMOTE 环境变量
如果您在运行 Spark 客户端应用程序的客户端机器上设置 SPARK_REMOTE
环境变量,并按照以下示例创建新的 Spark 会话,则该会话将成为一个 Spark Connect 会话。通过这种方法,无需更改代码即可开始使用 Spark Connect。
在终端窗口中,将 SPARK_REMOTE
环境变量设置为指向您之前在计算机上启动的本地 Spark 服务器:
export SPARK_REMOTE="sc://localhost"
并像往常一样启动 Spark shell:
./bin/pyspark
PySpark shell 现在已使用 Spark Connect 连接到 Spark,如欢迎消息中所示:
Client connected to the Spark Connect server at localhost
创建 Spark 会话时指定 Spark Connect
在创建 Spark 会话时,您也可以明确指定要使用 Spark Connect。
例如,您可以按照此处所示,使用 Spark Connect 启动 PySpark shell。
要使用 Spark Connect 启动 PySpark shell,只需包含 remote
参数并指定 Spark 服务器的位置。在此示例中,我们使用 localhost
连接到之前启动的本地 Spark 服务器:
./bin/pyspark --remote "sc://localhost"
您会注意到 PySpark shell 的欢迎消息会告诉您,您已使用 Spark Connect 连接到 Spark:
Client connected to the Spark Connect server at localhost
您还可以检查 Spark 会话类型。如果它包含 .connect.
,则表示您正在使用 Spark Connect,如本示例所示:
SparkSession available as 'spark'.
>>> type(spark)
<class 'pyspark.sql.connect.session.SparkSession'>
现在您可以在 shell 中运行 PySpark 代码,查看 Spark Connect 的实际运行情况:
>>> columns = ["id", "name"]
>>> data = [(1,"Sarah"), (2,"Maria")]
>>> df = spark.createDataFrame(data).toDF(*columns)
>>> df.show()
+---+-----+
| id| name|
+---+-----+
| 1|Sarah|
| 2|Maria|
+---+-----+
对于 Scala shell,我们使用基于 Ammonite 的 REPL。否则,它与 PySpark shell 非常相似。
./bin/spark-shell --remote "sc://localhost"
当 REPL 成功初始化时,将出现一条问候消息:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 4.0.0-SNAPSHOT
/_/
Type in expressions to have them evaluated.
Spark session available as 'spark'.
默认情况下,REPL 将尝试连接到本地 Spark 服务器。在 shell 中运行以下 Scala 代码,查看 Spark Connect 的实际运行情况:
@ spark.range(10).count
res0: Long = 10L
配置客户端-服务器连接
默认情况下,REPL 将尝试连接到端口 15002 上的本地 Spark 服务器。但是,连接可以通过多种方式进行配置,如本配置参考中所述。
设置 SPARK_REMOTE 环境变量
可以在客户端机器上设置 SPARK_REMOTE 环境变量,以自定义在 REPL 启动时初始化的客户端-服务器连接。
export SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG"
./bin/spark-shell
或者
SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG" spark-connect-repl
使用连接字符串进行编程配置
连接也可以使用 SparkSession#builder 进行编程创建,如本示例所示:
@ import org.apache.spark.sql.SparkSession
@ val spark = SparkSession.builder.remote("sc://localhost:443/;token=ABCDEFG").getOrCreate()
在独立应用程序中使用 Spark Connect
首先,使用 pip install pyspark[connect]==4.0.0
安装 PySpark,或者如果构建打包的 PySpark 应用程序/库,将其添加到您的 setup.py 文件中,例如:
install_requires=[
'pyspark[connect]==4.0.0'
]
在编写自己的代码时,当您创建 Spark 会话时,请包含 remote
函数并引用您的 Spark 服务器,如本示例所示:
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
为了说明目的,我们将创建一个简单的 Spark Connect 应用程序 SimpleApp.py:
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder.remote("sc://localhost").appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
此程序仅计算文本文件中包含“a”的行数和包含“b”的行数。请注意,您需要将 YOUR_SPARK_HOME 替换为 Spark 的安装位置。
我们可以使用常规 Python 解释器运行此应用程序,如下所示:
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 72, lines with b: 39
要在 Scala 应用程序/项目中将 Spark Connect 作为一部分使用,我们首先需要包含正确的依赖项。以 sbt
构建系统为例,我们将以下依赖项添加到 build.sbt
文件中:
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "4.0.0"
在编写自己的代码时,当您创建 Spark 会话时,请包含 remote
函数并引用您的 Spark 服务器,如本示例所示:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().remote("sc://localhost").getOrCreate()
注意:引用用户定义代码(例如 UDF、filter、map 等)的操作需要注册一个 ClassFinder,以便获取并上传任何必需的类文件。此外,任何 JAR 依赖项都必须使用 SparkSession#AddArtifact
上传到服务器。
示例
import org.apache.spark.sql.connect.client.REPLClassDirMonitor
// Register a ClassFinder to monitor and upload the classfiles from the build output.
val classFinder = new REPLClassDirMonitor(<ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR>)
spark.registerClassFinder(classFinder)
// Upload JAR dependencies
spark.addArtifact(<ABSOLUTE_PATH_JAR_DEP>)
这里,ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR
是构建系统写入类文件的输出目录,ABSOLUTE_PATH_JAR_DEP
是 JAR 在本地文件系统上的位置。
REPLClassDirMonitor
是 ClassFinder
的一个实现,用于监控特定目录,但也可以实现自己的类来扩展 ClassFinder
以进行自定义搜索和监控。
有关 Spark Connect 应用程序开发以及使用自定义功能扩展 Spark Connect 的更多信息,请参阅Spark Connect 应用程序开发。
客户端应用程序身份验证
虽然 Spark Connect 没有内置身份验证,但它旨在与您现有的身份验证基础设施无缝协作。其 gRPC HTTP/2 接口允许使用身份验证代理,这使得无需直接在 Spark 中实现身份验证逻辑即可保护 Spark Connect 的安全。
支持内容
PySpark:自 Spark 3.4 起,Spark Connect 支持大多数 PySpark API,包括 DataFrame、Functions 和 Column。但是,某些 API(例如 SparkContext 和 RDD)不受支持。您可以在 API 参考文档中查看目前支持哪些 API。支持的 API 标记为“Supports Spark Connect”,因此在将现有代码迁移到 Spark Connect 之前,您可以检查您正在使用的 API 是否可用。
Scala:自 Spark 3.5 起,Spark Connect 支持大多数 Scala API,包括 Dataset、functions、Column、Catalog 和 KeyValueGroupedDataset。
用户定义函数(UDF)支持,默认情况下对 shell 支持,在独立应用程序中需要额外设置。
大多数 Streaming API 都支持,包括 DataStreamReader、DataStreamWriter、StreamingQuery 和 StreamingQueryListener。
诸如 SparkContext 和 RDD 的 API 在 Spark Connect 中不受支持。
计划在未来的 Spark 版本中支持更多 API。