跳至内容

概述

SparkR 是一个 R 包,它提供了一个轻量级的前端,用于从 R 使用 Apache Spark。在 Spark 3.5.5 中,SparkR 提供了一个分布式数据帧实现,该实现支持数据处理操作,如选择、过滤、聚合等,以及使用 MLlib 的分布式机器学习。

入门

我们从一个在本地机器上运行的例子开始,并概述 SparkR 的使用:数据摄取、数据处理和机器学习。

首先,让我们加载并附加该包。

SparkSession 是 SparkR 的入口点,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session 创建 SparkSession,并传入应用程序名称、依赖的任何 Spark 包等选项。

我们使用默认设置,它在本地模式下运行。如果在后台未找到以前的安装,它会自动下载 Spark 包。有关设置的更多详细信息,请参见 Spark 会话

## Java ref type org.apache.spark.sql.SparkSession id 1

SparkR 中的操作围绕着一个名为 SparkDataFrame 的 R 类。它是一个组织成命名列的分布式数据集合,在概念上等同于关系数据库中的表或 R 中的数据帧,但在底层具有更丰富的优化。

SparkDataFrame 可以从各种各样的来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据帧。例如,我们从一个本地 R 数据帧创建一个 SparkDataFrame

cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

我们可以通过 headshowDF 函数查看 SparkDataFrame 的前几行。

head(carsDF)
##               model  mpg cyl disp  hp drat    wt  qsec vs am gear carb
## 1         Mazda RX4 21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
## 2     Mazda RX4 Wag 21.0   6  160 110 3.90 2.875 17.02  0  1    4    4
## 3        Datsun 710 22.8   4  108  93 3.85 2.320 18.61  1  1    4    1
## 4    Hornet 4 Drive 21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
## 5 Hornet Sportabout 18.7   8  360 175 3.15 3.440 17.02  0  0    3    2
## 6           Valiant 18.1   6  225 105 2.76 3.460 20.22  1  0    3    1

SparkDataFrame 支持常见的数据处理操作,例如 filterselect

carsSubDF <- select(carsDF, "model", "mpg", "hp")
carsSubDF <- filter(carsSubDF, carsSubDF$hp >= 200)
head(carsSubDF)
##                 model  mpg  hp
## 1          Duster 360 14.3 245
## 2  Cadillac Fleetwood 10.4 205
## 3 Lincoln Continental 10.4 215
## 4   Chrysler Imperial 14.7 230
## 5          Camaro Z28 13.3 245
## 6      Ford Pantera L 15.8 264

SparkR 可以在分组后使用许多常见的聚合函数。

carsGPDF <- summarize(groupBy(carsDF, carsDF$gear), count = n(carsDF$gear))
head(carsGPDF)
##   gear count
## 1    4    12
## 2    3    15
## 3    5     5

结果 carsDFcarsSubDFSparkDataFrame 对象。要转换回 R data.frame,我们可以使用 collect警告:这可能会导致您的交互式环境耗尽内存,因为 collect() 将整个分布式 DataFrame 提取到您的客户端,该客户端充当 Spark 驱动程序。

carsGP <- collect(carsGPDF)
class(carsGP)
## [1] "data.frame"

SparkR 支持许多常用的机器学习算法。在底层,SparkR 使用 MLlib 来训练模型。用户可以调用 summary 来打印已拟合模型的摘要,调用 predict 对新数据进行预测,以及调用 write.ml/read.ml 来保存/加载已拟合的模型。

SparkR 支持用于模型拟合的 R 公式运算符的子集,包括 '~'、'.'、':'、'+' 和 '-'。我们使用线性回归作为一个例子。

model <- spark.glm(carsDF, mpg ~ wt + cyl)

该结果与应用于 carsDF 的相应 data.frame mtcars 的 R glm 函数返回的结果相匹配。事实上,对于广义线性模型,我们也专门为 SparkDataFrame 暴露了 glm,因此上述内容等同于 model <- glm(mpg ~ wt + cyl, data = carsDF)

summary(model)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##     Min       1Q   Median       3Q      Max  
## -4.2893  -1.7085  -0.4713   1.5729   6.1004  
## 
## Coefficients:
##              Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)   39.6863     1.71498  23.1409  0.00000000
## wt            -3.1910     0.75691  -4.2158  0.00022202
## cyl           -1.5078     0.41469  -3.6360  0.00106428
## 
## (Dispersion parameter for gaussian family taken to be 6.592137)
## 
##     Null deviance: 1126.05  on 31  degrees of freedom
## Residual deviance:  191.17  on 29  degrees of freedom
## AIC: 156
## 
## Number of Fisher Scoring iterations: 1

该模型可以通过 write.ml 保存,并通过 read.ml 加载回来。

write.ml(model, path = "/HOME/tmp/mlModel/glmModel")

最后,我们可以通过运行以下命令停止 Spark 会话

设置

安装

与许多其他 R 包不同,要使用 SparkR,您需要额外安装 Apache Spark。 Spark 安装将用于运行一个后端进程,该进程将编译和执行 SparkR 程序。

安装 SparkR 包后,您可以如上一节所述调用 sparkR.session 来启动,它将检查 Spark 安装。如果您从交互式 shell(例如 R、RStudio)使用 SparkR,则如果未找到 Spark,则会自动下载并缓存 Spark。或者,我们提供一个易于使用的函数 install.spark 来手动运行此操作。如果您没有在计算机上安装 Spark,您可以从 Apache Spark 网站下载。

如果您已经安装了 Spark,则无需再次安装,可以将 sparkHome 参数传递给 sparkR.session,以便让 SparkR 知道现有 Spark 安装的位置。

sparkR.session(sparkHome = "/HOME/spark")

Spark 会话

除了 sparkHome 之外,还可以在 sparkR.session 中指定许多其他选项。有关完整列表,请参见 启动:SparkSessionSparkR API 文档

特别是,以下 Spark 驱动程序属性可以在 sparkConfig 中设置。

属性名称 属性组 spark-submit 等效项
spark.driver.memory 应用程序属性 --driver-memory
spark.driver.extraClassPath 运行时环境 --driver-class-path
spark.driver.extraJavaOptions 运行时环境 --driver-java-options
spark.driver.extraLibraryPath 运行时环境 --driver-library-path
spark.kerberos.keytab 应用程序属性 --keytab
spark.kerberos.principal 应用程序属性 --principal

对于 Windows 用户:由于不同操作系统上的文件前缀不同,为了避免潜在的错误前缀问题,当前的解决方法是在启动 SparkSession 时指定 spark.sql.warehouse.dir

spark_warehouse_path <- file.path(path.expand('~'), "spark-warehouse")
sparkR.session(spark.sql.warehouse.dir = spark_warehouse_path)

集群模式

SparkR 可以连接到远程 Spark 集群。集群模式概述 是对不同 Spark 集群模式的良好介绍。

将 SparkR 连接到远程 Spark 集群时,请确保机器上的 Spark 版本和 Hadoop 版本与集群上的相应版本匹配。当前的 SparkR 包兼容

## [1] "Spark 3.5.5"

它应该在本地计算机和远程集群上使用。

要连接,请将主节点的 URL 传递给 sparkR.session。有关完整列表,请参见 Spark Master URLs。例如,要连接到本地独立 Spark 主节点,我们可以调用

sparkR.session(master = "spark://local:7077")

对于 YARN 集群,SparkR 支持客户端模式,其中 master 设置为 “yarn”。

sparkR.session(master = "yarn")

当前版本不支持 Yarn 集群模式。

数据导入

本地数据帧

最简单的方法是将本地 R 数据帧转换为 SparkDataFrame。具体来说,我们可以使用 as.DataFramecreateDataFrame 并传入本地 R 数据帧来创建 SparkDataFrame。例如,以下代码基于 R 中的 faithful 数据集创建一个 SparkDataFrame

df <- as.DataFrame(faithful)
head(df)
##   eruptions waiting
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85
## 6     2.883      55

数据源

SparkR 支持通过 SparkDataFrame 接口对各种数据源进行操作。您可以查看 Spark SQL 编程指南,了解更多可用于内置数据源的 特定选项

从数据源创建 SparkDataFrame 的通用方法是 read.df。此方法接受要加载的文件的路径和数据源的类型,并且将自动使用当前活动的 Spark 会话。 SparkR 本机支持读取 CSV、JSON 和 Parquet 文件,并且通过 Spark 包,您可以找到流行文件格式(如 Avro)的数据源连接器。使用 sparkR.session 初始化 SparkSession 时,可以使用 sparkPackages 参数添加这些包。

sparkR.session(sparkPackages = "com.databricks:spark-avro_2.12:3.0.0")

我们可以看到如何使用示例 CSV 输入文件使用数据源。有关更多信息,请参阅 SparkR read.df API 文档。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

数据源 API 本机支持 JSON 格式的输入文件。请注意,此处使用的文件不是典型的 JSON 文件。文件中的每一行都必须包含一个单独的、自包含的有效 JSON 对象。因此,常规的多行 JSON 文件通常会失败。

让我们看一下这里使用的原始 JSON 文件的前两行。

