SparkR - 实用指南
sparkr-vignettes.Rmd
从 Apache Spark 4.0.0 开始,SparkR 已被弃用,并将在未来版本中移除。
概述
SparkR 是一个 R 包,提供了一个轻量级前端,用于在 R 中使用 Apache Spark。从 Spark 4.0.0 开始,SparkR 提供了一个分布式数据框实现,支持数据处理操作,如选择、过滤、聚合等,以及使用 MLlib 进行分布式机器学习。
开始使用
我们首先从一个在本地机器上运行的示例开始,并概述 SparkR 的使用:数据摄取、数据处理和机器学习。
首先,让我们加载并附加包。
SparkSession
是 SparkR 的入口点,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session
创建一个 SparkSession
并传入选项,例如应用程序名称、任何依赖的 Spark 包等。
我们使用默认设置,它以本地模式运行。如果没有找到之前的安装,它会在后台自动下载 Spark 包。有关设置的更多详细信息,请参阅 Spark 会话。
## Java ref type org.apache.spark.sql.classic.SparkSession id 1
SparkR 中的操作围绕一个名为 SparkDataFrame
的 R 类展开。它是一个分布式数据集合,数据按命名列组织,概念上等同于关系数据库中的表或 R 中的数据框,但在底层具有更丰富的优化。
SparkDataFrame
可以从多种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据框。例如,我们从一个本地 R 数据框创建一个 SparkDataFrame
,
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
我们可以通过 head
或 showDF
函数查看 SparkDataFrame
的前几行。
head(carsDF)
## model mpg cyl disp hp drat wt qsec vs am gear carb
## 1 Mazda RX4 21.0 6 160 110 3.90 2.620 16.46 0 1 4 4
## 2 Mazda RX4 Wag 21.0 6 160 110 3.90 2.875 17.02 0 1 4 4
## 3 Datsun 710 22.8 4 108 93 3.85 2.320 18.61 1 1 4 1
## 4 Hornet 4 Drive 21.4 6 258 110 3.08 3.215 19.44 1 0 3 1
## 5 Hornet Sportabout 18.7 8 360 175 3.15 3.440 17.02 0 0 3 2
## 6 Valiant 18.1 6 225 105 2.76 3.460 20.22 1 0 3 1
SparkDataFrame
支持常见的数据处理操作,例如 filter
和 select
。
carsSubDF <- select(carsDF, "model", "mpg", "hp")
carsSubDF <- filter(carsSubDF, carsSubDF$hp >= 200)
head(carsSubDF)
## model mpg hp
## 1 Duster 360 14.3 245
## 2 Cadillac Fleetwood 10.4 205
## 3 Lincoln Continental 10.4 215
## 4 Chrysler Imperial 14.7 230
## 5 Camaro Z28 13.3 245
## 6 Ford Pantera L 15.8 264
SparkR 可以在分组后使用许多常见的聚合函数。
## gear count
## 1 4 12
## 2 3 15
## 3 5 5
结果 carsDF
和 carsSubDF
都是 SparkDataFrame
对象。要转换回 R data.frame
,我们可以使用 collect
。注意:这可能会导致您的交互式环境内存不足,因为 collect()
会将整个分布式 DataFrame
获取到您的客户端,该客户端充当 Spark 驱动程序。
## [1] "data.frame"
SparkR 支持多种常用的机器学习算法。在底层,SparkR 使用 MLlib 训练模型。用户可以调用 summary
打印拟合模型的摘要,使用 predict
对新数据进行预测,以及使用 write.ml
/read.ml
保存/加载拟合模型。
SparkR 支持一部分 R 公式运算符用于模型拟合,包括 ‘~’、‘.’、‘:’、‘+’ 和 ‘-’。我们以线性回归为例。
model <- spark.glm(carsDF, mpg ~ wt + cyl)
该结果与应用于 carsDF
对应的 data.frame
mtcars
的 R glm
函数返回的结果相匹配。事实上,对于广义线性模型 (Generalized Linear Model),我们也专门为 SparkDataFrame
暴露了 glm
,因此上述操作等同于 model <- glm(mpg ~ wt + cyl, data = carsDF)
。
summary(model)
##
## Deviance Residuals:
## (Note: These are approximate quantiles with relative error <= 0.01)
## Min 1Q Median 3Q Max
## -4.2893 -1.7085 -0.4713 1.5729 6.1004
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 39.6863 1.71498 23.1409 0.00000000
## wt -3.1910 0.75691 -4.2158 0.00022202
## cyl -1.5078 0.41469 -3.6360 0.00106428
##
## (Dispersion parameter for gaussian family taken to be 6.592137)
##
## Null deviance: 1126.05 on 31 degrees of freedom
## Residual deviance: 191.17 on 29 degrees of freedom
## AIC: 156
##
## Number of Fisher Scoring iterations: 1
模型可以通过 write.ml
保存,并使用 read.ml
加载回来。
write.ml(model, path = "/HOME/tmp/mlModel/glmModel")
最后,我们可以通过运行以下命令停止 Spark 会话
设置
安装
与许多其他 R 包不同,要使用 SparkR,您需要额外安装 Apache Spark。Spark 安装将用于运行后端进程,该进程将编译和执行 SparkR 程序。
安装 SparkR 包后,您可以按照上一节的说明调用 sparkR.session
来启动,它将检查 Spark 安装。如果您从交互式 shell(例如 R、RStudio)中使用 SparkR,那么如果未找到 Spark,它将自动下载并缓存。另外,我们提供了一个易于使用的函数 install.spark
用于手动运行此操作。如果您计算机上没有安装 Spark,可以从 Apache Spark 网站下载。
如果您已经安装了 Spark,则无需再次安装,可以将 sparkHome
参数传递给 sparkR.session
,让 SparkR 知道现有 Spark 安装的位置。
sparkR.session(sparkHome = "/HOME/spark")
Spark 会话
除了 sparkHome
,sparkR.session
中还可以指定许多其他选项。有关完整列表,请参阅 启动:SparkSession 和 SparkR API 文档。
特别是,以下 Spark 驱动程序属性可以在 sparkConfig
中设置。
属性名称 | 属性组 | spark-submit 等效项 |
---|---|---|
spark.driver.memory |
应用程序属性 | --driver-memory |
spark.driver.extraClassPath |
运行时环境 | --driver-class-path |
spark.driver.extraJavaOptions |
运行时环境 | --driver-java-options |
spark.driver.extraLibraryPath |
运行时环境 | --driver-library-path |
spark.kerberos.keytab |
应用程序属性 | --keytab |
spark.kerberos.principal |
应用程序属性 | --principal |
对于 Windows 用户:由于不同操作系统之间文件前缀不同,为避免潜在的前缀错误问题,当前的解决方法是在启动 SparkSession
时指定 spark.sql.warehouse.dir
。
spark_warehouse_path <- file.path(path.expand('~'), "spark-warehouse")
sparkR.session(spark.sql.warehouse.dir = spark_warehouse_path)
集群模式
SparkR 可以连接到远程 Spark 集群。集群模式概述 是对不同 Spark 集群模式的良好介绍。
将 SparkR 连接到远程 Spark 集群时,请确保机器上的 Spark 版本和 Hadoop 版本与集群上的对应版本匹配。当前 SparkR 包兼容
## [1] "Spark 4.0.0"
它应该同时用于本地计算机和远程集群。
要连接,请将主节点的 URL 传递给 sparkR.session
。完整列表可在 Spark Master URL 中查看。例如,要连接到本地独立 Spark 主节点,我们可以调用
sparkR.session(master = "spark://local:7077")
对于 YARN 集群,SparkR 支持客户端模式,主节点设置为“yarn”。
sparkR.session(master = "yarn")
当前版本不支持 Yarn 集群模式。
数据导入
本地数据框
最简单的方法是将本地 R 数据框转换为 SparkDataFrame
。具体来说,我们可以使用 as.DataFrame
或 createDataFrame
并传入本地 R 数据框来创建一个 SparkDataFrame
。例如,以下代码使用 R 中的 faithful
数据集创建一个 SparkDataFrame
。
df <- as.DataFrame(faithful)
head(df)
## eruptions waiting
## 1 3.600 79
## 2 1.800 54
## 3 3.333 74
## 4 2.283 62
## 5 4.533 85
## 6 2.883 55
数据源
SparkR 通过 SparkDataFrame
接口支持对各种数据源进行操作。您可以查看 Spark SQL 编程指南,了解内置数据源可用的更多 特定选项。
从数据源创建 SparkDataFrame
的通用方法是 read.df
。此方法接受要加载的文件路径和数据源类型,并且将自动使用当前活动的 Spark 会话。SparkR 原生支持读取 CSV、JSON 和 Parquet 文件,并且通过 Spark Packages 您可以找到 Avro 等流行文件格式的数据源连接器。在使用 sparkR.session
初始化 SparkSession 时,可以使用 sparkPackages
参数添加这些包。
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.12:3.0.0")
我们可以通过一个 CSV 输入文件示例来了解如何使用数据源。欲了解更多信息,请参阅 SparkR read.df API 文档。
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
数据源 API 原生支持 JSON 格式的输入文件。请注意,此处使用的文件并非典型的 JSON 文件。文件中的每一行都必须包含一个单独的、自包含的有效 JSON 对象。因此,常规的多行 JSON 文件通常会失败。
让我们看一下此处使用的原始 JSON 文件的前两行。
filePath <- paste0(sparkR.conf("spark.home"),
"/examples/src/main/resources/people.json")
readLines(filePath, n = 2L)
## [1] "{\"name\":\"Michael\"}" "{\"name\":\"Andy\", \"age\":30}"
我们使用 read.df
将其读入 SparkDataFrame
。
## [1] 3
head(people)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
SparkR 自动从 JSON 文件推断模式。
printSchema(people)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
如果我们要读取多个 JSON 文件,可以使用 read.json
。
people <- read.json(paste0(Sys.getenv("SPARK_HOME"),
c("/examples/src/main/resources/people.json",
"/examples/src/main/resources/people.json")))
count(people)
## [1] 6
数据源 API 还可以用于将 SparkDataFrames
保存为多种文件格式。例如,我们可以使用 write.df
将前面示例中的 SparkDataFrame
保存到 Parquet 文件中。
write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
Hive 表
您还可以从 Hive 表创建 SparkDataFrames。为此,我们需要创建一个支持 Hive 的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该已经构建了 Hive 支持,更多详细信息可以在 SQL 编程指南中找到。在 SparkR 中,默认情况下它会尝试创建一个启用 Hive 支持的 SparkSession (enableHiveSupport = TRUE
)。
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
txtPath <- paste0(sparkR.conf("spark.home"), "/examples/src/main/resources/kv1.txt")
sqlCMD <- sprintf("LOAD DATA LOCAL INPATH '%s' INTO TABLE src", txtPath)
sql(sqlCMD)
results <- sql("FROM src SELECT key, value")
# results is now a SparkDataFrame
head(results)
数据处理
致 dplyr 用户:SparkR 在数据处理方面与 dplyr 具有相似的接口。然而,首先值得一提的是一些显著差异。我们在这里使用 df
代表 SparkDataFrame
,使用 col
代表列名。
指示列。SparkR 使用列名的字符串或使用
$
构造的 Column 对象来指示列。例如,要在df
中选择col
,我们可以写select(df, "col")
或select(df, df$col)
。描述条件。在 SparkR 中,Column 对象表示可以直接插入到条件中,或者我们可以使用字符串来描述条件,而无需引用所使用的
SparkDataFrame
。例如,要选择值 > 1 的行,我们可以写filter(df, df$col > 1)
或filter(df, "col > 1")
。
这里有更具体的例子。
dplyr | SparkR |
---|---|
select(mtcars, mpg, hp) |
select(carsDF, "mpg", "hp") |
filter(mtcars, mpg > 20, hp > 100) |
filter(carsDF, carsDF$mpg > 20, carsDF$hp > 100) |
其他差异将在具体方法中提及。
我们使用上面创建的 SparkDataFrame
carsDF
。我们可以获取关于 SparkDataFrame
的基本信息。
carsDF
## SparkDataFrame[model:string, mpg:double, cyl:double, disp:double, hp:double, drat:double, wt:double, qsec:double, vs:double, am:double, gear:double, carb:double]
以树状格式打印模式。
printSchema(carsDF)
## root
## |-- model: string (nullable = true)
## |-- mpg: double (nullable = true)
## |-- cyl: double (nullable = true)
## |-- disp: double (nullable = true)
## |-- hp: double (nullable = true)
## |-- drat: double (nullable = true)
## |-- wt: double (nullable = true)
## |-- qsec: double (nullable = true)
## |-- vs: double (nullable = true)
## |-- am: double (nullable = true)
## |-- gear: double (nullable = true)
## |-- carb: double (nullable = true)
SparkDataFrame 操作
选择行、列
SparkDataFrames 支持许多用于结构化数据处理的函数。这里我们包含一些基本示例,完整列表可在 API 文档中找到
您也可以将列名作为字符串传入。
## mpg
## 1 21.0
## 2 21.0
## 3 22.8
## 4 21.4
## 5 18.7
## 6 18.1
过滤 SparkDataFrame,只保留每加仑英里数 (mpg) 小于 20 英里/加仑的行。
## model mpg cyl disp hp drat wt qsec vs am gear carb
## 1 Hornet Sportabout 18.7 8 360.0 175 3.15 3.44 17.02 0 0 3 2
## 2 Valiant 18.1 6 225.0 105 2.76 3.46 20.22 1 0 3 1
## 3 Duster 360 14.3 8 360.0 245 3.21 3.57 15.84 0 0 3 4
## 4 Merc 280 19.2 6 167.6 123 3.92 3.44 18.30 1 0 4 4
## 5 Merc 280C 17.8 6 167.6 123 3.92 3.44 18.90 1 0 4 4
## 6 Merc 450SE 16.4 8 275.8 180 3.07 4.07 17.40 0 0 3 3
分组、聚合
分组和聚合的常见流程是
使用
groupBy
或group_by
并结合一些分组变量来创建一个GroupedData
对象将
GroupedData
对象传递给agg
或summarize
函数,并使用一些提供的聚合函数计算每个组内的数值。
分组后支持许多广泛使用的聚合函数,包括 avg
、count_distinct
、count
、first
、kurtosis
、last
、max
、mean
、min
、sd
、skewness
、stddev_pop
、stddev_samp
、sum_distinct
、sum
、var_pop
、var_samp
、var
。请参阅此处链接的聚合函数 API 文档。
例如,我们可以计算 mtcars
数据集中汽缸数量的直方图,如下所示。
## cyl count
## 1 8 14
## 2 4 11
## 3 6 7
使用 cube
或 rollup
计算多维度的小计。
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]
生成 {(cyl
, gear
, am
), (cyl
, gear
), (cyl
), ()} 的分组,而
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]
生成分组列的所有可能组合的分组。
列操作
SparkR 还提供了许多可以直接应用于列的函数,用于数据处理和聚合。以下示例展示了基本算术函数的使用。
carsDF_km <- carsDF
carsDF_km$kmpg <- carsDF_km$mpg * 1.61
head(select(carsDF_km, "model", "mpg", "kmpg"))
## model mpg kmpg
## 1 Mazda RX4 21.0 33.810
## 2 Mazda RX4 Wag 21.0 33.810
## 3 Datsun 710 22.8 36.708
## 4 Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6 Valiant 18.1 29.141
窗口函数
窗口函数是聚合函数的一种变体。简单来说,
聚合函数:
n
到1
的映射 - 为一组条目返回单个值。示例包括sum
、count
、max
。窗口函数:
n
到n
的映射 - 为组中的每个条目返回一个值,但该值可能取决于组中的所有条目。示例包括rank
、lead
、lag
。
形式上,上面提到的组被称为帧。每个输入行都可以有一个与之关联的唯一帧,并且该行上的窗口函数的输出基于该帧中包含的行。
窗口函数通常与以下函数结合使用:windowPartitionBy
、windowOrderBy
、partitionBy
、orderBy
、over
。为了说明这一点,我们接下来看一个示例。
我们仍然使用 mtcars
数据集。对应的 SparkDataFrame
是 carsDF
。假设对于每个汽缸数量,我们想计算组内每辆车在 mpg
中的排名。
carsSubDF <- select(carsDF, "model", "mpg", "cyl")
ws <- orderBy(windowPartitionBy("cyl"), "mpg")
carsRank <- withColumn(carsSubDF, "rank", over(rank(), ws))
head(carsRank, n = 20L)
## model mpg cyl rank
## 1 Volvo 142E 21.4 4 1
## 2 Toyota Corona 21.5 4 2
## 3 Datsun 710 22.8 4 3
## 4 Merc 230 22.8 4 3
## 5 Merc 240D 24.4 4 5
## 6 Porsche 914-2 26.0 4 6
## 7 Fiat X1-9 27.3 4 7
## 8 Honda Civic 30.4 4 8
## 9 Lotus Europa 30.4 4 8
## 10 Fiat 128 32.4 4 10
## 11 Toyota Corolla 33.9 4 11
## 12 Merc 280C 17.8 6 1
## 13 Valiant 18.1 6 2
## 14 Merc 280 19.2 6 3
## 15 Ferrari Dino 19.7 6 4
## 16 Mazda RX4 21.0 6 5
## 17 Mazda RX4 Wag 21.0 6 5
## 18 Hornet 4 Drive 21.4 6 7
## 19 Cadillac Fleetwood 10.4 8 1
## 20 Lincoln Continental 10.4 8 1
我们详细解释上述步骤。
-
windowPartitionBy
创建一个定义分区的窗口规范对象WindowSpec
。它控制哪些行将与给定行在同一分区中。在这种情况下,cyl
值相同的行将被放入同一分区中。orderBy
进一步定义了排序 - 给定行在分区中的位置。结果WindowSpec
将作为ws
返回。
更多的窗口规范方法包括 rangeBetween
(按值定义帧边界)和 rowsBetween
(按行索引定义边界)。
-
withColumn
将一个名为rank
的 Column 附加到SparkDataFrame
。over
返回一个窗口列。第一个参数通常是窗口函数(例如rank()
,lead(carsDF$wt)
)返回的 Column。它根据分区和排序后的表计算相应的值。
用户定义函数
在 SparkR 中,我们支持多种用户定义函数 (UDF)。
按分区应用
dapply
可以将函数应用于 SparkDataFrame
的每个分区。应用于 SparkDataFrame
每个分区的函数应该只有一个参数,即对应于一个分区的 data.frame
,并且输出也应该是一个 data.frame
。模式(Schema)指定了结果 SparkDataFrame
的行格式。它必须与返回值的 DataTypes 匹配。有关 R 和 Spark 之间映射的信息,请参阅此处。
我们将 mpg
转换为 kmpg
(每加仑公里数)。carsSubDF
是一个包含 carsDF
列子集的 SparkDataFrame
。
carsSubDF <- select(carsDF, "model", "mpg")
schema <- "model STRING, mpg DOUBLE, kmpg DOUBLE"
out <- dapply(carsSubDF, function(x) { x <- cbind(x, x$mpg * 1.61) }, schema)
head(collect(out))
## model mpg kmpg
## 1 Mazda RX4 21.0 33.810
## 2 Mazda RX4 Wag 21.0 33.810
## 3 Datsun 710 22.8 36.708
## 4 Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6 Valiant 18.1 29.141
与 dapply
类似,dapplyCollect
可以将函数应用于 SparkDataFrame
的每个分区并收集结果。函数的输出应该是一个 data.frame
,但在此情况下不需要模式。请注意,如果 UDF 在所有分区上的输出无法拉取到驱动程序内存中,则 dapplyCollect
可能会失败。
out <- dapplyCollect(
carsSubDF,
function(x) {
x <- cbind(x, "kmpg" = x$mpg * 1.61)
})
head(out, 3)
## model mpg kmpg
## 1 Mazda RX4 21.0 33.810
## 2 Mazda RX4 Wag 21.0 33.810
## 3 Datsun 710 22.8 36.708
按组应用
gapply
可以将函数应用于 SparkDataFrame
的每个组。要应用于 SparkDataFrame
每个组的函数应该只有两个参数:分组键和与该键对应的 R data.frame
。组从 SparkDataFrames
的列中选择。函数的输出应该是一个 data.frame
。模式(Schema)指定了结果 SparkDataFrame
的行格式。它必须根据 Spark 数据类型表示 R 函数的输出模式。返回的 data.frame
的列名由用户设置。有关 R 和 Spark 之间映射的信息,请参阅此处。
schema <- structType(structField("cyl", "double"), structField("max_mpg", "double"))
result <- gapply(
carsDF,
"cyl",
function(key, x) {
y <- data.frame(key, max(x$mpg))
},
schema)
head(arrange(result, "max_mpg", decreasing = TRUE))
## cyl max_mpg
## 1 4 33.9
## 2 6 21.4
## 3 8 19.2
与 gapply
类似,gapplyCollect
可以将函数应用于 SparkDataFrame
的每个分区并将结果收集回 R data.frame
。函数的输出应该是一个 data.frame
,但在此情况下不需要模式。请注意,如果 UDF 在所有分区上的输出无法拉取到驱动程序内存中,则 gapplyCollect
可能会失败。
result <- gapplyCollect(
carsDF,
"cyl",
function(key, x) {
y <- data.frame(key, max(x$mpg))
colnames(y) <- c("cyl", "max_mpg")
y
})
head(result[order(result$max_mpg, decreasing = TRUE), ])
## cyl max_mpg
## 1 4 33.9
## 2 6 21.4
## 3 8 19.2
分发本地函数
与原生 R 中的 lapply
类似,spark.lapply
对元素列表运行函数,并使用 Spark 分布计算。spark.lapply
的工作方式类似于 doParallel
或 lapply
对列表元素的操作。所有计算结果应该适合单台机器。如果不是这种情况,您可以执行类似 df <- createDataFrame(list)
的操作,然后使用 dapply
。
我们以 e1071
包中的 svm
为例。我们使用所有默认设置,除了改变约束违反的成本。spark.lapply
可以并行训练这些不同的模型。
costs <- exp(seq(from = log(1), to = log(1000), length.out = 5))
train <- function(cost) {
stopifnot(requireNamespace("e1071", quietly = TRUE))
model <- e1071::svm(Species ~ ., data = iris, cost = cost)
summary(model)
}
返回模型摘要的列表。
model.summaries <- spark.lapply(costs, train)
class(model.summaries)
## [1] "list"
为避免显示过长,我们只展示第二个拟合模型的部分结果。您也可以随意检查其他模型。
print(model.summaries[[2]])
## $call
## svm(formula = Species ~ ., data = iris, cost = cost)
##
## $type
## [1] 0
##
## $kernel
## [1] 2
##
## $cost
## [1] 5.623413
##
## $degree
## [1] 3
##
## $gamma
## [1] 0.25
##
## $coef0
## [1] 0
##
## $nu
## [1] 0.5
##
## $epsilon
## [1] 0.1
##
## $sparse
## [1] FALSE
##
## $scaled
## [1] TRUE TRUE TRUE TRUE
##
## $x.scale
## $x.scale$`scaled:center`
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## 5.843333 3.057333 3.758000 1.199333
##
## $x.scale$`scaled:scale`
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## 0.8280661 0.4358663 1.7652982 0.7622377
##
##
## $y.scale
## NULL
##
## $nclasses
## [1] 3
##
## $levels
## [1] "setosa" "versicolor" "virginica"
##
## $tot.nSV
## [1] 35
##
## $nSV
## [1] 6 15 14
##
## $labels
## [1] 1 2 3
##
## $SV
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## 14 -1.86378030 -0.13153881 -1.5056946 -1.4422448
## 16 -0.17309407 3.08045544 -1.2791040 -1.0486668
## 21 -0.53538397 0.78617383 -1.1658087 -1.3110521
## 23 -1.50149039 1.24503015 -1.5623422 -1.3110521
## 24 -0.89767388 0.55674567 -1.1658087 -0.9174741
## 42 -1.62225369 -1.73753594 -1.3923993 -1.1798595
## 51 1.39682886 0.32731751 0.5336209 0.2632600
## 53 1.27606556 0.09788935 0.6469162 0.3944526
## 54 -0.41462067 -1.73753594 0.1370873 0.1320673
## 55 0.79301235 -0.59039513 0.4769732 0.3944526
## [ reached getOption("max.print") -- omitted 25 rows ]
##
## $index
## [1] 14 16 21 23 24 42 51 53 54 55 58 61 69 71 73 78 79 84 85
## [20] 86 99 107 111 119 120 124 127 128 130 132 134 135 139 149 150
##
## $rho
## [1] -0.10346530 0.12160294 -0.09540346
##
## $compprob
## [1] FALSE
##
## $probA
## NULL
##
## $probB
## NULL
##
## $sigma
## NULL
##
## $coefs
## [,1] [,2]
## [1,] 0.00000000 0.06561739
## [2,] 0.76813720 0.93378721
## [3,] 0.00000000 0.12123270
## [4,] 0.00000000 0.31170741
## [5,] 1.11614066 0.46397392
## [6,] 1.88141600 1.10392128
## [7,] -0.55872622 0.00000000
## [8,] 0.00000000 5.62341325
## [9,] 0.00000000 0.27711792
## [10,] 0.00000000 5.28440007
## [11,] -1.06596713 0.00000000
## [12,] -0.57076709 1.09019756
## [13,] -0.03365904 5.62341325
## [14,] 0.00000000 5.62341325
## [15,] 0.00000000 5.62341325
## [16,] 0.00000000 5.62341325
## [17,] 0.00000000 4.70398738
## [18,] 0.00000000 5.62341325
## [19,] 0.00000000 4.97981371
## [20,] -0.77497987 0.00000000
## [ reached getOption("max.print") -- omitted 15 rows ]
##
## $na.action
## NULL
##
## $xlevels
## named list()
##
## $fitted
## 1 2 3 4 5 6 7 8 9 10 11
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa
## 12 13 14 15 16 17 18 19 20 21 22
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa
## 23 24 25 26 27 28 29 30 31 32 33
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa
## 34 35 36 37 38 39 40
## setosa setosa setosa setosa setosa setosa setosa
## [ reached getOption("max.print") -- omitted 110 entries ]
## Levels: setosa versicolor virginica
##
## $decision.values
## setosa/versicolor setosa/virginica versicolor/virginica
## 1 1.1911739 1.0908424 1.1275805
## 2 1.1336557 1.0619543 1.3260964
## 3 1.2085065 1.0698101 1.0511345
## 4 1.1646153 1.0505915 1.0806874
## 5 1.1880814 1.0950348 0.9542815
## 6 1.0990761 1.0984626 0.9326361
## 7 1.1573474 1.0343287 0.9726843
## 8 1.1851598 1.0815750 1.2206802
## 9 1.1673499 1.0406734 0.8837945
## 10 1.1629911 1.0560925 1.2430067
## 11 1.1339282 1.0803946 1.0338357
## 12 1.1724182 1.0641469 1.1190423
## 13 1.1827355 1.0667956 1.1414844
## [ reached getOption("max.print") -- omitted 137 rows ]
##
## $terms
## Species ~ Sepal.Length + Sepal.Width + Petal.Length + Petal.Width
## attr(,"variables")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"factors")
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## Species 0 0 0 0
## Sepal.Length 1 0 0 0
## Sepal.Width 0 1 0 0
## Petal.Length 0 0 1 0
## Petal.Width 0 0 0 1
## attr(,"term.labels")
## [1] "Sepal.Length" "Sepal.Width" "Petal.Length" "Petal.Width"
## attr(,"order")
## [1] 1 1 1 1
## attr(,"intercept")
## [1] 0
## attr(,"response")
## [1] 1
## attr(,".Environment")
## <environment: 0x5580e90fd1d8>
## attr(,"predvars")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"dataClasses")
## Species Sepal.Length Sepal.Width Petal.Length Petal.Width
## "factor" "numeric" "numeric" "numeric" "numeric"
##
## attr(,"class")
## [1] "summary.svm"
SQL 查询
SparkDataFrame
也可以在 Spark SQL 中注册为临时视图,以便可以对其数据运行 SQL 查询。sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame
返回。
people <- read.df(paste0(sparkR.conf("spark.home"),
"/examples/src/main/resources/people.json"), "json")
将此 SparkDataFrame
注册为临时视图。
createOrReplaceTempView(people, "people")
SQL 语句可以使用 sql 方法运行。
## name
## 1 Justin
机器学习
SparkR 支持以下机器学习模型和算法。
训练集和测试集
我们可以通过 randomSplit
函数轻松地将 SparkDataFrame
分割成随机的训练集和测试集。它返回一个包含给定 weights
的分割 SparkDataFrames
列表。我们以 carsDF
为例,希望大约有训练数据和测试数据。
splitDF_list <- randomSplit(carsDF, c(0.7, 0.3), seed = 0)
carsDF_train <- splitDF_list[[1]]
carsDF_test <- splitDF_list[[2]]
count(carsDF_train)
## [1] 24
head(carsDF_train)
## model mpg cyl disp hp drat wt qsec vs am gear carb
## 1 Cadillac Fleetwood 10.4 8 472 205 2.93 5.250 17.98 0 0 3 4
## 2 Camaro Z28 13.3 8 350 245 3.73 3.840 15.41 0 0 3 4
## 3 Chrysler Imperial 14.7 8 440 230 3.23 5.345 17.42 0 0 3 4
## 4 Dodge Challenger 15.5 8 318 150 2.76 3.520 16.87 0 0 3 2
## 5 Duster 360 14.3 8 360 245 3.21 3.570 15.84 0 0 3 4
## 6 Ferrari Dino 19.7 6 145 175 3.62 2.770 15.50 0 1 5 6
count(carsDF_test)
## [1] 8
head(carsDF_test)
## model mpg cyl disp hp drat wt qsec vs am gear carb
## 1 AMC Javelin 15.2 8 304.0 150 3.15 3.435 17.30 0 0 3 2
## 2 Datsun 710 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 3 Fiat 128 32.4 4 78.7 66 4.08 2.200 19.47 1 1 4 1
## 4 Merc 240D 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 5 Merc 280 19.2 6 167.6 123 3.92 3.440 18.30 1 0 4 4
## 6 Toyota Corolla 33.9 4 71.1 65 4.22 1.835 19.90 1 1 4 1
模型和算法
线性支持向量机 (SVM) 分类器
线性支持向量机 (SVM) 分类器是一种具有线性核的 SVM 分类器。这是一种二元分类器。我们用一个简单的例子来展示如何使用 spark.svmLinear
进行二元分类。
# load training data and create a DataFrame
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Linear SVM classifier model
model <- spark.svmLinear(training, Survived ~ ., regParam = 0.01, maxIter = 10)
summary(model)
## $coefficients
## Estimate
## (Intercept) 0.993131388
## Class_1st -0.386500359
## Class_2nd -0.622627816
## Class_3rd -0.204446602
## Sex_Female -0.589950309
## Age_Adult 0.741676902
## Freq -0.006582887
##
## $numClasses
## [1] 2
##
## $numFeatures
## [1] 6
在训练数据上预测值
prediction <- predict(model, training)
head(select(prediction, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No Yes
## 2 2nd Male Child 0 No Yes
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No Yes
## 5 1st Female Child 0 No Yes
## 6 2nd Female Child 0 No No
逻辑回归
逻辑回归 是一种在响应变量为分类变量时广泛使用的模型。它可以被视为 广义线性预测模型 的特例。我们在 spark.glm
之上提供了 spark.logit
,以支持具有高级超参数的逻辑回归。它支持带弹性网络正则化和特征标准化的二元和多类分类,类似于 glmnet
。
我们使用一个简单的例子来演示 spark.logit
的用法。通常,使用 spark.logit
有三个步骤:1). 从适当的数据源创建数据框;2). 使用 spark.logit
和适当的参数设置拟合逻辑回归模型;3). 使用 summary
获取拟合模型的系数矩阵,并使用 predict
进行预测。
二项逻辑回归
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.logit(training, Survived ~ ., regParam = 0.04741301)
summary(model)
## $coefficients
## Estimate
## (Intercept) 0.2255014282
## Class_1st -0.1338856652
## Class_2nd -0.1479826947
## Class_3rd 0.0005674937
## Sex_Female -0.2011183871
## Age_Adult 0.3263186885
## Freq -0.0033111157
在训练数据上预测值
fitted <- predict(model, training)
head(select(fitted, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No Yes
## 2 2nd Male Child 0 No Yes
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No Yes
## 5 1st Female Child 0 No No
## 6 2nd Female Child 0 No No
针对三个类别的多项逻辑回归
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# Note in this case, Spark infers it is multinomial logistic regression, so family = "multinomial" is optional.
model <- spark.logit(training, Class ~ ., regParam = 0.07815179)
summary(model)
## $coefficients
## 1st 2nd 3rd Crew
## (Intercept) 0.051662845 0.062998145 -0.039083689 -0.075577300
## Sex_Female -0.088030587 -0.102528148 0.059233106 0.131325629
## Age_Adult 0.141935316 0.169492058 -0.102562719 -0.208864654
## Survived_No 0.052721020 0.057980057 -0.029408423 -0.081292653
## Freq -0.001555912 -0.001970377 0.001303836 0.002222453
多层感知器
多层感知器分类器 (MLPC) 是一种基于前馈人工神经网络的分类器。MLPC 由多层节点组成。网络中的每一层都与下一层完全连接。输入层中的节点代表输入数据。所有其他节点通过输入与节点权重的线性组合将输入映射到输出和偏差并应用激活函数。这可以以矩阵形式写出,对于 MLPC 具有层,如下所示
中间层中的节点使用 Sigmoid (逻辑) 函数
输出层中的节点使用 Softmax 函数
节点的数量在输出层中对应于类别的数量。
MLPC 采用反向传播来学习模型。我们使用逻辑损失函数进行优化,并使用 L-BFGS 作为优化例程。
spark.mlp
需要 data
中至少有两列:一列名为 "label"
,另一列名为 "features"
。"features"
列应为 libSVM 格式。
我们使用 Titanic 数据集来展示如何在分类中使用 spark.mlp
。
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Multilayer Perceptron Classification Model
model <- spark.mlp(training, Survived ~ Age + Sex, blockSize = 128, layers = c(2, 2), solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1, initialWeights = c( 0, 0, 5, 5, 9, 9))
为避免显示过长,我们只展示模型摘要的部分结果。您可以在 SparkR shell 中查看完整结果。
# check the summary of the fitted model
summary(model)
## $numOfInputs
## [1] 2
##
## $numOfOutputs
## [1] 2
##
## $layers
## [1] 2 2
##
## $weights
## $weights[[1]]
## [1] 0
##
## $weights[[2]]
## [1] 0
##
## $weights[[3]]
## [1] 5
##
## $weights[[4]]
## [1] 5
##
## $weights[[5]]
## [1] 9
##
## $weights[[6]]
## [1] 9
# make predictions use the fitted model
predictions <- predict(model, training)
head(select(predictions, predictions$prediction))
## prediction
## 1 No
## 2 No
## 3 No
## 4 No
## 5 No
## 6 No
朴素贝叶斯
朴素贝叶斯模型假设特征之间相互独立。spark.naiveBayes
对 SparkDataFrame 拟合 伯努利朴素贝叶斯模型。数据应全部是分类的。这些模型常用于文档分类。
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
naiveBayesModel <- spark.naiveBayes(titanicDF, Survived ~ Class + Sex + Age)
summary(naiveBayesModel)
## $apriori
## Yes No
## [1,] 0.5769231 0.4230769
##
## $tables
## Class_3rd Class_1st Class_2nd Sex_Female Age_Adult
## Yes 0.3125 0.3125 0.3125 0.5 0.5625
## No 0.4166667 0.25 0.25 0.5 0.75
naiveBayesPrediction <- predict(naiveBayesModel, titanicDF)
head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction"))
## Class Sex Age Survived prediction
## 1 3rd Male Child No Yes
## 2 3rd Female Child No Yes
## 3 1st Male Adult No Yes
## 4 2nd Male Adult No Yes
## 5 3rd Male Adult No No
## 6 Crew Male Adult No Yes
因子分解机分类器
用于分类问题的因子分解机。
有关因子分解机实现背景和详细信息,请参阅因子分解机部分。
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.fmClassifier(training, Survived ~ Age + Sex)
summary(model)
## $coefficients
## Estimate
## (Intercept) 0.0064275991
## Age_Adult 0.0001294448
## Sex_Female 0.0001294448
##
## $factors
## [,1] [,2] [,3] [,4] [,5] [,6]
## [1,] -0.3256224 0.11912568 0.1460235 0.1620567 0.13153516 0.06403695
## [2,] -0.1382155 -0.03658261 0.1717808 -0.1602241 -0.08446129 -0.19287098
## [,7] [,8]
## [1,] -0.03292446 -0.05166818
## [2,] 0.19252571 0.06237194
##
## $numClasses
## [1] 2
##
## $numFeatures
## [1] 2
##
## $factorSize
## [1] 8
## prediction
## 1 Yes
## 2 Yes
## 3 Yes
## 4 Yes
## 5 Yes
## 6 Yes
加速失效时间生存模型
生存分析研究事件发生前预期的时间长度,以及通常与风险因素或受试者所接受治疗的关系。与标准回归分析不同,生存建模必须处理数据中的特殊特征,包括非负生存时间和删失。
加速失效时间 (AFT) 模型是一种用于删失数据的参数生存模型,它假设协变量的作用是以某个常数加速或减速事件的生命历程。欲了解更多信息,请参阅维基百科页面 AFT 模型及其中引用的文献。与用于相同目的的 比例风险模型 不同,AFT 模型更容易并行化,因为每个实例独立地贡献于目标函数。
library(survival)
ovarianDF <- createDataFrame(ovarian)
aftModel <- spark.survreg(ovarianDF, Surv(futime, fustat) ~ ecog_ps + rx)
summary(aftModel)
## $coefficients
## Value
## (Intercept) 6.8966910
## ecog_ps -0.3850414
## rx 0.5286455
## Log(scale) -0.1234429
## futime fustat age resid_ds rx ecog_ps label prediction
## 1 59 1 72.3315 2 1 1 59 1141.724
## 2 115 1 74.4932 2 1 1 115 1141.724
## 3 156 1 66.4658 2 1 2 156 776.855
## 4 421 0 53.3644 2 2 1 421 1937.087
## 5 431 1 50.3397 2 1 1 431 1141.724
## 6 448 0 56.4301 1 1 2 448 776.855
广义线性模型
主要函数是 spark.glm
。支持以下族和链接函数。默认是高斯 (gaussian)。
族 (Family) | 链接函数 (Link Function) |
---|---|
高斯 (gaussian) | 恒等 (identity), 对数 (log), 逆 (inverse) |
二项 (binomial) | logit, probit, cloglog (互补对数-对数) |
泊松 (poisson) | 对数 (log), 恒等 (identity), 平方根 (sqrt) |
伽马 (gamma) | 逆 (inverse), 恒等 (identity), 对数 (log) |
tweedie | 幂链接函数 |
有三种方法来指定 family
参数。
族名作为字符串,例如
family = "gaussian"
。族函数,例如
family = binomial
。族函数返回的结果,例如
family = poisson(link = log)
。-
请注意,有两种方式来指定 tweedie 族
- 设置
family = "tweedie"
并指定var.power
和link.power
- 当加载
statmod
包时,tweedie 族使用其中的族定义指定,即tweedie()
。
- 设置
有关族及其链接函数的更多信息,请参阅维基百科页面 广义线性模型。
我们使用 mtcars
数据集作为说明。对应的 SparkDataFrame
是 carsDF
。拟合模型后,我们打印出摘要并通过对原始数据集进行预测来查看拟合值。我们还可以传入具有相同模式的新 SparkDataFrame
来预测新数据。
##
## Deviance Residuals:
## (Note: These are approximate quantiles with relative error <= 0.01)
## Min 1Q Median 3Q Max
## -3.9410 -1.6499 -0.3267 1.0373 5.8538
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 37.227270 1.5987875 23.2847 0.0000e+00
## wt -3.877831 0.6327335 -6.1287 1.1196e-06
## hp -0.031773 0.0090297 -3.5187 1.4512e-03
##
## (Dispersion parameter for gaussian family taken to be 6.725785)
##
## Null deviance: 1126.05 on 31 degrees of freedom
## Residual deviance: 195.05 on 29 degrees of freedom
## AIC: 156.7
##
## Number of Fisher Scoring iterations: 1
进行预测时,将附加一个名为 prediction
的新列。我们这里只看一部分列。
gaussianFitted <- predict(gaussianGLM, carsDF)
head(select(gaussianFitted, "model", "prediction", "mpg", "wt", "hp"))
## model prediction mpg wt hp
## 1 Mazda RX4 23.57233 21.0 2.620 110
## 2 Mazda RX4 Wag 22.58348 21.0 2.875 110
## 3 Datsun 710 25.27582 22.8 2.320 93
## 4 Hornet 4 Drive 21.26502 21.4 3.215 110
## 5 Hornet Sportabout 18.32727 18.7 3.440 175
## 6 Valiant 20.47382 18.1 3.460 105
以下是使用 tweedie 族进行相同拟合的结果
tweedieGLM1 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie", var.power = 0.0)
summary(tweedieGLM1)
##
## Deviance Residuals:
## (Note: These are approximate quantiles with relative error <= 0.01)
## Min 1Q Median 3Q Max
## -3.9410 -1.6499 -0.3267 1.0373 5.8538
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 37.227270 1.5987875 23.2847 0.0000e+00
## wt -3.877831 0.6327335 -6.1287 1.1196e-06
## hp -0.031773 0.0090297 -3.5187 1.4512e-03
##
## (Dispersion parameter for tweedie family taken to be 6.725785)
##
## Null deviance: 1126.05 on 31 degrees of freedom
## Residual deviance: 195.05 on 29 degrees of freedom
## AIC: 156.7
##
## Number of Fisher Scoring iterations: 1
我们可以尝试 tweedie 族中的其他分布,例如,具有对数链接的复合泊松分布
tweedieGLM2 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie",
var.power = 1.2, link.power = 0.0)
summary(tweedieGLM2)
##
## Deviance Residuals:
## (Note: These are approximate quantiles with relative error <= 0.01)
## Min 1Q Median 3Q Max
## -0.58074 -0.25335 -0.09892 0.18608 0.82717
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 3.8500849 0.06698272 57.4788 0.0000e+00
## wt -0.2018426 0.02897283 -6.9666 1.1691e-07
## hp -0.0016248 0.00041603 -3.9054 5.1697e-04
##
## (Dispersion parameter for tweedie family taken to be 0.1340111)
##
## Null deviance: 29.8820 on 31 degrees of freedom
## Residual deviance: 3.7739 on 29 degrees of freedom
## AIC: NA
##
## Number of Fisher Scoring iterations: 4
等渗回归
spark.isoreg
对 SparkDataFrame
拟合 等渗回归 模型。它在完全顺序约束下解决加权单变量回归问题。具体来说,给定一组实际观测响应,对应的实际特征,以及可选的正权重,我们希望找到一个单调(分段线性)函数以最小化
还有一些可能有用的参数。
weightCol
:一个字符串,指定权重列。isotonic
:逻辑值,指示输出序列应为等渗/递增 (TRUE
) 还是反等渗/递减 (FALSE
)。featureIndex
:如果公式右侧的特征是向量列(默认值:0),则为该特征的索引,否则无影响。
我们使用一个人工示例来展示其用法。
y <- c(3.0, 6.0, 8.0, 5.0, 7.0)
x <- c(1.0, 2.0, 3.5, 3.0, 4.0)
w <- rep(1.0, 5)
data <- data.frame(y = y, x = x, w = w)
df <- createDataFrame(data)
isoregModel <- spark.isoreg(df, y ~ x, weightCol = "w")
isoregFitted <- predict(isoregModel, df)
head(select(isoregFitted, "x", "y", "prediction"))
## x y prediction
## 1 1.0 3 3.0
## 2 2.0 6 5.5
## 3 3.5 8 7.5
## 4 3.0 5 5.5
## 5 4.0 7 7.5
在预测阶段,根据拟合的单调分段函数,规则如下:
如果预测输入与训练特征完全匹配,则返回相关的预测值。如果具有相同特征的预测值有多个,则返回其中一个。具体是哪一个则未定义。
如果预测输入低于或高于所有训练特征,则分别返回具有最低或最高特征的预测值。如果具有相同特征的预测值有多个,则分别返回最低或最高值。
如果预测输入落在两个训练特征之间,则预测被视为分段线性函数,并根据两个最近特征的预测值计算插值。如果具有相同特征的预测值有多个,则使用与前一点相同的规则。
例如,当输入为时,两个最接近的特征值为和,则预测值将是预测值之间的线性插值,位于和.
newDF <- createDataFrame(data.frame(x = c(1.5, 3.2)))
head(predict(isoregModel, newDF))
## x prediction
## 1 1.5 4.25
## 2 3.2 6.30
线性回归
线性回归模型。
## $coefficients
## Estimate
## (Intercept) 37.22727012
## wt -3.87783074
## hp -0.03177295
##
## $numFeatures
## [1] 2
## prediction
## 1 23.57233
## 2 22.58348
## 3 25.27582
## 4 21.26502
## 5 18.32727
## 6 20.47382
因子分解机回归器
用于回归问题的因子分解机。
有关因子分解机实现背景和详细信息,请参阅因子分解机部分。
model <- spark.fmRegressor(carsDF, mpg ~ wt + hp)
summary(model)
## $coefficients
## Estimate
## (Intercept) 0.1518559
## wt 3.6472555
## hp 2.8026828
##
## $factors
## [,1] [,2] [,3] [,4] [,5] [,6]
## [1,] 0.1424420 -0.1178110 -0.3970272 -0.4696695 0.400288 0.3690930
## [2,] -0.1626185 0.1512138 0.3690435 0.4076975 -0.625752 -0.3715109
## [,7] [,8]
## [1,] 0.03472468 -0.1703219
## [2,] -0.02109148 -0.2006249
##
## $numFeatures
## [1] 2
##
## $factorSize
## [1] 8
## prediction
## 1 106.70996
## 2 87.07526
## 3 111.07931
## 4 60.89565
## 5 61.81374
## 6 40.70095
决策树
spark.decisionTree
对 SparkDataFrame
拟合 决策树 分类或回归模型。用户可以调用 summary
获取拟合模型的摘要,使用 predict
进行预测,以及使用 write.ml
/read.ml
保存/加载拟合模型。
我们使用 Titanic
数据集训练决策树并进行预测
t <- as.data.frame(Titanic)
df <- createDataFrame(t)
dtModel <- spark.decisionTree(df, Survived ~ ., type = "classification", maxDepth = 2)
summary(dtModel)
## Formula: Survived ~ .
## Number of features: 6
## Features: Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances: (6,[5],[1.0])
## Max Depth: 2
## DecisionTreeClassificationModel: uid=dtc_3b470a52c3a2, depth=2, numNodes=5, numClasses=2, numFeatures=6
## If (feature 5 <= 4.5)
## Predict: 0.0
## Else (feature 5 > 4.5)
## If (feature 5 <= 84.5)
## Predict: 1.0
## Else (feature 5 > 84.5)
## Predict: 0.0
##
predictions <- predict(dtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No No
## 2 2nd Male Child 0 No No
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No No
## 5 1st Female Child 0 No No
## 6 2nd Female Child 0 No No
梯度提升树
spark.gbt
对 SparkDataFrame
拟合 梯度提升树 分类或回归模型。用户可以调用 summary
获取拟合模型的摘要,使用 predict
进行预测,以及使用 write.ml
/read.ml
保存/加载拟合模型。
我们使用 Titanic
数据集训练梯度提升树并进行预测
t <- as.data.frame(Titanic)
df <- createDataFrame(t)
gbtModel <- spark.gbt(df, Survived ~ ., type = "classification", maxDepth = 2, maxIter = 2)
summary(gbtModel)
## Formula: Survived ~ .
## Number of features: 6
## Features: Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances: (6,[1,2,5],[0.03336902858878361,0.16099525743106016,0.8056357139801562])
## Max Depth: 2
## Number of trees: 2
## Tree weights: 1 0.1
## GBTClassificationModel: uid = gbtc_474511659869, numTrees=2, numClasses=2, numFeatures=6
## Tree 0 (weight 1.0):
## If (feature 5 <= 4.5)
## If (feature 1 in {1.0})
## Predict: -1.0
## Else (feature 1 not in {1.0})
## Predict: -0.3333333333333333
## Else (feature 5 > 4.5)
## If (feature 5 <= 84.5)
## Predict: 0.5714285714285714
## Else (feature 5 > 84.5)
## Predict: -0.42857142857142855
## Tree 1 (weight 0.1):
## If (feature 2 in {1.0})
## If (feature 5 <= 15.5)
## Predict: 0.9671846896296403
## Else (feature 5 > 15.5)
## Predict: -1.0857923804083338
## Else (feature 2 not in {1.0})
## If (feature 5 <= 13.5)
## Predict: -0.08651035613926407
## Else (feature 5 > 13.5)
## Predict: 0.6566673506774614
##
predictions <- predict(gbtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No No
## 2 2nd Male Child 0 No No
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No No
## 5 1st Female Child 0 No No
## 6 2nd Female Child 0 No No
随机森林
spark.randomForest
对 SparkDataFrame
拟合 随机森林 分类或回归模型。用户可以调用 summary
获取拟合模型的摘要,使用 predict
进行预测,以及使用 write.ml
/read.ml
保存/加载拟合模型。
在以下示例中,我们使用 Titanic
数据集训练随机森林并进行预测
t <- as.data.frame(Titanic)
df <- createDataFrame(t)
rfModel <- spark.randomForest(df, Survived ~ ., type = "classification", maxDepth = 2, numTrees = 2)
summary(rfModel)
## Formula: Survived ~ .
## Number of features: 6
## Features: Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances: (6,[3,4,5],[0.09849498327759101,0.401505016722409,0.5])
## Max Depth: 2
## Number of trees: 2
## Tree weights: 1 1
## RandomForestClassificationModel: uid=rfc_d8fbf7a89562, numTrees=2, numClasses=2, numFeatures=6
## Tree 0 (weight 1.0):
## If (feature 4 in {0.0})
## Predict: 0.0
## Else (feature 4 not in {0.0})
## If (feature 3 in {1.0})
## Predict: 0.0
## Else (feature 3 not in {1.0})
## Predict: 1.0
## Tree 1 (weight 1.0):
## If (feature 5 <= 84.5)
## If (feature 5 <= 4.5)
## Predict: 0.0
## Else (feature 5 > 4.5)
## Predict: 1.0
## Else (feature 5 > 84.5)
## Predict: 0.0
##
predictions <- predict(rfModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No No
## 2 2nd Male Child 0 No No
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No No
## 5 1st Female Child 0 No No
## 6 2nd Female Child 0 No No
二分 k-均值
spark.bisectingKmeans
是一种使用分裂(或“自顶向下”)方法的层次聚类:所有观测值从一个簇开始,然后随着向下移动层级,递归地执行分裂。
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)
summary(model)
## $k
## [1] 4
##
## $coefficients
## Survived_No
## 1 0
## 2 1
## 3 0
## 4 1
##
## $size
## $size[[1]]
## [1] 16
##
## $size[[2]]
## [1] 16
##
## $size[[3]]
## [1] 0
##
## $size[[4]]
## [1] 0
##
##
## $cluster
## SparkDataFrame[prediction:int]
##
## $is.loaded
## [1] FALSE
## Class prediction
## 1 1st 1
## 2 2nd 1
## 3 3rd 1
## 4 Crew 1
## 5 1st 1
## 6 2nd 1
高斯混合模型
spark.gaussianMixture
对 SparkDataFrame
拟合多元 高斯混合模型 (GMM)。期望最大化 (EM) 用于近似模型的最大似然估计器 (MLE)。
我们使用一个模拟示例来演示用法。
X1 <- data.frame(V1 = rnorm(4), V2 = rnorm(4))
X2 <- data.frame(V1 = rnorm(6, 3), V2 = rnorm(6, 4))
data <- rbind(X1, X2)
df <- createDataFrame(data)
gmmModel <- spark.gaussianMixture(df, ~ V1 + V2, k = 2)
summary(gmmModel)
## $lambda
## [1] 0.408879 0.591121
##
## $mu
## $mu[[1]]
## [1] -0.8254152973 0.0009888204
##
## $mu[[2]]
## [1] 3.006119 3.620325
##
##
## $sigma
## $sigma[[1]]
## [,1] [,2]
## [1,] 1.377944 1.092401
## [2,] 1.092401 1.477489
##
## $sigma[[2]]
## [,1] [,2]
## [1,] 1.317545 0.5716919
## [2,] 0.5716919 0.7335671
##
##
## $loglik
## [1] -32.8689
##
## $posterior
## SparkDataFrame[posterior:array<double>]
##
## $is.loaded
## [1] FALSE
## V1 V2 prediction
## 1 -1.400043517 0.6215527 0
## 2 0.255317055 1.1484116 0
## 3 -2.437263611 -1.8218177 0
## 4 -0.005571287 -0.2473253 0
## 5 2.755800393 4.5124269 1
## 6 2.717294551 2.1369885 1
k-均值聚类
spark.kmeans
对-均值聚类模型对 SparkDataFrame
进行聚类。作为一种无监督学习方法,我们不需要响应变量。因此,R 公式中的左侧应留空。聚类仅基于右侧的变量。
kmeansModel <- spark.kmeans(carsDF, ~ mpg + hp + wt, k = 3)
summary(kmeansModel)
## $k
## [1] 3
##
## $coefficients
## mpg hp wt
## 1 24.22353 93.52941 2.599588
## 2 15.80000 178.50000 3.926400
## 3 14.62000 263.80000 3.899000
##
## $size
## $size[[1]]
## [1] 17
##
## $size[[2]]
## [1] 10
##
## $size[[3]]
## [1] 5
##
##
## $cluster
## SparkDataFrame[prediction:int]
##
## $is.loaded
## [1] FALSE
##
## $clusterSize
## [1] 3
kmeansPredictions <- predict(kmeansModel, carsDF)
head(select(kmeansPredictions, "model", "mpg", "hp", "wt", "prediction"), n = 20L)
## model mpg hp wt prediction
## 1 Mazda RX4 21.0 110 2.620 0
## 2 Mazda RX4 Wag 21.0 110 2.875 0
## 3 Datsun 710 22.8 93 2.320 0
## 4 Hornet 4 Drive 21.4 110 3.215 0
## 5 Hornet Sportabout 18.7 175 3.440 1
## 6 Valiant 18.1 105 3.460 0
## 7 Duster 360 14.3 245 3.570 2
## 8 Merc 240D 24.4 62 3.190 0
## 9 Merc 230 22.8 95 3.150 0
## 10 Merc 280 19.2 123 3.440 0
## 11 Merc 280C 17.8 123 3.440 0
## 12 Merc 450SE 16.4 180 4.070 1
## 13 Merc 450SL 17.3 180 3.730 1
## 14 Merc 450SLC 15.2 180 3.780 1
## 15 Cadillac Fleetwood 10.4 205 5.250 1
## 16 Lincoln Continental 10.4 215 5.424 1
## 17 Chrysler Imperial 14.7 230 5.345 2
## 18 Fiat 128 32.4 66 2.200 0
## 19 Honda Civic 30.4 52 1.615 0
## 20 Toyota Corolla 33.9 65 1.835 0
潜在狄利克雷分配
spark.lda
对 SparkDataFrame
拟合 潜在狄利克雷分配 模型。它常用于主题建模,其中主题从文本文档集合中推断出来。LDA 可以被视为一种聚类算法,如下所示:
主题对应于簇中心,文档对应于数据集中的示例(行)。
主题和文档都存在于特征空间中,其中特征向量是词频向量(词袋模型)。
LDA 不使用传统的距离进行聚类,而是使用基于文本文档生成方式的统计模型函数。
要使用 LDA,我们需要在 data
中指定一个 features
列,其中每个条目代表一个文档。该列有两种选项:
字符串:这可以是整个文档的字符串。它将自动解析。额外的停用词可以在
customizedStopWords
中添加。libSVM:每个条目是一个词的集合,将直接处理。
拟合模型还提供了两个函数。
spark.posterior
返回一个SparkDataFrame
,其中包含一个名为“topicDistribution”的后验概率向量列。spark.perplexity
返回给定SparkDataFrame
的对数困惑度,如果缺少参数data
,则返回训练数据的对数困惑度。
欲了解更多信息,请参阅帮助文档 ?spark.lda
。
让我们看一个人工示例。
corpus <- data.frame(features = c(
"1 2 6 0 2 3 1 1 0 0 3",
"1 3 0 1 3 0 0 2 0 0 1",
"1 4 1 0 0 4 9 0 1 2 0",
"2 1 0 3 0 0 5 0 2 3 9",
"3 1 1 9 3 0 2 0 0 1 3",
"4 2 0 3 4 5 1 1 1 4 0",
"2 1 0 3 0 0 5 0 2 2 9",
"1 1 1 9 2 1 2 0 0 1 3",
"4 4 0 3 4 2 1 3 0 0 0",
"2 8 2 0 3 0 2 0 2 7 2",
"1 1 1 9 0 2 2 0 0 3 3",
"4 1 0 0 4 5 1 3 0 1 0"))
corpusDF <- createDataFrame(corpus)
model <- spark.lda(data = corpusDF, k = 5, optimizer = "em")
summary(model)
## $docConcentration
## [1] 11 11 11 11 11
##
## $topicConcentration
## [1] 1.1
##
## $logLikelihood
## [1] -353.2948
##
## $logPerplexity
## [1] 2.676476
##
## $isDistributed
## [1] TRUE
##
## $vocabSize
## [1] 10
##
## $topics
## SparkDataFrame[topic:int, term:array<string>, termWeights:array<double>]
##
## $vocabulary
## [1] "0" "1" "2" "3" "4" "9" "5" "8" "7" "6"
##
## $trainingLogLikelihood
## [1] -239.5629
##
## $logPrior
## [1] -980.2974
posterior <- spark.posterior(model, corpusDF)
head(posterior)
## features topicDistribution
## 1 1 2 6 0 2 3 1 1 0 0 3 0.1972183, 0.1986639, 0.2022022, 0.2006584, 0.2012571
## 2 1 3 0 1 3 0 0 2 0 0 1 0.1989991, 0.1988732, 0.2015973, 0.2006385, 0.1998919
## 3 1 4 1 0 0 4 9 0 1 2 0 0.2020625, 0.2026072, 0.1968842, 0.1987298, 0.1997163
## 4 2 1 0 3 0 0 5 0 2 3 9 0.2004088, 0.1981928, 0.2013018, 0.2006318, 0.1994647
## 5 3 1 1 9 3 0 2 0 0 1 3 0.1971488, 0.1983950, 0.2023585, 0.2011592, 0.2009385
## 6 4 2 0 3 4 5 1 1 1 4 0 0.2020255, 0.2041807, 0.1955397, 0.1997236, 0.1985306
perplexity <- spark.perplexity(model, corpusDF)
perplexity
## [1] 2.676476
交替最小二乘
spark.als
通过 交替最小二乘 在 协同过滤 中学习潜在因子。
spark.als
中可以配置多个选项,包括 rank
、reg
和 nonnegative
。有关完整列表,请参阅帮助文件。
ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(ratings, c("user", "item", "rating"))
model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegative = TRUE)
提取潜在因子。
stats <- summary(model)
userFactors <- stats$userFactors
itemFactors <- stats$itemFactors
head(userFactors)
head(itemFactors)
进行预测。
幂迭代聚类
幂迭代聚类 (PIC) 是一种可扩展的图聚类算法。spark.assignClusters
方法运行 PIC 算法并返回每个输入顶点的簇分配。
df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
list(1L, 2L, 1.0), list(3L, 4L, 1.0),
list(4L, 0L, 0.1)),
schema = c("src", "dst", "weight"))
head(spark.assignClusters(df, initMode = "degree", weightCol = "weight"))
## id cluster
## 1 4 1
## 2 0 0
## 3 1 0
## 4 3 1
## 5 2 0
FP-growth
spark.fpGrowth
执行 FP-growth 算法以在 SparkDataFrame
上挖掘频繁项集。itemsCol
应该是一个值数组。
df <- selectExpr(createDataFrame(data.frame(rawItems = c(
"T,R,U", "T,S", "V,R", "R,U,T,V", "R,S", "V,S,U", "U,R", "S,T", "V,R", "V,U,S",
"T,V,U", "R,V", "T,S", "T,S", "S,T", "S,U", "T,R", "V,R", "S,V", "T,S,U"
))), "split(rawItems, ',') AS items")
fpm <- spark.fpGrowth(df, minSupport = 0.2, minConfidence = 0.5)
spark.freqItemsets
方法可用于检索包含频繁项集的 SparkDataFrame
。
head(spark.freqItemsets(fpm))
## items freq
## 1 S 11
## 2 T 10
## 3 T, S 6
## 4 R 9
## 5 V 9
## 6 V, R 5
spark.associationRules
返回一个包含关联规则的 SparkDataFrame
。
head(spark.associationRules(fpm))
## antecedent consequent confidence lift support
## 1 V R 0.5555556 1.2345679 0.25
## 2 S T 0.5454545 1.0909091 0.30
## 3 T S 0.6000000 1.0909091 0.30
## 4 R V 0.5555556 1.2345679 0.25
## 5 U S 0.5000000 0.9090909 0.20
## 6 U T 0.5000000 1.0000000 0.20
我们可以根据 antecedent
进行预测。
## items prediction
## 1 T, R, U S, V
## 2 T, S NULL
## 3 V, R NULL
## 4 R, U, T, V S
## 5 R, S T, V
## 6 V, S, U R, T
PrefixSpan
spark.findFrequentSequentialPatterns
方法可用于在项集输入序列中查找完整的频繁序列模式集。
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
list(list(list(1L), list(3L, 2L), list(1L, 2L))),
list(list(list(1L, 2L), list(5L))),
list(list(list(6L)))),
schema = c("sequence"))
head(spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L))
## sequence freq
## 1 1 3
## 2 3 2
## 3 2 3
## 4 1, 2 3
## 5 1, 3 2
柯尔莫哥洛夫-斯米尔诺夫检验
spark.kstest
运行双侧、单样本 柯尔莫哥洛夫-斯米尔诺夫 (KS) 检验。给定一个 SparkDataFrame
,该检验将给定列 testCol
中的连续数据与参数 nullHypothesis
指定的理论分布进行比较。用户可以调用 summary
获取检验结果的摘要。
在以下示例中,我们测试 Titanic
数据集的 Freq
列是否服从正态分布。我们使用样本的均值和标准差设置正态分布的参数。
t <- as.data.frame(Titanic)
df <- createDataFrame(t)
freqStats <- head(select(df, mean(df$Freq), sd(df$Freq)))
freqMean <- freqStats[1]
freqStd <- freqStats[2]
test <- spark.kstest(df, "Freq", "norm", c(freqMean, freqStd))
testSummary <- summary(test)
testSummary
## Kolmogorov-Smirnov test summary:
## degrees of freedom = 0
## statistic = 0.3065126710255011
## pValue = 0.0036336792155329256
## Very strong presumption against null hypothesis: Sample follows theoretical distribution.
模型持久化
以下示例展示了如何在 SparkR 中保存/加载 ML 模型。
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
gaussianGLM <- spark.glm(training, Freq ~ Sex + Age, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
##
## Saved-loaded model does not support output 'Deviance Residuals'.
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 46.219 35.994 1.2841 0.2092846
## Sex_Female -78.812 41.562 -1.8962 0.0679311
## Age_Adult 123.938 41.562 2.9820 0.0057522
##
## (Dispersion parameter for gaussian family taken to be 13819.52)
##
## Null deviance: 573341 on 31 degrees of freedom
## Residual deviance: 400766 on 29 degrees of freedom
## AIC: 400.7
##
## Number of Fisher Scoring iterations: 1
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, training)
head(gaussianPredictions)
## Class Sex Age Survived Freq label prediction
## 1 1st Male Child No 0 0 46.21875
## 2 2nd Male Child No 0 0 46.21875
## 3 3rd Male Child No 35 35 46.21875
## 4 Crew Male Child No 0 0 46.21875
## 5 1st Female Child No 0 0 -32.59375
## 6 2nd Female Child No 0 0 -32.59375
unlink(modelPath)
结构化流
SparkR 支持结构化流 API。
您可以查阅结构化流编程指南,了解其编程模型和基本概念的介绍。
简单源和槽
Spark 有一些内置的输入源。例如,为了测试一个套接字源,该源将文本读取为单词并显示计算出的单词计数
# Create DataFrame representing the stream of input lines from connection
lines <- read.stream("socket", host = hostname, port = port)
# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# Generate running word count
wordCounts <- count(groupBy(words, "word"))
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")
Kafka 源
从 Kafka 读取数据很简单。欲了解更多信息,请参阅结构化流支持的输入源。
topic <- read.stream("kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1")
keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")
操作和槽
SparkDataFrame
上的大多数常见操作都支持流式处理,包括选择、投影和聚合。一旦您定义了最终结果,要开始流式计算,您将调用 write.stream
方法并设置一个槽 (sink) 和 outputMode
。
流式 SparkDataFrame
可以用于调试到控制台,写入临时内存表,或者以容错方式写入不同格式的文件槽 (File Sink) 以进行进一步处理。
noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")
# Print new data to console
write.stream(noAggDF, "console")
# Write new data to Parquet files
write.stream(noAggDF,
"parquet",
path = "path/to/destination/dir",
checkpointLocation = "path/to/checkpoint/dir")
# Aggregate
aggDF <- count(groupBy(noAggDF, "device"))
# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")
# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
head(sql("select * from aggregates"))
高级主题
SparkR 对象类
您可能在 SparkR 中使用的主要对象类有三种。
-
SparkDataFrame
:SparkR 的核心组件。它是一个 S4 类,表示按命名列组织的数据的分布式集合,概念上等同于关系数据库中的表或 R 中的数据框。它有两个槽sdf
和env
。-
sdf
存储对 Spark JVM 后端中相应 Spark Dataset 的引用。 -
env
保存对象的元信息,例如isCached
。
它可以通过数据导入方法创建,也可以通过转换现有
SparkDataFrame
来创建。我们可以通过众多数据处理函数操纵SparkDataFrame
,并将其输入到机器学习算法中。 -
-
Column
:一个 S4 类,表示SparkDataFrame
的一列。槽jc
存储对 Spark JVM 后端中相应Column
对象的引用。它可以从
SparkDataFrame
中通过$
运算符获取,例如df$col
。更常见的是,它与其他函数一起使用,例如与select
一起选择特定列,与filter
和构造的条件一起选择行,与聚合函数一起计算每个组的聚合统计信息。 -
GroupedData
:一个 S4 类,表示由groupBy
创建或通过转换其他GroupedData
而来的分组数据。其sgd
槽存储对后端中RelationalGroupedDataset
对象的引用。这通常是一个包含分组信息的中间对象,随后进行聚合操作。
架构
完整的架构描述可以在参考文献中找到,特别是论文 SparkR: Scaling R Programs with Spark。
SparkR 的底层是 Spark SQL 引擎。这避免了运行解释型 R 代码的开销,并且 Spark 中优化的 SQL 执行引擎利用数据和计算流的结构信息执行一系列优化以加速计算。
实际计算的主要方法调用发生在驱动程序的 Spark JVM 中。我们有一个基于套接字的 SparkR API,它允许我们从 R 调用 JVM 上的函数。我们使用一个基于 Netty 的套接字服务器监听的 SparkR JVM 后端。
SparkR JVM 后端支持两种 RPC:方法调用和创建新对象。方法调用可以通过两种方式完成。
sparkR.callJMethod
接受对现有 Java 对象的引用以及要传递给方法的一组参数。sparkR.callJStatic
接受静态方法的类名以及要传递给方法的一组参数。
参数使用我们自定义的线格式进行序列化,然后在 JVM 侧进行反序列化。然后我们使用 Java 反射来调用适当的方法。
要创建对象,使用 sparkR.newJObject
,然后类似地使用提供的参数调用相应的构造函数。
最后,我们使用一个新的 R 类 jobj
,它引用后端中存在的 Java 对象。这些引用在 Java 侧被跟踪,当它们在 R 侧超出作用域时,会自动进行垃圾回收。
参考文献
SparkR: 用 Spark 扩展 R 程序,Shivaram Venkataraman, Zongheng Yang, Davies Liu, Eric Liang, Hossein Falaki, Xiangrui Meng, Reynold Xin, Ali Ghodsi, Michael Franklin, Ion Stoica, and Matei Zaharia。SIGMOD 2016。2016 年 6 月。