跳到内容

从 Apache Spark 4.0.0 开始,SparkR 已被弃用,并将在未来版本中移除。

概述

SparkR 是一个 R 包,提供了一个轻量级前端,用于在 R 中使用 Apache Spark。从 Spark 4.0.0 开始,SparkR 提供了一个分布式数据框实现,支持数据处理操作,如选择、过滤、聚合等,以及使用 MLlib 进行分布式机器学习。

开始使用

我们首先从一个在本地机器上运行的示例开始,并概述 SparkR 的使用:数据摄取、数据处理和机器学习。

首先,让我们加载并附加包。

SparkSession 是 SparkR 的入口点,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session 创建一个 SparkSession 并传入选项,例如应用程序名称、任何依赖的 Spark 包等。

我们使用默认设置,它以本地模式运行。如果没有找到之前的安装,它会在后台自动下载 Spark 包。有关设置的更多详细信息,请参阅 Spark 会话

## Java ref type org.apache.spark.sql.classic.SparkSession id 1

SparkR 中的操作围绕一个名为 SparkDataFrame 的 R 类展开。它是一个分布式数据集合,数据按命名列组织,概念上等同于关系数据库中的表或 R 中的数据框,但在底层具有更丰富的优化。

SparkDataFrame 可以从多种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据框。例如,我们从一个本地 R 数据框创建一个 SparkDataFrame

cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

我们可以通过 headshowDF 函数查看 SparkDataFrame 的前几行。

head(carsDF)
##               model  mpg cyl disp  hp drat    wt  qsec vs am gear carb
## 1         Mazda RX4 21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
## 2     Mazda RX4 Wag 21.0   6  160 110 3.90 2.875 17.02  0  1    4    4
## 3        Datsun 710 22.8   4  108  93 3.85 2.320 18.61  1  1    4    1
## 4    Hornet 4 Drive 21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
## 5 Hornet Sportabout 18.7   8  360 175 3.15 3.440 17.02  0  0    3    2
## 6           Valiant 18.1   6  225 105 2.76 3.460 20.22  1  0    3    1

SparkDataFrame 支持常见的数据处理操作,例如 filterselect

carsSubDF <- select(carsDF, "model", "mpg", "hp")
carsSubDF <- filter(carsSubDF, carsSubDF$hp >= 200)
head(carsSubDF)
##                 model  mpg  hp
## 1          Duster 360 14.3 245
## 2  Cadillac Fleetwood 10.4 205
## 3 Lincoln Continental 10.4 215
## 4   Chrysler Imperial 14.7 230
## 5          Camaro Z28 13.3 245
## 6      Ford Pantera L 15.8 264

SparkR 可以在分组后使用许多常见的聚合函数。

carsGPDF <- summarize(groupBy(carsDF, carsDF$gear), count = n(carsDF$gear))
head(carsGPDF)
##   gear count
## 1    4    12
## 2    3    15
## 3    5     5

结果 carsDFcarsSubDF 都是 SparkDataFrame 对象。要转换回 R data.frame,我们可以使用 collect注意:这可能会导致您的交互式环境内存不足,因为 collect() 会将整个分布式 DataFrame 获取到您的客户端,该客户端充当 Spark 驱动程序。

carsGP <- collect(carsGPDF)
class(carsGP)
## [1] "data.frame"

SparkR 支持多种常用的机器学习算法。在底层,SparkR 使用 MLlib 训练模型。用户可以调用 summary 打印拟合模型的摘要,使用 predict 对新数据进行预测,以及使用 write.ml/read.ml 保存/加载拟合模型。

SparkR 支持一部分 R 公式运算符用于模型拟合,包括 ‘~’、‘.’、‘:’、‘+’ 和 ‘-’。我们以线性回归为例。

model <- spark.glm(carsDF, mpg ~ wt + cyl)

该结果与应用于 carsDF 对应的 data.frame mtcars 的 R glm 函数返回的结果相匹配。事实上,对于广义线性模型 (Generalized Linear Model),我们也专门为 SparkDataFrame 暴露了 glm,因此上述操作等同于 model <- glm(mpg ~ wt + cyl, data = carsDF)

summary(model)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##     Min       1Q   Median       3Q      Max  
## -4.2893  -1.7085  -0.4713   1.5729   6.1004  
## 
## Coefficients:
##              Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)   39.6863     1.71498  23.1409  0.00000000
## wt            -3.1910     0.75691  -4.2158  0.00022202
## cyl           -1.5078     0.41469  -3.6360  0.00106428
## 
## (Dispersion parameter for gaussian family taken to be 6.592137)
## 
##     Null deviance: 1126.05  on 31  degrees of freedom
## Residual deviance:  191.17  on 29  degrees of freedom
## AIC: 156
## 
## Number of Fisher Scoring iterations: 1

模型可以通过 write.ml 保存,并使用 read.ml 加载回来。

write.ml(model, path = "/HOME/tmp/mlModel/glmModel")

最后,我们可以通过运行以下命令停止 Spark 会话

设置

安装

与许多其他 R 包不同,要使用 SparkR,您需要额外安装 Apache Spark。Spark 安装将用于运行后端进程,该进程将编译和执行 SparkR 程序。

安装 SparkR 包后,您可以按照上一节的说明调用 sparkR.session 来启动,它将检查 Spark 安装。如果您从交互式 shell(例如 R、RStudio)中使用 SparkR,那么如果未找到 Spark,它将自动下载并缓存。另外,我们提供了一个易于使用的函数 install.spark 用于手动运行此操作。如果您计算机上没有安装 Spark,可以从 Apache Spark 网站下载。

如果您已经安装了 Spark,则无需再次安装,可以将 sparkHome 参数传递给 sparkR.session,让 SparkR 知道现有 Spark 安装的位置。

sparkR.session(sparkHome = "/HOME/spark")

Spark 会话

除了 sparkHomesparkR.session 中还可以指定许多其他选项。有关完整列表,请参阅 启动:SparkSessionSparkR API 文档

特别是,以下 Spark 驱动程序属性可以在 sparkConfig 中设置。

属性名称 属性组 spark-submit 等效项
spark.driver.memory 应用程序属性 --driver-memory
spark.driver.extraClassPath 运行时环境 --driver-class-path
spark.driver.extraJavaOptions 运行时环境 --driver-java-options
spark.driver.extraLibraryPath 运行时环境 --driver-library-path
spark.kerberos.keytab 应用程序属性 --keytab
spark.kerberos.principal 应用程序属性 --principal

对于 Windows 用户:由于不同操作系统之间文件前缀不同,为避免潜在的前缀错误问题,当前的解决方法是在启动 SparkSession 时指定 spark.sql.warehouse.dir

spark_warehouse_path <- file.path(path.expand('~'), "spark-warehouse")
sparkR.session(spark.sql.warehouse.dir = spark_warehouse_path)

集群模式

SparkR 可以连接到远程 Spark 集群。集群模式概述 是对不同 Spark 集群模式的良好介绍。

将 SparkR 连接到远程 Spark 集群时,请确保机器上的 Spark 版本和 Hadoop 版本与集群上的对应版本匹配。当前 SparkR 包兼容

## [1] "Spark 4.0.0"

它应该同时用于本地计算机和远程集群。

要连接,请将主节点的 URL 传递给 sparkR.session。完整列表可在 Spark Master URL 中查看。例如,要连接到本地独立 Spark 主节点,我们可以调用

sparkR.session(master = "spark://local:7077")

对于 YARN 集群,SparkR 支持客户端模式,主节点设置为“yarn”。

sparkR.session(master = "yarn")

当前版本不支持 Yarn 集群模式。

数据导入

本地数据框

最简单的方法是将本地 R 数据框转换为 SparkDataFrame。具体来说,我们可以使用 as.DataFramecreateDataFrame 并传入本地 R 数据框来创建一个 SparkDataFrame。例如,以下代码使用 R 中的 faithful 数据集创建一个 SparkDataFrame

df <- as.DataFrame(faithful)
head(df)
##   eruptions waiting
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85
## 6     2.883      55

数据源

SparkR 通过 SparkDataFrame 接口支持对各种数据源进行操作。您可以查看 Spark SQL 编程指南,了解内置数据源可用的更多 特定选项

从数据源创建 SparkDataFrame 的通用方法是 read.df。此方法接受要加载的文件路径和数据源类型,并且将自动使用当前活动的 Spark 会话。SparkR 原生支持读取 CSV、JSON 和 Parquet 文件,并且通过 Spark Packages 您可以找到 Avro 等流行文件格式的数据源连接器。在使用 sparkR.session 初始化 SparkSession 时,可以使用 sparkPackages 参数添加这些包。

sparkR.session(sparkPackages = "com.databricks:spark-avro_2.12:3.0.0")

我们可以通过一个 CSV 输入文件示例来了解如何使用数据源。欲了解更多信息,请参阅 SparkR read.df API 文档。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

数据源 API 原生支持 JSON 格式的输入文件。请注意,此处使用的文件并非典型的 JSON 文件。文件中的每一行都必须包含一个单独的、自包含的有效 JSON 对象。因此,常规的多行 JSON 文件通常会失败。