filePath <- paste0(sparkR.conf("spark.home"),
                         "/examples/src/main/resources/people.json")
readLines(filePath, n = 2L)
## [1] "{\"name\":\"Michael\"}"          "{\"name\":\"Andy\", \"age\":30}"

我们使用 read.df 将其读取到 SparkDataFrame 中。

people <- read.df(filePath, "json")
count(people)
## [1] 3
head(people)
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin

SparkR 会自动从 JSON 文件中推断模式。

printSchema(people)
## root
##  |-- age: long (nullable = true)
##  |-- name: string (nullable = true)

如果我们想读取多个 JSON 文件,可以使用 read.json

people <- read.json(paste0(Sys.getenv("SPARK_HOME"),
                           c("/examples/src/main/resources/people.json",
                             "/examples/src/main/resources/people.json")))
count(people)
## [1] 6

数据源 API 也可用于将 SparkDataFrames 保存到多种文件格式中。例如,我们可以使用 write.df 将前一个示例中的 SparkDataFrame 保存到 Parquet 文件中。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

Hive 表

您还可以从 Hive 表创建 SparkDataFrames。为此,我们需要创建一个支持 Hive 的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该已经构建了 Hive 支持,更多详细信息可以在 SQL 编程指南 中找到。在 SparkR 中,默认情况下,它将尝试创建一个启用 Hive 支持的 SparkSession(enableHiveSupport = TRUE)。

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

txtPath <- paste0(sparkR.conf("spark.home"), "/examples/src/main/resources/kv1.txt")
sqlCMD <- sprintf("LOAD DATA LOCAL INPATH '%s' INTO TABLE src", txtPath)
sql(sqlCMD)

results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)

数据处理

对于 dplyr 用户:SparkR 在数据处理方面具有与 dplyr 相似的接口。但是,首先值得提及一些明显的差异。我们使用 df 来表示 SparkDataFrame,使用 col 来表示此处的列名。

  1. 指示列。 SparkR 使用列名的字符字符串或使用 $ 构造的 Column 对象来指示列。例如,要在 df 中选择 col,我们可以编写 select(df, "col")select(df, df$col)

  2. 描述条件。在 SparkR 中,Column 对象表示可以直接插入到条件中,或者我们可以使用字符字符串来描述条件,而无需引用所使用的 SparkDataFrame。例如,要选择值 > 1 的行,我们可以编写 filter(df, df$col > 1)filter(df, "col > 1")

以下是更具体的例子。

dplyr SparkR
select(mtcars, mpg, hp) select(carsDF, "mpg", "hp")
filter(mtcars, mpg > 20, hp > 100) filter(carsDF, carsDF$mpg > 20, carsDF$hp > 100)

其他差异将在特定方法中提及。

我们使用上面创建的 SparkDataFrame carsDF。我们可以获取有关 SparkDataFrame 的基本信息。

carsDF
## SparkDataFrame[model:string, mpg:double, cyl:double, disp:double, hp:double, drat:double, wt:double, qsec:double, vs:double, am:double, gear:double, carb:double]

以树格式打印出模式。

printSchema(carsDF)
## root
##  |-- model: string (nullable = true)
##  |-- mpg: double (nullable = true)
##  |-- cyl: double (nullable = true)
##  |-- disp: double (nullable = true)
##  |-- hp: double (nullable = true)
##  |-- drat: double (nullable = true)
##  |-- wt: double (nullable = true)
##  |-- qsec: double (nullable = true)
##  |-- vs: double (nullable = true)
##  |-- am: double (nullable = true)
##  |-- gear: double (nullable = true)
##  |-- carb: double (nullable = true)

SparkDataFrame 操作

选择行、列

SparkDataFrames 支持许多函数来执行结构化数据处理。这里我们包括一些基本示例,完整的列表可以在 API 文档中找到

您也可以将列名作为字符串传入。

head(select(carsDF, "mpg"))
##    mpg
## 1 21.0
## 2 21.0
## 3 22.8
## 4 21.4
## 5 18.7
## 6 18.1

过滤 SparkDataFrame 以仅保留每加仑汽油里程数 (mpg) 小于 20 英里的行。

head(filter(carsDF, carsDF$mpg < 20))
##               model  mpg cyl  disp  hp drat   wt  qsec vs am gear carb
## 1 Hornet Sportabout 18.7   8 360.0 175 3.15 3.44 17.02  0  0    3    2
## 2           Valiant 18.1   6 225.0 105 2.76 3.46 20.22  1  0    3    1
## 3        Duster 360 14.3   8 360.0 245 3.21 3.57 15.84  0  0    3    4
## 4          Merc 280 19.2   6 167.6 123 3.92 3.44 18.30  1  0    4    4
## 5         Merc 280C 17.8   6 167.6 123 3.92 3.44 18.90  1  0    4    4
## 6        Merc 450SE 16.4   8 275.8 180 3.07 4.07 17.40  0  0    3    3

分组、聚合

分组和聚合的常见流程是

  1. 相对于某些分组变量使用 groupBygroup_by 来创建 GroupedData 对象

  2. GroupedData 对象馈送到 aggsummarize 函数,并提供一些聚合函数来计算每个组中的一个数字。

支持许多广泛使用的函数,用于在分组后聚合数据,包括 avgcount_distinctcountfirstkurtosislastmaxmeanminsdskewnessstddev_popstddev_sampsum_distinctsumvar_popvar_sampvar。 请参阅链接的聚合函数 API 文档

例如,我们可以计算 mtcars 数据集中气缸数量的直方图,如下所示。

numCyl <- summarize(groupBy(carsDF, carsDF$cyl), count = n(carsDF$cyl))
head(numCyl)
##   cyl count
## 1   8    14
## 2   4    11
## 3   6     7

使用 cuberollup 计算跨多个维度的子总计。

mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]

为 {(cyl, gear, am), (cyl, gear), (cyl), ()} 生成分组,而

mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]

为分组列的所有可能组合生成分组。

对列进行操作

SparkR 还提供了许多可以直接应用于列的函数,用于数据处理和聚合期间。 下面的示例显示了基本算术函数的使用。

carsDF_km <- carsDF
carsDF_km$kmpg <- carsDF_km$mpg * 1.61
head(select(carsDF_km, "model", "mpg", "kmpg"))
##               model  mpg   kmpg
## 1         Mazda RX4 21.0 33.810
## 2     Mazda RX4 Wag 21.0 33.810
## 3        Datsun 710 22.8 36.708
## 4    Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6           Valiant 18.1 29.141

窗口函数

窗口函数是聚合函数的一种变体。 简而言之,

  • 聚合函数:n1 的映射 - 为一组条目返回单个值。 示例包括 sumcountmax

  • 窗口函数:nn 的映射 - 为组中的每个条目返回一个值,但该值可能取决于的所有条目。 示例包括 rankleadlag

形式上,上面提到的称为框架。 每个输入行都可以有一个与之关联的唯一框架,并且该行上窗口函数的输出基于该框架中包含的行。

窗口函数通常与以下函数结合使用:windowPartitionBywindowOrderBypartitionByorderByover。 为了说明这一点,接下来我们看一个例子。

我们仍然使用 mtcars 数据集。 对应的 SparkDataFramecarsDF。 假设对于每个气缸数,我们想要计算 mpg 中每辆车的排名。

carsSubDF <- select(carsDF, "model", "mpg", "cyl")
ws <- orderBy(windowPartitionBy("cyl"), "mpg")
carsRank <- withColumn(carsSubDF, "rank", over(rank(), ws))
head(carsRank, n = 20L)
##                  model  mpg cyl rank
## 1           Volvo 142E 21.4   4    1
## 2        Toyota Corona 21.5   4    2
## 3           Datsun 710 22.8   4    3
## 4             Merc 230 22.8   4    3
## 5            Merc 240D 24.4   4    5
## 6        Porsche 914-2 26.0   4    6
## 7            Fiat X1-9 27.3   4    7
## 8          Honda Civic 30.4   4    8
## 9         Lotus Europa 30.4   4    8
## 10            Fiat 128 32.4   4   10
## 11      Toyota Corolla 33.9   4   11
## 12           Merc 280C 17.8   6    1
## 13             Valiant 18.1   6    2
## 14            Merc 280 19.2   6    3
## 15        Ferrari Dino 19.7   6    4
## 16           Mazda RX4 21.0   6    5
## 17       Mazda RX4 Wag 21.0   6    5
## 18      Hornet 4 Drive 21.4   6    7
## 19  Cadillac Fleetwood 10.4   8    1
## 20 Lincoln Continental 10.4   8    1

我们详细解释上述步骤。

  • windowPartitionBy 创建一个窗口规范对象 WindowSpec,用于定义分区。 它控制哪些行将与给定行位于同一分区中。 在这种情况下,cyl 中具有相同值的行将被放入同一分区中。 orderBy 进一步定义排序 - 给定行在分区中的位置。 生成的 WindowSpec 作为 ws 返回。

更多窗口规范方法包括 rangeBetween,它可以按值定义框架的边界,以及 rowsBetween,它可以按行索引定义边界。

  • withColumn 将一个名为 rank 的列附加到 SparkDataFrameover 返回一个窗口列。 第一个参数通常是由窗口函数(例如 rank()lead(carsDF$wt))返回的列。 这会根据分区和排序的表计算相应的值。

