SparkR (R on Spark)

概述

SparkR 是一个 R 包,它提供了一个轻量级的前端,用于从 R 中使用 Apache Spark。在 Spark 3.5.5 中,SparkR 提供了一个分布式数据帧实现,它支持诸如选择、过滤、聚合等操作(类似于 R 数据帧、dplyr),但适用于大型数据集。SparkR 还支持使用 MLlib 进行分布式机器学习。

SparkDataFrame

SparkDataFrame 是一个组织成命名列的分布式数据集合。它在概念上等同于关系数据库中的表或 R 中的数据帧,但底层具有更丰富的优化。SparkDataFrame 可以从各种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据帧。

此页面上的所有示例都使用 R 或 Spark 发行版中包含的示例数据,并且可以使用 ./bin/sparkR shell 运行。

启动: SparkSession

进入 SparkR 的入口点是 SparkSession,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session 创建 SparkSession,并传入诸如应用程序名称、任何依赖的 Spark 包等选项。此外,您还可以通过 SparkSession 使用 SparkDataFrame。如果您正在使用 sparkR shell,则应该已经为您创建了 SparkSession,您无需调用 sparkR.session

sparkR.session()

从 RStudio 启动

您也可以从 RStudio 启动 SparkR。您可以从 RStudio、R shell、Rscript 或其他 R IDE 将您的 R 程序连接到 Spark 集群。要启动,请确保在环境中设置了 SPARK_HOME(您可以检查 Sys.getenv),加载 SparkR 包,并按如下方式调用 sparkR.session。它将检查 Spark 安装,如果未找到,它将被自动下载和缓存。或者,您也可以手动运行 install.spark

除了调用 sparkR.session 之外,您还可以指定某些 Spark 驱动程序属性。通常这些 应用程序属性运行时环境 无法以编程方式设置,因为驱动程序 JVM 进程已经启动,在这种情况下,SparkR 会为您处理此问题。要设置它们,请像在 sparkR.session()sparkConfig 参数中设置其他配置属性一样传递它们。

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

以下 Spark 驱动程序属性可以在 RStudio 中使用 sparkR.sessionsparkConfig 中设置

属性名称属性组spark-submit 等效项
spark.master 应用程序属性 --master
spark.kerberos.keytab 应用程序属性 --keytab
spark.kerberos.principal 应用程序属性 --principal
spark.driver.memory 应用程序属性 --driver-memory
spark.driver.extraClassPath 运行时环境 --driver-class-path
spark.driver.extraJavaOptions 运行时环境 --driver-java-options
spark.driver.extraLibraryPath 运行时环境 --driver-library-path

创建 SparkDataFrames

使用 SparkSession,应用程序可以从本地 R 数据帧、Hive 表 或其他 数据源 创建 SparkDataFrame

从本地数据帧

创建数据帧的最简单方法是将本地 R 数据帧转换为 SparkDataFrame。具体来说,我们可以使用 as.DataFramecreateDataFrame 并传入本地 R 数据帧以创建 SparkDataFrame。例如,以下基于 R 中的 faithful 数据集创建一个 SparkDataFrame

df <- as.DataFrame(faithful)

# Displays the first part of the SparkDataFrame
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

从数据源

SparkR 支持通过 SparkDataFrame 接口对各种数据源进行操作。本节介绍使用数据源加载和保存数据的通用方法。您可以查看 Spark SQL 编程指南以获取更多可用于内置数据源的 特定选项

从数据源创建 SparkDataFrame 的通用方法是 read.df。此方法接收要加载的文件的路径和数据源的类型,并且将自动使用当前活动的 SparkSession。SparkR 原生支持读取 JSON、CSV 和 Parquet 文件,并且通过来自诸如 第三方项目 等来源的可用软件包,您可以找到流行的文件格式(如 Avro)的数据源连接器。这些软件包可以通过使用 spark-submitsparkR 命令指定 --packages 来添加,或者如果在交互式 R shell 中或从 RStudio 初始化 SparkSession 时使用 sparkPackages 参数来添加。

sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.12:3.5.5")

我们可以通过一个 JSON 输入文件示例了解如何使用数据源。请注意,此处使用的文件不是典型的 JSON 文件。文件中的每一行必须包含一个单独的、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式,也称为换行符分隔的 JSON。因此,常规的多行 JSON 文件通常会失败。

people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))

数据源 API 原生支持 CSV 格式的输入文件。有关更多信息,请参阅 SparkR read.df API 文档。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

数据源 API 也可用于将 SparkDataFrame 保存为多种文件格式。例如,我们可以使用 write.df 将上一个示例中的 SparkDataFrame 保存到 Parquet 文件中。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

从 Hive 表

您也可以从 Hive 表创建 SparkDataFrame。为此,我们需要创建一个具有 Hive 支持的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该已使用 Hive 支持 构建,更多详细信息可以在 SQL 编程指南 中找到。在 SparkR 中,默认情况下它将尝试创建一个启用 Hive 支持的 SparkSession (enableHiveSupport = TRUE)。