让我们看一下此处使用的原始 JSON 文件的前两行。

filePath <- paste0(sparkR.conf("spark.home"),
                         "/examples/src/main/resources/people.json")
readLines(filePath, n = 2L)
## [1] "{\"name\":\"Michael\"}"          "{\"name\":\"Andy\", \"age\":30}"

我们使用 read.df 将其读入 SparkDataFrame

people <- read.df(filePath, "json")
count(people)
## [1] 3
head(people)
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin

SparkR 自动从 JSON 文件推断模式。

printSchema(people)
## root
##  |-- age: long (nullable = true)
##  |-- name: string (nullable = true)

如果我们要读取多个 JSON 文件,可以使用 read.json

people <- read.json(paste0(Sys.getenv("SPARK_HOME"),
                           c("/examples/src/main/resources/people.json",
                             "/examples/src/main/resources/people.json")))
count(people)
## [1] 6

数据源 API 还可以用于将 SparkDataFrames 保存为多种文件格式。例如,我们可以使用 write.df 将前面示例中的 SparkDataFrame 保存到 Parquet 文件中。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

Hive 表

您还可以从 Hive 表创建 SparkDataFrames。为此,我们需要创建一个支持 Hive 的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该已经构建了 Hive 支持,更多详细信息可以在 SQL 编程指南中找到。在 SparkR 中,默认情况下它会尝试创建一个启用 Hive 支持的 SparkSession (enableHiveSupport = TRUE)。

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

txtPath <- paste0(sparkR.conf("spark.home"), "/examples/src/main/resources/kv1.txt")
sqlCMD <- sprintf("LOAD DATA LOCAL INPATH '%s' INTO TABLE src", txtPath)
sql(sqlCMD)

results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)

数据处理

致 dplyr 用户:SparkR 在数据处理方面与 dplyr 具有相似的接口。然而,首先值得一提的是一些显著差异。我们在这里使用 df 代表 SparkDataFrame,使用 col 代表列名。

  1. 指示列。SparkR 使用列名的字符串或使用 $ 构造的 Column 对象来指示列。例如,要在 df 中选择 col,我们可以写 select(df, "col")select(df, df$col)

  2. 描述条件。在 SparkR 中,Column 对象表示可以直接插入到条件中,或者我们可以使用字符串来描述条件,而无需引用所使用的 SparkDataFrame。例如,要选择值 > 1 的行,我们可以写 filter(df, df$col > 1)filter(df, "col > 1")

这里有更具体的例子。

dplyr SparkR
select(mtcars, mpg, hp) select(carsDF, "mpg", "hp")
filter(mtcars, mpg > 20, hp > 100) filter(carsDF, carsDF$mpg > 20, carsDF$hp > 100)

其他差异将在具体方法中提及。

我们使用上面创建的 SparkDataFrame carsDF。我们可以获取关于 SparkDataFrame 的基本信息。

carsDF
## SparkDataFrame[model:string, mpg:double, cyl:double, disp:double, hp:double, drat:double, wt:double, qsec:double, vs:double, am:double, gear:double, carb:double]

以树状格式打印模式。

printSchema(carsDF)
## root
##  |-- model: string (nullable = true)
##  |-- mpg: double (nullable = true)
##  |-- cyl: double (nullable = true)
##  |-- disp: double (nullable = true)
##  |-- hp: double (nullable = true)
##  |-- drat: double (nullable = true)
##  |-- wt: double (nullable = true)
##  |-- qsec: double (nullable = true)
##  |-- vs: double (nullable = true)
##  |-- am: double (nullable = true)
##  |-- gear: double (nullable = true)
##  |-- carb: double (nullable = true)

SparkDataFrame 操作

选择行、列

SparkDataFrames 支持许多用于结构化数据处理的函数。这里我们包含一些基本示例,完整列表可在 API 文档中找到

您也可以将列名作为字符串传入。

head(select(carsDF, "mpg"))
##    mpg
## 1 21.0
## 2 21.0
## 3 22.8
## 4 21.4
## 5 18.7
## 6 18.1

过滤 SparkDataFrame,只保留每加仑英里数 (mpg) 小于 20 英里/加仑的行。

head(filter(carsDF, carsDF$mpg < 20))
##               model  mpg cyl  disp  hp drat   wt  qsec vs am gear carb
## 1 Hornet Sportabout 18.7   8 360.0 175 3.15 3.44 17.02  0  0    3    2
## 2           Valiant 18.1   6 225.0 105 2.76 3.46 20.22  1  0    3    1
## 3        Duster 360 14.3   8 360.0 245 3.21 3.57 15.84  0  0    3    4
## 4          Merc 280 19.2   6 167.6 123 3.92 3.44 18.30  1  0    4    4
## 5         Merc 280C 17.8   6 167.6 123 3.92 3.44 18.90  1  0    4    4
## 6        Merc 450SE 16.4   8 275.8 180 3.07 4.07 17.40  0  0    3    3

分组、聚合

分组和聚合的常见流程是

  1. 使用 groupBygroup_by 并结合一些分组变量来创建一个 GroupedData 对象

  2. GroupedData 对象传递给 aggsummarize 函数,并使用一些提供的聚合函数计算每个组内的数值。

分组后支持许多广泛使用的聚合函数,包括 avgcount_distinctcountfirstkurtosislastmaxmeanminsdskewnessstddev_popstddev_sampsum_distinctsumvar_popvar_sampvar。请参阅此处链接的聚合函数 API 文档

例如,我们可以计算 mtcars 数据集中汽缸数量的直方图,如下所示。

numCyl <- summarize(groupBy(carsDF, carsDF$cyl), count = n(carsDF$cyl))
head(numCyl)
##   cyl count
## 1   8    14
## 2   4    11
## 3   6     7

使用 cuberollup 计算多维度的小计。

mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]

生成 {(cyl, gear, am), (cyl, gear), (cyl), ()} 的分组,而

mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]

生成分组列的所有可能组合的分组。

列操作

SparkR 还提供了许多可以直接应用于列的函数,用于数据处理和聚合。以下示例展示了基本算术函数的使用。

carsDF_km <- carsDF
carsDF_km$kmpg <- carsDF_km$mpg * 1.61
head(select(carsDF_km, "model", "mpg", "kmpg"))
##               model  mpg   kmpg
## 1         Mazda RX4 21.0 33.810
## 2     Mazda RX4 Wag 21.0 33.810
## 3        Datsun 710 22.8 36.708
## 4    Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6           Valiant 18.1 29.141

窗口函数

窗口函数是聚合函数的一种变体。简单来说,

  • 聚合函数:n1 的映射 - 为一组条目返回单个值。示例包括 sumcountmax

  • 窗口函数:nn 的映射 - 为组中的每个条目返回一个值,但该值可能取决于中的所有条目。示例包括 rankleadlag

形式上,上面提到的被称为。每个输入行都可以有一个与之关联的唯一帧,并且该行上的窗口函数的输出基于该帧中包含的行。

窗口函数通常与以下函数结合使用:windowPartitionBywindowOrderBypartitionByorderByover。为了说明这一点,我们接下来看一个示例。

我们仍然使用 mtcars 数据集。对应的 SparkDataFramecarsDF。假设对于每个汽缸数量,我们想计算组内每辆车在 mpg 中的排名。

carsSubDF <- select(carsDF, "model", "mpg", "cyl")
ws <- orderBy(windowPartitionBy("cyl"), "mpg")
carsRank <- withColumn(carsSubDF, "rank", over(rank(), ws))
head(carsRank, n = 20L)
##                  model  mpg cyl rank
## 1           Volvo 142E 21.4   4    1
## 2        Toyota Corona 21.5   4    2
## 3           Datsun 710 22.8   4    3
## 4             Merc 230 22.8   4    3
## 5            Merc 240D 24.4   4    5
## 6        Porsche 914-2 26.0   4    6
## 7            Fiat X1-9 27.3   4    7
## 8          Honda Civic 30.4   4    8
## 9         Lotus Europa 30.4   4    8
## 10            Fiat 128 32.4   4   10
## 11      Toyota Corolla 33.9   4   11
## 12           Merc 280C 17.8   6    1
## 13             Valiant 18.1   6    2
## 14            Merc 280 19.2   6    3
## 15        Ferrari Dino 19.7   6    4
## 16           Mazda RX4 21.0   6    5
## 17       Mazda RX4 Wag 21.0   6    5
## 18      Hornet 4 Drive 21.4   6    7
## 19  Cadillac Fleetwood 10.4   8    1
## 20 Lincoln Continental 10.4   8    1

我们详细解释上述步骤。

  • windowPartitionBy 创建一个定义分区的窗口规范对象 WindowSpec。它控制哪些行将与给定行在同一分区中。在这种情况下,cyl 值相同的行将被放入同一分区中。orderBy 进一步定义了排序 - 给定行在分区中的位置。结果 WindowSpec 将作为 ws 返回。

更多的窗口规范方法包括 rangeBetween(按值定义帧边界)和 rowsBetween(按行索引定义边界)。

  • withColumn 将一个名为 rank 的 Column 附加到 SparkDataFrameover 返回一个窗口列。第一个参数通常是窗口函数(例如 rank()lead(carsDF$wt))返回的 Column。它根据分区和排序后的表计算相应的值。