用户自定义函数

在 SparkR 中,我们支持几种用户自定义函数 (UDF)。

按分区应用

dapply 可以将函数应用于 SparkDataFrame 的每个分区。 要应用于 SparkDataFrame 的每个分区的函数应该只有一个参数,即对应于分区的 data.frame,并且输出也应该是 data.frame。 Schema 指定结果 SparkDataFrame 的行格式。 它必须与返回值的数据类型匹配。 有关 R 和 Spark 之间的映射,请参见此处

我们将 mpg 转换为 kmpg(公里/加仑)。 carsSubDF 是一个包含 carsDF 列子集的 SparkDataFrame

carsSubDF <- select(carsDF, "model", "mpg")
schema <- "model STRING, mpg DOUBLE, kmpg DOUBLE"
out <- dapply(carsSubDF, function(x) { x <- cbind(x, x$mpg * 1.61) }, schema)
head(collect(out))
##               model  mpg   kmpg
## 1         Mazda RX4 21.0 33.810
## 2     Mazda RX4 Wag 21.0 33.810
## 3        Datsun 710 22.8 36.708
## 4    Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6           Valiant 18.1 29.141

dapply 类似,dapplyCollect 可以将函数应用于 SparkDataFrame 的每个分区并将结果收集回来。 该函数的输出应为 data.frame,但在这种情况下不需要模式。 请注意,如果无法将 UDF 在所有分区上的输出拉入驱动程序的内存中,则 dapplyCollect 可能会失败。

out <- dapplyCollect(
         carsSubDF,
         function(x) {
           x <- cbind(x, "kmpg" = x$mpg * 1.61)
         })
head(out, 3)
##           model  mpg   kmpg
## 1     Mazda RX4 21.0 33.810
## 2 Mazda RX4 Wag 21.0 33.810
## 3    Datsun 710 22.8 36.708

按组应用

gapply 可以将函数应用于 SparkDataFrame 的每个组。 该函数要应用于 SparkDataFrame 的每个组,并且应仅具有两个参数:分组键和对应于该键的 R data.frame。 这些组是从 SparkDataFrames 列中选择的。 该函数的输出应为 data.frame。 Schema 指定结果 SparkDataFrame 的行格式。 它必须基于 Spark 数据类型表示 R 函数的输出模式。 返回的 data.frame 的列名由用户设置。 有关 R 和 Spark 之间的映射,请参见此处

schema <- structType(structField("cyl", "double"), structField("max_mpg", "double"))
result <- gapply(
    carsDF,
    "cyl",
    function(key, x) {
        y <- data.frame(key, max(x$mpg))
    },
    schema)
head(arrange(result, "max_mpg", decreasing = TRUE))
##   cyl max_mpg
## 1   4    33.9
## 2   6    21.4
## 3   8    19.2

gapply 类似,gapplyCollect 可以将函数应用于 SparkDataFrame 的每个分区,并将结果收集回 R data.frame。 该函数的输出应为 data.frame,但在这种情况下不需要模式。 请注意,如果无法将 UDF 在所有分区上的输出拉入驱动程序的内存中,则 gapplyCollect 可能会失败。

result <- gapplyCollect(
    carsDF,
    "cyl",
    function(key, x) {
         y <- data.frame(key, max(x$mpg))
        colnames(y) <- c("cyl", "max_mpg")
        y
    })
head(result[order(result$max_mpg, decreasing = TRUE), ])
##   cyl max_mpg
## 1   4    33.9
## 2   6    21.4
## 3   8    19.2

分发本地函数

与本地 R 中的 lapply 类似,spark.lapply 在元素列表上运行函数,并使用 Spark 分发计算。 spark.lapply 的工作方式类似于 doParallellapply 对列表的元素进行操作。 所有计算的结果都应适合单台机器。 如果不是这种情况,您可以执行类似 df <- createDataFrame(list) 的操作,然后使用 dapply

我们以 e1071 包中的 svm 为例。 我们使用所有默认设置,除了约束违规的成本不同。 spark.lapply 可以并行训练这些不同的模型。

costs <- exp(seq(from = log(1), to = log(1000), length.out = 5))
train <- function(cost) {
  stopifnot(requireNamespace("e1071", quietly = TRUE))
  model <- e1071::svm(Species ~ ., data = iris, cost = cost)
  summary(model)
}

返回模型摘要的列表。

model.summaries <- spark.lapply(costs, train)
class(model.summaries)
## [1] "list"

为了避免冗长的显示,我们仅显示第二个拟合模型的部分结果。 您可以随意检查其他模型。

print(model.summaries[[2]])
## $call
## svm(formula = Species ~ ., data = iris, cost = cost)
## 
## $type
## [1] 0
## 
## $kernel
## [1] 2
## 
## $cost
## [1] 5.623413
## 
## $degree
## [1] 3
## 
## $gamma
## [1] 0.25
## 
## $coef0
## [1] 0
## 
## $nu
## [1] 0.5
## 
## $epsilon
## [1] 0.1
## 
## $sparse
## [1] FALSE
## 
## $scaled
## [1] TRUE TRUE TRUE TRUE
## 
## $x.scale
## $x.scale$`scaled:center`
## Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##     5.843333     3.057333     3.758000     1.199333 
## 
## $x.scale$`scaled:scale`
## Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##    0.8280661    0.4358663    1.7652982    0.7622377 
## 
## 
## $y.scale
## NULL
## 
## $nclasses
## [1] 3
## 
## $levels
## [1] "setosa"     "versicolor" "virginica" 
## 
## $tot.nSV
## [1] 35
## 
## $nSV
## [1]  6 15 14
## 
## $labels
## [1] 1 2 3
## 
## $SV
##     Sepal.Length Sepal.Width Petal.Length Petal.Width
## 14   -1.86378030 -0.13153881   -1.5056946  -1.4422448
## 16   -0.17309407  3.08045544   -1.2791040  -1.0486668
## 21   -0.53538397  0.78617383   -1.1658087  -1.3110521
## 23   -1.50149039  1.24503015   -1.5623422  -1.3110521
## 24   -0.89767388  0.55674567   -1.1658087  -0.9174741
## 42   -1.62225369 -1.73753594   -1.3923993  -1.1798595
## 51    1.39682886  0.32731751    0.5336209   0.2632600
## 53    1.27606556  0.09788935    0.6469162   0.3944526
## 54   -0.41462067 -1.73753594    0.1370873   0.1320673
## 55    0.79301235 -0.59039513    0.4769732   0.3944526
##  [ reached getOption("max.print") -- omitted 25 rows ]
## 
## $index
##  [1]  14  16  21  23  24  42  51  53  54  55  58  61  69  71  73  78  79  84  85
## [20]  86  99 107 111 119 120 124 127 128 130 132 134 135 139 149 150
## 
## $rho
## [1] -0.10346530  0.12160294 -0.09540346
## 
## $compprob
## [1] FALSE
## 
## $probA
## NULL
## 
## $probB
## NULL
## 
## $sigma
## NULL
## 
## $coefs
##              [,1]        [,2]
##  [1,]  0.00000000  0.06561739
##  [2,]  0.76813720  0.93378721
##  [3,]  0.00000000  0.12123270
##  [4,]  0.00000000  0.31170741
##  [5,]  1.11614066  0.46397392
##  [6,]  1.88141600  1.10392128
##  [7,] -0.55872622  0.00000000
##  [8,]  0.00000000  5.62341325
##  [9,]  0.00000000  0.27711792
## [10,]  0.00000000  5.28440007
## [11,] -1.06596713  0.00000000
## [12,] -0.57076709  1.09019756
## [13,] -0.03365904  5.62341325
## [14,]  0.00000000  5.62341325
## [15,]  0.00000000  5.62341325
## [16,]  0.00000000  5.62341325
## [17,]  0.00000000  4.70398738
## [18,]  0.00000000  5.62341325
## [19,]  0.00000000  4.97981371
## [20,] -0.77497987  0.00000000
##  [ reached getOption("max.print") -- omitted 15 rows ]
## 
## $na.action
## NULL
## 
## $xlevels
## named list()
## 
## $fitted
##      1      2      3      4      5      6      7      8      9     10     11 
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa 
##     12     13     14     15     16     17     18     19     20     21     22 
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa 
##     23     24     25     26     27     28     29     30     31     32     33 
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa 
##     34     35     36     37     38     39     40 
## setosa setosa setosa setosa setosa setosa setosa 
##  [ reached getOption("max.print") -- omitted 110 entries ]
## Levels: setosa versicolor virginica
## 
## $decision.values
##     setosa/versicolor setosa/virginica versicolor/virginica
## 1           1.1911739        1.0908424            1.1275805
## 2           1.1336557        1.0619543            1.3260964
## 3           1.2085065        1.0698101            1.0511345
## 4           1.1646153        1.0505915            1.0806874
## 5           1.1880814        1.0950348            0.9542815
## 6           1.0990761        1.0984626            0.9326361
## 7           1.1573474        1.0343287            0.9726843
## 8           1.1851598        1.0815750            1.2206802
## 9           1.1673499        1.0406734            0.8837945
## 10          1.1629911        1.0560925            1.2430067
## 11          1.1339282        1.0803946            1.0338357
## 12          1.1724182        1.0641469            1.1190423
## 13          1.1827355        1.0667956            1.1414844
##  [ reached getOption("max.print") -- omitted 137 rows ]
## 
## $terms
## Species ~ Sepal.Length + Sepal.Width + Petal.Length + Petal.Width
## attr(,"variables")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"factors")
##              Sepal.Length Sepal.Width Petal.Length Petal.Width
## Species                 0           0            0           0
## Sepal.Length            1           0            0           0
## Sepal.Width             0           1            0           0
## Petal.Length            0           0            1           0
## Petal.Width             0           0            0           1
## attr(,"term.labels")
## [1] "Sepal.Length" "Sepal.Width"  "Petal.Length" "Petal.Width" 
## attr(,"order")
## [1] 1 1 1 1
## attr(,"intercept")
## [1] 0
## attr(,"response")
## [1] 1
## attr(,".Environment")
## <environment: 0x5f60eb41d4c8>
## attr(,"predvars")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"dataClasses")
##      Species Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##     "factor"    "numeric"    "numeric"    "numeric"    "numeric" 
## 
## attr(,"class")
## [1] "summary.svm"

