SparkR (R on Spark)
- 概述
- SparkDataFrame
- 机器学习
- R 和 Spark 之间的数据类型映射
- 结构化流
- SparkR 中的 Apache Arrow
- R 函数名称冲突
- 迁移指南
概述
SparkR 是一个 R 包,它提供了一个轻量级的前端,用于从 R 使用 Apache Spark。在 Spark 3.5.1 中,SparkR 提供了一个分布式数据框实现,支持选择、过滤、聚合等操作(类似于 R 数据框,dplyr),但适用于大型数据集。SparkR 还支持使用 MLlib 进行分布式机器学习。
SparkDataFrame
SparkDataFrame 是一个分布式数据集合,组织成命名列。从概念上讲,它等同于关系数据库中的表或 R 中的数据框,但在幕后具有更丰富的优化。SparkDataFrame 可以从各种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据框。
本页上的所有示例都使用 R 或 Spark 分发版中包含的示例数据,可以使用 ./bin/sparkR
shell 运行。
启动:SparkSession
SparkR 的入口点是 SparkSession
,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session
创建 SparkSession
,并传入选项,例如应用程序名称、任何依赖的 Spark 包等。此外,您还可以通过 SparkSession
使用 SparkDataFrame。如果您在 sparkR
shell 中工作,则 SparkSession
应该已经为您创建,您不需要调用 sparkR.session
。
sparkR.session()
从 RStudio 启动
您也可以从 RStudio 启动 SparkR。您可以从 RStudio、R shell、Rscript 或其他 R IDE 将您的 R 程序连接到 Spark 集群。要开始,请确保 SPARK_HOME 在环境中设置(您可以检查 Sys.getenv),加载 SparkR 包,并按如下所示调用 sparkR.session
。它将检查 Spark 安装,如果未找到,它将自动下载并缓存。或者,您也可以手动运行 install.spark
。
除了调用 sparkR.session
之外,您还可以指定某些 Spark 驱动程序属性。通常,这些 应用程序属性 和 运行时环境 无法以编程方式设置,因为驱动程序 JVM 进程已经启动,在这种情况下,SparkR 会为您处理。要设置它们,请将它们作为其他配置属性传递给 sparkR.session()
中的 sparkConfig
参数。
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
以下 Spark 驱动程序属性可以在 sparkConfig
中使用 sparkR.session
从 RStudio 设置
属性名称 | 属性组 | spark-submit 等效项 |
---|---|---|
spark.master |
应用程序属性 | --master |
spark.kerberos.keytab |
应用程序属性 | --keytab |
spark.kerberos.principal |
应用程序属性 | --principal |
spark.driver.memory |
应用程序属性 | --driver-memory |
spark.driver.extraClassPath |
运行时环境 | --driver-class-path |
spark.driver.extraJavaOptions |
运行时环境 | --driver-java-options |
spark.driver.extraLibraryPath |
运行时环境 | --driver-library-path |
创建 SparkDataFrame
使用 SparkSession
,应用程序可以从本地 R 数据框、从 Hive 表 或从其他 数据源 创建 SparkDataFrame
。
从本地数据框
创建数据框最简单的方法是将本地 R 数据框转换为 SparkDataFrame。具体来说,我们可以使用 as.DataFrame
或 createDataFrame
并传入本地 R 数据框以创建 SparkDataFrame。例如,以下代码使用 R 中的 faithful
数据集创建了一个 SparkDataFrame
。
df <- as.DataFrame(faithful)
# Displays the first part of the SparkDataFrame
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
从数据源
SparkR 支持通过 SparkDataFrame
接口对各种数据源进行操作。本节介绍使用数据源加载和保存数据的通用方法。您可以查看 Spark SQL 编程指南以了解有关 特定选项 的更多信息,这些选项可用于内置数据源。
从数据源创建 SparkDataFrame 的通用方法是 read.df
。此方法接受要加载的文件的路径和数据源类型,并且当前活动的 SparkSession 将自动使用。SparkR 本机支持读取 JSON、CSV 和 Parquet 文件,并且通过来自 第三方项目 等来源的可用包,您可以找到针对流行文件格式(如 Avro)的数据源连接器。这些包可以通过使用 spark-submit
或 sparkR
命令指定 --packages
来添加,或者如果在交互式 R shell 或从 RStudio 中使用 sparkPackages
参数初始化 SparkSession。
sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.12:3.5.1")
我们可以看到如何使用示例 JSON 输入文件使用数据源。请注意,此处使用的文件 *不是* 典型的 JSON 文件。文件中的每一行都必须包含一个单独的、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式,也称为换行符分隔的 JSON。因此,常规的多行 JSON 文件通常会失败。
people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
## age name
##1 NA Michael
##2 30 Andy
##3 19 Justin
# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))
数据源 API 本机支持 CSV 格式的输入文件。有关更多信息,请参阅 SparkR read.df API 文档。
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
数据源 API 也可以用于将 SparkDataFrame 保存到多种文件格式。例如,我们可以使用 write.df
将上一个示例中的 SparkDataFrame 保存到 Parquet 文件。
write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
从 Hive 表
您还可以从 Hive 表创建 SparkDataFrame。为此,我们需要创建一个具有 Hive 支持的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该使用 Hive 支持 构建,有关更多详细信息,请参阅 SQL 编程指南。在 SparkR 中,默认情况下,它将尝试创建一个启用了 Hive 支持的 SparkSession (enableHiveSupport = TRUE
)。
sparkR.session()
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")
# results is now a SparkDataFrame
head(results)
## key value
## 1 238 val_238
## 2 86 val_86
## 3 311 val_311
SparkDataFrame 操作
SparkDataFrame 支持许多函数来执行结构化数据处理。这里我们包含一些基本示例,完整的列表可以在 API 文档中找到
选择行、列
# Create the SparkDataFrame
df <- as.DataFrame(faithful)
# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]
# Select only the "eruptions" column
head(select(df, df$eruptions))
## eruptions
##1 3.600
##2 1.800
##3 3.333
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
## eruptions waiting
##1 1.750 47
##2 1.750 47
##3 1.867 48
分组、聚合
SparkR 数据框支持许多常用的函数,用于在分组后聚合数据。例如,我们可以计算 faithful
数据集中 waiting
时间的直方图,如下所示
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
## waiting count
##1 70 4
##2 67 1
##3 69 2
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
## waiting count
##1 78 15
##2 83 14
##3 81 13
除了标准聚合之外,SparkR 还支持 OLAP 立方体 运算符 cube
head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 NA 140.8 4 22.8
##2 4 75.7 4 30.4
##3 8 400.0 3 19.2
##4 8 318.0 3 15.5
##5 NA 351.0 NA 15.8
##6 NA 275.8 NA 16.3
和 rollup
head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 4 75.7 4 30.4
##2 8 400.0 3 19.2
##3 8 318.0 3 15.5
##4 4 78.7 NA 32.4
##5 8 304.0 3 15.2
##6 4 79.0 NA 27.3
对列进行操作
SparkR 还提供了一些函数,可以直接应用于列,用于数据处理和聚合期间。以下示例展示了基本算术函数的使用。
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
应用用户定义函数
在 SparkR 中,我们支持几种用户定义函数
使用 dapply
或 dapplyCollect
在大型数据集上运行给定函数
dapply
将函数应用于 SparkDataFrame
的每个分区。应用于 SparkDataFrame
每个分区的函数应该只有一个参数,该参数将传递一个对应于每个分区的 data.frame
。函数的输出应该是一个 data.frame
。模式指定了结果 SparkDataFrame
的行格式。它必须与返回值的 数据类型 匹配。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
##4 2.283 62 3720
##5 4.533 85 5100
##6 2.883 55 3300
dapplyCollect
类似于 dapply
,将函数应用于 SparkDataFrame
的每个分区,并将结果收集回来。函数的输出应该是一个 data.frame
。但是,不需要传递模式。请注意,如果在所有分区上运行的 UDF 的输出无法被拉到驱动程序并适合驱动程序内存,则 dapplyCollect
可能会失败。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
使用 gapply
或 gapplyCollect
在大型数据集上运行给定函数,并按输入列分组
gapply
将函数应用于 SparkDataFrame
的每个组。该函数将应用于 SparkDataFrame
的每个组,并且应该只有两个参数:分组键和对应于该键的 R data.frame
。组是从 SparkDataFrame
的列中选择的。函数的输出应该是一个 data.frame
。模式指定了结果 SparkDataFrame
的行格式。它必须根据 Spark 数据类型 表示 R 函数的输出模式。返回的 data.frame
的列名由用户设置。
# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
gapplyCollect
类似于 gapply
,将函数应用于 SparkDataFrame
的每个分区,并将结果收集回 R data.frame。函数的输出应该是一个 data.frame
。但是,不需要传递模式。请注意,如果在所有分区上运行的 UDF 的输出无法被拉到驱动程序并适合驱动程序内存,则 gapplyCollect
可能会失败。
# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
使用 spark.lapply
运行分布式本地 R 函数
spark.lapply
类似于原生 R 中的 lapply
,spark.lapply
在元素列表上运行函数,并使用 Spark 分发计算。以类似于 doParallel
或 lapply
的方式将函数应用于列表的元素。所有计算的结果应该适合一台机器。如果不是这种情况,他们可以做类似于 df <- createDataFrame(list)
的事情,然后使用 dapply
# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# Print the summary of each model
print(model.summaries)
急切执行
如果启用了急切执行,则在创建 SparkDataFrame
时,数据将立即返回到 R 客户端。默认情况下,急切执行未启用,可以通过在启动 SparkSession
时将配置属性 spark.sql.repl.eagerEval.enabled
设置为 true
来启用它。
要显示的数据的最大行数和每列的最大字符数可以通过 spark.sql.repl.eagerEval.maxNumRows
和 spark.sql.repl.eagerEval.truncate
配置属性分别控制。这些属性仅在启用急切执行时有效。如果这些属性没有显式设置,默认情况下,将显示最多 20 行和每列最多 20 个字符的数据。
# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))
# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")
# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2
##+-------+-----+
##|waiting|count|
##+-------+-----+
##| 43.0| 1|
##| 45.0| 3|
##| 46.0| 5|
##| 47.0| 4|
##| 48.0| 3|
##| 49.0| 5|
##| 50.0| 5|
##| 51.0| 6|
##| 52.0| 5|
##| 53.0| 7|
##+-------+-----+
##only showing top 10 rows
请注意,要在 sparkR
shell 中启用急切执行,请将 spark.sql.repl.eagerEval.enabled=true
配置属性添加到 --conf
选项。
从 SparkR 运行 SQL 查询
SparkDataFrame 也可以在 Spark SQL 中注册为临时视图,这允许您在其数据上运行 SQL 查询。 sql
函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame
返回。
# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")
# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
##1 Justin
机器学习
算法
SparkR 目前支持以下机器学习算法
分类
spark.logit
:逻辑回归
spark.mlp
:多层感知器 (MLP)
spark.naiveBayes
:朴素贝叶斯
spark.svmLinear
:线性支持向量机
spark.fmClassifier
:分解机分类器
回归
spark.survreg
:加速失效时间 (AFT) 生存模型
spark.glm
或glm
:广义线性模型 (GLM)
spark.isoreg
:等距回归
spark.lm
:线性回归
spark.fmRegressor
:分解机回归器
树
spark.decisionTree
:决策树用于
回归
和
分类
spark.gbt
:梯度提升树用于
回归
和
分类
spark.randomForest
:随机森林用于
回归
和
分类
聚类
spark.bisectingKmeans
:二分 k 均值
spark.gaussianMixture
:高斯混合模型 (GMM)
spark.kmeans
:K 均值
spark.lda
:潜在狄利克雷分配 (LDA)
spark.powerIterationClustering (PIC)
:幂迭代聚类 (PIC)
协同过滤
频繁模式挖掘
统计
spark.kstest
:柯尔莫哥洛夫-斯米尔诺夫检验
在幕后,SparkR 使用 MLlib 训练模型。有关示例代码,请参阅 MLlib 用户指南的相应部分。用户可以调用 summary
打印拟合模型的摘要,predict 对新数据进行预测,以及 write.ml/read.ml 保存/加载拟合模型。SparkR 支持模型拟合中可用的 R 公式运算符的子集,包括“~”、“.”、“:”、“+”和“-”。
模型持久化
以下示例展示了如何通过 SparkR 保存/加载 MLlib 模型。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)
R 和 Spark 之间的数据类型映射
R | Spark |
---|---|
byte | byte |
integer | integer |
float | float |
double | double |
numeric | double |
character | string |
string | string |
binary | binary |
raw | binary |
logical | boolean |
POSIXct | timestamp |
POSIXlt | timestamp |
Date | date |
array | array |
list | array |
env | map |
结构化流
SparkR 支持结构化流式 API。结构化流式是一种可扩展且容错的流式处理引擎,构建在 Spark SQL 引擎之上。有关更多信息,请参阅 结构化流式编程指南 上的 R API
SparkR 中的 Apache Arrow
Apache Arrow 是一种内存中列式数据格式,用于在 Spark 中在 JVM 和 R 进程之间高效地传输数据。另请参阅 PySpark 完成的优化,PySpark 使用指南,用于使用 Apache Arrow 的 Pandas。本指南旨在解释如何在 SparkR 中使用 Arrow 优化,并提供一些关键要点。
确保安装 Arrow
Arrow R 库在 CRAN 上可用,可以按如下方式安装。
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
有关更多详细信息,请参阅 Apache Arrow 的官方文档。
请注意,您必须确保 Arrow R 包已安装并在所有集群节点上可用。当前支持的最低版本是 1.0.0;但是,由于 SparkR 中的 Arrow 优化是实验性的,因此这可能会在次要版本之间发生变化。
启用转换为/从 R DataFrame、dapply
和 gapply
当使用 collect(spark_df)
将 Spark DataFrame 转换为 R DataFrame,使用 createDataFrame(r_df)
从 R DataFrame 创建 Spark DataFrame,使用 dapply(...)
将 R 原生函数应用于每个分区,以及使用 gapply(...)
将 R 原生函数应用于分组数据时,可以使用 Arrow 优化。要使用这些功能中的 Arrow,用户需要先将 Spark 配置 ‘spark.sql.execution.arrow.sparkr.enabled’ 设置为 ‘true’。默认情况下,此功能处于禁用状态。
无论优化是否启用,SparkR 都能产生相同的结果。此外,当优化因任何原因在实际计算之前失败时,Spark DataFrame 和 R DataFrame 之间的转换会自动回退到非 Arrow 优化实现。
# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)
# Converts Spark DataFrame to an R DataFrame
collect(spark_df)
# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
# Apply an R native function to grouped data.
collect(gapply(spark_df,
"gear",
function(key, group) {
data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
},
structType("gear double, disp boolean")))
请注意,即使使用 Arrow,collect(spark_df)
也会导致将 DataFrame 中的所有记录收集到驱动程序程序中,因此应该只对数据的小子集执行此操作。此外,在 gapply(...)
和 dapply(...)
中指定的输出模式应与给定函数返回的 R DataFrame 相匹配。
支持的 SQL 类型
目前,Arrow 基于的转换支持所有 Spark SQL 数据类型,除了 FloatType
、BinaryType
、ArrayType
、StructType
和 MapType
。
R 函数名称冲突
在 R 中加载并附加新包时,可能会出现名称 冲突,其中一个函数会屏蔽另一个函数。
以下函数被 SparkR 包屏蔽
被屏蔽的函数 | 如何访问 |
---|---|
cov in package:stats |
|
filter in package:stats |
|
sample in package:base |
base::sample(x, size, replace = FALSE, prob = NULL) |
由于 SparkR 的一部分是根据 dplyr
包建模的,因此 SparkR 中的某些函数与 dplyr
中的函数具有相同的名称。根据这两个包的加载顺序,第一个加载的包中的某些函数会被第二个加载的包中的函数屏蔽。在这种情况下,请在这些调用之前加上包名称,例如 SparkR::cume_dist(x)
或 dplyr::cume_dist(x)
。
可以使用 search()
检查 R 中的搜索路径。
迁移指南
迁移指南现已存档 在此页面上。