用户定义函数

在 SparkR 中,我们支持多种用户定义函数 (UDF)。

按分区应用

dapply 可以将函数应用于 SparkDataFrame 的每个分区。应用于 SparkDataFrame 每个分区的函数应该只有一个参数,即对应于一个分区的 data.frame,并且输出也应该是一个 data.frame。模式(Schema)指定了结果 SparkDataFrame 的行格式。它必须与返回值的 DataTypes 匹配。有关 R 和 Spark 之间映射的信息,请参阅此处

我们将 mpg 转换为 kmpg(每加仑公里数)。carsSubDF 是一个包含 carsDF 列子集的 SparkDataFrame

carsSubDF <- select(carsDF, "model", "mpg")
schema <- "model STRING, mpg DOUBLE, kmpg DOUBLE"
out <- dapply(carsSubDF, function(x) { x <- cbind(x, x$mpg * 1.61) }, schema)
head(collect(out))
##               model  mpg   kmpg
## 1         Mazda RX4 21.0 33.810
## 2     Mazda RX4 Wag 21.0 33.810
## 3        Datsun 710 22.8 36.708
## 4    Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6           Valiant 18.1 29.141

dapply 类似,dapplyCollect 可以将函数应用于 SparkDataFrame 的每个分区并收集结果。函数的输出应该是一个 data.frame,但在此情况下不需要模式。请注意,如果 UDF 在所有分区上的输出无法拉取到驱动程序内存中,则 dapplyCollect 可能会失败。

out <- dapplyCollect(
         carsSubDF,
         function(x) {
           x <- cbind(x, "kmpg" = x$mpg * 1.61)
         })
head(out, 3)
##           model  mpg   kmpg
## 1     Mazda RX4 21.0 33.810
## 2 Mazda RX4 Wag 21.0 33.810
## 3    Datsun 710 22.8 36.708

按组应用

gapply 可以将函数应用于 SparkDataFrame 的每个组。要应用于 SparkDataFrame 每个组的函数应该只有两个参数:分组键和与该键对应的 R data.frame。组从 SparkDataFrames 的列中选择。函数的输出应该是一个 data.frame。模式(Schema)指定了结果 SparkDataFrame 的行格式。它必须根据 Spark 数据类型表示 R 函数的输出模式。返回的 data.frame 的列名由用户设置。有关 R 和 Spark 之间映射的信息,请参阅此处

schema <- structType(structField("cyl", "double"), structField("max_mpg", "double"))
result <- gapply(
    carsDF,
    "cyl",
    function(key, x) {
        y <- data.frame(key, max(x$mpg))
    },
    schema)
head(arrange(result, "max_mpg", decreasing = TRUE))
##   cyl max_mpg
## 1   4    33.9
## 2   6    21.4
## 3   8    19.2

gapply 类似,gapplyCollect 可以将函数应用于 SparkDataFrame 的每个分区并将结果收集回 R data.frame。函数的输出应该是一个 data.frame,但在此情况下不需要模式。请注意,如果 UDF 在所有分区上的输出无法拉取到驱动程序内存中,则 gapplyCollect 可能会失败。

result <- gapplyCollect(
    carsDF,
    "cyl",
    function(key, x) {
         y <- data.frame(key, max(x$mpg))
        colnames(y) <- c("cyl", "max_mpg")
        y
    })
head(result[order(result$max_mpg, decreasing = TRUE), ])
##   cyl max_mpg
## 1   4    33.9
## 2   6    21.4
## 3   8    19.2

分发本地函数

与原生 R 中的 lapply 类似,spark.lapply 对元素列表运行函数,并使用 Spark 分布计算。spark.lapply 的工作方式类似于 doParallellapply 对列表元素的操作。所有计算结果应该适合单台机器。如果不是这种情况,您可以执行类似 df <- createDataFrame(list) 的操作,然后使用 dapply

我们以 e1071 包中的 svm 为例。我们使用所有默认设置,除了改变约束违反的成本。spark.lapply 可以并行训练这些不同的模型。

costs <- exp(seq(from = log(1), to = log(1000), length.out = 5))
train <- function(cost) {
  stopifnot(requireNamespace("e1071", quietly = TRUE))
  model <- e1071::svm(Species ~ ., data = iris, cost = cost)
  summary(model)
}

返回模型摘要的列表。

model.summaries <- spark.lapply(costs, train)
class(model.summaries)
## [1] "list"

为避免显示过长,我们只展示第二个拟合模型的部分结果。您也可以随意检查其他模型。

print(model.summaries[[2]])
## $call
## svm(formula = Species ~ ., data = iris, cost = cost)
## 
## $type
## [1] 0
## 
## $kernel
## [1] 2
## 
## $cost
## [1] 5.623413
## 
## $degree
## [1] 3
## 
## $gamma
## [1] 0.25
## 
## $coef0
## [1] 0
## 
## $nu
## [1] 0.5
## 
## $epsilon
## [1] 0.1
## 
## $sparse
## [1] FALSE
## 
## $scaled
## [1] TRUE TRUE TRUE TRUE
## 
## $x.scale
## $x.scale$`scaled:center`
## Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##     5.843333     3.057333     3.758000     1.199333 
## 
## $x.scale$`scaled:scale`
## Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##    0.8280661    0.4358663    1.7652982    0.7622377 
## 
## 
## $y.scale
## NULL
## 
## $nclasses
## [1] 3
## 
## $levels
## [1] "setosa"     "versicolor" "virginica" 
## 
## $tot.nSV
## [1] 35
## 
## $nSV
## [1]  6 15 14
## 
## $labels
## [1] 1 2 3
## 
## $SV
##     Sepal.Length Sepal.Width Petal.Length Petal.Width
## 14   -1.86378030 -0.13153881   -1.5056946  -1.4422448
## 16   -0.17309407  3.08045544   -1.2791040  -1.0486668
## 21   -0.53538397  0.78617383   -1.1658087  -1.3110521
## 23   -1.50149039  1.24503015   -1.5623422  -1.3110521
## 24   -0.89767388  0.55674567   -1.1658087  -0.9174741
## 42   -1.62225369 -1.73753594   -1.3923993  -1.1798595
## 51    1.39682886  0.32731751    0.5336209   0.2632600
## 53    1.27606556  0.09788935    0.6469162   0.3944526
## 54   -0.41462067 -1.73753594    0.1370873   0.1320673
## 55    0.79301235 -0.59039513    0.4769732   0.3944526
##  [ reached getOption("max.print") -- omitted 25 rows ]
## 
## $index
##  [1]  14  16  21  23  24  42  51  53  54  55  58  61  69  71  73  78  79  84  85
## [20]  86  99 107 111 119 120 124 127 128 130 132 134 135 139 149 150
## 
## $rho
## [1] -0.10346530  0.12160294 -0.09540346
## 
## $compprob
## [1] FALSE
## 
## $probA
## NULL
## 
## $probB
## NULL
## 
## $sigma
## NULL
## 
## $coefs
##              [,1]        [,2]
##  [1,]  0.00000000  0.06561739
##  [2,]  0.76813720  0.93378721
##  [3,]  0.00000000  0.12123270
##  [4,]  0.00000000  0.31170741
##  [5,]  1.11614066  0.46397392
##  [6,]  1.88141600  1.10392128
##  [7,] -0.55872622  0.00000000
##  [8,]  0.00000000  5.62341325
##  [9,]  0.00000000  0.27711792
## [10,]  0.00000000  5.28440007
## [11,] -1.06596713  0.00000000
## [12,] -0.57076709  1.09019756
## [13,] -0.03365904  5.62341325
## [14,]  0.00000000  5.62341325
## [15,]  0.00000000  5.62341325
## [16,]  0.00000000  5.62341325
## [17,]  0.00000000  4.70398738
## [18,]  0.00000000  5.62341325
## [19,]  0.00000000  4.97981371
## [20,] -0.77497987  0.00000000
##  [ reached getOption("max.print") -- omitted 15 rows ]
## 
## $na.action
## NULL
## 
## $xlevels
## named list()
## 
## $fitted
##      1      2      3      4      5      6      7      8      9     10     11 
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa 
##     12     13     14     15     16     17     18     19     20     21     22 
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa 
##     23     24     25     26     27     28     29     30     31     32     33 
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa 
##     34     35     36     37     38     39     40 
## setosa setosa setosa setosa setosa setosa setosa 
##  [ reached getOption("max.print") -- omitted 110 entries ]
## Levels: setosa versicolor virginica
## 
## $decision.values
##     setosa/versicolor setosa/virginica versicolor/virginica
## 1           1.1911739        1.0908424            1.1275805
## 2           1.1336557        1.0619543            1.3260964
## 3           1.2085065        1.0698101            1.0511345
## 4           1.1646153        1.0505915            1.0806874
## 5           1.1880814        1.0950348            0.9542815
## 6           1.0990761        1.0984626            0.9326361
## 7           1.1573474        1.0343287            0.9726843
## 8           1.1851598        1.0815750            1.2206802
## 9           1.1673499        1.0406734            0.8837945
## 10          1.1629911        1.0560925            1.2430067
## 11          1.1339282        1.0803946            1.0338357
## 12          1.1724182        1.0641469            1.1190423
## 13          1.1827355        1.0667956            1.1414844
##  [ reached getOption("max.print") -- omitted 137 rows ]
## 
## $terms
## Species ~ Sepal.Length + Sepal.Width + Petal.Length + Petal.Width
## attr(,"variables")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"factors")
##              Sepal.Length Sepal.Width Petal.Length Petal.Width
## Species                 0           0            0           0
## Sepal.Length            1           0            0           0
## Sepal.Width             0           1            0           0
## Petal.Length            0           0            1           0
## Petal.Width             0           0            0           1
## attr(,"term.labels")
## [1] "Sepal.Length" "Sepal.Width"  "Petal.Length" "Petal.Width" 
## attr(,"order")
## [1] 1 1 1 1
## attr(,"intercept")
## [1] 0
## attr(,"response")
## [1] 1
## attr(,".Environment")
## <environment: 0x5580e90fd1d8>
## attr(,"predvars")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"dataClasses")
##      Species Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##     "factor"    "numeric"    "numeric"    "numeric"    "numeric" 
## 
## attr(,"class")
## [1] "summary.svm"