SQL 查询

SparkDataFrame 也可以注册为 Spark SQL 中的临时视图,以便可以对其数据运行 SQL 查询。 sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame 返回。

people <- read.df(paste0(sparkR.conf("spark.home"),
                         "/examples/src/main/resources/people.json"), "json")

将此 SparkDataFrame 注册为临时视图。

createOrReplaceTempView(people, "people")

可以使用 sql 方法运行 SQL 语句。

teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##     name
## 1 Justin

机器学习

SparkR 支持以下机器学习模型和算法。

分类

  • 线性支持向量机 (SVM) 分类器

  • 逻辑回归

  • 多层感知器 (MLP)

  • 朴素贝叶斯

  • 分解机 (FM) 分类器

回归

  • 加速失效时间 (AFT) 生存模型

  • 广义线性模型 (GLM)

  • 等渗回归

  • 线性回归

  • 分解机 (FM) 回归器

树 - 分类和回归

  • 决策树

  • 梯度提升树 (GBT)

  • 随机森林

聚类

  • 二分kk-均值

  • 高斯混合模型 (GMM)

  • kkK 均值聚类

  • 潜在狄利克雷分配 (LDA)

  • 幂迭代聚类 (PIC)

协同过滤

  • 交替最小二乘法 (ALS)

频繁模式挖掘

  • FP 增长
  • PrefixSpan

统计

  • 柯尔莫哥洛夫-斯米尔诺夫检验

R 公式

对于上述大多数,SparkR 支持R 公式运算符,包括 ~.:+- 用于模型拟合。 这使其与使用 R 函数的体验相似。

训练集和测试集

我们可以使用 randomSplit 函数轻松地将 SparkDataFrame 拆分为随机训练集和测试集。 它返回一个带有提供的 weights 的拆分 SparkDataFrames 列表。 我们以 carsDF 为例,并且想要大约7070%训练数据和3030%测试数据。

splitDF_list <- randomSplit(carsDF, c(0.7, 0.3), seed = 0)
carsDF_train <- splitDF_list[[1]]
carsDF_test <- splitDF_list[[2]]
count(carsDF_train)
## [1] 24
head(carsDF_train)
##                model  mpg cyl disp  hp drat    wt  qsec vs am gear carb
## 1 Cadillac Fleetwood 10.4   8  472 205 2.93 5.250 17.98  0  0    3    4
## 2         Camaro Z28 13.3   8  350 245 3.73 3.840 15.41  0  0    3    4
## 3  Chrysler Imperial 14.7   8  440 230 3.23 5.345 17.42  0  0    3    4
## 4   Dodge Challenger 15.5   8  318 150 2.76 3.520 16.87  0  0    3    2
## 5         Duster 360 14.3   8  360 245 3.21 3.570 15.84  0  0    3    4
## 6       Ferrari Dino 19.7   6  145 175 3.62 2.770 15.50  0  1    5    6
count(carsDF_test)
## [1] 8
head(carsDF_test)
##            model  mpg cyl  disp  hp drat    wt  qsec vs am gear carb
## 1    AMC Javelin 15.2   8 304.0 150 3.15 3.435 17.30  0  0    3    2
## 2     Datsun 710 22.8   4 108.0  93 3.85 2.320 18.61  1  1    4    1
## 3       Fiat 128 32.4   4  78.7  66 4.08 2.200 19.47  1  1    4    1
## 4      Merc 240D 24.4   4 146.7  62 3.69 3.190 20.00  1  0    4    2
## 5       Merc 280 19.2   6 167.6 123 3.92 3.440 18.30  1  0    4    4
## 6 Toyota Corolla 33.9   4  71.1  65 4.22 1.835 19.90  1  1    4    1

模型和算法

线性支持向量机 (SVM) 分类器

线性支持向量机 (SVM) 分类器是具有线性内核的 SVM 分类器。 这是一个二元分类器。 我们使用一个简单的例子来展示如何使用 spark.svmLinear 进行二元分类。

# load training data and create a DataFrame
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Linear SVM classifier model
model <- spark.svmLinear(training,  Survived ~ ., regParam = 0.01, maxIter = 10)
summary(model)
## $coefficients
##                 Estimate
## (Intercept)  0.993131388
## Class_1st   -0.386500359
## Class_2nd   -0.622627816
## Class_3rd   -0.204446602
## Sex_Female  -0.589950309
## Age_Adult    0.741676902
## Freq        -0.006582887
## 
## $numClasses
## [1] 2
## 
## $numFeatures
## [1] 6

预测训练数据的值

prediction <- predict(model, training)
head(select(prediction, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No        Yes
## 2   2nd   Male Child    0       No        Yes
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No        Yes
## 5   1st Female Child    0       No        Yes
## 6   2nd Female Child    0       No         No

逻辑回归

逻辑回归是响应为类别时广泛使用的模型。 它可以看作是广义线性预测模型的一个特例。 我们在 spark.glm 之上提供 spark.logit,以支持具有高级超参数的逻辑回归。 它支持具有弹性网络正则化和特征标准化的二元和多类分类,类似于 glmnet

我们使用一个简单的示例来演示 spark.logit 的用法。 通常,使用 spark.logit 有三个步骤:1). 从适当的数据源创建一个数据帧;2). 使用 spark.logit 和适当的参数设置拟合逻辑回归模型;3). 使用 summary 获取拟合模型的系数矩阵,并使用该模型进行预测与 predict

二项式逻辑回归

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.logit(training, Survived ~ ., regParam = 0.04741301)
summary(model)
## $coefficients
##                  Estimate
## (Intercept)  0.2255014282
## Class_1st   -0.1338856652
## Class_2nd   -0.1479826947
## Class_3rd    0.0005674937
## Sex_Female  -0.2011183871
## Age_Adult    0.3263186885
## Freq        -0.0033111157

预测训练数据的值

