SparkR (Spark 上的 R)
- 概述
- SparkDataFrame
- 机器学习
- R 与 Spark 之间的数据类型映射
- 结构化流
- SparkR 中的 Apache Arrow
- R 函数名称冲突
- 迁移指南
SparkR 已从 Apache Spark 4.0.0 开始弃用,并将在未来版本中移除。
概述
SparkR 是一个 R 包,它提供了一个轻量级前端,允许从 R 中使用 Apache Spark。在 Spark 4.0.0 中,SparkR 提供了一个分布式数据框实现,支持选择、过滤、聚合等操作(类似于 R 数据框,dplyr),但适用于大型数据集。SparkR 还支持使用 MLlib 进行分布式机器学习。
SparkDataFrame
SparkDataFrame 是一种由命名列组织起来的分布式数据集合。它在概念上等同于关系数据库中的表或 R 中的数据框,但在底层拥有更丰富的优化。SparkDataFrame 可以从多种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据框。
本页上的所有示例都使用 R 或 Spark 发行版中包含的示例数据,并且可以使用 ./bin/sparkR
shell 运行。
启动:SparkSession
SparkR 的入口点是 SparkSession
,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session
创建 SparkSession
并传入应用程序名称、任何依赖的 Spark 包等选项。此外,您还可以通过 SparkSession
使用 SparkDataFrame。如果您从 sparkR
shell 工作,则 SparkSession
应该已经为您创建,您无需调用 sparkR.session
。
sparkR.session()
从 RStudio 启动
您也可以从 RStudio 启动 SparkR。您可以将您的 R 程序连接到 Spark 集群,无论是从 RStudio、R shell、Rscript 还是其他 R IDE。要启动,请确保在环境中设置了 SPARK_HOME(您可以通过 Sys.getenv 检查),加载 SparkR 包,然后如下所示调用 sparkR.session
。它将检查 Spark 安装,如果未找到,将自动下载和缓存。或者,您也可以手动运行 install.spark
。
除了调用 sparkR.session
之外,您还可以指定某些 Spark 驱动器属性。通常,这些应用程序属性和运行时环境无法通过编程方式设置,因为驱动器 JVM 进程已经启动。在这种情况下,SparkR 会为您处理。要设置它们,请像设置其他配置属性一样,将它们作为 sparkR.session()
的 sparkConfig
参数传入。
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
以下 Spark 驱动器属性可以在 RStudio 中通过 sparkR.session
的 sparkConfig
参数进行设置
属性名称 | 属性组 | spark-submit 等效参数 |
---|---|---|
spark.master |
应用程序属性 | --master |
spark.kerberos.keytab |
应用程序属性 | --keytab |
spark.kerberos.principal |
应用程序属性 | --principal |
spark.driver.memory |
应用程序属性 | --driver-memory |
spark.driver.extraClassPath |
运行时环境 | --driver-class-path |
spark.driver.extraJavaOptions |
运行时环境 | --driver-java-options |
spark.driver.extraLibraryPath |
运行时环境 | --driver-library-path |
创建 SparkDataFrame
通过 SparkSession
,应用程序可以从本地 R 数据框、Hive 表或其他数据源创建 SparkDataFrame
。
从本地数据框创建
创建数据框最简单的方法是将本地 R 数据框转换为 SparkDataFrame。具体来说,我们可以使用 as.DataFrame
或 createDataFrame
并传入本地 R 数据框来创建一个 SparkDataFrame。例如,以下代码使用 R 中的 faithful
数据集创建了一个 SparkDataFrame
。
df <- as.DataFrame(faithful)
# Displays the first part of the SparkDataFrame
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
从数据源创建
SparkR 通过 SparkDataFrame
接口支持操作各种数据源。本节描述了使用数据源加载和保存数据的通用方法。您可以查看 Spark SQL 编程指南,了解内置数据源可用的更多特定选项。
从数据源创建 SparkDataFrame 的通用方法是 read.df
。此方法接受要加载的文件路径和数据源类型,并且当前活动的 SparkSession 将自动使用。SparkR 原生支持读取 JSON、CSV 和 Parquet 文件,并且通过第三方项目等来源提供的包,您可以找到适用于 Avro 等流行文件格式的数据源连接器。这些包可以通过在 spark-submit
或 sparkR
命令中指定 --packages
来添加,或者如果在交互式 R shell 或 RStudio 中初始化 SparkSession 时使用 sparkPackages
参数。
sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.13:4.0.0")
我们可以通过一个 JSON 输入文件示例了解如何使用数据源。请注意,此处使用的文件并非典型的 JSON 文件。文件中的每一行都必须包含一个独立、自包含的有效 JSON 对象。有关更多信息,请参阅JSON Lines 文本格式(也称为换行符分隔的 JSON)。因此,常规的多行 JSON 文件通常会失败。
people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
## age name
##1 NA Michael
##2 30 Andy
##3 19 Justin
# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))
数据源 API 原生支持 CSV 格式的输入文件。有关更多信息,请参阅 SparkR read.df API 文档。
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
数据源 API 也可以用于将 SparkDataFrame 保存为多种文件格式。例如,我们可以使用 write.df
将前面示例中的 SparkDataFrame 保存到 Parquet 文件。
write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
从 Hive 表创建
您还可以从 Hive 表创建 SparkDataFrame。为此,我们需要创建一个支持 Hive 的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该已经构建了Hive 支持,更多详细信息可以在SQL 编程指南中找到。在 SparkR 中,默认情况下会尝试创建启用 Hive 支持的 SparkSession (enableHiveSupport = TRUE
)。
sparkR.session()
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")
# results is now a SparkDataFrame
head(results)
## key value
## 1 238 val_238
## 2 86 val_86
## 3 311 val_311
SparkDataFrame 操作
SparkDataFrame 支持多种函数来执行结构化数据处理。此处包含一些基本示例,完整列表可在 API 文档中找到。
选择行、列
# Create the SparkDataFrame
df <- as.DataFrame(faithful)
# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]
# Select only the "eruptions" column
head(select(df, df$eruptions))
## eruptions
##1 3.600
##2 1.800
##3 3.333
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
## eruptions waiting
##1 1.750 47
##2 1.750 47
##3 1.867 48
分组、聚合
SparkR 数据框支持多种常用函数,用于分组后聚合数据。例如,我们可以计算 faithful
数据集中 waiting
时间的直方图,如下所示
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
## waiting count
##1 70 4
##2 67 1
##3 69 2
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
## waiting count
##1 78 15
##2 83 14
##3 81 13
除了标准聚合之外,SparkR 还支持 OLAP 立方体运算符 cube
head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 NA 140.8 4 22.8
##2 4 75.7 4 30.4
##3 8 400.0 3 19.2
##4 8 318.0 3 15.5
##5 NA 351.0 NA 15.8
##6 NA 275.8 NA 16.3
和 rollup
head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 4 75.7 4 30.4
##2 8 400.0 3 19.2
##3 8 318.0 3 15.5
##4 4 78.7 NA 32.4
##5 8 304.0 3 15.2
##6 4 79.0 NA 27.3
列操作
SparkR 还提供了许多可以直接应用于列的函数,用于数据处理和聚合。以下示例展示了基本算术函数的使用。
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
应用用户定义函数
在 SparkR 中,我们支持多种用户定义函数(UDF)
使用 dapply
或 dapplyCollect
在大型数据集上运行给定函数
dapply
将一个函数应用于 SparkDataFrame
的每个分区。应用于 SparkDataFrame
每个分区的函数应只有一个参数,该参数将传入对应每个分区的 data.frame
。函数的输出应为一个 data.frame
。Schema 指定了结果 SparkDataFrame
的行格式。它必须与返回值的数据类型匹配。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
##4 2.283 62 3720
##5 4.533 85 5100
##6 2.883 55 3300
dapplyCollect
与 dapply
类似,将一个函数应用于 SparkDataFrame
的每个分区并收集结果。函数的输出应为一个 data.frame
。但不需要传入 Schema。请注意,如果 UDF 在所有分区上运行的输出无法拉取到驱动器并适应驱动器内存,则 dapplyCollect
可能会失败。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
通过输入列分组,使用 gapply
或 gapplyCollect
在大型数据集上运行给定函数
gapply
将一个函数应用于 SparkDataFrame
的每个分组。应用于 SparkDataFrame
每个分组的函数应只有两个参数:分组键和对应此键的 R data.frame
。分组从 SparkDataFrame
的列中选择。函数的输出应为一个 data.frame
。Schema 指定了结果 SparkDataFrame
的行格式。它必须基于 Spark 数据类型表示 R 函数的输出 Schema。返回的 data.frame
的列名由用户设置。
# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
gapplyCollect
与 gapply
类似,将一个函数应用于 SparkDataFrame
的每个分区,并将结果收集回 R data.frame。函数的输出应为一个 data.frame
。但不需要传入 Schema。请注意,如果 UDF 在所有分区上运行的输出无法拉取到驱动器并适应驱动器内存,则 gapplyCollect
可能会失败。
# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
使用 spark.lapply
分布式运行本地 R 函数
spark.lapply
类似于原生 R 中的 lapply
,spark.lapply
对元素列表运行一个函数,并使用 Spark 分布式计算。以类似于 doParallel
或 lapply
的方式将函数应用于列表元素。所有计算的结果应能适应单台机器。如果不是这种情况,他们可以执行类似 df <- createDataFrame(list)
的操作,然后使用 dapply
。
# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# Print the summary of each model
print(model.summaries)
即时执行
如果启用了即时执行,当 SparkDataFrame
创建时,数据将立即返回到 R 客户端。默认情况下,即时执行未启用,但可以在启动 SparkSession
时将配置属性 spark.sql.repl.eagerEval.enabled
设置为 true
来启用。
要显示的最大行数和每列最大字符数可以分别通过 spark.sql.repl.eagerEval.maxNumRows
和 spark.sql.repl.eagerEval.truncate
配置属性控制。这些属性仅在启用即时执行时有效。如果未显式设置这些属性,默认情况下将显示最多 20 行数据和每列最多 20 个字符。
# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))
# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")
# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2
##+-------+-----+
##|waiting|count|
##+-------+-----+
##| 43.0| 1|
##| 45.0| 3|
##| 46.0| 5|
##| 47.0| 4|
##| 48.0| 3|
##| 49.0| 5|
##| 50.0| 5|
##| 51.0| 6|
##| 52.0| 5|
##| 53.0| 7|
##+-------+-----+
##only showing top 10 rows
请注意,要在 sparkR
shell 中启用即时执行,请将 spark.sql.repl.eagerEval.enabled=true
配置属性添加到 --conf
选项。
从 SparkR 运行 SQL 查询
SparkDataFrame 也可以在 Spark SQL 中注册为临时视图,这允许您对其数据运行 SQL 查询。sql
函数使应用程序能够以编程方式运行 SQL 查询,并以 SparkDataFrame
的形式返回结果。
# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")
# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
##1 Justin
机器学习
算法
SparkR 目前支持以下机器学习算法
分类
spark.logit
:逻辑回归
spark.mlp
:多层感知机 (MLP)
spark.naiveBayes
:朴素贝叶斯
spark.svmLinear
:线性支持向量机
spark.fmClassifier
:分解机分类器
回归
spark.survreg
:加速失效时间 (AFT) 生存模型
spark.glm
或glm
:广义线性模型 (GLM)
spark.isoreg
:保序回归
spark.lm
:线性回归
spark.fmRegressor
:分解机回归器
树
spark.decisionTree
: 用于回归
和分类
的决策树
spark.gbt
: 用于回归
和分类
的梯度提升树
spark.randomForest
: 用于回归
和分类
的随机森林
聚类
spark.bisectingKmeans
:二分 K 均值
spark.gaussianMixture
:高斯混合模型 (GMM)
spark.kmeans
:K-Means
spark.lda
:潜在狄利克雷分配 (LDA)
spark.powerIterationClustering (PIC)
:幂迭代聚类 (PIC)
协同过滤
频繁模式挖掘
统计
spark.kstest
:Kolmogorov-Smirnov 检验
在底层,SparkR 使用 MLlib 训练模型。有关示例代码,请参阅 MLlib 用户指南的相应部分。用户可以调用 summary
打印拟合模型的摘要,调用 predict 对新数据进行预测,以及调用 write.ml/read.ml 保存/加载拟合模型。SparkR 支持一部分可用的 R 公式运算符用于模型拟合,包括 ‘~’、‘.’、‘:’、‘+’ 和 ‘-‘。
模型持久化
以下示例展示了如何通过 SparkR 保存/加载 MLlib 模型。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)
R 与 Spark 之间的数据类型映射
R | Spark |
---|---|
字节 | 字节 |
整型 | 整型 |
浮点型 | 浮点型 |
双精度浮点型 | 双精度浮点型 |
数值型 | 双精度浮点型 |
字符型 | 字符串 |
字符串 | 字符串 |
二进制 | 二进制 |
原始字节 | 二进制 |
逻辑型 | 布尔型 |
POSIXct | 时间戳 |
POSIXlt | 时间戳 |
日期 | 日期 |
数组 | 数组 |
列表 | 数组 |
环境 | 映射 |
结构化流
SparkR 支持结构化流 API。结构化流是一个可伸缩且容错的流处理引擎,构建于 Spark SQL 引擎之上。有关更多信息,请参阅结构化流编程指南中的 R API。
SparkR 中的 Apache Arrow
Apache Arrow 是一种内存列式数据格式,在 Spark 中用于在 JVM 和 R 进程之间高效传输数据。另请参阅 PySpark 优化,PySpark 使用 Apache Arrow 配合 Pandas 的指南。本指南旨在解释如何在 SparkR 中使用 Arrow 优化,并提供一些关键点。
确保 Arrow 已安装
Arrow R 库在 CRAN 上可用,可以如下安装。
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
有关更多详细信息,请参阅Apache Arrow 的官方文档。
请注意,您必须确保 Arrow R 包已安装并在所有集群节点上可用。当前支持的最低版本是 1.0.0;但是,由于 SparkR 中的 Arrow 优化仍处于实验阶段,此版本可能会在次要版本之间发生变化。
启用 R DataFrame 之间的转换,以及 dapply
和 gapply
Arrow 优化在以下情况下可用:使用 collect(spark_df)
将 Spark DataFrame 转换为 R DataFrame 时,使用 createDataFrame(r_df)
从 R DataFrame 创建 Spark DataFrame 时,通过 dapply(...)
将 R 原生函数应用于每个分区时,以及通过 gapply(...)
将 R 原生函数应用于分组数据时。要在执行这些操作时使用 Arrow,用户首先需要将 Spark 配置 'spark.sql.execution.arrow.sparkr.enabled' 设置为 'true'。默认情况下此功能是禁用的。
无论是否启用优化,SparkR 都会产生相同的结果。此外,当在实际计算之前因任何原因导致优化失败时,Spark DataFrame 和 R DataFrame 之间的转换会自动回退到非 Arrow 优化实现。
# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)
# Converts Spark DataFrame to an R DataFrame
collect(spark_df)
# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
# Apply an R native function to grouped data.
collect(gapply(spark_df,
"gear",
function(key, group) {
data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
},
structType("gear double, disp boolean")))
请注意,即使使用 Arrow,collect(spark_df)
也会导致将 DataFrame 中的所有记录收集到驱动程序中,因此应在数据的少量子集上执行此操作。此外,gapply(...)
和 dapply(...)
中指定的输出 Schema 应与给定函数返回的 R DataFrame 相匹配。
支持的 SQL 类型
目前,除了 FloatType
、BinaryType
、ArrayType
、StructType
和 MapType
之外,所有 Spark SQL 数据类型都支持基于 Arrow 的转换。
R 函数名称冲突
在 R 中加载和附加新包时,可能会出现名称冲突,即一个函数掩盖了另一个函数。
以下函数被 SparkR 包掩盖
被掩盖的函数 | 如何访问 |
---|---|
package:stats 中的 cov |
|
package:stats 中的 filter |
|
package:base 中的 sample |
base::sample(x, size, replace = FALSE, prob = NULL) |
由于 SparkR 的部分功能是基于 dplyr
包建模的,因此 SparkR 中的某些函数与 dplyr
中的函数同名。根据两个包的加载顺序,先加载的包中的某些函数可能会被后加载的包中的同名函数掩盖。在这种情况下,请使用包名作为前缀来调用这些函数,例如 SparkR::cume_dist(x)
或 dplyr::cume_dist(x)
。
您可以使用 search()
检查 R 中的搜索路径。
迁移指南
迁移指南现已存档在此页面上。