SparkR (R on Spark)
- 概述
- SparkDataFrame
- 机器学习
- R 和 Spark 之间的数据类型映射
- 结构化流
- SparkR 中的 Apache Arrow
- R 函数名称冲突
- 迁移指南
概述
SparkR 是一个 R 包,它提供了一个轻量级的前端,用于从 R 使用 Apache Spark。在 Spark 3.5.1 中,SparkR 提供了一个分布式数据框实现,支持选择、过滤、聚合等操作(类似于 R 数据框,dplyr),但适用于大型数据集。SparkR 还支持使用 MLlib 进行分布式机器学习。
SparkDataFrame
SparkDataFrame 是一个分布式数据集合,组织成命名列。从概念上讲,它等同于关系数据库中的表或 R 中的数据框,但在幕后具有更丰富的优化。SparkDataFrame 可以从各种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据框。
本页上的所有示例都使用 R 或 Spark 分发版中包含的示例数据,可以使用 ./bin/sparkR
shell 运行。
启动:SparkSession
SparkR 的入口点是 SparkSession
,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session
创建 SparkSession
,并传入选项,例如应用程序名称、任何依赖的 Spark 包等。此外,您还可以通过 SparkSession
使用 SparkDataFrame。如果您在 sparkR
shell 中工作,则 SparkSession
应该已经为您创建,您不需要调用 sparkR.session
。
从 RStudio 启动
您也可以从 RStudio 启动 SparkR。您可以从 RStudio、R shell、Rscript 或其他 R IDE 将您的 R 程序连接到 Spark 集群。要开始,请确保 SPARK_HOME 在环境中设置(您可以检查 Sys.getenv),加载 SparkR 包,并按如下所示调用 sparkR.session
。它将检查 Spark 安装,如果未找到,它将自动下载并缓存。或者,您也可以手动运行 install.spark
。
除了调用 sparkR.session
之外,您还可以指定某些 Spark 驱动程序属性。通常,这些 应用程序属性 和 运行时环境 无法以编程方式设置,因为驱动程序 JVM 进程已经启动,在这种情况下,SparkR 会为您处理。要设置它们,请将它们作为其他配置属性传递给 sparkR.session()
中的 sparkConfig
参数。
以下 Spark 驱动程序属性可以在 sparkConfig
中使用 sparkR.session
从 RStudio 设置
属性名称 | 属性组 | spark-submit 等效项 |
---|---|---|
spark.master |
应用程序属性 | --master |
spark.kerberos.keytab |
应用程序属性 | --keytab |
spark.kerberos.principal |
应用程序属性 | --principal |
spark.driver.memory |
应用程序属性 | --driver-memory |
spark.driver.extraClassPath |
运行时环境 | --driver-class-path |
spark.driver.extraJavaOptions |
运行时环境 | --driver-java-options |
spark.driver.extraLibraryPath |
运行时环境 | --driver-library-path |
创建 SparkDataFrame
使用 SparkSession
,应用程序可以从本地 R 数据框、从 Hive 表 或从其他 数据源 创建 SparkDataFrame
。
从本地数据框
创建数据框最简单的方法是将本地 R 数据框转换为 SparkDataFrame。具体来说,我们可以使用 as.DataFrame
或 createDataFrame
并传入本地 R 数据框以创建 SparkDataFrame。例如,以下代码使用 R 中的 faithful
数据集创建了一个 SparkDataFrame
。
从数据源
SparkR 支持通过 SparkDataFrame
接口对各种数据源进行操作。本节介绍使用数据源加载和保存数据的通用方法。您可以查看 Spark SQL 编程指南以了解有关 特定选项 的更多信息,这些选项可用于内置数据源。
从数据源创建 SparkDataFrame 的通用方法是 read.df
。此方法接受要加载的文件的路径和数据源类型,并且当前活动的 SparkSession 将自动使用。SparkR 本机支持读取 JSON、CSV 和 Parquet 文件,并且通过来自 第三方项目 等来源的可用包,您可以找到针对流行文件格式(如 Avro)的数据源连接器。这些包可以通过使用 spark-submit
或 sparkR
命令指定 --packages
来添加,或者如果在交互式 R shell 或从 RStudio 中使用 sparkPackages
参数初始化 SparkSession。
我们可以看到如何使用示例 JSON 输入文件使用数据源。请注意,此处使用的文件 *不是* 典型的 JSON 文件。文件中的每一行都必须包含一个单独的、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式,也称为换行符分隔的 JSON。因此,常规的多行 JSON 文件通常会失败。
数据源 API 本机支持 CSV 格式的输入文件。有关更多信息,请参阅 SparkR read.df API 文档。
数据源 API 也可以用于将 SparkDataFrame 保存到多种文件格式。例如,我们可以使用 write.df
将上一个示例中的 SparkDataFrame 保存到 Parquet 文件。
从 Hive 表
您还可以从 Hive 表创建 SparkDataFrame。为此,我们需要创建一个具有 Hive 支持的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该使用 Hive 支持 构建,有关更多详细信息,请参阅 SQL 编程指南。在 SparkR 中,默认情况下,它将尝试创建一个启用了 Hive 支持的 SparkSession (enableHiveSupport = TRUE
)。
SparkDataFrame 操作
SparkDataFrame 支持许多函数来执行结构化数据处理。这里我们包含一些基本示例,完整的列表可以在 API 文档中找到
选择行、列
分组、聚合
SparkR 数据框支持许多常用的函数,用于在分组后聚合数据。例如,我们可以计算 faithful
数据集中 waiting
时间的直方图,如下所示
除了标准聚合之外,SparkR 还支持 OLAP 立方体 运算符 cube
和 rollup
对列进行操作
SparkR 还提供了一些函数,可以直接应用于列,用于数据处理和聚合期间。以下示例展示了基本算术函数的使用。
应用用户定义函数
在 SparkR 中,我们支持几种用户定义函数
使用 dapply
或 dapplyCollect
在大型数据集上运行给定函数
dapply
将函数应用于 SparkDataFrame
的每个分区。应用于 SparkDataFrame
每个分区的函数应该只有一个参数,该参数将传递一个对应于每个分区的 data.frame
。函数的输出应该是一个 data.frame
。模式指定了结果 SparkDataFrame
的行格式。它必须与返回值的 数据类型 匹配。
dapplyCollect
类似于 dapply
,将函数应用于 SparkDataFrame
的每个分区,并将结果收集回来。函数的输出应该是一个 data.frame
。但是,不需要传递模式。请注意,如果在所有分区上运行的 UDF 的输出无法被拉到驱动程序并适合驱动程序内存,则 dapplyCollect
可能会失败。
使用 gapply
或 gapplyCollect
在大型数据集上运行给定函数,并按输入列分组
gapply
将函数应用于 SparkDataFrame
的每个组。该函数将应用于 SparkDataFrame
的每个组,并且应该只有两个参数:分组键和对应于该键的 R data.frame
。组是从 SparkDataFrame
的列中选择的。函数的输出应该是一个 data.frame
。模式指定了结果 SparkDataFrame
的行格式。它必须根据 Spark 数据类型 表示 R 函数的输出模式。返回的 data.frame
的列名由用户设置。
gapplyCollect
类似于 gapply
,将函数应用于 SparkDataFrame
的每个分区,并将结果收集回 R data.frame。函数的输出应该是一个 data.frame
。但是,不需要传递模式。请注意,如果在所有分区上运行的 UDF 的输出无法被拉到驱动程序并适合驱动程序内存,则 gapplyCollect
可能会失败。
使用 spark.lapply
运行分布式本地 R 函数
spark.lapply
类似于原生 R 中的 lapply
,spark.lapply
在元素列表上运行函数,并使用 Spark 分发计算。以类似于 doParallel
或 lapply
的方式将函数应用于列表的元素。所有计算的结果应该适合一台机器。如果不是这种情况,他们可以做类似于 df <- createDataFrame(list)
的事情,然后使用 dapply
急切执行
如果启用了急切执行,则在创建 SparkDataFrame
时,数据将立即返回到 R 客户端。默认情况下,急切执行未启用,可以通过在启动 SparkSession
时将配置属性 spark.sql.repl.eagerEval.enabled
设置为 true
来启用它。
要显示的数据的最大行数和每列的最大字符数可以通过 spark.sql.repl.eagerEval.maxNumRows
和 spark.sql.repl.eagerEval.truncate
配置属性分别控制。这些属性仅在启用急切执行时有效。如果这些属性没有显式设置,默认情况下,将显示最多 20 行和每列最多 20 个字符的数据。
请注意,要在 sparkR
shell 中启用急切执行,请将 spark.sql.repl.eagerEval.enabled=true
配置属性添加到 --conf
选项。
从 SparkR 运行 SQL 查询
SparkDataFrame 也可以在 Spark SQL 中注册为临时视图,这允许您在其数据上运行 SQL 查询。 sql
函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame
返回。
机器学习
算法
SparkR 目前支持以下机器学习算法
分类
spark.logit
:逻辑回归
spark.mlp
:多层感知器 (MLP)
spark.naiveBayes
:朴素贝叶斯
spark.svmLinear
:线性支持向量机
spark.fmClassifier
:分解机分类器
回归
spark.survreg
:加速失效时间 (AFT) 生存模型
spark.glm
或glm
:广义线性模型 (GLM)
spark.isoreg
:等距回归
spark.lm
:线性回归
spark.fmRegressor
:分解机回归器
树
spark.decisionTree
:决策树用于
回归
和
分类
spark.gbt
:梯度提升树用于
回归
和
分类
spark.randomForest
:随机森林用于
回归
和
分类
聚类
spark.bisectingKmeans
:二分 k 均值
spark.gaussianMixture
:高斯混合模型 (GMM)
spark.kmeans
:K 均值
spark.lda
:潜在狄利克雷分配 (LDA)
spark.powerIterationClustering (PIC)
:幂迭代聚类 (PIC)
协同过滤
频繁模式挖掘
统计
spark.kstest
:柯尔莫哥洛夫-斯米尔诺夫检验
在幕后,SparkR 使用 MLlib 训练模型。有关示例代码,请参阅 MLlib 用户指南的相应部分。用户可以调用 summary
打印拟合模型的摘要,predict 对新数据进行预测,以及 write.ml/read.ml 保存/加载拟合模型。SparkR 支持模型拟合中可用的 R 公式运算符的子集,包括“~”、“.”、“:”、“+”和“-”。
模型持久化
以下示例展示了如何通过 SparkR 保存/加载 MLlib 模型。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)
R 和 Spark 之间的数据类型映射
R | Spark |
---|---|
byte | byte |
integer | integer |
float | float |
double | double |
numeric | double |
character | string |
string | string |
binary | binary |
raw | binary |
logical | boolean |
POSIXct | timestamp |
POSIXlt | timestamp |
Date | date |
array | array |
list | array |
env | map |
结构化流
SparkR 支持结构化流式 API。结构化流式是一种可扩展且容错的流式处理引擎,构建在 Spark SQL 引擎之上。有关更多信息,请参阅 结构化流式编程指南 上的 R API
SparkR 中的 Apache Arrow
Apache Arrow 是一种内存中列式数据格式,用于在 Spark 中在 JVM 和 R 进程之间高效地传输数据。另请参阅 PySpark 完成的优化,PySpark 使用指南,用于使用 Apache Arrow 的 Pandas。本指南旨在解释如何在 SparkR 中使用 Arrow 优化,并提供一些关键要点。
确保安装 Arrow
Arrow R 库在 CRAN 上可用,可以按如下方式安装。
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
有关更多详细信息,请参阅 Apache Arrow 的官方文档。
请注意,您必须确保 Arrow R 包已安装并在所有集群节点上可用。当前支持的最低版本是 1.0.0;但是,由于 SparkR 中的 Arrow 优化是实验性的,因此这可能会在次要版本之间发生变化。
启用转换为/从 R DataFrame、dapply
和 gapply
当使用 collect(spark_df)
将 Spark DataFrame 转换为 R DataFrame,使用 createDataFrame(r_df)
从 R DataFrame 创建 Spark DataFrame,使用 dapply(...)
将 R 原生函数应用于每个分区,以及使用 gapply(...)
将 R 原生函数应用于分组数据时,可以使用 Arrow 优化。要使用这些功能中的 Arrow,用户需要先将 Spark 配置 ‘spark.sql.execution.arrow.sparkr.enabled’ 设置为 ‘true’。默认情况下,此功能处于禁用状态。
无论优化是否启用,SparkR 都能产生相同的结果。此外,当优化因任何原因在实际计算之前失败时,Spark DataFrame 和 R DataFrame 之间的转换会自动回退到非 Arrow 优化实现。
请注意,即使使用 Arrow,collect(spark_df)
也会导致将 DataFrame 中的所有记录收集到驱动程序程序中,因此应该只对数据的小子集执行此操作。此外,在 gapply(...)
和 dapply(...)
中指定的输出模式应与给定函数返回的 R DataFrame 相匹配。
支持的 SQL 类型
目前,Arrow 基于的转换支持所有 Spark SQL 数据类型,除了 FloatType
、BinaryType
、ArrayType
、StructType
和 MapType
。
R 函数名称冲突
在 R 中加载并附加新包时,可能会出现名称 冲突,其中一个函数会屏蔽另一个函数。
以下函数被 SparkR 包屏蔽
被屏蔽的函数 | 如何访问 |
---|---|
cov in package:stats |
|
filter in package:stats |
|
sample in package:base |
base::sample(x, size, replace = FALSE, prob = NULL) |
由于 SparkR 的一部分是根据 dplyr
包建模的,因此 SparkR 中的某些函数与 dplyr
中的函数具有相同的名称。根据这两个包的加载顺序,第一个加载的包中的某些函数会被第二个加载的包中的函数屏蔽。在这种情况下,请在这些调用之前加上包名称,例如 SparkR::cume_dist(x)
或 dplyr::cume_dist(x)
。
可以使用 search()
检查 R 中的搜索路径。
迁移指南
迁移指南现已存档 在此页面上。