fitted <- predict(model, training)
head(select(fitted, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No        Yes
## 2   2nd   Male Child    0       No        Yes
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No        Yes
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

针对三个类的多项式逻辑回归

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# Note in this case, Spark infers it is multinomial logistic regression, so family = "multinomial" is optional.
model <- spark.logit(training, Class ~ ., regParam = 0.07815179)
summary(model)
## $coefficients
##                      1st          2nd          3rd         Crew
## (Intercept)  0.051662845  0.062998145 -0.039083689 -0.075577300
## Sex_Female  -0.088030587 -0.102528148  0.059233106  0.131325629
## Age_Adult    0.141935316  0.169492058 -0.102562719 -0.208864654
## Survived_No  0.052721020  0.057980057 -0.029408423 -0.081292653
## Freq        -0.001555912 -0.001970377  0.001303836  0.002222453

多层感知器

多层感知器分类器 (MLPC) 是一种基于前馈人工神经网络的分类器。 MLPC 由多个节点层组成。 每层都完全连接到网络中的下一层。 输入层中的节点表示输入数据。 所有其他节点通过将输入与节点的权重进行线性组合来将输入映射到输出ww和偏置bb并应用激活函数。 这可以用矩阵形式为 MLPC 编写,其中K+1K+1层如下y(x)=fK(f2(w2Tf1(w1Tx+b1)+b2)+bK).y(x)=f_K(\ldots f_2(w_2^T f_1(w_1^T x + b_1) + b_2) \ldots + b_K).

中间层中的节点使用 sigmoid(逻辑)函数f(zi)=11+ezi.f(z_i) = \frac{1}{1+e^{-z_i}}.

输出层中的节点使用 softmax 函数f(zi)=ezik=1Nezk.f(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}}.

输出层中节点的数量NN对应于类的数量。

MLPC 采用反向传播来学习模型。 我们使用逻辑损失函数进行优化,并使用 L-BFGS 作为优化例程。

spark.mlpdata 中至少需要两列:一列名为 "label",另一列名为 "features""features" 列应为 libSVM 格式。

我们使用 Titanic 数据集来展示如何在分类中使用 spark.mlp

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Multilayer Perceptron Classification Model
model <- spark.mlp(training, Survived ~ Age + Sex, blockSize = 128, layers = c(2, 2), solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1, initialWeights = c( 0, 0, 5, 5, 9, 9))

为了避免冗长的显示,我们仅显示模型摘要的部分结果。 您可以从您的 sparkR shell 中查看完整结果。

# check the summary of the fitted model
summary(model)
## $numOfInputs
## [1] 2
## 
## $numOfOutputs
## [1] 2
## 
## $layers
## [1] 2 2
## 
## $weights
## $weights[[1]]
## [1] 0
## 
## $weights[[2]]
## [1] 0
## 
## $weights[[3]]
## [1] 5
## 
## $weights[[4]]
## [1] 5
## 
## $weights[[5]]
## [1] 9
## 
## $weights[[6]]
## [1] 9
# make predictions use the fitted model
predictions <- predict(model, training)
head(select(predictions, predictions$prediction))
##   prediction
## 1         No
## 2         No
## 3         No
## 4         No
## 5         No
## 6         No

朴素贝叶斯

朴素贝叶斯模型假设特征之间相互独立。spark.naiveBayes 针对 SparkDataFrame 拟合 伯努利朴素贝叶斯模型。数据应全部为类别型。这些模型通常用于文档分类。

titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
naiveBayesModel <- spark.naiveBayes(titanicDF, Survived ~ Class + Sex + Age)
summary(naiveBayesModel)
## $apriori
##            Yes        No
## [1,] 0.5769231 0.4230769
## 
## $tables
##     Class_3rd Class_1st Class_2nd Sex_Female Age_Adult
## Yes 0.3125    0.3125    0.3125    0.5        0.5625   
## No  0.4166667 0.25      0.25      0.5        0.75
naiveBayesPrediction <- predict(naiveBayesModel, titanicDF)
head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction"))
##   Class    Sex   Age Survived prediction
## 1   3rd   Male Child       No        Yes
## 2   3rd Female Child       No        Yes
## 3   1st   Male Adult       No        Yes
## 4   2nd   Male Adult       No        Yes
## 5   3rd   Male Adult       No         No
## 6  Crew   Male Adult       No        Yes

因子分解机分类器

用于分类问题的因子分解机。

有关因子分解机的背景和实现的详细信息,请参阅因子分解机部分

t <- as.data.frame(Titanic)
training <- createDataFrame(t)

model <- spark.fmClassifier(training, Survived ~ Age + Sex)
summary(model)
## $coefficients
##                 Estimate
## (Intercept) 0.0064275991
## Age_Adult   0.0001294448
## Sex_Female  0.0001294448
## 
## $factors
##            [,1]        [,2]      [,3]       [,4]        [,5]        [,6]
## [1,] -0.3256224  0.11912568 0.1460235  0.1620567  0.13153516  0.06403695
## [2,] -0.1382155 -0.03658261 0.1717808 -0.1602241 -0.08446129 -0.19287098
##             [,7]        [,8]
## [1,] -0.03292446 -0.05166818
## [2,]  0.19252571  0.06237194
## 
## $numClasses
## [1] 2
## 
## $numFeatures
## [1] 2
## 
## $factorSize
## [1] 8
predictions <- predict(model, training)
head(select(predictions, predictions$prediction))
##   prediction
## 1        Yes
## 2        Yes
## 3        Yes
## 4        Yes
## 5        Yes
## 6        Yes

加速失效时间生存模型

生存分析研究事件发生前的预期持续时间,以及与受试者承担的风险因素或治疗方法的关系。与标准回归分析相比,生存建模必须处理数据中的特殊特征,包括非负生存时间和删失。

加速失效时间 (AFT) 模型是一种用于删失数据的参数生存模型,它假设协变量的影响是按某个常数加速或减速事件的生命历程。有关更多信息,请参阅 Wikipedia 页面 AFT 模型及其中的参考文献。与为相同目的设计的比例风险模型不同,AFT 模型更容易并行化,因为每个实例都独立地对目标函数做出贡献。

library(survival)
ovarianDF <- createDataFrame(ovarian)
aftModel <- spark.survreg(ovarianDF, Surv(futime, fustat) ~ ecog_ps + rx)
summary(aftModel)
## $coefficients
##                  Value
## (Intercept)  6.8966910
## ecog_ps     -0.3850414
## rx           0.5286455
## Log(scale)  -0.1234429
aftPredictions <- predict(aftModel, ovarianDF)
head(aftPredictions)
##   futime fustat     age resid_ds rx ecog_ps label prediction
## 1     59      1 72.3315        2  1       1    59   1141.724
## 2    115      1 74.4932        2  1       1   115   1141.724
## 3    156      1 66.4658        2  1       2   156    776.855
## 4    421      0 53.3644        2  2       1   421   1937.087
## 5    431      1 50.3397        2  1       1   431   1141.724
## 6    448      0 56.4301        1  1       2   448    776.855

广义线性模型

主要函数是 spark.glm。支持以下族和链接函数。默认值为高斯分布。

链接函数
高斯分布 恒等, 对数, 倒数
二项分布 logit, probit, cloglog (互补对数-对数)
泊松分布 对数, 恒等, 平方根
伽马分布 倒数, 恒等, 对数
tweedie 幂链接函数

有三种方法可以指定 family 参数。

  • 族名称作为字符串,例如 family = "gaussian"

  • 族函数,例如 family = binomial

  • 族函数返回的结果,例如 family = poisson(link = log)

  • 请注意,有两种方法可以指定 tweedie 族
    1. 设置 family = "tweedie" 并指定 var.powerlink.power
    2. 加载包 statmod 后,使用其中的族定义(即 tweedie())指定 tweedie 族。

有关族及其链接函数的更多信息,请参阅 Wikipedia 页面 广义线性模型

我们使用 mtcars 数据集作为示例。相应的 SparkDataFramecarsDF。拟合模型后,我们打印出摘要,并通过对原始数据集进行预测来查看拟合值。我们还可以传递具有相同模式的新 SparkDataFrame 以对新数据进行预测。

gaussianGLM <- spark.glm(carsDF, mpg ~ wt + hp)
summary(gaussianGLM)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##     Min       1Q   Median       3Q      Max  
## -3.9410  -1.6499  -0.3267   1.0373   5.8538  
## 
## Coefficients:
##               Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)  37.227270   1.5987875  23.2847  0.0000e+00
## wt           -3.877831   0.6327335  -6.1287  1.1196e-06
## hp           -0.031773   0.0090297  -3.5187  1.4512e-03
## 
## (Dispersion parameter for gaussian family taken to be 6.725785)
## 
##     Null deviance: 1126.05  on 31  degrees of freedom
## Residual deviance:  195.05  on 29  degrees of freedom
## AIC: 156.7
## 
## Number of Fisher Scoring iterations: 1

进行预测时,将附加一个名为 prediction 的新列。让我们在这里只查看列的一个子集。

gaussianFitted <- predict(gaussianGLM, carsDF)
head(select(gaussianFitted, "model", "prediction", "mpg", "wt", "hp"))
##               model prediction  mpg    wt  hp
## 1         Mazda RX4   23.57233 21.0 2.620 110
## 2     Mazda RX4 Wag   22.58348 21.0 2.875 110
## 3        Datsun 710   25.27582 22.8 2.320  93
## 4    Hornet 4 Drive   21.26502 21.4 3.215 110
## 5 Hornet Sportabout   18.32727 18.7 3.440 175
## 6           Valiant   20.47382 18.1 3.460 105

以下是使用 tweedie 族进行的相同拟合

tweedieGLM1 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie", var.power = 0.0)
summary(tweedieGLM1)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##     Min       1Q   Median       3Q      Max  
## -3.9410  -1.6499  -0.3267   1.0373   5.8538  
## 
## Coefficients:
##               Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)  37.227270   1.5987875  23.2847  0.0000e+00
## wt           -3.877831   0.6327335  -6.1287  1.1196e-06
## hp           -0.031773   0.0090297  -3.5187  1.4512e-03
## 
## (Dispersion parameter for tweedie family taken to be 6.725785)
## 
##     Null deviance: 1126.05  on 31  degrees of freedom
## Residual deviance:  195.05  on 29  degrees of freedom
## AIC: 156.7
## 
## Number of Fisher Scoring iterations: 1

我们可以尝试 tweedie 族中的其他分布,例如具有对数链接的复合泊松分布

tweedieGLM2 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie",
                         var.power = 1.2, link.power = 0.0)
summary(tweedieGLM2)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##      Min        1Q    Median        3Q       Max  
## -0.58074  -0.25335  -0.09892   0.18608   0.82717  
## 
## Coefficients:
##                Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)   3.8500849  0.06698272  57.4788  0.0000e+00
## wt           -0.2018426  0.02897283  -6.9666  1.1691e-07
## hp           -0.0016248  0.00041603  -3.9054  5.1697e-04
## 
## (Dispersion parameter for tweedie family taken to be 0.1340111)
## 
##     Null deviance: 29.8820  on 31  degrees of freedom
## Residual deviance:  3.7739  on 29  degrees of freedom
## AIC: NA
## 
## Number of Fisher Scoring iterations: 4