sparkR.session()

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

SparkDataFrame 操作

SparkDataFrame 支持许多用于进行结构化数据处理的函数。这里我们包含一些基本示例,完整的列表可以在 API 文档中找到

选择行、列

# Create the SparkDataFrame
df <- as.DataFrame(faithful)

# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]

# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333

# You can also pass in column name as strings
head(select(df, "eruptions"))

# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

分组、聚合

SparkR 数据帧支持许多常用的函数来在分组后聚合数据。例如,我们可以计算 faithful 数据集中 waiting 时间的直方图,如下所示

# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2

# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

除了标准聚合之外,SparkR 还支持 OLAP 多维数据集操作符 cube

head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1  NA 140.8    4     22.8
##2   4  75.7    4     30.4
##3   8 400.0    3     19.2
##4   8 318.0    3     15.5
##5  NA 351.0   NA     15.8
##6  NA 275.8   NA     16.3

rollup

head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1   4  75.7    4     30.4
##2   8 400.0    3     19.2
##3   8 318.0    3     15.5
##4   4  78.7   NA     32.4
##5   8 304.0    3     15.2
##6   4  79.0   NA     27.3

对列进行操作

SparkR 还提供了许多可以直接应用于列的数据处理和聚合函数。下面的例子展示了基本算术函数的使用。

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

应用用户自定义函数

在 SparkR 中,我们支持几种用户定义的函数

使用 dapplydapplyCollect 在大型数据集上运行给定的函数

dapply

将函数应用于 SparkDataFrame 的每个分区。要应用于 SparkDataFrame 的每个分区的函数应该只有一个参数,传递给该参数的应该是一个与每个分区对应的 data.frame。函数的输出应该是一个 data.frame。Schema 指定了结果 SparkDataFrame 的行格式。它必须与返回值的 数据类型相匹配。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300
dapplyCollect

类似于 dapply,将函数应用于 SparkDataFrame 的每个分区并将结果收集回来。函数的输出应该是一个 data.frame。但是,不需要传递 Schema。请注意,如果 UDF 在所有分区上运行的输出无法拉取到驱动程序并放入驱动程序内存中,则 dapplyCollect 可能会失败。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

通过输入列分组,并使用 gapplygapplyCollect 在大型数据集上运行给定的函数

gapply

将函数应用于 SparkDataFrame 的每个组。该函数将应用于 SparkDataFrame 的每个组,并且应该只有两个参数:分组键和与该键对应的 R data.frame。这些组是从 SparkDataFrame 的列中选择的。函数的输出应该是一个 data.frame。Schema 指定了结果 SparkDataFrame 的行格式。它必须根据 Spark 数据类型表示 R 函数的输出模式。返回的 data.frame 的列名由用户设置。

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900
gapplyCollect

类似于 gapply,将函数应用于 SparkDataFrame 的每个分区,并将结果收集回 R data.frame。函数的输出应该是一个 data.frame。但是,不需要传递 schema。请注意,如果 UDF 在所有分区上运行的输出无法拉取到驱动程序并放入驱动程序内存中,则 gapplyCollect 可能会失败。

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

使用 spark.lapply 分布式运行本地 R 函数

spark.lapply

类似于本地 R 中的 lapplyspark.lapply 对元素列表运行函数,并使用 Spark 分布式计算。以类似于 doParallellapply 的方式将函数应用于列表的元素。所有计算的结果都应该能放入一台机器中。如果不是这种情况,他们可以执行类似 df <- createDataFrame(list) 的操作,然后使用 dapply

# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

急切执行

如果启用了 eager execution,则当创建 SparkDataFrame 时,数据将立即返回到 R 客户端。默认情况下,未启用 eager execution,可以通过在启动 SparkSession 时将配置属性 spark.sql.repl.eagerEval.enabled 设置为 true 来启用。

要显示的最大行数和每列数据的最大字符数可以分别通过 spark.sql.repl.eagerEval.maxNumRowsspark.sql.repl.eagerEval.truncate 配置属性来控制。这些属性仅在启用 eager execution 时有效。如果未显式设置这些属性,则默认情况下,将显示最多 20 行数据和每列最多 20 个字符。

# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
                                  spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))

# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")

# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2

##+-------+-----+
##|waiting|count|
##+-------+-----+
##|   43.0|    1|
##|   45.0|    3|
##|   46.0|    5|
##|   47.0|    4|
##|   48.0|    3|
##|   49.0|    5|
##|   50.0|    5|
##|   51.0|    6|
##|   52.0|    5|
##|   53.0|    7|
##+-------+-----+
##only showing top 10 rows

请注意,要在 sparkR shell 中启用 eager execution,请将 spark.sql.repl.eagerEval.enabled=true 配置属性添加到 --conf 选项中。