SQL 查询

SparkDataFrame 也可以在 Spark SQL 中注册为临时视图,以便可以对其数据运行 SQL 查询。sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame 返回。

people <- read.df(paste0(sparkR.conf("spark.home"),
                         "/examples/src/main/resources/people.json"), "json")

将此 SparkDataFrame 注册为临时视图。

createOrReplaceTempView(people, "people")

SQL 语句可以使用 sql 方法运行。

teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##     name
## 1 Justin

机器学习

SparkR 支持以下机器学习模型和算法。

分类

  • 线性支持向量机 (SVM) 分类器

  • 逻辑回归

  • 多层感知器 (MLP)

  • 朴素贝叶斯

  • 因子分解机 (FM) 分类器

回归

  • 加速失效时间 (AFT) 生存模型

  • 广义线性模型 (GLM)

  • 等渗回归

  • 线性回归

  • 因子分解机 (FM) 回归器

树模型 - 分类和回归

  • 决策树

  • 梯度提升树 (GBT)

  • 随机森林

聚类

  • 二分kk-均值

  • 高斯混合模型 (GMM)

  • kk-均值聚类

  • 潜在狄利克雷分配 (LDA)

  • 幂迭代聚类 (PIC)

协同过滤

  • 交替最小二乘 (ALS)

频繁模式挖掘

  • FP-growth
  • PrefixSpan

统计

  • 柯尔莫哥洛夫-斯米尔诺夫检验

R 公式

对于上述大多数模型,SparkR 支持 R 公式运算符,包括 ~.:+- 用于模型拟合。这使得使用体验类似于使用 R 函数。

训练集和测试集

我们可以通过 randomSplit 函数轻松地将 SparkDataFrame 分割成随机的训练集和测试集。它返回一个包含给定 weights 的分割 SparkDataFrames 列表。我们以 carsDF 为例,希望大约有7070%训练数据和3030%测试数据。

splitDF_list <- randomSplit(carsDF, c(0.7, 0.3), seed = 0)
carsDF_train <- splitDF_list[[1]]
carsDF_test <- splitDF_list[[2]]
count(carsDF_train)
## [1] 24
head(carsDF_train)
##                model  mpg cyl disp  hp drat    wt  qsec vs am gear carb
## 1 Cadillac Fleetwood 10.4   8  472 205 2.93 5.250 17.98  0  0    3    4
## 2         Camaro Z28 13.3   8  350 245 3.73 3.840 15.41  0  0    3    4
## 3  Chrysler Imperial 14.7   8  440 230 3.23 5.345 17.42  0  0    3    4
## 4   Dodge Challenger 15.5   8  318 150 2.76 3.520 16.87  0  0    3    2
## 5         Duster 360 14.3   8  360 245 3.21 3.570 15.84  0  0    3    4
## 6       Ferrari Dino 19.7   6  145 175 3.62 2.770 15.50  0  1    5    6
count(carsDF_test)
## [1] 8
head(carsDF_test)
##            model  mpg cyl  disp  hp drat    wt  qsec vs am gear carb
## 1    AMC Javelin 15.2   8 304.0 150 3.15 3.435 17.30  0  0    3    2
## 2     Datsun 710 22.8   4 108.0  93 3.85 2.320 18.61  1  1    4    1
## 3       Fiat 128 32.4   4  78.7  66 4.08 2.200 19.47  1  1    4    1
## 4      Merc 240D 24.4   4 146.7  62 3.69 3.190 20.00  1  0    4    2
## 5       Merc 280 19.2   6 167.6 123 3.92 3.440 18.30  1  0    4    4
## 6 Toyota Corolla 33.9   4  71.1  65 4.22 1.835 19.90  1  1    4    1

模型和算法

线性支持向量机 (SVM) 分类器

线性支持向量机 (SVM) 分类器是一种具有线性核的 SVM 分类器。这是一种二元分类器。我们用一个简单的例子来展示如何使用 spark.svmLinear 进行二元分类。

# load training data and create a DataFrame
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Linear SVM classifier model
model <- spark.svmLinear(training,  Survived ~ ., regParam = 0.01, maxIter = 10)
summary(model)
## $coefficients
##                 Estimate
## (Intercept)  0.993131388
## Class_1st   -0.386500359
## Class_2nd   -0.622627816
## Class_3rd   -0.204446602
## Sex_Female  -0.589950309
## Age_Adult    0.741676902
## Freq        -0.006582887
## 
## $numClasses
## [1] 2
## 
## $numFeatures
## [1] 6

在训练数据上预测值

prediction <- predict(model, training)
head(select(prediction, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No        Yes
## 2   2nd   Male Child    0       No        Yes
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No        Yes
## 5   1st Female Child    0       No        Yes
## 6   2nd Female Child    0       No         No

逻辑回归

逻辑回归 是一种在响应变量为分类变量时广泛使用的模型。它可以被视为 广义线性预测模型 的特例。我们在 spark.glm 之上提供了 spark.logit,以支持具有高级超参数的逻辑回归。它支持带弹性网络正则化和特征标准化的二元和多类分类,类似于 glmnet

我们使用一个简单的例子来演示 spark.logit 的用法。通常,使用 spark.logit 有三个步骤:1). 从适当的数据源创建数据框;2). 使用 spark.logit 和适当的参数设置拟合逻辑回归模型;3). 使用 summary 获取拟合模型的系数矩阵,并使用 predict 进行预测。

二项逻辑回归

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.logit(training, Survived ~ ., regParam = 0.04741301)
summary(model)
## $coefficients
##                  Estimate
## (Intercept)  0.2255014282
## Class_1st   -0.1338856652
## Class_2nd   -0.1479826947
## Class_3rd    0.0005674937
## Sex_Female  -0.2011183871
## Age_Adult    0.3263186885
## Freq        -0.0033111157

在训练数据上预测值