等渗回归

spark.isoreg 针对 SparkDataFrame 拟合 等渗回归 模型。 它解决了完全有序约束下的加权单变量回归问题。 具体来说,给定一组真实的观测响应y1,,yny_1, \ldots, y_n,对应的真实特征x1,,xnx_1, \ldots, x_n,以及可选的正权重w1,,wnw_1, \ldots, w_n,我们想要找到一个单调(分段线性)函数ff来最小化(f)=i=1nwi(yif(xi))2.\ell(f) = \sum_{i=1}^n w_i (y_i - f(x_i))^2.

还有一些可能有用的参数。

  • weightCol:指定权重列的字符串。

  • isotonic:逻辑值,指示输出序列是否应为等渗/递增 (TRUE) 或反渗/递减 (FALSE)。

  • featureIndex:如果它是向量列,则公式右侧的特征索引(默认值:0),否则无效。

我们使用一个人工示例来演示用法。

y <- c(3.0, 6.0, 8.0, 5.0, 7.0)
x <- c(1.0, 2.0, 3.5, 3.0, 4.0)
w <- rep(1.0, 5)
data <- data.frame(y = y, x = x, w = w)
df <- createDataFrame(data)
isoregModel <- spark.isoreg(df, y ~ x, weightCol = "w")
isoregFitted <- predict(isoregModel, df)
head(select(isoregFitted, "x", "y", "prediction"))
##     x y prediction
## 1 1.0 3        3.0
## 2 2.0 6        5.5
## 3 3.5 8        7.5
## 4 3.0 5        5.5
## 5 4.0 7        7.5

在预测阶段,基于拟合的单调分段函数,规则是

  • 如果预测输入与训练特征完全匹配,则返回关联的预测。 如果同一特征有多个预测,则返回其中一个。 返回哪个是未定义的。

  • 如果预测输入低于或高于所有训练特征,则分别返回具有最低或最高特征的预测。 如果同一特征有多个预测,则分别返回最低或最高的预测。

  • 如果预测输入落在两个训练特征之间,则将预测视为分段线性函数,并从两个最接近特征的预测中计算插值。 如果同一特征有多个值,则使用与前一点相同的规则。

例如,当输入为3.23.2,两个最接近的特征值是3.03.03.53.5,则预测值将是在3.03.03.53.5.

newDF <- createDataFrame(data.frame(x = c(1.5, 3.2)))
head(predict(isoregModel, newDF))
##     x prediction
## 1 1.5       4.25
## 2 3.2       6.30

处的预测值之间的线性插值。

线性回归

model <- spark.lm(carsDF, mpg ~ wt + hp)

summary(model)
## $coefficients
##                Estimate
## (Intercept) 37.22727012
## wt          -3.87783074
## hp          -0.03177295
## 
## $numFeatures
## [1] 2
predictions <- predict(model, carsDF)
head(select(predictions, predictions$prediction))
##   prediction
## 1   23.57233
## 2   22.58348
## 3   25.27582
## 4   21.26502
## 5   18.32727
## 6   20.47382

线性回归模型。

因子分解机回归器

有关因子分解机的背景和实现的详细信息,请参阅因子分解机部分

model <- spark.fmRegressor(carsDF, mpg ~ wt + hp)
summary(model)
## $coefficients
##              Estimate
## (Intercept) 0.1518559
## wt          3.6472555
## hp          2.8026828
## 
## $factors
##            [,1]       [,2]       [,3]       [,4]      [,5]       [,6]
## [1,]  0.1424420 -0.1178110 -0.3970272 -0.4696695  0.400288  0.3690930
## [2,] -0.1626185  0.1512138  0.3690435  0.4076975 -0.625752 -0.3715109
##             [,7]       [,8]
## [1,]  0.03472468 -0.1703219
## [2,] -0.02109148 -0.2006249
## 
## $numFeatures
## [1] 2
## 
## $factorSize
## [1] 8
predictions <- predict(model, carsDF)
head(select(predictions, predictions$prediction))
##   prediction
## 1  106.70996
## 2   87.07526
## 3  111.07931
## 4   60.89565
## 5   61.81374
## 6   40.70095

用于回归问题的因子分解机。

决策树