从 SparkR 运行 SQL 查询

SparkDataFrame 也可以注册为 Spark SQL 中的临时视图,这允许您对其数据运行 SQL 查询。sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame 返回。

# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

机器学习

算法

SparkR 目前支持以下机器学习算法

分类

回归

聚类

协同过滤

频繁模式挖掘

统计

在底层,SparkR 使用 MLlib 来训练模型。有关示例代码,请参阅 MLlib 用户指南的相应部分。用户可以调用 summary 来打印拟合模型的摘要,predict 来对新数据进行预测,以及 write.ml/read.ml 来保存/加载拟合模型。SparkR 支持可用于模型拟合的可用 R 公式运算符的子集,包括 ‘~’、‘.’、‘:’、‘+’ 和 ‘-‘。

模型持久化

以下示例展示了如何通过 SparkR 保存/加载 MLlib 模型。

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)

# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)

unlink(modelPath)
在 Spark 仓库的 "examples/src/main/r/ml/ml.R" 中查找完整的示例代码。

R 和 Spark 之间的数据类型映射

RSpark
byte byte
integer integer
float float
double double
numeric double
character string
string string
binary binary
raw binary
logical boolean
POSIXct timestamp
POSIXlt timestamp
Date date
array array
list array
env map

结构化流处理

SparkR 支持结构化流式 API。结构化流式处理是一个可扩展且容错的流式处理引擎,构建在 Spark SQL 引擎之上。有关更多信息,请参阅 结构化流式处理编程指南上的 R API

SparkR 中的 Apache Arrow

Apache Arrow 是一种内存中的列式数据格式,Spark 中使用它来有效地在 JVM 和 R 进程之间传输数据。另请参阅 PySpark 优化完成,使用 Apache Arrow 的 Pandas 的 PySpark 用法指南。本指南旨在解释如何在 SparkR 中使用 Arrow 优化,并提供一些关键点。

确保已安装 Arrow

Arrow R 库可在 CRAN 上获得,可以如下安装。

Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'

有关更多详细信息,请参阅 Apache Arrow 的官方文档

请注意,您必须确保 Arrow R 包已安装并在所有集群节点上可用。当前支持的最低版本是 1.0.0;但是,由于 SparkR 中的 Arrow 优化是实验性的,因此这可能会在次要版本之间发生变化。

启用与 R DataFrame 之间的转换、dapplygapply

当使用 collect(spark_df) 将 Spark DataFrame 转换为 R DataFrame,使用 createDataFrame(r_df) 从 R DataFrame 创建 Spark DataFrame,通过 dapply(...) 将 R 原生函数应用于每个分区,以及通过 gapply(...) 将 R 原生函数应用于分组数据时,可以使用 Arrow 优化。要在使用这些函数时使用 Arrow,用户需要首先将 Spark 配置 ‘spark.sql.execution.arrow.sparkr.enabled’ 设置为 ‘true’。 默认情况下,此功能处于禁用状态。

无论是否启用优化,SparkR 都会产生相同的结果。此外,当在实际计算之前由于任何原因导致优化失败时,Spark DataFrame 和 R DataFrame 之间的转换会自动回退到非 Arrow 优化实现。

# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)

# Converts Spark DataFrame to an R DataFrame
collect(spark_df)

# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))

# Apply an R native function to grouped data.
collect(gapply(spark_df,
               "gear",
               function(key, group) {
                 data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
               },
               structType("gear double, disp boolean")))

请注意,即使使用 Arrow,collect(spark_df) 也会导致将 DataFrame 中的所有记录收集到驱动程序程序中,因此应仅在数据的较小子集上执行。此外,gapply(...)dapply(...) 中指定的输出 schema 应与给定函数返回的 R DataFrame 的 schema 相匹配。

支持的 SQL 类型

目前,基于 Arrow 的转换支持所有 Spark SQL 数据类型,除了 FloatTypeBinaryTypeArrayTypeStructTypeMapType

R 函数名称冲突

在 R 中加载和附加新包时,可能会出现名称冲突,即一个函数屏蔽了另一个函数。

以下函数被 SparkR 包屏蔽

被屏蔽的函数如何访问
package:stats 中的 cov
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
package:stats 中的 filter
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
package:base 中的 sample base::sample(x, size, replace = FALSE, prob = NULL)

由于 SparkR 的一部分是基于 dplyr 包建模的,因此 SparkR 中的某些函数与 dplyr 中的函数共享相同的名称。根据两个包的加载顺序,首先加载的包中的某些函数将被稍后加载的包中的函数屏蔽。在这种情况下,请使用包名称作为此类调用的前缀,例如,SparkR::cume_dist(x)dplyr::cume_dist(x)

您可以使用 search() 检查 R 中的搜索路径

迁移指南

迁移指南现已在本页存档。