fitted <- predict(model, training)
head(select(fitted, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No        Yes
## 2   2nd   Male Child    0       No        Yes
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No        Yes
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

针对三个类别的多项逻辑回归

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# Note in this case, Spark infers it is multinomial logistic regression, so family = "multinomial" is optional.
model <- spark.logit(training, Class ~ ., regParam = 0.07815179)
summary(model)
## $coefficients
##                      1st          2nd          3rd         Crew
## (Intercept)  0.051662845  0.062998145 -0.039083689 -0.075577300
## Sex_Female  -0.088030587 -0.102528148  0.059233106  0.131325629
## Age_Adult    0.141935316  0.169492058 -0.102562719 -0.208864654
## Survived_No  0.052721020  0.057980057 -0.029408423 -0.081292653
## Freq        -0.001555912 -0.001970377  0.001303836  0.002222453

多层感知器

多层感知器分类器 (MLPC) 是一种基于前馈人工神经网络的分类器。MLPC 由多层节点组成。网络中的每一层都与下一层完全连接。输入层中的节点代表输入数据。所有其他节点通过输入与节点权重的线性组合将输入映射到输出ww和偏差bb并应用激活函数。这可以以矩阵形式写出,对于 MLPC 具有K+1K+1层,如下所示y(x)=fK(f2(w2Tf1(w1Tx+b1)+b2)+bK).y(x)=f_K(\ldots f_2(w_2^T f_1(w_1^T x + b_1) + b_2) \ldots + b_K).

中间层中的节点使用 Sigmoid (逻辑) 函数f(zi)=11+ezi.f(z_i) = \frac{1}{1+e^{-z_i}}.

输出层中的节点使用 Softmax 函数f(zi)=ezik=1Nezk.f(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}}.

节点的数量NN在输出层中对应于类别的数量。

MLPC 采用反向传播来学习模型。我们使用逻辑损失函数进行优化,并使用 L-BFGS 作为优化例程。

spark.mlp 需要 data 中至少有两列:一列名为 "label",另一列名为 "features""features" 列应为 libSVM 格式。

我们使用 Titanic 数据集来展示如何在分类中使用 spark.mlp

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Multilayer Perceptron Classification Model
model <- spark.mlp(training, Survived ~ Age + Sex, blockSize = 128, layers = c(2, 2), solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1, initialWeights = c( 0, 0, 5, 5, 9, 9))

为避免显示过长,我们只展示模型摘要的部分结果。您可以在 SparkR shell 中查看完整结果。

# check the summary of the fitted model
summary(model)
## $numOfInputs
## [1] 2
## 
## $numOfOutputs
## [1] 2
## 
## $layers
## [1] 2 2
## 
## $weights
## $weights[[1]]
## [1] 0
## 
## $weights[[2]]
## [1] 0
## 
## $weights[[3]]
## [1] 5
## 
## $weights[[4]]
## [1] 5
## 
## $weights[[5]]
## [1] 9
## 
## $weights[[6]]
## [1] 9
# make predictions use the fitted model
predictions <- predict(model, training)
head(select(predictions, predictions$prediction))
##   prediction
## 1         No
## 2         No
## 3         No
## 4         No
## 5         No
## 6         No

朴素贝叶斯

朴素贝叶斯模型假设特征之间相互独立。spark.naiveBayes 对 SparkDataFrame 拟合 伯努利朴素贝叶斯模型。数据应全部是分类的。这些模型常用于文档分类。

titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
naiveBayesModel <- spark.naiveBayes(titanicDF, Survived ~ Class + Sex + Age)
summary(naiveBayesModel)
## $apriori
##            Yes        No
## [1,] 0.5769231 0.4230769
## 
## $tables
##     Class_3rd Class_1st Class_2nd Sex_Female Age_Adult
## Yes 0.3125    0.3125    0.3125    0.5        0.5625   
## No  0.4166667 0.25      0.25      0.5        0.75
naiveBayesPrediction <- predict(naiveBayesModel, titanicDF)
head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction"))
##   Class    Sex   Age Survived prediction
## 1   3rd   Male Child       No        Yes
## 2   3rd Female Child       No        Yes
## 3   1st   Male Adult       No        Yes
## 4   2nd   Male Adult       No        Yes
## 5   3rd   Male Adult       No         No
## 6  Crew   Male Adult       No        Yes

因子分解机分类器

用于分类问题的因子分解机。

有关因子分解机实现背景和详细信息,请参阅因子分解机部分

t <- as.data.frame(Titanic)
training <- createDataFrame(t)

model <- spark.fmClassifier(training, Survived ~ Age + Sex)
summary(model)
## $coefficients
##                 Estimate
## (Intercept) 0.0064275991
## Age_Adult   0.0001294448
## Sex_Female  0.0001294448
## 
## $factors
##            [,1]        [,2]      [,3]       [,4]        [,5]        [,6]
## [1,] -0.3256224  0.11912568 0.1460235  0.1620567  0.13153516  0.06403695
## [2,] -0.1382155 -0.03658261 0.1717808 -0.1602241 -0.08446129 -0.19287098
##             [,7]        [,8]
## [1,] -0.03292446 -0.05166818
## [2,]  0.19252571  0.06237194
## 
## $numClasses
## [1] 2
## 
## $numFeatures
## [1] 2
## 
## $factorSize
## [1] 8
predictions <- predict(model, training)
head(select(predictions, predictions$prediction))
##   prediction
## 1        Yes
## 2        Yes
## 3        Yes
## 4        Yes
## 5        Yes
## 6        Yes

加速失效时间生存模型

生存分析研究事件发生前预期的时间长度,以及通常与风险因素或受试者所接受治疗的关系。与标准回归分析不同,生存建模必须处理数据中的特殊特征,包括非负生存时间和删失。

加速失效时间 (AFT) 模型是一种用于删失数据的参数生存模型,它假设协变量的作用是以某个常数加速或减速事件的生命历程。欲了解更多信息,请参阅维基百科页面 AFT 模型及其中引用的文献。与用于相同目的的 比例风险模型 不同,AFT 模型更容易并行化,因为每个实例独立地贡献于目标函数。

library(survival)
ovarianDF <- createDataFrame(ovarian)
aftModel <- spark.survreg(ovarianDF, Surv(futime, fustat) ~ ecog_ps + rx)
summary(aftModel)
## $coefficients
##                  Value
## (Intercept)  6.8966910
## ecog_ps     -0.3850414
## rx           0.5286455
## Log(scale)  -0.1234429
aftPredictions <- predict(aftModel, ovarianDF)
head(aftPredictions)
##   futime fustat     age resid_ds rx ecog_ps label prediction
## 1     59      1 72.3315        2  1       1    59   1141.724
## 2    115      1 74.4932        2  1       1   115   1141.724
## 3    156      1 66.4658        2  1       2   156    776.855
## 4    421      0 53.3644        2  2       1   421   1937.087
## 5    431      1 50.3397        2  1       1   431   1141.724
## 6    448      0 56.4301        1  1       2   448    776.855

广义线性模型

主要函数是 spark.glm。支持以下族和链接函数。默认是高斯 (gaussian)。

族 (Family) 链接函数 (Link Function)
高斯 (gaussian) 恒等 (identity), 对数 (log), 逆 (inverse)
二项 (binomial) logit, probit, cloglog (互补对数-对数)
泊松 (poisson) 对数 (log), 恒等 (identity), 平方根 (sqrt)
伽马 (gamma) 逆 (inverse), 恒等 (identity), 对数 (log)
tweedie 幂链接函数

有三种方法来指定 family 参数。

  • 族名作为字符串,例如 family = "gaussian"

  • 族函数,例如 family = binomial

  • 族函数返回的结果,例如 family = poisson(link = log)

  • 请注意,有两种方式来指定 tweedie 族

    1. 设置 family = "tweedie" 并指定 var.powerlink.power
    2. 当加载 statmod 包时,tweedie 族使用其中的族定义指定,即 tweedie()

有关族及其链接函数的更多信息,请参阅维基百科页面 广义线性模型

我们使用 mtcars 数据集作为说明。对应的 SparkDataFramecarsDF。拟合模型后,我们打印出摘要并通过对原始数据集进行预测来查看拟合值。我们还可以传入具有相同模式的新 SparkDataFrame 来预测新数据。

gaussianGLM <- spark.glm(carsDF, mpg ~ wt + hp)
summary(gaussianGLM)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##     Min       1Q   Median       3Q      Max  
## -3.9410  -1.6499  -0.3267   1.0373   5.8538  
## 
## Coefficients:
##               Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)  37.227270   1.5987875  23.2847  0.0000e+00
## wt           -3.877831   0.6327335  -6.1287  1.1196e-06
## hp           -0.031773   0.0090297  -3.5187  1.4512e-03
## 
## (Dispersion parameter for gaussian family taken to be 6.725785)
## 
##     Null deviance: 1126.05  on 31  degrees of freedom
## Residual deviance:  195.05  on 29  degrees of freedom
## AIC: 156.7
## 
## Number of Fisher Scoring iterations: 1

进行预测时,将附加一个名为 prediction 的新列。我们这里只看一部分列。

gaussianFitted <- predict(gaussianGLM, carsDF)
head(select(gaussianFitted, "model", "prediction", "mpg", "wt", "hp"))
##               model prediction  mpg    wt  hp
## 1         Mazda RX4   23.57233 21.0 2.620 110
## 2     Mazda RX4 Wag   22.58348 21.0 2.875 110
## 3        Datsun 710   25.27582 22.8 2.320  93
## 4    Hornet 4 Drive   21.26502 21.4 3.215 110
## 5 Hornet Sportabout   18.32727 18.7 3.440 175
## 6           Valiant   20.47382 18.1 3.460 105

以下是使用 tweedie 族进行相同拟合的结果

tweedieGLM1 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie", var.power = 0.0)
summary(tweedieGLM1)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##     Min       1Q   Median       3Q      Max  
## -3.9410  -1.6499  -0.3267   1.0373   5.8538  
## 
## Coefficients:
##               Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)  37.227270   1.5987875  23.2847  0.0000e+00
## wt           -3.877831   0.6327335  -6.1287  1.1196e-06
## hp           -0.031773   0.0090297  -3.5187  1.4512e-03
## 
## (Dispersion parameter for tweedie family taken to be 6.725785)
## 
##     Null deviance: 1126.05  on 31  degrees of freedom
## Residual deviance:  195.05  on 29  degrees of freedom
## AIC: 156.7
## 
## Number of Fisher Scoring iterations: 1

我们可以尝试 tweedie 族中的其他分布,例如,具有对数链接的复合泊松分布

tweedieGLM2 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie",
                         var.power = 1.2, link.power = 0.0)
summary(tweedieGLM2)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##      Min        1Q    Median        3Q       Max  
## -0.58074  -0.25335  -0.09892   0.18608   0.82717  
## 
## Coefficients:
##                Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)   3.8500849  0.06698272  57.4788  0.0000e+00
## wt           -0.2018426  0.02897283  -6.9666  1.1691e-07
## hp           -0.0016248  0.00041603  -3.9054  5.1697e-04
## 
## (Dispersion parameter for tweedie family taken to be 0.1340111)
## 
##     Null deviance: 29.8820  on 31  degrees of freedom
## Residual deviance:  3.7739  on 29  degrees of freedom
## AIC: NA
## 
## Number of Fisher Scoring iterations: 4