spark.decisionTreeSparkDataFrame 上拟合 决策树 分类或回归模型。 用户可以调用 summary 来获取拟合模型的摘要,调用 predict 进行预测,以及调用 write.ml/read.ml 来保存/加载拟合模型。

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
dtModel <- spark.decisionTree(df, Survived ~ ., type = "classification", maxDepth = 2)
summary(dtModel)
## Formula:  Survived ~ .
## Number of features:  6
## Features:  Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances:  (6,[5],[1.0])
## Max Depth:  2
##  DecisionTreeClassificationModel: uid=dtc_a1e6304af37c, depth=2, numNodes=5, numClasses=2, numFeatures=6
##   If (feature 5 <= 4.5)
##    Predict: 0.0
##   Else (feature 5 > 4.5)
##    If (feature 5 <= 84.5)
##     Predict: 1.0
##    Else (feature 5 > 84.5)
##     Predict: 0.0
## 
predictions <- predict(dtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No         No
## 2   2nd   Male Child    0       No         No
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No         No
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

我们使用 Titanic 数据集来训练决策树并进行预测

梯度提升树

spark.gbtSparkDataFrame 上拟合 梯度提升树 分类或回归模型。 用户可以调用 summary 来获取拟合模型的摘要,调用 predict 进行预测,以及调用 write.ml/read.ml 来保存/加载拟合模型。

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
gbtModel <- spark.gbt(df, Survived ~ ., type = "classification", maxDepth = 2, maxIter = 2)
summary(gbtModel)
## Formula:  Survived ~ .
## Number of features:  6
## Features:  Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances:  (6,[1,2,5],[0.03336902858878361,0.16099525743106016,0.8056357139801562])
## Max Depth:  2
## Number of trees:  2
## Tree weights:  1 0.1
##  GBTClassificationModel: uid = gbtc_298ba1eabf17, numTrees=2, numClasses=2, numFeatures=6
##   Tree 0 (weight 1.0):
##     If (feature 5 <= 4.5)
##      If (feature 1 in {1.0})
##       Predict: -1.0
##      Else (feature 1 not in {1.0})
##       Predict: -0.3333333333333333
##     Else (feature 5 > 4.5)
##      If (feature 5 <= 84.5)
##       Predict: 0.5714285714285714
##      Else (feature 5 > 84.5)
##       Predict: -0.42857142857142855
##   Tree 1 (weight 0.1):
##     If (feature 2 in {1.0})
##      If (feature 5 <= 15.5)
##       Predict: 0.9671846896296403
##      Else (feature 5 > 15.5)
##       Predict: -1.0857923804083338
##     Else (feature 2 not in {1.0})
##      If (feature 5 <= 13.5)
##       Predict: -0.08651035613926407
##      Else (feature 5 > 13.5)
##       Predict: 0.6566673506774614
## 
predictions <- predict(gbtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No         No
## 2   2nd   Male Child    0       No         No
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No         No
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

我们使用 Titanic 数据集来训练梯度提升树并进行预测

随机森林

spark.randomForestSparkDataFrame 上拟合 随机森林 分类或回归模型。 用户可以调用 summary 来获取拟合模型的摘要,调用 predict 进行预测,以及调用 write.ml/read.ml 来保存/加载拟合模型。

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
rfModel <- spark.randomForest(df, Survived ~ ., type = "classification", maxDepth = 2, numTrees = 2)
summary(rfModel)
## Formula:  Survived ~ .
## Number of features:  6
## Features:  Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances:  (6,[3,4,5],[0.17058779274099098,0.09676977311565654,0.7326424341433525])
## Max Depth:  2
## Number of trees:  2
## Tree weights:  1 1
##  RandomForestClassificationModel: uid=rfc_f5de657337f1, numTrees=2, numClasses=2, numFeatures=6
##   Tree 0 (weight 1.0):
##     If (feature 4 in {0.0})
##      If (feature 3 in {0.0})
##       Predict: 0.0
##      Else (feature 3 not in {0.0})
##       Predict: 1.0
##     Else (feature 4 not in {0.0})
##      If (feature 5 <= 13.5)
##       Predict: 0.0
##      Else (feature 5 > 13.5)
##       Predict: 1.0
##   Tree 1 (weight 1.0):
##     If (feature 5 <= 84.5)
##      If (feature 5 <= 4.5)
##       Predict: 0.0
##      Else (feature 5 > 4.5)
##       Predict: 1.0
##     Else (feature 5 > 84.5)
##      Predict: 0.0
## 
predictions <- predict(rfModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No         No
## 2   2nd   Male Child    0       No         No
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No         No
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

在以下示例中,我们使用 Titanic 数据集来训练随机森林并进行预测

二分 k-Means

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)
summary(model)
## $k
## [1] 4
## 
## $coefficients
##   Survived_No
## 1 0          
## 2 1          
## 3 0          
## 4 1          
## 
## $size
## $size[[1]]
## [1] 16
## 
## $size[[2]]
## [1] 16
## 
## $size[[3]]
## [1] 0
## 
## $size[[4]]
## [1] 0
## 
## 
## $cluster
## SparkDataFrame[prediction:int]
## 
## $is.loaded
## [1] FALSE
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
##   Class prediction
## 1   1st          1
## 2   2nd          1
## 3   3rd          1
## 4  Crew          1
## 5   1st          1
## 6   2nd          1

spark.bisectingKmeans 是一种使用分裂(或“自上而下”)方法的层次聚类:所有观测值都从一个簇开始,并且随着层次结构的向下移动,递归地执行分裂。

高斯混合模型

spark.gaussianMixture 针对 SparkDataFrame 拟合多元高斯混合模型 (GMM)。 期望最大化 (EM) 用于近似模型的最大似然估计量 (MLE)。

X1 <- data.frame(V1 = rnorm(4), V2 = rnorm(4))
X2 <- data.frame(V1 = rnorm(6, 3), V2 = rnorm(6, 4))
data <- rbind(X1, X2)
df <- createDataFrame(data)
gmmModel <- spark.gaussianMixture(df, ~ V1 + V2, k = 2)
summary(gmmModel)
## $lambda
## [1] 0.408879 0.591121
## 
## $mu
## $mu[[1]]
## [1] -0.8254152973  0.0009888204
## 
## $mu[[2]]
## [1] 3.006119 3.620325
## 
## 
## $sigma
## $sigma[[1]]
##      [,1]     [,2]    
## [1,] 1.377944 1.092401
## [2,] 1.092401 1.477489
## 
## $sigma[[2]]
##      [,1]      [,2]     
## [1,] 1.317545  0.5716919
## [2,] 0.5716919 0.7335671
## 
## 
## $loglik
## [1] -32.8689
## 
## $posterior
## SparkDataFrame[posterior:array<double>]
## 
## $is.loaded
## [1] FALSE
gmmFitted <- predict(gmmModel, df)
head(select(gmmFitted, "V1", "V2", "prediction"))
##             V1         V2 prediction
## 1 -1.400043517  0.6215527          0
## 2  0.255317055  1.1484116          0
## 3 -2.437263611 -1.8218177          0
## 4 -0.005571287 -0.2473253          0
## 5  2.755800393  4.5124269          1
## 6  2.717294551  2.1369885          1

我们使用一个模拟示例来演示用法。

k-Means 聚类kk-means 聚类模型。 作为一种无监督学习方法,我们不需要响应变量。 因此,R 公式的左侧应留空。 聚类仅基于右侧的变量。

kmeansModel <- spark.kmeans(carsDF, ~ mpg + hp + wt, k = 3)
summary(kmeansModel)
## $k
## [1] 3
## 
## $coefficients
##        mpg        hp       wt
## 1 24.22353  93.52941 2.599588
## 2 15.80000 178.50000 3.926400
## 3 14.62000 263.80000 3.899000
## 
## $size
## $size[[1]]
## [1] 17
## 
## $size[[2]]
## [1] 10
## 
## $size[[3]]
## [1] 5
## 
## 
## $cluster
## SparkDataFrame[prediction:int]
## 
## $is.loaded
## [1] FALSE
## 
## $clusterSize
## [1] 3
kmeansPredictions <- predict(kmeansModel, carsDF)
head(select(kmeansPredictions, "model", "mpg", "hp", "wt", "prediction"), n = 20L)
##                  model  mpg  hp    wt prediction
## 1            Mazda RX4 21.0 110 2.620          0
## 2        Mazda RX4 Wag 21.0 110 2.875          0
## 3           Datsun 710 22.8  93 2.320          0
## 4       Hornet 4 Drive 21.4 110 3.215          0
## 5    Hornet Sportabout 18.7 175 3.440          1
## 6              Valiant 18.1 105 3.460          0
## 7           Duster 360 14.3 245 3.570          2
## 8            Merc 240D 24.4  62 3.190          0
## 9             Merc 230 22.8  95 3.150          0
## 10            Merc 280 19.2 123 3.440          0
## 11           Merc 280C 17.8 123 3.440          0
## 12          Merc 450SE 16.4 180 4.070          1
## 13          Merc 450SL 17.3 180 3.730          1
## 14         Merc 450SLC 15.2 180 3.780          1
## 15  Cadillac Fleetwood 10.4 205 5.250          1
## 16 Lincoln Continental 10.4 215 5.424          1
## 17   Chrysler Imperial 14.7 230 5.345          2
## 18            Fiat 128 32.4  66 2.200          0
## 19         Honda Civic 30.4  52 1.615          0
## 20      Toyota Corolla 33.9  65 1.835          0

潜在狄利克雷分配

spark.ldaSparkDataFrame 上拟合 潜在狄利克雷分配 模型。 它通常用于主题建模,其中从文本文档的集合中推断出主题。 LDA 可以被认为是如下所示的聚类算法

  • 主题对应于聚类中心,而文档对应于数据集中的示例(行)。

  • 主题和文档都存在于特征空间中,其中特征向量是词计数向量(词袋)。

  • LDA 不是使用传统的距离进行聚类,而是使用基于文本文件生成方式的统计模型的函数。

要使用 LDA,我们需要在 data 中指定一个 features 列,其中每个条目代表一个文档。 该列有两个选项

  • 字符串:这可以是整个文档的字符串。 它将被自动解析。 可以在 customizedStopWords 中添加其他停用词。

  • libSVM:每个条目都是单词的集合,并将被直接处理。

为拟合模型提供了两个附加函数。

  • spark.posterior 返回一个 SparkDataFrame,其中包含一个名为“topicDistribution”的后验概率向量列。

  • spark.perplexity 返回给定 SparkDataFrame 的对数困惑度,如果缺少参数 data,则返回训练数据的对数困惑度。

有关更多信息,请参见帮助文档 ?spark.lda

让我们看一个人工示例。

corpus <- data.frame(features = c(
  "1 2 6 0 2 3 1 1 0 0 3",
  "1 3 0 1 3 0 0 2 0 0 1",
  "1 4 1 0 0 4 9 0 1 2 0",
  "2 1 0 3 0 0 5 0 2 3 9",
  "3 1 1 9 3 0 2 0 0 1 3",
  "4 2 0 3 4 5 1 1 1 4 0",
  "2 1 0 3 0 0 5 0 2 2 9",
  "1 1 1 9 2 1 2 0 0 1 3",
  "4 4 0 3 4 2 1 3 0 0 0",
  "2 8 2 0 3 0 2 0 2 7 2",
  "1 1 1 9 0 2 2 0 0 3 3",
  "4 1 0 0 4 5 1 3 0 1 0"))
corpusDF <- createDataFrame(corpus)
model <- spark.lda(data = corpusDF, k = 5, optimizer = "em")
summary(model)
## $docConcentration
## [1] 11 11 11 11 11
## 
## $topicConcentration
## [1] 1.1
## 
## $logLikelihood
## [1] -353.2948
## 
## $logPerplexity
## [1] 2.676476
## 
## $isDistributed
## [1] TRUE
## 
## $vocabSize
## [1] 10
## 
## $topics
## SparkDataFrame[topic:int, term:array<string>, termWeights:array<double>]
## 
## $vocabulary
##  [1] "0" "1" "2" "3" "4" "9" "5" "8" "7" "6"
## 
## $trainingLogLikelihood
## [1] -239.5629
## 
## $logPrior
## [1] -980.2974
posterior <- spark.posterior(model, corpusDF)
head(posterior)
##                features                                     topicDistribution
## 1 1 2 6 0 2 3 1 1 0 0 3 0.1972172, 0.1986640, 0.2021996, 0.2006554, 0.2012638
## 2 1 3 0 1 3 0 0 2 0 0 1 0.1989977, 0.1988735, 0.2015952, 0.2006357, 0.1998979
## 3 1 4 1 0 0 4 9 0 1 2 0 0.2020598, 0.2026073, 0.1968832, 0.1987276, 0.1997220
## 4 2 1 0 3 0 0 5 0 2 3 9 0.2004069, 0.1981933, 0.2013000, 0.2006292, 0.1994706
## 5 3 1 1 9 3 0 2 0 0 1 3 0.1971478, 0.1983952, 0.2023559, 0.2011561, 0.2009450
## 6 4 2 0 3 4 5 1 1 1 4 0 0.2020231, 0.2041808, 0.1955391, 0.1997213, 0.1985357
perplexity <- spark.perplexity(model, corpusDF)
perplexity
## [1] 2.676476

交替最小二乘法

spark.als 通过 交替最小二乘法 学习 协同过滤 中的潜在因子。

可以在 spark.als 中配置多个选项,包括 rankregnonnegative。 有关完整的列表,请参阅帮助文件。

ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
                list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(ratings, c("user", "item", "rating"))
model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegative = TRUE)

提取潜在因子。

stats <- summary(model)
userFactors <- stats$userFactors
itemFactors <- stats$itemFactors
head(userFactors)
head(itemFactors)

进行预测。

predicted <- predict(model, df)
head(predicted)

幂迭代聚类

幂迭代聚类 (PIC) 是一种可扩展的图聚类算法。 spark.assignClusters 方法运行 PIC 算法,并为每个输入顶点返回一个聚类分配。

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                           list(4L, 0L, 0.1)),
                      schema = c("src", "dst", "weight"))
