SparkR - 实用指南
sparkr-vignettes.Rmd
概述
SparkR 是一个 R 包,它提供了一个轻量级的前端,用于从 R 使用 Apache Spark。在 Spark 3.5.5 中,SparkR 提供了一个分布式数据帧实现,该实现支持数据处理操作,如选择、过滤、聚合等,以及使用 MLlib 的分布式机器学习。
入门
我们从一个在本地机器上运行的例子开始,并概述 SparkR 的使用:数据摄取、数据处理和机器学习。
首先,让我们加载并附加该包。
SparkSession
是 SparkR 的入口点,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session
创建 SparkSession
,并传入应用程序名称、依赖的任何 Spark 包等选项。
我们使用默认设置,它在本地模式下运行。如果在后台未找到以前的安装,它会自动下载 Spark 包。有关设置的更多详细信息,请参见 Spark 会话。
## Java ref type org.apache.spark.sql.SparkSession id 1
SparkR 中的操作围绕着一个名为 SparkDataFrame
的 R 类。它是一个组织成命名列的分布式数据集合,在概念上等同于关系数据库中的表或 R 中的数据帧,但在底层具有更丰富的优化。
SparkDataFrame
可以从各种各样的来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据帧。例如,我们从一个本地 R 数据帧创建一个 SparkDataFrame
,
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
我们可以通过 head
或 showDF
函数查看 SparkDataFrame
的前几行。
head(carsDF)
## model mpg cyl disp hp drat wt qsec vs am gear carb
## 1 Mazda RX4 21.0 6 160 110 3.90 2.620 16.46 0 1 4 4
## 2 Mazda RX4 Wag 21.0 6 160 110 3.90 2.875 17.02 0 1 4 4
## 3 Datsun 710 22.8 4 108 93 3.85 2.320 18.61 1 1 4 1
## 4 Hornet 4 Drive 21.4 6 258 110 3.08 3.215 19.44 1 0 3 1
## 5 Hornet Sportabout 18.7 8 360 175 3.15 3.440 17.02 0 0 3 2
## 6 Valiant 18.1 6 225 105 2.76 3.460 20.22 1 0 3 1
SparkDataFrame
支持常见的数据处理操作,例如 filter
和 select
。
carsSubDF <- select(carsDF, "model", "mpg", "hp")
carsSubDF <- filter(carsSubDF, carsSubDF$hp >= 200)
head(carsSubDF)
## model mpg hp
## 1 Duster 360 14.3 245
## 2 Cadillac Fleetwood 10.4 205
## 3 Lincoln Continental 10.4 215
## 4 Chrysler Imperial 14.7 230
## 5 Camaro Z28 13.3 245
## 6 Ford Pantera L 15.8 264
SparkR 可以在分组后使用许多常见的聚合函数。
## gear count
## 1 4 12
## 2 3 15
## 3 5 5
结果 carsDF
和 carsSubDF
是 SparkDataFrame
对象。要转换回 R data.frame
,我们可以使用 collect
。警告:这可能会导致您的交互式环境耗尽内存,因为 collect()
将整个分布式 DataFrame
提取到您的客户端,该客户端充当 Spark 驱动程序。
## [1] "data.frame"
SparkR 支持许多常用的机器学习算法。在底层,SparkR 使用 MLlib 来训练模型。用户可以调用 summary
来打印已拟合模型的摘要,调用 predict
对新数据进行预测,以及调用 write.ml
/read.ml
来保存/加载已拟合的模型。
SparkR 支持用于模型拟合的 R 公式运算符的子集,包括 '~'、'.'、':'、'+' 和 '-'。我们使用线性回归作为一个例子。
model <- spark.glm(carsDF, mpg ~ wt + cyl)
该结果与应用于 carsDF
的相应 data.frame
mtcars
的 R glm
函数返回的结果相匹配。事实上,对于广义线性模型,我们也专门为 SparkDataFrame
暴露了 glm
,因此上述内容等同于 model <- glm(mpg ~ wt + cyl, data = carsDF)
。
summary(model)
##
## Deviance Residuals:
## (Note: These are approximate quantiles with relative error <= 0.01)
## Min 1Q Median 3Q Max
## -4.2893 -1.7085 -0.4713 1.5729 6.1004
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 39.6863 1.71498 23.1409 0.00000000
## wt -3.1910 0.75691 -4.2158 0.00022202
## cyl -1.5078 0.41469 -3.6360 0.00106428
##
## (Dispersion parameter for gaussian family taken to be 6.592137)
##
## Null deviance: 1126.05 on 31 degrees of freedom
## Residual deviance: 191.17 on 29 degrees of freedom
## AIC: 156
##
## Number of Fisher Scoring iterations: 1
该模型可以通过 write.ml
保存,并通过 read.ml
加载回来。
write.ml(model, path = "/HOME/tmp/mlModel/glmModel")
最后,我们可以通过运行以下命令停止 Spark 会话
设置
安装
与许多其他 R 包不同,要使用 SparkR,您需要额外安装 Apache Spark。 Spark 安装将用于运行一个后端进程,该进程将编译和执行 SparkR 程序。
安装 SparkR 包后,您可以如上一节所述调用 sparkR.session
来启动,它将检查 Spark 安装。如果您从交互式 shell(例如 R、RStudio)使用 SparkR,则如果未找到 Spark,则会自动下载并缓存 Spark。或者,我们提供一个易于使用的函数 install.spark
来手动运行此操作。如果您没有在计算机上安装 Spark,您可以从 Apache Spark 网站下载。
如果您已经安装了 Spark,则无需再次安装,可以将 sparkHome
参数传递给 sparkR.session
,以便让 SparkR 知道现有 Spark 安装的位置。
sparkR.session(sparkHome = "/HOME/spark")
Spark 会话
除了 sparkHome
之外,还可以在 sparkR.session
中指定许多其他选项。有关完整列表,请参见 启动:SparkSession 和 SparkR API 文档。
特别是,以下 Spark 驱动程序属性可以在 sparkConfig
中设置。
属性名称 | 属性组 | spark-submit 等效项 |
---|---|---|
spark.driver.memory |
应用程序属性 | --driver-memory |
spark.driver.extraClassPath |
运行时环境 | --driver-class-path |
spark.driver.extraJavaOptions |
运行时环境 | --driver-java-options |
spark.driver.extraLibraryPath |
运行时环境 | --driver-library-path |
spark.kerberos.keytab |
应用程序属性 | --keytab |
spark.kerberos.principal |
应用程序属性 | --principal |
对于 Windows 用户:由于不同操作系统上的文件前缀不同,为了避免潜在的错误前缀问题,当前的解决方法是在启动 SparkSession
时指定 spark.sql.warehouse.dir
。
spark_warehouse_path <- file.path(path.expand('~'), "spark-warehouse")
sparkR.session(spark.sql.warehouse.dir = spark_warehouse_path)
集群模式
SparkR 可以连接到远程 Spark 集群。集群模式概述 是对不同 Spark 集群模式的良好介绍。
将 SparkR 连接到远程 Spark 集群时,请确保机器上的 Spark 版本和 Hadoop 版本与集群上的相应版本匹配。当前的 SparkR 包兼容
## [1] "Spark 3.5.5"
它应该在本地计算机和远程集群上使用。
要连接,请将主节点的 URL 传递给 sparkR.session
。有关完整列表,请参见 Spark Master URLs。例如,要连接到本地独立 Spark 主节点,我们可以调用
sparkR.session(master = "spark://local:7077")
对于 YARN 集群,SparkR 支持客户端模式,其中 master 设置为 “yarn”。
sparkR.session(master = "yarn")
当前版本不支持 Yarn 集群模式。
数据导入
本地数据帧
最简单的方法是将本地 R 数据帧转换为 SparkDataFrame
。具体来说,我们可以使用 as.DataFrame
或 createDataFrame
并传入本地 R 数据帧来创建 SparkDataFrame
。例如,以下代码基于 R 中的 faithful
数据集创建一个 SparkDataFrame
。
df <- as.DataFrame(faithful)
head(df)
## eruptions waiting
## 1 3.600 79
## 2 1.800 54
## 3 3.333 74
## 4 2.283 62
## 5 4.533 85
## 6 2.883 55
数据源
SparkR 支持通过 SparkDataFrame
接口对各种数据源进行操作。您可以查看 Spark SQL 编程指南,了解更多可用于内置数据源的 特定选项。
从数据源创建 SparkDataFrame
的通用方法是 read.df
。此方法接受要加载的文件的路径和数据源的类型,并且将自动使用当前活动的 Spark 会话。 SparkR 本机支持读取 CSV、JSON 和 Parquet 文件,并且通过 Spark 包,您可以找到流行文件格式(如 Avro)的数据源连接器。使用 sparkR.session
初始化 SparkSession 时,可以使用 sparkPackages
参数添加这些包。
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.12:3.0.0")
我们可以看到如何使用示例 CSV 输入文件使用数据源。有关更多信息,请参阅 SparkR read.df API 文档。
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
数据源 API 本机支持 JSON 格式的输入文件。请注意,此处使用的文件不是典型的 JSON 文件。文件中的每一行都必须包含一个单独的、自包含的有效 JSON 对象。因此,常规的多行 JSON 文件通常会失败。
让我们看一下这里使用的原始 JSON 文件的前两行。
filePath <- paste0(sparkR.conf("spark.home"),
"/examples/src/main/resources/people.json")
readLines(filePath, n = 2L)
## [1] "{\"name\":\"Michael\"}" "{\"name\":\"Andy\", \"age\":30}"
我们使用 read.df
将其读取到 SparkDataFrame
中。
## [1] 3
head(people)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
SparkR 会自动从 JSON 文件中推断模式。
printSchema(people)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
如果我们想读取多个 JSON 文件,可以使用 read.json
。
people <- read.json(paste0(Sys.getenv("SPARK_HOME"),
c("/examples/src/main/resources/people.json",
"/examples/src/main/resources/people.json")))
count(people)
## [1] 6
数据源 API 也可用于将 SparkDataFrames
保存到多种文件格式中。例如,我们可以使用 write.df
将前一个示例中的 SparkDataFrame
保存到 Parquet 文件中。
write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
Hive 表
您还可以从 Hive 表创建 SparkDataFrames。为此,我们需要创建一个支持 Hive 的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该已经构建了 Hive 支持,更多详细信息可以在 SQL 编程指南 中找到。在 SparkR 中,默认情况下,它将尝试创建一个启用 Hive 支持的 SparkSession(enableHiveSupport = TRUE
)。
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
txtPath <- paste0(sparkR.conf("spark.home"), "/examples/src/main/resources/kv1.txt")
sqlCMD <- sprintf("LOAD DATA LOCAL INPATH '%s' INTO TABLE src", txtPath)
sql(sqlCMD)
results <- sql("FROM src SELECT key, value")
# results is now a SparkDataFrame
head(results)
数据处理
对于 dplyr 用户:SparkR 在数据处理方面具有与 dplyr 相似的接口。但是,首先值得提及一些明显的差异。我们使用 df
来表示 SparkDataFrame
,使用 col
来表示此处的列名。
指示列。 SparkR 使用列名的字符字符串或使用
$
构造的 Column 对象来指示列。例如,要在df
中选择col
,我们可以编写select(df, "col")
或select(df, df$col)
。描述条件。在 SparkR 中,Column 对象表示可以直接插入到条件中,或者我们可以使用字符字符串来描述条件,而无需引用所使用的
SparkDataFrame
。例如,要选择值 > 1 的行,我们可以编写filter(df, df$col > 1)
或filter(df, "col > 1")
。
以下是更具体的例子。
dplyr | SparkR |
---|---|
select(mtcars, mpg, hp) |
select(carsDF, "mpg", "hp") |
filter(mtcars, mpg > 20, hp > 100) |
filter(carsDF, carsDF$mpg > 20, carsDF$hp > 100) |
其他差异将在特定方法中提及。
我们使用上面创建的 SparkDataFrame
carsDF
。我们可以获取有关 SparkDataFrame
的基本信息。
carsDF
## SparkDataFrame[model:string, mpg:double, cyl:double, disp:double, hp:double, drat:double, wt:double, qsec:double, vs:double, am:double, gear:double, carb:double]
以树格式打印出模式。
printSchema(carsDF)
## root
## |-- model: string (nullable = true)
## |-- mpg: double (nullable = true)
## |-- cyl: double (nullable = true)
## |-- disp: double (nullable = true)
## |-- hp: double (nullable = true)
## |-- drat: double (nullable = true)
## |-- wt: double (nullable = true)
## |-- qsec: double (nullable = true)
## |-- vs: double (nullable = true)
## |-- am: double (nullable = true)
## |-- gear: double (nullable = true)
## |-- carb: double (nullable = true)
SparkDataFrame 操作
选择行、列
SparkDataFrames 支持许多函数来执行结构化数据处理。这里我们包括一些基本示例,完整的列表可以在 API 文档中找到
您也可以将列名作为字符串传入。
## mpg
## 1 21.0
## 2 21.0
## 3 22.8
## 4 21.4
## 5 18.7
## 6 18.1
过滤 SparkDataFrame 以仅保留每加仑汽油里程数 (mpg) 小于 20 英里的行。
## model mpg cyl disp hp drat wt qsec vs am gear carb
## 1 Hornet Sportabout 18.7 8 360.0 175 3.15 3.44 17.02 0 0 3 2
## 2 Valiant 18.1 6 225.0 105 2.76 3.46 20.22 1 0 3 1
## 3 Duster 360 14.3 8 360.0 245 3.21 3.57 15.84 0 0 3 4
## 4 Merc 280 19.2 6 167.6 123 3.92 3.44 18.30 1 0 4 4
## 5 Merc 280C 17.8 6 167.6 123 3.92 3.44 18.90 1 0 4 4
## 6 Merc 450SE 16.4 8 275.8 180 3.07 4.07 17.40 0 0 3 3
分组、聚合
分组和聚合的常见流程是
相对于某些分组变量使用
groupBy
或group_by
来创建GroupedData
对象将
GroupedData
对象馈送到agg
或summarize
函数,并提供一些聚合函数来计算每个组中的一个数字。
支持许多广泛使用的函数,用于在分组后聚合数据,包括 avg
、count_distinct
、count
、first
、kurtosis
、last
、max
、mean
、min
、sd
、skewness
、stddev_pop
、stddev_samp
、sum_distinct
、sum
、var_pop
、var_samp
、var
。 请参阅链接的聚合函数 API 文档。
例如,我们可以计算 mtcars
数据集中气缸数量的直方图,如下所示。
## cyl count
## 1 8 14
## 2 4 11
## 3 6 7
使用 cube
或 rollup
计算跨多个维度的子总计。
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]
为 {(cyl
, gear
, am
), (cyl
, gear
), (cyl
), ()} 生成分组,而
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]
为分组列的所有可能组合生成分组。
对列进行操作
SparkR 还提供了许多可以直接应用于列的函数,用于数据处理和聚合期间。 下面的示例显示了基本算术函数的使用。
carsDF_km <- carsDF
carsDF_km$kmpg <- carsDF_km$mpg * 1.61
head(select(carsDF_km, "model", "mpg", "kmpg"))
## model mpg kmpg
## 1 Mazda RX4 21.0 33.810
## 2 Mazda RX4 Wag 21.0 33.810
## 3 Datsun 710 22.8 36.708
## 4 Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6 Valiant 18.1 29.141
窗口函数
窗口函数是聚合函数的一种变体。 简而言之,
聚合函数:
n
到1
的映射 - 为一组条目返回单个值。 示例包括sum
、count
、max
。窗口函数:
n
到n
的映射 - 为组中的每个条目返回一个值,但该值可能取决于组的所有条目。 示例包括rank
、lead
、lag
。
形式上,上面提到的组称为框架。 每个输入行都可以有一个与之关联的唯一框架,并且该行上窗口函数的输出基于该框架中包含的行。
窗口函数通常与以下函数结合使用:windowPartitionBy
、windowOrderBy
、partitionBy
、orderBy
、over
。 为了说明这一点,接下来我们看一个例子。
我们仍然使用 mtcars
数据集。 对应的 SparkDataFrame
是 carsDF
。 假设对于每个气缸数,我们想要计算 mpg
中每辆车的排名。
carsSubDF <- select(carsDF, "model", "mpg", "cyl")
ws <- orderBy(windowPartitionBy("cyl"), "mpg")
carsRank <- withColumn(carsSubDF, "rank", over(rank(), ws))
head(carsRank, n = 20L)
## model mpg cyl rank
## 1 Volvo 142E 21.4 4 1
## 2 Toyota Corona 21.5 4 2
## 3 Datsun 710 22.8 4 3
## 4 Merc 230 22.8 4 3
## 5 Merc 240D 24.4 4 5
## 6 Porsche 914-2 26.0 4 6
## 7 Fiat X1-9 27.3 4 7
## 8 Honda Civic 30.4 4 8
## 9 Lotus Europa 30.4 4 8
## 10 Fiat 128 32.4 4 10
## 11 Toyota Corolla 33.9 4 11
## 12 Merc 280C 17.8 6 1
## 13 Valiant 18.1 6 2
## 14 Merc 280 19.2 6 3
## 15 Ferrari Dino 19.7 6 4
## 16 Mazda RX4 21.0 6 5
## 17 Mazda RX4 Wag 21.0 6 5
## 18 Hornet 4 Drive 21.4 6 7
## 19 Cadillac Fleetwood 10.4 8 1
## 20 Lincoln Continental 10.4 8 1
我们详细解释上述步骤。
-
windowPartitionBy
创建一个窗口规范对象WindowSpec
,用于定义分区。 它控制哪些行将与给定行位于同一分区中。 在这种情况下,cyl
中具有相同值的行将被放入同一分区中。orderBy
进一步定义排序 - 给定行在分区中的位置。 生成的WindowSpec
作为ws
返回。
更多窗口规范方法包括 rangeBetween
,它可以按值定义框架的边界,以及 rowsBetween
,它可以按行索引定义边界。
-
withColumn
将一个名为rank
的列附加到SparkDataFrame
。over
返回一个窗口列。 第一个参数通常是由窗口函数(例如rank()
、lead(carsDF$wt)
)返回的列。 这会根据分区和排序的表计算相应的值。
用户自定义函数
在 SparkR 中,我们支持几种用户自定义函数 (UDF)。
按分区应用
dapply
可以将函数应用于 SparkDataFrame
的每个分区。 要应用于 SparkDataFrame
的每个分区的函数应该只有一个参数,即对应于分区的 data.frame
,并且输出也应该是 data.frame
。 Schema 指定结果 SparkDataFrame
的行格式。 它必须与返回值的数据类型匹配。 有关 R 和 Spark 之间的映射,请参见此处。
我们将 mpg
转换为 kmpg
(公里/加仑)。 carsSubDF
是一个包含 carsDF
列子集的 SparkDataFrame
。
carsSubDF <- select(carsDF, "model", "mpg")
schema <- "model STRING, mpg DOUBLE, kmpg DOUBLE"
out <- dapply(carsSubDF, function(x) { x <- cbind(x, x$mpg * 1.61) }, schema)
head(collect(out))
## model mpg kmpg
## 1 Mazda RX4 21.0 33.810
## 2 Mazda RX4 Wag 21.0 33.810
## 3 Datsun 710 22.8 36.708
## 4 Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6 Valiant 18.1 29.141
与 dapply
类似,dapplyCollect
可以将函数应用于 SparkDataFrame
的每个分区并将结果收集回来。 该函数的输出应为 data.frame
,但在这种情况下不需要模式。 请注意,如果无法将 UDF 在所有分区上的输出拉入驱动程序的内存中,则 dapplyCollect
可能会失败。
out <- dapplyCollect(
carsSubDF,
function(x) {
x <- cbind(x, "kmpg" = x$mpg * 1.61)
})
head(out, 3)
## model mpg kmpg
## 1 Mazda RX4 21.0 33.810
## 2 Mazda RX4 Wag 21.0 33.810
## 3 Datsun 710 22.8 36.708
按组应用
gapply
可以将函数应用于 SparkDataFrame
的每个组。 该函数要应用于 SparkDataFrame
的每个组,并且应仅具有两个参数:分组键和对应于该键的 R data.frame
。 这些组是从 SparkDataFrames
列中选择的。 该函数的输出应为 data.frame
。 Schema 指定结果 SparkDataFrame
的行格式。 它必须基于 Spark 数据类型表示 R 函数的输出模式。 返回的 data.frame
的列名由用户设置。 有关 R 和 Spark 之间的映射,请参见此处。
schema <- structType(structField("cyl", "double"), structField("max_mpg", "double"))
result <- gapply(
carsDF,
"cyl",
function(key, x) {
y <- data.frame(key, max(x$mpg))
},
schema)
head(arrange(result, "max_mpg", decreasing = TRUE))
## cyl max_mpg
## 1 4 33.9
## 2 6 21.4
## 3 8 19.2
与 gapply
类似,gapplyCollect
可以将函数应用于 SparkDataFrame
的每个分区,并将结果收集回 R data.frame
。 该函数的输出应为 data.frame
,但在这种情况下不需要模式。 请注意,如果无法将 UDF 在所有分区上的输出拉入驱动程序的内存中,则 gapplyCollect
可能会失败。
result <- gapplyCollect(
carsDF,
"cyl",
function(key, x) {
y <- data.frame(key, max(x$mpg))
colnames(y) <- c("cyl", "max_mpg")
y
})
head(result[order(result$max_mpg, decreasing = TRUE), ])
## cyl max_mpg
## 1 4 33.9
## 2 6 21.4
## 3 8 19.2
分发本地函数
与本地 R 中的 lapply
类似,spark.lapply
在元素列表上运行函数,并使用 Spark 分发计算。 spark.lapply
的工作方式类似于 doParallel
或 lapply
对列表的元素进行操作。 所有计算的结果都应适合单台机器。 如果不是这种情况,您可以执行类似 df <- createDataFrame(list)
的操作,然后使用 dapply
。
我们以 e1071
包中的 svm
为例。 我们使用所有默认设置,除了约束违规的成本不同。 spark.lapply
可以并行训练这些不同的模型。
costs <- exp(seq(from = log(1), to = log(1000), length.out = 5))
train <- function(cost) {
stopifnot(requireNamespace("e1071", quietly = TRUE))
model <- e1071::svm(Species ~ ., data = iris, cost = cost)
summary(model)
}
返回模型摘要的列表。
model.summaries <- spark.lapply(costs, train)
class(model.summaries)
## [1] "list"
为了避免冗长的显示,我们仅显示第二个拟合模型的部分结果。 您可以随意检查其他模型。
print(model.summaries[[2]])
## $call
## svm(formula = Species ~ ., data = iris, cost = cost)
##
## $type
## [1] 0
##
## $kernel
## [1] 2
##
## $cost
## [1] 5.623413
##
## $degree
## [1] 3
##
## $gamma
## [1] 0.25
##
## $coef0
## [1] 0
##
## $nu
## [1] 0.5
##
## $epsilon
## [1] 0.1
##
## $sparse
## [1] FALSE
##
## $scaled
## [1] TRUE TRUE TRUE TRUE
##
## $x.scale
## $x.scale$`scaled:center`
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## 5.843333 3.057333 3.758000 1.199333
##
## $x.scale$`scaled:scale`
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## 0.8280661 0.4358663 1.7652982 0.7622377
##
##
## $y.scale
## NULL
##
## $nclasses
## [1] 3
##
## $levels
## [1] "setosa" "versicolor" "virginica"
##
## $tot.nSV
## [1] 35
##
## $nSV
## [1] 6 15 14
##
## $labels
## [1] 1 2 3
##
## $SV
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## 14 -1.86378030 -0.13153881 -1.5056946 -1.4422448
## 16 -0.17309407 3.08045544 -1.2791040 -1.0486668
## 21 -0.53538397 0.78617383 -1.1658087 -1.3110521
## 23 -1.50149039 1.24503015 -1.5623422 -1.3110521
## 24 -0.89767388 0.55674567 -1.1658087 -0.9174741
## 42 -1.62225369 -1.73753594 -1.3923993 -1.1798595
## 51 1.39682886 0.32731751 0.5336209 0.2632600
## 53 1.27606556 0.09788935 0.6469162 0.3944526
## 54 -0.41462067 -1.73753594 0.1370873 0.1320673
## 55 0.79301235 -0.59039513 0.4769732 0.3944526
## [ reached getOption("max.print") -- omitted 25 rows ]
##
## $index
## [1] 14 16 21 23 24 42 51 53 54 55 58 61 69 71 73 78 79 84 85
## [20] 86 99 107 111 119 120 124 127 128 130 132 134 135 139 149 150
##
## $rho
## [1] -0.10346530 0.12160294 -0.09540346
##
## $compprob
## [1] FALSE
##
## $probA
## NULL
##
## $probB
## NULL
##
## $sigma
## NULL
##
## $coefs
## [,1] [,2]
## [1,] 0.00000000 0.06561739
## [2,] 0.76813720 0.93378721
## [3,] 0.00000000 0.12123270
## [4,] 0.00000000 0.31170741
## [5,] 1.11614066 0.46397392
## [6,] 1.88141600 1.10392128
## [7,] -0.55872622 0.00000000
## [8,] 0.00000000 5.62341325
## [9,] 0.00000000 0.27711792
## [10,] 0.00000000 5.28440007
## [11,] -1.06596713 0.00000000
## [12,] -0.57076709 1.09019756
## [13,] -0.03365904 5.62341325
## [14,] 0.00000000 5.62341325
## [15,] 0.00000000 5.62341325
## [16,] 0.00000000 5.62341325
## [17,] 0.00000000 4.70398738
## [18,] 0.00000000 5.62341325
## [19,] 0.00000000 4.97981371
## [20,] -0.77497987 0.00000000
## [ reached getOption("max.print") -- omitted 15 rows ]
##
## $na.action
## NULL
##
## $xlevels
## named list()
##
## $fitted
## 1 2 3 4 5 6 7 8 9 10 11
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa
## 12 13 14 15 16 17 18 19 20 21 22
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa
## 23 24 25 26 27 28 29 30 31 32 33
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa
## 34 35 36 37 38 39 40
## setosa setosa setosa setosa setosa setosa setosa
## [ reached getOption("max.print") -- omitted 110 entries ]
## Levels: setosa versicolor virginica
##
## $decision.values
## setosa/versicolor setosa/virginica versicolor/virginica
## 1 1.1911739 1.0908424 1.1275805
## 2 1.1336557 1.0619543 1.3260964
## 3 1.2085065 1.0698101 1.0511345
## 4 1.1646153 1.0505915 1.0806874
## 5 1.1880814 1.0950348 0.9542815
## 6 1.0990761 1.0984626 0.9326361
## 7 1.1573474 1.0343287 0.9726843
## 8 1.1851598 1.0815750 1.2206802
## 9 1.1673499 1.0406734 0.8837945
## 10 1.1629911 1.0560925 1.2430067
## 11 1.1339282 1.0803946 1.0338357
## 12 1.1724182 1.0641469 1.1190423
## 13 1.1827355 1.0667956 1.1414844
## [ reached getOption("max.print") -- omitted 137 rows ]
##
## $terms
## Species ~ Sepal.Length + Sepal.Width + Petal.Length + Petal.Width
## attr(,"variables")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"factors")
## Sepal.Length Sepal.Width Petal.Length Petal.Width
## Species 0 0 0 0
## Sepal.Length 1 0 0 0
## Sepal.Width 0 1 0 0
## Petal.Length 0 0 1 0
## Petal.Width 0 0 0 1
## attr(,"term.labels")
## [1] "Sepal.Length" "Sepal.Width" "Petal.Length" "Petal.Width"
## attr(,"order")
## [1] 1 1 1 1
## attr(,"intercept")
## [1] 0
## attr(,"response")
## [1] 1
## attr(,".Environment")
## <environment: 0x5f60eb41d4c8>
## attr(,"predvars")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"dataClasses")
## Species Sepal.Length Sepal.Width Petal.Length Petal.Width
## "factor" "numeric" "numeric" "numeric" "numeric"
##
## attr(,"class")
## [1] "summary.svm"
SQL 查询
SparkDataFrame
也可以注册为 Spark SQL 中的临时视图,以便可以对其数据运行 SQL 查询。 sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame
返回。
people <- read.df(paste0(sparkR.conf("spark.home"),
"/examples/src/main/resources/people.json"), "json")
将此 SparkDataFrame
注册为临时视图。
createOrReplaceTempView(people, "people")
可以使用 sql 方法运行 SQL 语句。
## name
## 1 Justin
机器学习
SparkR 支持以下机器学习模型和算法。
训练集和测试集
我们可以使用 randomSplit
函数轻松地将 SparkDataFrame
拆分为随机训练集和测试集。 它返回一个带有提供的 weights
的拆分 SparkDataFrames
列表。 我们以 carsDF
为例,并且想要大约训练数据和测试数据。
splitDF_list <- randomSplit(carsDF, c(0.7, 0.3), seed = 0)
carsDF_train <- splitDF_list[[1]]
carsDF_test <- splitDF_list[[2]]
count(carsDF_train)
## [1] 24
head(carsDF_train)
## model mpg cyl disp hp drat wt qsec vs am gear carb
## 1 Cadillac Fleetwood 10.4 8 472 205 2.93 5.250 17.98 0 0 3 4
## 2 Camaro Z28 13.3 8 350 245 3.73 3.840 15.41 0 0 3 4
## 3 Chrysler Imperial 14.7 8 440 230 3.23 5.345 17.42 0 0 3 4
## 4 Dodge Challenger 15.5 8 318 150 2.76 3.520 16.87 0 0 3 2
## 5 Duster 360 14.3 8 360 245 3.21 3.570 15.84 0 0 3 4
## 6 Ferrari Dino 19.7 6 145 175 3.62 2.770 15.50 0 1 5 6
count(carsDF_test)
## [1] 8
head(carsDF_test)
## model mpg cyl disp hp drat wt qsec vs am gear carb
## 1 AMC Javelin 15.2 8 304.0 150 3.15 3.435 17.30 0 0 3 2
## 2 Datsun 710 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
## 3 Fiat 128 32.4 4 78.7 66 4.08 2.200 19.47 1 1 4 1
## 4 Merc 240D 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
## 5 Merc 280 19.2 6 167.6 123 3.92 3.440 18.30 1 0 4 4
## 6 Toyota Corolla 33.9 4 71.1 65 4.22 1.835 19.90 1 1 4 1
模型和算法
线性支持向量机 (SVM) 分类器
线性支持向量机 (SVM) 分类器是具有线性内核的 SVM 分类器。 这是一个二元分类器。 我们使用一个简单的例子来展示如何使用 spark.svmLinear
进行二元分类。
# load training data and create a DataFrame
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Linear SVM classifier model
model <- spark.svmLinear(training, Survived ~ ., regParam = 0.01, maxIter = 10)
summary(model)
## $coefficients
## Estimate
## (Intercept) 0.993131388
## Class_1st -0.386500359
## Class_2nd -0.622627816
## Class_3rd -0.204446602
## Sex_Female -0.589950309
## Age_Adult 0.741676902
## Freq -0.006582887
##
## $numClasses
## [1] 2
##
## $numFeatures
## [1] 6
预测训练数据的值
prediction <- predict(model, training)
head(select(prediction, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No Yes
## 2 2nd Male Child 0 No Yes
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No Yes
## 5 1st Female Child 0 No Yes
## 6 2nd Female Child 0 No No
逻辑回归
逻辑回归是响应为类别时广泛使用的模型。 它可以看作是广义线性预测模型的一个特例。 我们在 spark.glm
之上提供 spark.logit
,以支持具有高级超参数的逻辑回归。 它支持具有弹性网络正则化和特征标准化的二元和多类分类,类似于 glmnet
。
我们使用一个简单的示例来演示 spark.logit
的用法。 通常,使用 spark.logit
有三个步骤:1). 从适当的数据源创建一个数据帧;2). 使用 spark.logit
和适当的参数设置拟合逻辑回归模型;3). 使用 summary
获取拟合模型的系数矩阵,并使用该模型进行预测与 predict
。
二项式逻辑回归
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.logit(training, Survived ~ ., regParam = 0.04741301)
summary(model)
## $coefficients
## Estimate
## (Intercept) 0.2255014282
## Class_1st -0.1338856652
## Class_2nd -0.1479826947
## Class_3rd 0.0005674937
## Sex_Female -0.2011183871
## Age_Adult 0.3263186885
## Freq -0.0033111157
预测训练数据的值
fitted <- predict(model, training)
head(select(fitted, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No Yes
## 2 2nd Male Child 0 No Yes
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No Yes
## 5 1st Female Child 0 No No
## 6 2nd Female Child 0 No No
针对三个类的多项式逻辑回归
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# Note in this case, Spark infers it is multinomial logistic regression, so family = "multinomial" is optional.
model <- spark.logit(training, Class ~ ., regParam = 0.07815179)
summary(model)
## $coefficients
## 1st 2nd 3rd Crew
## (Intercept) 0.051662845 0.062998145 -0.039083689 -0.075577300
## Sex_Female -0.088030587 -0.102528148 0.059233106 0.131325629
## Age_Adult 0.141935316 0.169492058 -0.102562719 -0.208864654
## Survived_No 0.052721020 0.057980057 -0.029408423 -0.081292653
## Freq -0.001555912 -0.001970377 0.001303836 0.002222453
多层感知器
多层感知器分类器 (MLPC) 是一种基于前馈人工神经网络的分类器。 MLPC 由多个节点层组成。 每层都完全连接到网络中的下一层。 输入层中的节点表示输入数据。 所有其他节点通过将输入与节点的权重进行线性组合来将输入映射到输出和偏置并应用激活函数。 这可以用矩阵形式为 MLPC 编写,其中层如下
中间层中的节点使用 sigmoid(逻辑)函数
输出层中的节点使用 softmax 函数
输出层中节点的数量对应于类的数量。
MLPC 采用反向传播来学习模型。 我们使用逻辑损失函数进行优化,并使用 L-BFGS 作为优化例程。
spark.mlp
在 data
中至少需要两列:一列名为 "label"
,另一列名为 "features"
。 "features"
列应为 libSVM 格式。
我们使用 Titanic 数据集来展示如何在分类中使用 spark.mlp
。
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Multilayer Perceptron Classification Model
model <- spark.mlp(training, Survived ~ Age + Sex, blockSize = 128, layers = c(2, 2), solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1, initialWeights = c( 0, 0, 5, 5, 9, 9))
为了避免冗长的显示,我们仅显示模型摘要的部分结果。 您可以从您的 sparkR shell 中查看完整结果。
# check the summary of the fitted model
summary(model)
## $numOfInputs
## [1] 2
##
## $numOfOutputs
## [1] 2
##
## $layers
## [1] 2 2
##
## $weights
## $weights[[1]]
## [1] 0
##
## $weights[[2]]
## [1] 0
##
## $weights[[3]]
## [1] 5
##
## $weights[[4]]
## [1] 5
##
## $weights[[5]]
## [1] 9
##
## $weights[[6]]
## [1] 9
# make predictions use the fitted model
predictions <- predict(model, training)
head(select(predictions, predictions$prediction))
## prediction
## 1 No
## 2 No
## 3 No
## 4 No
## 5 No
## 6 No
朴素贝叶斯
朴素贝叶斯模型假设特征之间相互独立。spark.naiveBayes
针对 SparkDataFrame 拟合 伯努利朴素贝叶斯模型。数据应全部为类别型。这些模型通常用于文档分类。
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
naiveBayesModel <- spark.naiveBayes(titanicDF, Survived ~ Class + Sex + Age)
summary(naiveBayesModel)
## $apriori
## Yes No
## [1,] 0.5769231 0.4230769
##
## $tables
## Class_3rd Class_1st Class_2nd Sex_Female Age_Adult
## Yes 0.3125 0.3125 0.3125 0.5 0.5625
## No 0.4166667 0.25 0.25 0.5 0.75
naiveBayesPrediction <- predict(naiveBayesModel, titanicDF)
head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction"))
## Class Sex Age Survived prediction
## 1 3rd Male Child No Yes
## 2 3rd Female Child No Yes
## 3 1st Male Adult No Yes
## 4 2nd Male Adult No Yes
## 5 3rd Male Adult No No
## 6 Crew Male Adult No Yes
因子分解机分类器
用于分类问题的因子分解机。
有关因子分解机的背景和实现的详细信息,请参阅因子分解机部分。
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.fmClassifier(training, Survived ~ Age + Sex)
summary(model)
## $coefficients
## Estimate
## (Intercept) 0.0064275991
## Age_Adult 0.0001294448
## Sex_Female 0.0001294448
##
## $factors
## [,1] [,2] [,3] [,4] [,5] [,6]
## [1,] -0.3256224 0.11912568 0.1460235 0.1620567 0.13153516 0.06403695
## [2,] -0.1382155 -0.03658261 0.1717808 -0.1602241 -0.08446129 -0.19287098
## [,7] [,8]
## [1,] -0.03292446 -0.05166818
## [2,] 0.19252571 0.06237194
##
## $numClasses
## [1] 2
##
## $numFeatures
## [1] 2
##
## $factorSize
## [1] 8
## prediction
## 1 Yes
## 2 Yes
## 3 Yes
## 4 Yes
## 5 Yes
## 6 Yes
加速失效时间生存模型
生存分析研究事件发生前的预期持续时间,以及与受试者承担的风险因素或治疗方法的关系。与标准回归分析相比,生存建模必须处理数据中的特殊特征,包括非负生存时间和删失。
加速失效时间 (AFT) 模型是一种用于删失数据的参数生存模型,它假设协变量的影响是按某个常数加速或减速事件的生命历程。有关更多信息,请参阅 Wikipedia 页面 AFT 模型及其中的参考文献。与为相同目的设计的比例风险模型不同,AFT 模型更容易并行化,因为每个实例都独立地对目标函数做出贡献。
library(survival)
ovarianDF <- createDataFrame(ovarian)
aftModel <- spark.survreg(ovarianDF, Surv(futime, fustat) ~ ecog_ps + rx)
summary(aftModel)
## $coefficients
## Value
## (Intercept) 6.8966910
## ecog_ps -0.3850414
## rx 0.5286455
## Log(scale) -0.1234429
## futime fustat age resid_ds rx ecog_ps label prediction
## 1 59 1 72.3315 2 1 1 59 1141.724
## 2 115 1 74.4932 2 1 1 115 1141.724
## 3 156 1 66.4658 2 1 2 156 776.855
## 4 421 0 53.3644 2 2 1 421 1937.087
## 5 431 1 50.3397 2 1 1 431 1141.724
## 6 448 0 56.4301 1 1 2 448 776.855
广义线性模型
主要函数是 spark.glm
。支持以下族和链接函数。默认值为高斯分布。
族 | 链接函数 |
---|---|
高斯分布 | 恒等, 对数, 倒数 |
二项分布 | logit, probit, cloglog (互补对数-对数) |
泊松分布 | 对数, 恒等, 平方根 |
伽马分布 | 倒数, 恒等, 对数 |
tweedie | 幂链接函数 |
有三种方法可以指定 family
参数。
族名称作为字符串,例如
family = "gaussian"
。族函数,例如
family = binomial
。族函数返回的结果,例如
family = poisson(link = log)
。- 请注意,有两种方法可以指定 tweedie 族
- 设置
family = "tweedie"
并指定var.power
和link.power
- 加载包
statmod
后,使用其中的族定义(即tweedie()
)指定 tweedie 族。
- 设置
有关族及其链接函数的更多信息,请参阅 Wikipedia 页面 广义线性模型。
我们使用 mtcars
数据集作为示例。相应的 SparkDataFrame
是 carsDF
。拟合模型后,我们打印出摘要,并通过对原始数据集进行预测来查看拟合值。我们还可以传递具有相同模式的新 SparkDataFrame
以对新数据进行预测。
##
## Deviance Residuals:
## (Note: These are approximate quantiles with relative error <= 0.01)
## Min 1Q Median 3Q Max
## -3.9410 -1.6499 -0.3267 1.0373 5.8538
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 37.227270 1.5987875 23.2847 0.0000e+00
## wt -3.877831 0.6327335 -6.1287 1.1196e-06
## hp -0.031773 0.0090297 -3.5187 1.4512e-03
##
## (Dispersion parameter for gaussian family taken to be 6.725785)
##
## Null deviance: 1126.05 on 31 degrees of freedom
## Residual deviance: 195.05 on 29 degrees of freedom
## AIC: 156.7
##
## Number of Fisher Scoring iterations: 1
进行预测时,将附加一个名为 prediction
的新列。让我们在这里只查看列的一个子集。
gaussianFitted <- predict(gaussianGLM, carsDF)
head(select(gaussianFitted, "model", "prediction", "mpg", "wt", "hp"))
## model prediction mpg wt hp
## 1 Mazda RX4 23.57233 21.0 2.620 110
## 2 Mazda RX4 Wag 22.58348 21.0 2.875 110
## 3 Datsun 710 25.27582 22.8 2.320 93
## 4 Hornet 4 Drive 21.26502 21.4 3.215 110
## 5 Hornet Sportabout 18.32727 18.7 3.440 175
## 6 Valiant 20.47382 18.1 3.460 105
以下是使用 tweedie 族进行的相同拟合
tweedieGLM1 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie", var.power = 0.0)
summary(tweedieGLM1)
##
## Deviance Residuals:
## (Note: These are approximate quantiles with relative error <= 0.01)
## Min 1Q Median 3Q Max
## -3.9410 -1.6499 -0.3267 1.0373 5.8538
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 37.227270 1.5987875 23.2847 0.0000e+00
## wt -3.877831 0.6327335 -6.1287 1.1196e-06
## hp -0.031773 0.0090297 -3.5187 1.4512e-03
##
## (Dispersion parameter for tweedie family taken to be 6.725785)
##
## Null deviance: 1126.05 on 31 degrees of freedom
## Residual deviance: 195.05 on 29 degrees of freedom
## AIC: 156.7
##
## Number of Fisher Scoring iterations: 1
我们可以尝试 tweedie 族中的其他分布,例如具有对数链接的复合泊松分布
tweedieGLM2 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie",
var.power = 1.2, link.power = 0.0)
summary(tweedieGLM2)
##
## Deviance Residuals:
## (Note: These are approximate quantiles with relative error <= 0.01)
## Min 1Q Median 3Q Max
## -0.58074 -0.25335 -0.09892 0.18608 0.82717
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 3.8500849 0.06698272 57.4788 0.0000e+00
## wt -0.2018426 0.02897283 -6.9666 1.1691e-07
## hp -0.0016248 0.00041603 -3.9054 5.1697e-04
##
## (Dispersion parameter for tweedie family taken to be 0.1340111)
##
## Null deviance: 29.8820 on 31 degrees of freedom
## Residual deviance: 3.7739 on 29 degrees of freedom
## AIC: NA
##
## Number of Fisher Scoring iterations: 4
等渗回归
spark.isoreg
针对 SparkDataFrame
拟合 等渗回归 模型。 它解决了完全有序约束下的加权单变量回归问题。 具体来说,给定一组真实的观测响应,对应的真实特征,以及可选的正权重,我们想要找到一个单调(分段线性)函数来最小化
还有一些可能有用的参数。
weightCol
:指定权重列的字符串。isotonic
:逻辑值,指示输出序列是否应为等渗/递增 (TRUE
) 或反渗/递减 (FALSE
)。featureIndex
:如果它是向量列,则公式右侧的特征索引(默认值:0),否则无效。
我们使用一个人工示例来演示用法。
y <- c(3.0, 6.0, 8.0, 5.0, 7.0)
x <- c(1.0, 2.0, 3.5, 3.0, 4.0)
w <- rep(1.0, 5)
data <- data.frame(y = y, x = x, w = w)
df <- createDataFrame(data)
isoregModel <- spark.isoreg(df, y ~ x, weightCol = "w")
isoregFitted <- predict(isoregModel, df)
head(select(isoregFitted, "x", "y", "prediction"))
## x y prediction
## 1 1.0 3 3.0
## 2 2.0 6 5.5
## 3 3.5 8 7.5
## 4 3.0 5 5.5
## 5 4.0 7 7.5
在预测阶段,基于拟合的单调分段函数,规则是
如果预测输入与训练特征完全匹配,则返回关联的预测。 如果同一特征有多个预测,则返回其中一个。 返回哪个是未定义的。
如果预测输入低于或高于所有训练特征,则分别返回具有最低或最高特征的预测。 如果同一特征有多个预测,则分别返回最低或最高的预测。
如果预测输入落在两个训练特征之间,则将预测视为分段线性函数,并从两个最接近特征的预测中计算插值。 如果同一特征有多个值,则使用与前一点相同的规则。
例如,当输入为,两个最接近的特征值是和,则预测值将是在和.
newDF <- createDataFrame(data.frame(x = c(1.5, 3.2)))
head(predict(isoregModel, newDF))
## x prediction
## 1 1.5 4.25
## 2 3.2 6.30
处的预测值之间的线性插值。
## $coefficients
## Estimate
## (Intercept) 37.22727012
## wt -3.87783074
## hp -0.03177295
##
## $numFeatures
## [1] 2
## prediction
## 1 23.57233
## 2 22.58348
## 3 25.27582
## 4 21.26502
## 5 18.32727
## 6 20.47382
线性回归模型。
有关因子分解机的背景和实现的详细信息,请参阅因子分解机部分。
model <- spark.fmRegressor(carsDF, mpg ~ wt + hp)
summary(model)
## $coefficients
## Estimate
## (Intercept) 0.1518559
## wt 3.6472555
## hp 2.8026828
##
## $factors
## [,1] [,2] [,3] [,4] [,5] [,6]
## [1,] 0.1424420 -0.1178110 -0.3970272 -0.4696695 0.400288 0.3690930
## [2,] -0.1626185 0.1512138 0.3690435 0.4076975 -0.625752 -0.3715109
## [,7] [,8]
## [1,] 0.03472468 -0.1703219
## [2,] -0.02109148 -0.2006249
##
## $numFeatures
## [1] 2
##
## $factorSize
## [1] 8
## prediction
## 1 106.70996
## 2 87.07526
## 3 111.07931
## 4 60.89565
## 5 61.81374
## 6 40.70095
用于回归问题的因子分解机。
spark.decisionTree
在 SparkDataFrame
上拟合 决策树 分类或回归模型。 用户可以调用 summary
来获取拟合模型的摘要,调用 predict
进行预测,以及调用 write.ml
/read.ml
来保存/加载拟合模型。
t <- as.data.frame(Titanic)
df <- createDataFrame(t)
dtModel <- spark.decisionTree(df, Survived ~ ., type = "classification", maxDepth = 2)
summary(dtModel)
## Formula: Survived ~ .
## Number of features: 6
## Features: Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances: (6,[5],[1.0])
## Max Depth: 2
## DecisionTreeClassificationModel: uid=dtc_a1e6304af37c, depth=2, numNodes=5, numClasses=2, numFeatures=6
## If (feature 5 <= 4.5)
## Predict: 0.0
## Else (feature 5 > 4.5)
## If (feature 5 <= 84.5)
## Predict: 1.0
## Else (feature 5 > 84.5)
## Predict: 0.0
##
predictions <- predict(dtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No No
## 2 2nd Male Child 0 No No
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No No
## 5 1st Female Child 0 No No
## 6 2nd Female Child 0 No No
我们使用 Titanic
数据集来训练决策树并进行预测
spark.gbt
在 SparkDataFrame
上拟合 梯度提升树 分类或回归模型。 用户可以调用 summary
来获取拟合模型的摘要,调用 predict
进行预测,以及调用 write.ml
/read.ml
来保存/加载拟合模型。
t <- as.data.frame(Titanic)
df <- createDataFrame(t)
gbtModel <- spark.gbt(df, Survived ~ ., type = "classification", maxDepth = 2, maxIter = 2)
summary(gbtModel)
## Formula: Survived ~ .
## Number of features: 6
## Features: Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances: (6,[1,2,5],[0.03336902858878361,0.16099525743106016,0.8056357139801562])
## Max Depth: 2
## Number of trees: 2
## Tree weights: 1 0.1
## GBTClassificationModel: uid = gbtc_298ba1eabf17, numTrees=2, numClasses=2, numFeatures=6
## Tree 0 (weight 1.0):
## If (feature 5 <= 4.5)
## If (feature 1 in {1.0})
## Predict: -1.0
## Else (feature 1 not in {1.0})
## Predict: -0.3333333333333333
## Else (feature 5 > 4.5)
## If (feature 5 <= 84.5)
## Predict: 0.5714285714285714
## Else (feature 5 > 84.5)
## Predict: -0.42857142857142855
## Tree 1 (weight 0.1):
## If (feature 2 in {1.0})
## If (feature 5 <= 15.5)
## Predict: 0.9671846896296403
## Else (feature 5 > 15.5)
## Predict: -1.0857923804083338
## Else (feature 2 not in {1.0})
## If (feature 5 <= 13.5)
## Predict: -0.08651035613926407
## Else (feature 5 > 13.5)
## Predict: 0.6566673506774614
##
predictions <- predict(gbtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No No
## 2 2nd Male Child 0 No No
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No No
## 5 1st Female Child 0 No No
## 6 2nd Female Child 0 No No
我们使用 Titanic
数据集来训练梯度提升树并进行预测
spark.randomForest
在 SparkDataFrame
上拟合 随机森林 分类或回归模型。 用户可以调用 summary
来获取拟合模型的摘要,调用 predict
进行预测,以及调用 write.ml
/read.ml
来保存/加载拟合模型。
t <- as.data.frame(Titanic)
df <- createDataFrame(t)
rfModel <- spark.randomForest(df, Survived ~ ., type = "classification", maxDepth = 2, numTrees = 2)
summary(rfModel)
## Formula: Survived ~ .
## Number of features: 6
## Features: Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances: (6,[3,4,5],[0.17058779274099098,0.09676977311565654,0.7326424341433525])
## Max Depth: 2
## Number of trees: 2
## Tree weights: 1 1
## RandomForestClassificationModel: uid=rfc_f5de657337f1, numTrees=2, numClasses=2, numFeatures=6
## Tree 0 (weight 1.0):
## If (feature 4 in {0.0})
## If (feature 3 in {0.0})
## Predict: 0.0
## Else (feature 3 not in {0.0})
## Predict: 1.0
## Else (feature 4 not in {0.0})
## If (feature 5 <= 13.5)
## Predict: 0.0
## Else (feature 5 > 13.5)
## Predict: 1.0
## Tree 1 (weight 1.0):
## If (feature 5 <= 84.5)
## If (feature 5 <= 4.5)
## Predict: 0.0
## Else (feature 5 > 4.5)
## Predict: 1.0
## Else (feature 5 > 84.5)
## Predict: 0.0
##
predictions <- predict(rfModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
## Class Sex Age Freq Survived prediction
## 1 1st Male Child 0 No No
## 2 2nd Male Child 0 No No
## 3 3rd Male Child 35 No Yes
## 4 Crew Male Child 0 No No
## 5 1st Female Child 0 No No
## 6 2nd Female Child 0 No No
在以下示例中,我们使用 Titanic
数据集来训练随机森林并进行预测
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)
summary(model)
## $k
## [1] 4
##
## $coefficients
## Survived_No
## 1 0
## 2 1
## 3 0
## 4 1
##
## $size
## $size[[1]]
## [1] 16
##
## $size[[2]]
## [1] 16
##
## $size[[3]]
## [1] 0
##
## $size[[4]]
## [1] 0
##
##
## $cluster
## SparkDataFrame[prediction:int]
##
## $is.loaded
## [1] FALSE
## Class prediction
## 1 1st 1
## 2 2nd 1
## 3 3rd 1
## 4 Crew 1
## 5 1st 1
## 6 2nd 1
spark.bisectingKmeans
是一种使用分裂(或“自上而下”)方法的层次聚类:所有观测值都从一个簇开始,并且随着层次结构的向下移动,递归地执行分裂。
spark.gaussianMixture
针对 SparkDataFrame
拟合多元高斯混合模型 (GMM)。 期望最大化 (EM) 用于近似模型的最大似然估计量 (MLE)。
X1 <- data.frame(V1 = rnorm(4), V2 = rnorm(4))
X2 <- data.frame(V1 = rnorm(6, 3), V2 = rnorm(6, 4))
data <- rbind(X1, X2)
df <- createDataFrame(data)
gmmModel <- spark.gaussianMixture(df, ~ V1 + V2, k = 2)
summary(gmmModel)
## $lambda
## [1] 0.408879 0.591121
##
## $mu
## $mu[[1]]
## [1] -0.8254152973 0.0009888204
##
## $mu[[2]]
## [1] 3.006119 3.620325
##
##
## $sigma
## $sigma[[1]]
## [,1] [,2]
## [1,] 1.377944 1.092401
## [2,] 1.092401 1.477489
##
## $sigma[[2]]
## [,1] [,2]
## [1,] 1.317545 0.5716919
## [2,] 0.5716919 0.7335671
##
##
## $loglik
## [1] -32.8689
##
## $posterior
## SparkDataFrame[posterior:array<double>]
##
## $is.loaded
## [1] FALSE
## V1 V2 prediction
## 1 -1.400043517 0.6215527 0
## 2 0.255317055 1.1484116 0
## 3 -2.437263611 -1.8218177 0
## 4 -0.005571287 -0.2473253 0
## 5 2.755800393 4.5124269 1
## 6 2.717294551 2.1369885 1
我们使用一个模拟示例来演示用法。
k-Means 聚类-means 聚类模型。 作为一种无监督学习方法,我们不需要响应变量。 因此,R 公式的左侧应留空。 聚类仅基于右侧的变量。
kmeansModel <- spark.kmeans(carsDF, ~ mpg + hp + wt, k = 3)
summary(kmeansModel)
## $k
## [1] 3
##
## $coefficients
## mpg hp wt
## 1 24.22353 93.52941 2.599588
## 2 15.80000 178.50000 3.926400
## 3 14.62000 263.80000 3.899000
##
## $size
## $size[[1]]
## [1] 17
##
## $size[[2]]
## [1] 10
##
## $size[[3]]
## [1] 5
##
##
## $cluster
## SparkDataFrame[prediction:int]
##
## $is.loaded
## [1] FALSE
##
## $clusterSize
## [1] 3
kmeansPredictions <- predict(kmeansModel, carsDF)
head(select(kmeansPredictions, "model", "mpg", "hp", "wt", "prediction"), n = 20L)
## model mpg hp wt prediction
## 1 Mazda RX4 21.0 110 2.620 0
## 2 Mazda RX4 Wag 21.0 110 2.875 0
## 3 Datsun 710 22.8 93 2.320 0
## 4 Hornet 4 Drive 21.4 110 3.215 0
## 5 Hornet Sportabout 18.7 175 3.440 1
## 6 Valiant 18.1 105 3.460 0
## 7 Duster 360 14.3 245 3.570 2
## 8 Merc 240D 24.4 62 3.190 0
## 9 Merc 230 22.8 95 3.150 0
## 10 Merc 280 19.2 123 3.440 0
## 11 Merc 280C 17.8 123 3.440 0
## 12 Merc 450SE 16.4 180 4.070 1
## 13 Merc 450SL 17.3 180 3.730 1
## 14 Merc 450SLC 15.2 180 3.780 1
## 15 Cadillac Fleetwood 10.4 205 5.250 1
## 16 Lincoln Continental 10.4 215 5.424 1
## 17 Chrysler Imperial 14.7 230 5.345 2
## 18 Fiat 128 32.4 66 2.200 0
## 19 Honda Civic 30.4 52 1.615 0
## 20 Toyota Corolla 33.9 65 1.835 0
潜在狄利克雷分配
spark.lda
在 SparkDataFrame
上拟合 潜在狄利克雷分配 模型。 它通常用于主题建模,其中从文本文档的集合中推断出主题。 LDA 可以被认为是如下所示的聚类算法
主题对应于聚类中心,而文档对应于数据集中的示例(行)。
主题和文档都存在于特征空间中,其中特征向量是词计数向量(词袋)。
LDA 不是使用传统的距离进行聚类,而是使用基于文本文件生成方式的统计模型的函数。
要使用 LDA,我们需要在 data
中指定一个 features
列,其中每个条目代表一个文档。 该列有两个选项
字符串:这可以是整个文档的字符串。 它将被自动解析。 可以在
customizedStopWords
中添加其他停用词。libSVM:每个条目都是单词的集合,并将被直接处理。
为拟合模型提供了两个附加函数。
spark.posterior
返回一个SparkDataFrame
,其中包含一个名为“topicDistribution”的后验概率向量列。spark.perplexity
返回给定SparkDataFrame
的对数困惑度,如果缺少参数data
,则返回训练数据的对数困惑度。
有关更多信息,请参见帮助文档 ?spark.lda
。
让我们看一个人工示例。
corpus <- data.frame(features = c(
"1 2 6 0 2 3 1 1 0 0 3",
"1 3 0 1 3 0 0 2 0 0 1",
"1 4 1 0 0 4 9 0 1 2 0",
"2 1 0 3 0 0 5 0 2 3 9",
"3 1 1 9 3 0 2 0 0 1 3",
"4 2 0 3 4 5 1 1 1 4 0",
"2 1 0 3 0 0 5 0 2 2 9",
"1 1 1 9 2 1 2 0 0 1 3",
"4 4 0 3 4 2 1 3 0 0 0",
"2 8 2 0 3 0 2 0 2 7 2",
"1 1 1 9 0 2 2 0 0 3 3",
"4 1 0 0 4 5 1 3 0 1 0"))
corpusDF <- createDataFrame(corpus)
model <- spark.lda(data = corpusDF, k = 5, optimizer = "em")
summary(model)
## $docConcentration
## [1] 11 11 11 11 11
##
## $topicConcentration
## [1] 1.1
##
## $logLikelihood
## [1] -353.2948
##
## $logPerplexity
## [1] 2.676476
##
## $isDistributed
## [1] TRUE
##
## $vocabSize
## [1] 10
##
## $topics
## SparkDataFrame[topic:int, term:array<string>, termWeights:array<double>]
##
## $vocabulary
## [1] "0" "1" "2" "3" "4" "9" "5" "8" "7" "6"
##
## $trainingLogLikelihood
## [1] -239.5629
##
## $logPrior
## [1] -980.2974
posterior <- spark.posterior(model, corpusDF)
head(posterior)
## features topicDistribution
## 1 1 2 6 0 2 3 1 1 0 0 3 0.1972172, 0.1986640, 0.2021996, 0.2006554, 0.2012638
## 2 1 3 0 1 3 0 0 2 0 0 1 0.1989977, 0.1988735, 0.2015952, 0.2006357, 0.1998979
## 3 1 4 1 0 0 4 9 0 1 2 0 0.2020598, 0.2026073, 0.1968832, 0.1987276, 0.1997220
## 4 2 1 0 3 0 0 5 0 2 3 9 0.2004069, 0.1981933, 0.2013000, 0.2006292, 0.1994706
## 5 3 1 1 9 3 0 2 0 0 1 3 0.1971478, 0.1983952, 0.2023559, 0.2011561, 0.2009450
## 6 4 2 0 3 4 5 1 1 1 4 0 0.2020231, 0.2041808, 0.1955391, 0.1997213, 0.1985357
perplexity <- spark.perplexity(model, corpusDF)
perplexity
## [1] 2.676476
交替最小二乘法
spark.als
通过 交替最小二乘法 学习 协同过滤 中的潜在因子。
可以在 spark.als
中配置多个选项,包括 rank
、reg
和 nonnegative
。 有关完整的列表,请参阅帮助文件。
ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(ratings, c("user", "item", "rating"))
model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegative = TRUE)
提取潜在因子。
stats <- summary(model)
userFactors <- stats$userFactors
itemFactors <- stats$itemFactors
head(userFactors)
head(itemFactors)
进行预测。
幂迭代聚类
幂迭代聚类 (PIC) 是一种可扩展的图聚类算法。 spark.assignClusters
方法运行 PIC 算法,并为每个输入顶点返回一个聚类分配。
df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
list(1L, 2L, 1.0), list(3L, 4L, 1.0),
list(4L, 0L, 0.1)),
schema = c("src", "dst", "weight"))
head(spark.assignClusters(df, initMode = "degree", weightCol = "weight"))
## id cluster
## 1 4 1
## 2 0 0
## 3 1 0
## 4 3 1
## 5 2 0
FP-growth
spark.fpGrowth
执行 FP-growth 算法以挖掘 SparkDataFrame
上的频繁项集。 itemsCol
应该是一个值数组。
df <- selectExpr(createDataFrame(data.frame(rawItems = c(
"T,R,U", "T,S", "V,R", "R,U,T,V", "R,S", "V,S,U", "U,R", "S,T", "V,R", "V,U,S",
"T,V,U", "R,V", "T,S", "T,S", "S,T", "S,U", "T,R", "V,R", "S,V", "T,S,U"
))), "split(rawItems, ',') AS items")
fpm <- spark.fpGrowth(df, minSupport = 0.2, minConfidence = 0.5)
spark.freqItemsets
方法可用于检索具有频繁项集的 SparkDataFrame
。
head(spark.freqItemsets(fpm))
## items freq
## 1 R 9
## 2 U 8
## 3 U, T 4
## 4 U, V 4
## 5 U, S 4
## 6 T 10
spark.associationRules
返回具有关联规则的 SparkDataFrame
。
head(spark.associationRules(fpm))
## antecedent consequent confidence lift support
## 1 V R 0.5555556 1.234568 0.25
## 2 S T 0.5454545 1.090909 0.30
## 3 T S 0.6000000 1.090909 0.30
## 4 R V 0.5555556 1.234568 0.25
## 5 U T 0.5000000 1.000000 0.20
## 6 U V 0.5000000 1.111111 0.20
我们可以基于 antecedent
进行预测。
## items prediction
## 1 T, R, U S, V
## 2 T, S NULL
## 3 V, R NULL
## 4 R, U, T, V S
## 5 R, S T, V
## 6 V, S, U R, T
PrefixSpan
spark.findFrequentSequentialPatterns
方法可用于查找项集输入序列中完整的频繁序列模式集。
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
list(list(list(1L), list(3L, 2L), list(1L, 2L))),
list(list(list(1L, 2L), list(5L))),
list(list(list(6L)))),
schema = c("sequence"))
head(spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L))
## sequence freq
## 1 1 3
## 2 3 2
## 3 2 3
## 4 1, 2 3
## 5 1, 3 2
Kolmogorov-Smirnov 检验
spark.kstest
运行一个双侧、单样本的 Kolmogorov-Smirnov (KS) 检验。 给定一个 SparkDataFrame
,该测试会将给定列 testCol
中的连续数据与参数 nullHypothesis
指定的理论分布进行比较。 用户可以调用 summary
来获取测试结果的摘要。
在以下示例中,我们测试 Titanic
数据集的 Freq
列是否服从正态分布。 我们使用样本的均值和标准差来设置正态分布的参数。
t <- as.data.frame(Titanic)
df <- createDataFrame(t)
freqStats <- head(select(df, mean(df$Freq), sd(df$Freq)))
freqMean <- freqStats[1]
freqStd <- freqStats[2]
test <- spark.kstest(df, "Freq", "norm", c(freqMean, freqStd))
testSummary <- summary(test)
testSummary
## Kolmogorov-Smirnov test summary:
## degrees of freedom = 0
## statistic = 0.3065126710255011
## pValue = 0.0036336792155329256
## Very strong presumption against null hypothesis: Sample follows theoretical distribution.
模型持久化
以下示例展示了如何在 SparkR 中保存/加载 ML 模型。
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
gaussianGLM <- spark.glm(training, Freq ~ Sex + Age, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
##
## Saved-loaded model does not support output 'Deviance Residuals'.
##
## Coefficients:
## Estimate Std. Error t value Pr(>|t|)
## (Intercept) 46.219 35.994 1.2841 0.2092846
## Sex_Female -78.812 41.562 -1.8962 0.0679311
## Age_Adult 123.938 41.562 2.9820 0.0057522
##
## (Dispersion parameter for gaussian family taken to be 13819.52)
##
## Null deviance: 573341 on 31 degrees of freedom
## Residual deviance: 400766 on 29 degrees of freedom
## AIC: 400.7
##
## Number of Fisher Scoring iterations: 1
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, training)
head(gaussianPredictions)
## Class Sex Age Survived Freq label prediction
## 1 1st Male Child No 0 0 46.21875
## 2 2nd Male Child No 0 0 46.21875
## 3 3rd Male Child No 35 35 46.21875
## 4 Crew Male Child No 0 0 46.21875
## 5 1st Female Child No 0 0 -32.59375
## 6 2nd Female Child No 0 0 -32.59375
unlink(modelPath)
结构化流处理
SparkR 支持结构化流处理 API。
您可以查看结构化流处理编程指南,了解其编程模型和基本概念的 简介。
简单源和接收器
Spark 有一些内置的输入源。 例如,要使用套接字源将文本读入单词并显示计算的字数
# Create DataFrame representing the stream of input lines from connection
lines <- read.stream("socket", host = hostname, port = port)
# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# Generate running word count
wordCounts <- count(groupBy(words, "word"))
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")
Kafka 源
从 Kafka 读取数据很简单。 有关更多信息,请参阅结构化流处理支持的输入源。
topic <- read.stream("kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1")
keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")
操作和接收器
SparkDataFrame
上最常见的操作都支持流处理,包括选择、投影和聚合。 定义最终结果后,要启动流式计算,您将调用 write.stream
方法来设置接收器和 outputMode
。
可以将流式 SparkDataFrame
写入控制台进行调试,写入临时内存表,或者以容错方式写入不同格式的文件接收器以进行进一步处理。
noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")
# Print new data to console
write.stream(noAggDF, "console")
# Write new data to Parquet files
write.stream(noAggDF,
"parquet",
path = "path/to/destination/dir",
checkpointLocation = "path/to/checkpoint/dir")
# Aggregate
aggDF <- count(groupBy(noAggDF, "device"))
# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")
# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
head(sql("select * from aggregates"))
高级主题
SparkR 对象类
SparkR 中您可能会使用的三个主要对象类。
-
SparkDataFrame
:SparkR 的核心组件。 它是一个 S4 类,表示组织成命名列的分布式数据集合,在概念上等同于关系数据库中的表或 R 中的数据框。 它有两个插槽sdf
和env
。-
sdf
存储对 Spark JVM 后端中相应 Spark Dataset 的引用。 -
env
保存对象的元信息,例如isCached
。
它可以由数据导入方法创建,也可以通过转换现有的
SparkDataFrame
创建。 我们可以通过大量数据处理函数来操作SparkDataFrame
,并将其输入到机器学习算法中。 -
-
Column
:一个 S4 类,表示SparkDataFrame
的一列。 插槽jc
保存对 Spark JVM 后端中相应Column
对象的引用。可以通过
$
运算符从SparkDataFrame
中获得它,例如,df$col
。 更常见的是,它与其他函数一起使用,例如,与select
一起选择特定列,与filter
和构造的条件一起选择行,与聚合函数一起计算每个组的聚合统计信息。 -
GroupedData
:一个 S4 类,表示由groupBy
创建或通过转换其他GroupedData
创建的分组数据。 它的sgd
插槽保存对后端中RelationalGroupedDataset
对象的引用。这通常是一个带有组信息的中间对象,后面是聚合操作。
架构
可以在参考文献中看到架构的完整描述,特别是论文 *SparkR:使用 Spark 扩展 R 程序*。
SparkR 的底层是 Spark SQL 引擎。 这避免了运行解释 R 代码的开销,并且 Spark 中优化的 SQL 执行引擎使用有关数据和计算流的结构信息来执行一系列优化以加速计算。
实际计算的主要方法调用发生在驱动程序的 Spark JVM 中。 我们有一个基于套接字的 SparkR API,允许我们从 R 调用 JVM 上的函数。 我们使用在基于 Netty 的套接字服务器上侦听的 SparkR JVM 后端。
SparkR JVM 后端支持两种 RPC:方法调用和创建新对象。 方法调用可以通过两种方式完成。
sparkR.callJMethod
接受对现有 Java 对象的引用以及要传递给方法的参数列表。sparkR.callJStatic
接受静态方法的类名以及要传递给方法的参数列表。
参数使用我们的自定义线路格式序列化,然后在 JVM 端反序列化。 然后,我们使用 Java 反射来调用适当的方法。
要创建对象,使用 sparkR.newJObject
,然后类似地使用提供的参数调用适当的构造函数。
最后,我们使用一个新的 R 类 jobj
,它引用后端中存在的 Java 对象。 这些引用在 Java 端被跟踪,并在它们超出 R 端的范围时自动进行垃圾回收。
参考
SparkR:使用 Spark 扩展 R 程序, Shivaram Venkataraman, Zongheng Yang, Davies Liu, Eric Liang, Hossein Falaki, Xiangrui Meng, Reynold Xin, Ali Ghodsi, Michael Franklin, Ion Stoica, 和 Matei Zaharia。 SIGMOD 2016. 2016 年 6 月。