等渗回归

spark.isoregSparkDataFrame 拟合 等渗回归 模型。它在完全顺序约束下解决加权单变量回归问题。具体来说,给定一组实际观测响应y1,,yny_1, \ldots, y_n,对应的实际特征x1,,xnx_1, \ldots, x_n,以及可选的正权重w1,,wnw_1, \ldots, w_n,我们希望找到一个单调(分段线性)函数ff以最小化(f)=i=1nwi(yif(xi))2.\ell(f) = \sum_{i=1}^n w_i (y_i - f(x_i))^2.

还有一些可能有用的参数。

  • weightCol:一个字符串,指定权重列。

  • isotonic:逻辑值,指示输出序列应为等渗/递增 (TRUE) 还是反等渗/递减 (FALSE)。

  • featureIndex:如果公式右侧的特征是向量列(默认值:0),则为该特征的索引,否则无影响。

我们使用一个人工示例来展示其用法。

y <- c(3.0, 6.0, 8.0, 5.0, 7.0)
x <- c(1.0, 2.0, 3.5, 3.0, 4.0)
w <- rep(1.0, 5)
data <- data.frame(y = y, x = x, w = w)
df <- createDataFrame(data)
isoregModel <- spark.isoreg(df, y ~ x, weightCol = "w")
isoregFitted <- predict(isoregModel, df)
head(select(isoregFitted, "x", "y", "prediction"))
##     x y prediction
## 1 1.0 3        3.0
## 2 2.0 6        5.5
## 3 3.5 8        7.5
## 4 3.0 5        5.5
## 5 4.0 7        7.5

在预测阶段,根据拟合的单调分段函数,规则如下:

  • 如果预测输入与训练特征完全匹配,则返回相关的预测值。如果具有相同特征的预测值有多个,则返回其中一个。具体是哪一个则未定义。

  • 如果预测输入低于或高于所有训练特征,则分别返回具有最低或最高特征的预测值。如果具有相同特征的预测值有多个,则分别返回最低或最高值。

  • 如果预测输入落在两个训练特征之间,则预测被视为分段线性函数,并根据两个最近特征的预测值计算插值。如果具有相同特征的预测值有多个,则使用与前一点相同的规则。

例如,当输入为3.23.2时,两个最接近的特征值为3.03.03.53.5,则预测值将是预测值之间的线性插值,位于3.03.03.53.5.

newDF <- createDataFrame(data.frame(x = c(1.5, 3.2)))
head(predict(isoregModel, newDF))
##     x prediction
## 1 1.5       4.25
## 2 3.2       6.30

线性回归

线性回归模型。

model <- spark.lm(carsDF, mpg ~ wt + hp)

summary(model)
## $coefficients
##                Estimate
## (Intercept) 37.22727012
## wt          -3.87783074
## hp          -0.03177295
## 
## $numFeatures
## [1] 2
predictions <- predict(model, carsDF)
head(select(predictions, predictions$prediction))
##   prediction
## 1   23.57233
## 2   22.58348
## 3   25.27582
## 4   21.26502
## 5   18.32727
## 6   20.47382

因子分解机回归器

用于回归问题的因子分解机。

有关因子分解机实现背景和详细信息,请参阅因子分解机部分

model <- spark.fmRegressor(carsDF, mpg ~ wt + hp)
summary(model)
## $coefficients
##              Estimate
## (Intercept) 0.1518559
## wt          3.6472555
## hp          2.8026828
## 
## $factors
##            [,1]       [,2]       [,3]       [,4]      [,5]       [,6]
## [1,]  0.1424420 -0.1178110 -0.3970272 -0.4696695  0.400288  0.3690930
## [2,] -0.1626185  0.1512138  0.3690435  0.4076975 -0.625752 -0.3715109
##             [,7]       [,8]
## [1,]  0.03472468 -0.1703219
## [2,] -0.02109148 -0.2006249
## 
## $numFeatures
## [1] 2
## 
## $factorSize
## [1] 8
predictions <- predict(model, carsDF)
head(select(predictions, predictions$prediction))
##   prediction
## 1  106.70996
## 2   87.07526
## 3  111.07931
## 4   60.89565
## 5   61.81374
## 6   40.70095

决策树

spark.decisionTreeSparkDataFrame 拟合 决策树 分类或回归模型。用户可以调用 summary 获取拟合模型的摘要,使用 predict 进行预测,以及使用 write.ml/read.ml 保存/加载拟合模型。