head(spark.assignClusters(df, initMode = "degree", weightCol = "weight"))
##   id cluster
## 1  4       1
## 2  0       0
## 3  1       0
## 4  3       1
## 5  2       0

FP-growth

spark.fpGrowth 执行 FP-growth 算法以挖掘 SparkDataFrame 上的频繁项集。 itemsCol 应该是一个值数组。

df <- selectExpr(createDataFrame(data.frame(rawItems = c(
  "T,R,U", "T,S", "V,R", "R,U,T,V", "R,S", "V,S,U", "U,R", "S,T", "V,R", "V,U,S",
  "T,V,U", "R,V", "T,S", "T,S", "S,T", "S,U", "T,R", "V,R", "S,V", "T,S,U"
))), "split(rawItems, ',') AS items")

fpm <- spark.fpGrowth(df, minSupport = 0.2, minConfidence = 0.5)

spark.freqItemsets 方法可用于检索具有频繁项集的 SparkDataFrame

##   items freq
## 1     R    9
## 2     U    8
## 3  U, T    4
## 4  U, V    4
## 5  U, S    4
## 6     T   10

spark.associationRules 返回具有关联规则的 SparkDataFrame

##   antecedent consequent confidence     lift support
## 1          V          R  0.5555556 1.234568    0.25
## 2          S          T  0.5454545 1.090909    0.30
## 3          T          S  0.6000000 1.090909    0.30
## 4          R          V  0.5555556 1.234568    0.25
## 5          U          T  0.5000000 1.000000    0.20
## 6          U          V  0.5000000 1.111111    0.20

我们可以基于 antecedent 进行预测。

head(predict(fpm, df))
##        items prediction
## 1    T, R, U       S, V
## 2       T, S       NULL
## 3       V, R       NULL
## 4 R, U, T, V          S
## 5       R, S       T, V
## 6    V, S, U       R, T

PrefixSpan

spark.findFrequentSequentialPatterns 方法可用于查找项集输入序列中完整的频繁序列模式集。

df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
                           list(list(list(1L), list(3L, 2L), list(1L, 2L))),
                           list(list(list(1L, 2L), list(5L))),
                           list(list(list(6L)))),
                      schema = c("sequence"))
head(spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L))
##   sequence freq
## 1        1    3
## 2        3    2
## 3        2    3
## 4     1, 2    3
## 5     1, 3    2

Kolmogorov-Smirnov 检验

spark.kstest 运行一个双侧、单样本的 Kolmogorov-Smirnov (KS) 检验。 给定一个 SparkDataFrame,该测试会将给定列 testCol 中的连续数据与参数 nullHypothesis 指定的理论分布进行比较。 用户可以调用 summary 来获取测试结果的摘要。

在以下示例中,我们测试 Titanic 数据集的 Freq 列是否服从正态分布。 我们使用样本的均值和标准差来设置正态分布的参数。

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
freqStats <- head(select(df, mean(df$Freq), sd(df$Freq)))
freqMean <- freqStats[1]
freqStd <- freqStats[2]

test <- spark.kstest(df, "Freq", "norm", c(freqMean, freqStd))
testSummary <- summary(test)
testSummary
## Kolmogorov-Smirnov test summary:
## degrees of freedom = 0 
## statistic = 0.3065126710255011 
## pValue = 0.0036336792155329256 
## Very strong presumption against null hypothesis: Sample follows theoretical distribution.

模型持久化

以下示例展示了如何在 SparkR 中保存/加载 ML 模型。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
gaussianGLM <- spark.glm(training, Freq ~ Sex + Age, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)
## 
## Saved-loaded model does not support output 'Deviance Residuals'.
## 
## Coefficients:
##              Estimate  Std. Error  t value   Pr(>|t|)
## (Intercept)    46.219      35.994   1.2841  0.2092846
## Sex_Female    -78.812      41.562  -1.8962  0.0679311
## Age_Adult     123.938      41.562   2.9820  0.0057522
## 
## (Dispersion parameter for gaussian family taken to be 13819.52)
## 
##     Null deviance: 573341  on 31  degrees of freedom
## Residual deviance: 400766  on 29  degrees of freedom
## AIC: 400.7
## 
## Number of Fisher Scoring iterations: 1
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, training)
head(gaussianPredictions)
##   Class    Sex   Age Survived Freq label prediction
## 1   1st   Male Child       No    0     0   46.21875
## 2   2nd   Male Child       No    0     0   46.21875
## 3   3rd   Male Child       No   35    35   46.21875
## 4  Crew   Male Child       No    0     0   46.21875
## 5   1st Female Child       No    0     0  -32.59375
## 6   2nd Female Child       No    0     0  -32.59375
unlink(modelPath)

结构化流处理

SparkR 支持结构化流处理 API。

您可以查看结构化流处理编程指南,了解其编程模型和基本概念的 简介

简单源和接收器

Spark 有一些内置的输入源。 例如,要使用套接字源将文本读入单词并显示计算的字数

# Create DataFrame representing the stream of input lines from connection
lines <- read.stream("socket", host = hostname, port = port)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(groupBy(words, "word"))

# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

Kafka 源

从 Kafka 读取数据很简单。 有关更多信息,请参阅结构化流处理支持的输入源

topic <- read.stream("kafka",
                     kafka.bootstrap.servers = "host1:port1,host2:port2",
                     subscribe = "topic1")
keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")

操作和接收器

SparkDataFrame 上最常见的操作都支持流处理,包括选择、投影和聚合。 定义最终结果后,要启动流式计算,您将调用 write.stream 方法来设置接收器和 outputMode

可以将流式 SparkDataFrame 写入控制台进行调试,写入临时内存表,或者以容错方式写入不同格式的文件接收器以进行进一步处理。

noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# Aggregate
aggDF <- count(groupBy(noAggDF, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

head(sql("select * from aggregates"))

高级主题

SparkR 对象类

SparkR 中您可能会使用的三个主要对象类。

  • SparkDataFrame:SparkR 的核心组件。 它是一个 S4 类,表示组织成命名列的分布式数据集合,在概念上等同于关系数据库中的表或 R 中的数据框。 它有两个插槽 sdfenv
    • sdf 存储对 Spark JVM 后端中相应 Spark Dataset 的引用。
    • env 保存对象的元信息,例如 isCached

    它可以由数据导入方法创建,也可以通过转换现有的 SparkDataFrame 创建。 我们可以通过大量数据处理函数来操作 SparkDataFrame,并将其输入到机器学习算法中。

  • Column:一个 S4 类,表示 SparkDataFrame 的一列。 插槽 jc 保存对 Spark JVM 后端中相应 Column 对象的引用。

    可以通过 $ 运算符从 SparkDataFrame 中获得它,例如,df$col。 更常见的是,它与其他函数一起使用,例如,与 select 一起选择特定列,与 filter 和构造的条件一起选择行,与聚合函数一起计算每个组的聚合统计信息。

  • GroupedData:一个 S4 类,表示由 groupBy 创建或通过转换其他 GroupedData 创建的分组数据。 它的 sgd 插槽保存对后端中 RelationalGroupedDataset 对象的引用。

    这通常是一个带有组信息的中间对象,后面是聚合操作。

架构

可以在参考文献中看到架构的完整描述,特别是论文 *SparkR:使用 Spark 扩展 R 程序*。

SparkR 的底层是 Spark SQL 引擎。 这避免了运行解释 R 代码的开销,并且 Spark 中优化的 SQL 执行引擎使用有关数据和计算流的结构信息来执行一系列优化以加速计算。

实际计算的主要方法调用发生在驱动程序的 Spark JVM 中。 我们有一个基于套接字的 SparkR API,允许我们从 R 调用 JVM 上的函数。 我们使用在基于 Netty 的套接字服务器上侦听的 SparkR JVM 后端。

SparkR JVM 后端支持两种 RPC:方法调用和创建新对象。 方法调用可以通过两种方式完成。

  • sparkR.callJMethod 接受对现有 Java 对象的引用以及要传递给方法的参数列表。

  • sparkR.callJStatic 接受静态方法的类名以及要传递给方法的参数列表。

参数使用我们的自定义线路格式序列化,然后在 JVM 端反序列化。 然后,我们使用 Java 反射来调用适当的方法。

要创建对象,使用 sparkR.newJObject,然后类似地使用提供的参数调用适当的构造函数。

最后,我们使用一个新的 R 类 jobj,它引用后端中存在的 Java 对象。 这些引用在 Java 端被跟踪,并在它们超出 R 端的范围时自动进行垃圾回收。

附录

R 和 Spark 数据类型

R Spark
byte byte
integer integer
float float
double double
numeric double
character string
string string
binary binary
raw binary
logical boolean
POSIXct timestamp
POSIXlt timestamp
Date date
array array
list array
env map

参考