我们使用 Titanic 数据集训练决策树并进行预测

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
dtModel <- spark.decisionTree(df, Survived ~ ., type = "classification", maxDepth = 2)
summary(dtModel)
## Formula:  Survived ~ .
## Number of features:  6
## Features:  Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances:  (6,[5],[1.0])
## Max Depth:  2
##  DecisionTreeClassificationModel: uid=dtc_3b470a52c3a2, depth=2, numNodes=5, numClasses=2, numFeatures=6
##   If (feature 5 <= 4.5)
##    Predict: 0.0
##   Else (feature 5 > 4.5)
##    If (feature 5 <= 84.5)
##     Predict: 1.0
##    Else (feature 5 > 84.5)
##     Predict: 0.0
## 
predictions <- predict(dtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No         No
## 2   2nd   Male Child    0       No         No
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No         No
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

梯度提升树

spark.gbtSparkDataFrame 拟合 梯度提升树 分类或回归模型。用户可以调用 summary 获取拟合模型的摘要,使用 predict 进行预测,以及使用 write.ml/read.ml 保存/加载拟合模型。

我们使用 Titanic 数据集训练梯度提升树并进行预测

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
gbtModel <- spark.gbt(df, Survived ~ ., type = "classification", maxDepth = 2, maxIter = 2)
summary(gbtModel)
## Formula:  Survived ~ .
## Number of features:  6
## Features:  Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances:  (6,[1,2,5],[0.03336902858878361,0.16099525743106016,0.8056357139801562])
## Max Depth:  2
## Number of trees:  2
## Tree weights:  1 0.1
##  GBTClassificationModel: uid = gbtc_474511659869, numTrees=2, numClasses=2, numFeatures=6
##   Tree 0 (weight 1.0):
##     If (feature 5 <= 4.5)
##      If (feature 1 in {1.0})
##       Predict: -1.0
##      Else (feature 1 not in {1.0})
##       Predict: -0.3333333333333333
##     Else (feature 5 > 4.5)
##      If (feature 5 <= 84.5)
##       Predict: 0.5714285714285714
##      Else (feature 5 > 84.5)
##       Predict: -0.42857142857142855
##   Tree 1 (weight 0.1):
##     If (feature 2 in {1.0})
##      If (feature 5 <= 15.5)
##       Predict: 0.9671846896296403
##      Else (feature 5 > 15.5)
##       Predict: -1.0857923804083338
##     Else (feature 2 not in {1.0})
##      If (feature 5 <= 13.5)
##       Predict: -0.08651035613926407
##      Else (feature 5 > 13.5)
##       Predict: 0.6566673506774614
## 
predictions <- predict(gbtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No         No
## 2   2nd   Male Child    0       No         No
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No         No
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

随机森林

spark.randomForestSparkDataFrame 拟合 随机森林 分类或回归模型。用户可以调用 summary 获取拟合模型的摘要,使用 predict 进行预测,以及使用 write.ml/read.ml 保存/加载拟合模型。

在以下示例中,我们使用 Titanic 数据集训练随机森林并进行预测

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
rfModel <- spark.randomForest(df, Survived ~ ., type = "classification", maxDepth = 2, numTrees = 2)
summary(rfModel)
## Formula:  Survived ~ .
## Number of features:  6
## Features:  Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances:  (6,[3,4,5],[0.09849498327759101,0.401505016722409,0.5])
## Max Depth:  2
## Number of trees:  2
## Tree weights:  1 1
##  RandomForestClassificationModel: uid=rfc_d8fbf7a89562, numTrees=2, numClasses=2, numFeatures=6
##   Tree 0 (weight 1.0):
##     If (feature 4 in {0.0})
##      Predict: 0.0
##     Else (feature 4 not in {0.0})
##      If (feature 3 in {1.0})
##       Predict: 0.0
##      Else (feature 3 not in {1.0})
##       Predict: 1.0
##   Tree 1 (weight 1.0):
##     If (feature 5 <= 84.5)
##      If (feature 5 <= 4.5)
##       Predict: 0.0
##      Else (feature 5 > 4.5)
##       Predict: 1.0
##     Else (feature 5 > 84.5)
##      Predict: 0.0
## 
predictions <- predict(rfModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No         No
## 2   2nd   Male Child    0       No         No
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No         No
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

二分 k-均值

spark.bisectingKmeans 是一种使用分裂(或“自顶向下”)方法的层次聚类:所有观测值从一个簇开始,然后随着向下移动层级,递归地执行分裂。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)
summary(model)
## $k
## [1] 4
## 
## $coefficients
##   Survived_No
## 1 0          
## 2 1          
## 3 0          
## 4 1          
## 
## $size
## $size[[1]]
## [1] 16
## 
## $size[[2]]
## [1] 16
## 
## $size[[3]]
## [1] 0
## 
## $size[[4]]
## [1] 0
## 
## 
## $cluster
## SparkDataFrame[prediction:int]
## 
## $is.loaded
## [1] FALSE
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
##   Class prediction
## 1   1st          1
## 2   2nd          1
## 3   3rd          1
## 4  Crew          1
## 5   1st          1
## 6   2nd          1

高斯混合模型

spark.gaussianMixtureSparkDataFrame 拟合多元 高斯混合模型 (GMM)。期望最大化 (EM) 用于近似模型的最大似然估计器 (MLE)。

我们使用一个模拟示例来演示用法。

X1 <- data.frame(V1 = rnorm(4), V2 = rnorm(4))
X2 <- data.frame(V1 = rnorm(6, 3), V2 = rnorm(6, 4))
data <- rbind(X1, X2)
df <- createDataFrame(data)
gmmModel <- spark.gaussianMixture(df, ~ V1 + V2, k = 2)
summary(gmmModel)
## $lambda
## [1] 0.408879 0.591121
## 
## $mu
## $mu[[1]]
## [1] -0.8254152973  0.0009888204
## 
## $mu[[2]]
## [1] 3.006119 3.620325
## 
## 
## $sigma
## $sigma[[1]]
##      [,1]     [,2]    
## [1,] 1.377944 1.092401
## [2,] 1.092401 1.477489
## 
## $sigma[[2]]
##      [,1]      [,2]     
## [1,] 1.317545  0.5716919
## [2,] 0.5716919 0.7335671
## 
## 
## $loglik
## [1] -32.8689
## 
## $posterior
## SparkDataFrame[posterior:array<double>]
## 
## $is.loaded
## [1] FALSE
gmmFitted <- predict(gmmModel, df)
head(select(gmmFitted, "V1", "V2", "prediction"))
##             V1         V2 prediction
## 1 -1.400043517  0.6215527          0
## 2  0.255317055  1.1484116          0
## 3 -2.437263611 -1.8218177          0
## 4 -0.005571287 -0.2473253          0
## 5  2.755800393  4.5124269          1
## 6  2.717294551  2.1369885          1

k-均值聚类

spark.kmeanskk-均值聚类模型对 SparkDataFrame 进行聚类。作为一种无监督学习方法,我们不需要响应变量。因此,R 公式中的左侧应留空。聚类仅基于右侧的变量。

kmeansModel <- spark.kmeans(carsDF, ~ mpg + hp + wt, k = 3)
summary(kmeansModel)
## $k
## [1] 3
## 
## $coefficients
##        mpg        hp       wt
## 1 24.22353  93.52941 2.599588
## 2 15.80000 178.50000 3.926400
## 3 14.62000 263.80000 3.899000
## 
## $size
## $size[[1]]
## [1] 17
## 
## $size[[2]]
## [1] 10
## 
## $size[[3]]
## [1] 5
## 
## 
## $cluster
## SparkDataFrame[prediction:int]
## 
## $is.loaded
## [1] FALSE
## 
## $clusterSize
## [1] 3
kmeansPredictions <- predict(kmeansModel, carsDF)
head(select(kmeansPredictions, "model", "mpg", "hp", "wt", "prediction"), n = 20L)
##                  model  mpg  hp    wt prediction
## 1            Mazda RX4 21.0 110 2.620          0
## 2        Mazda RX4 Wag 21.0 110 2.875          0
## 3           Datsun 710 22.8  93 2.320          0
## 4       Hornet 4 Drive 21.4 110 3.215          0
## 5    Hornet Sportabout 18.7 175 3.440          1
## 6              Valiant 18.1 105 3.460          0
## 7           Duster 360 14.3 245 3.570          2
## 8            Merc 240D 24.4  62 3.190          0
## 9             Merc 230 22.8  95 3.150          0
## 10            Merc 280 19.2 123 3.440          0
## 11           Merc 280C 17.8 123 3.440          0
## 12          Merc 450SE 16.4 180 4.070          1
## 13          Merc 450SL 17.3 180 3.730          1
## 14         Merc 450SLC 15.2 180 3.780          1
## 15  Cadillac Fleetwood 10.4 205 5.250          1
## 16 Lincoln Continental 10.4 215 5.424          1
## 17   Chrysler Imperial 14.7 230 5.345          2
## 18            Fiat 128 32.4  66 2.200          0
## 19         Honda Civic 30.4  52 1.615          0
## 20      Toyota Corolla 33.9  65 1.835          0

潜在狄利克雷分配

spark.ldaSparkDataFrame 拟合 潜在狄利克雷分配 模型。它常用于主题建模,其中主题从文本文档集合中推断出来。LDA 可以被视为一种聚类算法,如下所示:

  • 主题对应于簇中心,文档对应于数据集中的示例(行)。

  • 主题和文档都存在于特征空间中,其中特征向量是词频向量(词袋模型)。

  • LDA 不使用传统的距离进行聚类,而是使用基于文本文档生成方式的统计模型函数。

要使用 LDA,我们需要在 data 中指定一个 features 列,其中每个条目代表一个文档。该列有两种选项:

  • 字符串:这可以是整个文档的字符串。它将自动解析。额外的停用词可以在 customizedStopWords 中添加。

  • libSVM:每个条目是一个词的集合,将直接处理。

拟合模型还提供了两个函数。

  • spark.posterior 返回一个 SparkDataFrame,其中包含一个名为“topicDistribution”的后验概率向量列。

  • spark.perplexity 返回给定 SparkDataFrame 的对数困惑度,如果缺少参数 data,则返回训练数据的对数困惑度。

欲了解更多信息,请参阅帮助文档 ?spark.lda

让我们看一个人工示例。

corpus <- data.frame(features = c(
  "1 2 6 0 2 3 1 1 0 0 3",
  "1 3 0 1 3 0 0 2 0 0 1",
  "1 4 1 0 0 4 9 0 1 2 0",
  "2 1 0 3 0 0 5 0 2 3 9",
  "3 1 1 9 3 0 2 0 0 1 3",
  "4 2 0 3 4 5 1 1 1 4 0",
  "2 1 0 3 0 0 5 0 2 2 9",
  "1 1 1 9 2 1 2 0 0 1 3",
  "4 4 0 3 4 2 1 3 0 0 0",
  "2 8 2 0 3 0 2 0 2 7 2",
  "1 1 1 9 0 2 2 0 0 3 3",
  "4 1 0 0 4 5 1 3 0 1 0"))
corpusDF <- createDataFrame(corpus)
model <- spark.lda(data = corpusDF, k = 5, optimizer = "em")
summary(model)
## $docConcentration
## [1] 11 11 11 11 11
## 
## $topicConcentration
## [1] 1.1
## 
## $logLikelihood
## [1] -353.2948
## 
## $logPerplexity
## [1] 2.676476
## 
## $isDistributed
## [1] TRUE
## 
## $vocabSize
## [1] 10
## 
## $topics
## SparkDataFrame[topic:int, term:array<string>, termWeights:array<double>]
## 
## $vocabulary
##  [1] "0" "1" "2" "3" "4" "9" "5" "8" "7" "6"
## 
## $trainingLogLikelihood
## [1] -239.5629
## 
## $logPrior
## [1] -980.2974
posterior <- spark.posterior(model, corpusDF)
head(posterior)
##                features                                     topicDistribution
## 1 1 2 6 0 2 3 1 1 0 0 3 0.1972183, 0.1986639, 0.2022022, 0.2006584, 0.2012571
## 2 1 3 0 1 3 0 0 2 0 0 1 0.1989991, 0.1988732, 0.2015973, 0.2006385, 0.1998919
## 3 1 4 1 0 0 4 9 0 1 2 0 0.2020625, 0.2026072, 0.1968842, 0.1987298, 0.1997163
## 4 2 1 0 3 0 0 5 0 2 3 9 0.2004088, 0.1981928, 0.2013018, 0.2006318, 0.1994647
## 5 3 1 1 9 3 0 2 0 0 1 3 0.1971488, 0.1983950, 0.2023585, 0.2011592, 0.2009385
## 6 4 2 0 3 4 5 1 1 1 4 0 0.2020255, 0.2041807, 0.1955397, 0.1997236, 0.1985306
perplexity <- spark.perplexity(model, corpusDF)
perplexity
## [1] 2.676476

交替最小二乘

spark.als 通过 交替最小二乘协同过滤 中学习潜在因子。

spark.als 中可以配置多个选项,包括 rankregnonnegative。有关完整列表,请参阅帮助文件。

ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
                list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(ratings, c("user", "item", "rating"))
model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegative = TRUE)

提取潜在因子。

stats <- summary(model)
userFactors <- stats$userFactors
itemFactors <- stats$itemFactors
head(userFactors)
head(itemFactors)

进行预测。

predicted <- predict(model, df)
head(predicted)

幂迭代聚类

幂迭代聚类 (PIC) 是一种可扩展的图聚类算法。spark.assignClusters 方法运行 PIC 算法并返回每个输入顶点的簇分配。

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                           list(4L, 0L, 0.1)),
                      schema = c("src", "dst", "weight"))
head(spark.assignClusters(df, initMode = "degree", weightCol = "weight"))
##   id cluster
## 1  4       1
## 2  0       0
## 3  1       0
## 4  3       1
## 5  2       0

FP-growth

spark.fpGrowth 执行 FP-growth 算法以在 SparkDataFrame 上挖掘频繁项集。itemsCol 应该是一个值数组。

df <- selectExpr(createDataFrame(data.frame(rawItems = c(
  "T,R,U", "T,S", "V,R", "R,U,T,V", "R,S", "V,S,U", "U,R", "S,T", "V,R", "V,U,S",
  "T,V,U", "R,V", "T,S", "T,S", "S,T", "S,U", "T,R", "V,R", "S,V", "T,S,U"
))), "split(rawItems, ',') AS items")

fpm <- spark.fpGrowth(df, minSupport = 0.2, minConfidence = 0.5)

spark.freqItemsets 方法可用于检索包含频繁项集的 SparkDataFrame

##   items freq
## 1     S   11
## 2     T   10
## 3  T, S    6
## 4     R    9
## 5     V    9
## 6  V, R    5

spark.associationRules 返回一个包含关联规则的 SparkDataFrame

##   antecedent consequent confidence      lift support
## 1          V          R  0.5555556 1.2345679    0.25
## 2          S          T  0.5454545 1.0909091    0.30
## 3          T          S  0.6000000 1.0909091    0.30
## 4          R          V  0.5555556 1.2345679    0.25
## 5          U          S  0.5000000 0.9090909    0.20
## 6          U          T  0.5000000 1.0000000    0.20

我们可以根据 antecedent 进行预测。

head(predict(fpm, df))
##        items prediction
## 1    T, R, U       S, V
## 2       T, S       NULL
## 3       V, R       NULL
## 4 R, U, T, V          S
## 5       R, S       T, V
## 6    V, S, U       R, T

PrefixSpan

spark.findFrequentSequentialPatterns 方法可用于在项集输入序列中查找完整的频繁序列模式集。

df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
                           list(list(list(1L), list(3L, 2L), list(1L, 2L))),
                           list(list(list(1L, 2L), list(5L))),
                           list(list(list(6L)))),
                      schema = c("sequence"))
head(spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L))
##   sequence freq
## 1        1    3
## 2        3    2
## 3        2    3
## 4     1, 2    3
## 5     1, 3    2

柯尔莫哥洛夫-斯米尔诺夫检验

spark.kstest 运行双侧、单样本 柯尔莫哥洛夫-斯米尔诺夫 (KS) 检验。给定一个 SparkDataFrame,该检验将给定列 testCol 中的连续数据与参数 nullHypothesis 指定的理论分布进行比较。用户可以调用 summary 获取检验结果的摘要。

在以下示例中,我们测试 Titanic 数据集的 Freq 列是否服从正态分布。我们使用样本的均值和标准差设置正态分布的参数。

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
freqStats <- head(select(df, mean(df$Freq), sd(df$Freq)))
freqMean <- freqStats[1]
freqStd <- freqStats[2]

test <- spark.kstest(df, "Freq", "norm", c(freqMean, freqStd))
testSummary <- summary(test)
testSummary
## Kolmogorov-Smirnov test summary:
## degrees of freedom = 0 
## statistic = 0.3065126710255011 
## pValue = 0.0036336792155329256 
## Very strong presumption against null hypothesis: Sample follows theoretical distribution.

模型持久化

以下示例展示了如何在 SparkR 中保存/加载 ML 模型。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
gaussianGLM <- spark.glm(training, Freq ~ Sex + Age, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)
## 
## Saved-loaded model does not support output 'Deviance Residuals'.
## 
## Coefficients:
##              Estimate  Std. Error  t value   Pr(>|t|)
## (Intercept)    46.219      35.994   1.2841  0.2092846
## Sex_Female    -78.812      41.562  -1.8962  0.0679311
## Age_Adult     123.938      41.562   2.9820  0.0057522
## 
## (Dispersion parameter for gaussian family taken to be 13819.52)
## 
##     Null deviance: 573341  on 31  degrees of freedom
## Residual deviance: 400766  on 29  degrees of freedom
## AIC: 400.7
## 
## Number of Fisher Scoring iterations: 1
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, training)
head(gaussianPredictions)
##   Class    Sex   Age Survived Freq label prediction
## 1   1st   Male Child       No    0     0   46.21875
## 2   2nd   Male Child       No    0     0   46.21875
## 3   3rd   Male Child       No   35    35   46.21875
## 4  Crew   Male Child       No    0     0   46.21875
## 5   1st Female Child       No    0     0  -32.59375
## 6   2nd Female Child       No    0     0  -32.59375
unlink(modelPath)

结构化流

SparkR 支持结构化流 API。

您可以查阅结构化流编程指南,了解其编程模型和基本概念的介绍

简单源和槽

Spark 有一些内置的输入源。例如,为了测试一个套接字源,该源将文本读取为单词并显示计算出的单词计数

# Create DataFrame representing the stream of input lines from connection
lines <- read.stream("socket", host = hostname, port = port)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(groupBy(words, "word"))

# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

Kafka 源

从 Kafka 读取数据很简单。欲了解更多信息,请参阅结构化流支持的输入源

topic <- read.stream("kafka",
                     kafka.bootstrap.servers = "host1:port1,host2:port2",
                     subscribe = "topic1")
keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")

操作和槽

SparkDataFrame 上的大多数常见操作都支持流式处理,包括选择、投影和聚合。一旦您定义了最终结果,要开始流式计算,您将调用 write.stream 方法并设置一个槽 (sink) 和 outputMode

流式 SparkDataFrame 可以用于调试到控制台,写入临时内存表,或者以容错方式写入不同格式的文件槽 (File Sink) 以进行进一步处理。

noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# Aggregate
aggDF <- count(groupBy(noAggDF, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

head(sql("select * from aggregates"))

高级主题

SparkR 对象类

您可能在 SparkR 中使用的主要对象类有三种。

  • SparkDataFrame:SparkR 的核心组件。它是一个 S4 类,表示按命名列组织的数据的分布式集合,概念上等同于关系数据库中的表或 R 中的数据框。它有两个槽 sdfenv

    • sdf 存储对 Spark JVM 后端中相应 Spark Dataset 的引用。
    • env 保存对象的元信息,例如 isCached

    它可以通过数据导入方法创建,也可以通过转换现有 SparkDataFrame 来创建。我们可以通过众多数据处理函数操纵 SparkDataFrame,并将其输入到机器学习算法中。

  • Column:一个 S4 类,表示 SparkDataFrame 的一列。槽 jc 存储对 Spark JVM 后端中相应 Column 对象的引用。

    它可以从 SparkDataFrame 中通过 $ 运算符获取,例如 df$col。更常见的是,它与其他函数一起使用,例如与 select 一起选择特定列,与 filter 和构造的条件一起选择行,与聚合函数一起计算每个组的聚合统计信息。

  • GroupedData:一个 S4 类,表示由 groupBy 创建或通过转换其他 GroupedData 而来的分组数据。其 sgd 槽存储对后端中 RelationalGroupedDataset 对象的引用。

    这通常是一个包含分组信息的中间对象,随后进行聚合操作。

架构

完整的架构描述可以在参考文献中找到,特别是论文 SparkR: Scaling R Programs with Spark

SparkR 的底层是 Spark SQL 引擎。这避免了运行解释型 R 代码的开销,并且 Spark 中优化的 SQL 执行引擎利用数据和计算流的结构信息执行一系列优化以加速计算。

实际计算的主要方法调用发生在驱动程序的 Spark JVM 中。我们有一个基于套接字的 SparkR API,它允许我们从 R 调用 JVM 上的函数。我们使用一个基于 Netty 的套接字服务器监听的 SparkR JVM 后端。

SparkR JVM 后端支持两种 RPC:方法调用和创建新对象。方法调用可以通过两种方式完成。

  • sparkR.callJMethod 接受对现有 Java 对象的引用以及要传递给方法的一组参数。

  • sparkR.callJStatic 接受静态方法的类名以及要传递给方法的一组参数。

参数使用我们自定义的线格式进行序列化,然后在 JVM 侧进行反序列化。然后我们使用 Java 反射来调用适当的方法。

要创建对象,使用 sparkR.newJObject,然后类似地使用提供的参数调用相应的构造函数。

最后,我们使用一个新的 R 类 jobj,它引用后端中存在的 Java 对象。这些引用在 Java 侧被跟踪,当它们在 R 侧超出作用域时,会自动进行垃圾回收。

附录

R 和 Spark 数据类型

R Spark
byte byte
integer integer
float float
double double
numeric double
character string
string string
binary binary
raw binary
logical boolean
POSIXct timestamp
POSIXlt timestamp
Date date
array array
list array
env map

参考文献