SparkR (Spark 上的 R)

SparkR 已从 Apache Spark 4.0.0 开始弃用,并将在未来版本中移除。

概述

SparkR 是一个 R 包,它提供了一个轻量级前端,允许从 R 中使用 Apache Spark。在 Spark 4.0.0 中,SparkR 提供了一个分布式数据框实现,支持选择、过滤、聚合等操作(类似于 R 数据框,dplyr),但适用于大型数据集。SparkR 还支持使用 MLlib 进行分布式机器学习。

SparkDataFrame

SparkDataFrame 是一种由命名列组织起来的分布式数据集合。它在概念上等同于关系数据库中的表或 R 中的数据框,但在底层拥有更丰富的优化。SparkDataFrame 可以从多种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据框。

本页上的所有示例都使用 R 或 Spark 发行版中包含的示例数据,并且可以使用 ./bin/sparkR shell 运行。

启动:SparkSession

SparkR 的入口点是 SparkSession,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session 创建 SparkSession 并传入应用程序名称、任何依赖的 Spark 包等选项。此外,您还可以通过 SparkSession 使用 SparkDataFrame。如果您从 sparkR shell 工作,则 SparkSession 应该已经为您创建,您无需调用 sparkR.session

sparkR.session()

从 RStudio 启动

您也可以从 RStudio 启动 SparkR。您可以将您的 R 程序连接到 Spark 集群,无论是从 RStudio、R shell、Rscript 还是其他 R IDE。要启动,请确保在环境中设置了 SPARK_HOME(您可以通过 Sys.getenv 检查),加载 SparkR 包,然后如下所示调用 sparkR.session。它将检查 Spark 安装,如果未找到,将自动下载和缓存。或者,您也可以手动运行 install.spark

除了调用 sparkR.session 之外,您还可以指定某些 Spark 驱动器属性。通常,这些应用程序属性运行时环境无法通过编程方式设置,因为驱动器 JVM 进程已经启动。在这种情况下,SparkR 会为您处理。要设置它们,请像设置其他配置属性一样,将它们作为 sparkR.session()sparkConfig 参数传入。

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

以下 Spark 驱动器属性可以在 RStudio 中通过 sparkR.sessionsparkConfig 参数进行设置

属性名称属性组spark-submit 等效参数
spark.master 应用程序属性 --master
spark.kerberos.keytab 应用程序属性 --keytab
spark.kerberos.principal 应用程序属性 --principal
spark.driver.memory 应用程序属性 --driver-memory
spark.driver.extraClassPath 运行时环境 --driver-class-path
spark.driver.extraJavaOptions 运行时环境 --driver-java-options
spark.driver.extraLibraryPath 运行时环境 --driver-library-path

创建 SparkDataFrame

通过 SparkSession,应用程序可以从本地 R 数据框、Hive 表或其他数据源创建 SparkDataFrame

从本地数据框创建

创建数据框最简单的方法是将本地 R 数据框转换为 SparkDataFrame。具体来说,我们可以使用 as.DataFramecreateDataFrame 并传入本地 R 数据框来创建一个 SparkDataFrame。例如,以下代码使用 R 中的 faithful 数据集创建了一个 SparkDataFrame

df <- as.DataFrame(faithful)

# Displays the first part of the SparkDataFrame
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

从数据源创建

SparkR 通过 SparkDataFrame 接口支持操作各种数据源。本节描述了使用数据源加载和保存数据的通用方法。您可以查看 Spark SQL 编程指南,了解内置数据源可用的更多特定选项

从数据源创建 SparkDataFrame 的通用方法是 read.df。此方法接受要加载的文件路径和数据源类型,并且当前活动的 SparkSession 将自动使用。SparkR 原生支持读取 JSON、CSV 和 Parquet 文件,并且通过第三方项目等来源提供的包,您可以找到适用于 Avro 等流行文件格式的数据源连接器。这些包可以通过在 spark-submitsparkR 命令中指定 --packages 来添加,或者如果在交互式 R shell 或 RStudio 中初始化 SparkSession 时使用 sparkPackages 参数。

sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.13:4.0.0")

我们可以通过一个 JSON 输入文件示例了解如何使用数据源。请注意,此处使用的文件并非典型的 JSON 文件。文件中的每一行都必须包含一个独立、自包含的有效 JSON 对象。有关更多信息,请参阅JSON Lines 文本格式(也称为换行符分隔的 JSON)。因此,常规的多行 JSON 文件通常会失败。

people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))

数据源 API 原生支持 CSV 格式的输入文件。有关更多信息,请参阅 SparkR read.df API 文档。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

数据源 API 也可以用于将 SparkDataFrame 保存为多种文件格式。例如,我们可以使用 write.df 将前面示例中的 SparkDataFrame 保存到 Parquet 文件。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

从 Hive 表创建

您还可以从 Hive 表创建 SparkDataFrame。为此,我们需要创建一个支持 Hive 的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该已经构建了Hive 支持,更多详细信息可以在SQL 编程指南中找到。在 SparkR 中,默认情况下会尝试创建启用 Hive 支持的 SparkSession (enableHiveSupport = TRUE)。

sparkR.session()

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

SparkDataFrame 操作

SparkDataFrame 支持多种函数来执行结构化数据处理。此处包含一些基本示例,完整列表可在 API 文档中找到。

选择行、列

# Create the SparkDataFrame
df <- as.DataFrame(faithful)

# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]

# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333

# You can also pass in column name as strings
head(select(df, "eruptions"))

# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

分组、聚合

SparkR 数据框支持多种常用函数,用于分组后聚合数据。例如,我们可以计算 faithful 数据集中 waiting 时间的直方图,如下所示

# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2

# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

除了标准聚合之外,SparkR 还支持 OLAP 立方体运算符 cube

head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1  NA 140.8    4     22.8
##2   4  75.7    4     30.4
##3   8 400.0    3     19.2
##4   8 318.0    3     15.5
##5  NA 351.0   NA     15.8
##6  NA 275.8   NA     16.3

rollup

head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1   4  75.7    4     30.4
##2   8 400.0    3     19.2
##3   8 318.0    3     15.5
##4   4  78.7   NA     32.4
##5   8 304.0    3     15.2
##6   4  79.0   NA     27.3

列操作

SparkR 还提供了许多可以直接应用于列的函数,用于数据处理和聚合。以下示例展示了基本算术函数的使用。

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

应用用户定义函数

在 SparkR 中,我们支持多种用户定义函数(UDF)

使用 dapplydapplyCollect 在大型数据集上运行给定函数

dapply

将一个函数应用于 SparkDataFrame 的每个分区。应用于 SparkDataFrame 每个分区的函数应只有一个参数,该参数将传入对应每个分区的 data.frame。函数的输出应为一个 data.frame。Schema 指定了结果 SparkDataFrame 的行格式。它必须与返回值的数据类型匹配。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300
dapplyCollect

dapply 类似,将一个函数应用于 SparkDataFrame 的每个分区并收集结果。函数的输出应为一个 data.frame。但不需要传入 Schema。请注意,如果 UDF 在所有分区上运行的输出无法拉取到驱动器并适应驱动器内存,则 dapplyCollect 可能会失败。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

通过输入列分组,使用 gapplygapplyCollect 在大型数据集上运行给定函数

gapply

将一个函数应用于 SparkDataFrame 的每个分组。应用于 SparkDataFrame 每个分组的函数应只有两个参数:分组键和对应此键的 R data.frame。分组从 SparkDataFrame 的列中选择。函数的输出应为一个 data.frame。Schema 指定了结果 SparkDataFrame 的行格式。它必须基于 Spark 数据类型表示 R 函数的输出 Schema。返回的 data.frame 的列名由用户设置。

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900
gapplyCollect

gapply 类似,将一个函数应用于 SparkDataFrame 的每个分区,并将结果收集回 R data.frame。函数的输出应为一个 data.frame。但不需要传入 Schema。请注意,如果 UDF 在所有分区上运行的输出无法拉取到驱动器并适应驱动器内存,则 gapplyCollect 可能会失败。

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

使用 spark.lapply 分布式运行本地 R 函数

spark.lapply

类似于原生 R 中的 lapplyspark.lapply 对元素列表运行一个函数,并使用 Spark 分布式计算。以类似于 doParallellapply 的方式将函数应用于列表元素。所有计算的结果应能适应单台机器。如果不是这种情况,他们可以执行类似 df <- createDataFrame(list) 的操作,然后使用 dapply

# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

即时执行

如果启用了即时执行,当 SparkDataFrame 创建时,数据将立即返回到 R 客户端。默认情况下,即时执行未启用,但可以在启动 SparkSession 时将配置属性 spark.sql.repl.eagerEval.enabled 设置为 true 来启用。

要显示的最大行数和每列最大字符数可以分别通过 spark.sql.repl.eagerEval.maxNumRowsspark.sql.repl.eagerEval.truncate 配置属性控制。这些属性仅在启用即时执行时有效。如果未显式设置这些属性,默认情况下将显示最多 20 行数据和每列最多 20 个字符。

# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
                                  spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))

# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")

# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2

##+-------+-----+
##|waiting|count|
##+-------+-----+
##|   43.0|    1|
##|   45.0|    3|
##|   46.0|    5|
##|   47.0|    4|
##|   48.0|    3|
##|   49.0|    5|
##|   50.0|    5|
##|   51.0|    6|
##|   52.0|    5|
##|   53.0|    7|
##+-------+-----+
##only showing top 10 rows

请注意,要在 sparkR shell 中启用即时执行,请将 spark.sql.repl.eagerEval.enabled=true 配置属性添加到 --conf 选项。

从 SparkR 运行 SQL 查询

SparkDataFrame 也可以在 Spark SQL 中注册为临时视图,这允许您对其数据运行 SQL 查询。sql 函数使应用程序能够以编程方式运行 SQL 查询,并以 SparkDataFrame 的形式返回结果。

# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

机器学习

算法

SparkR 目前支持以下机器学习算法

分类

回归

聚类

协同过滤

频繁模式挖掘

统计

在底层,SparkR 使用 MLlib 训练模型。有关示例代码,请参阅 MLlib 用户指南的相应部分。用户可以调用 summary 打印拟合模型的摘要,调用 predict 对新数据进行预测,以及调用 write.ml/read.ml 保存/加载拟合模型。SparkR 支持一部分可用的 R 公式运算符用于模型拟合,包括 ‘~’、‘.’、‘:’、‘+’ 和 ‘-‘。

模型持久化

以下示例展示了如何通过 SparkR 保存/加载 MLlib 模型。

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)

# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)

unlink(modelPath)
完整的示例代码可在 Spark 仓库的 "examples/src/main/r/ml/ml.R" 中找到。

R 与 Spark 之间的数据类型映射

RSpark
字节 字节
整型 整型
浮点型 浮点型
双精度浮点型 双精度浮点型
数值型 双精度浮点型
字符型 字符串
字符串 字符串
二进制 二进制
原始字节 二进制
逻辑型 布尔型
POSIXct 时间戳
POSIXlt 时间戳
日期 日期
数组 数组
列表 数组
环境 映射

结构化流

SparkR 支持结构化流 API。结构化流是一个可伸缩且容错的流处理引擎,构建于 Spark SQL 引擎之上。有关更多信息,请参阅结构化流编程指南中的 R API。

SparkR 中的 Apache Arrow

Apache Arrow 是一种内存列式数据格式,在 Spark 中用于在 JVM 和 R 进程之间高效传输数据。另请参阅 PySpark 优化,PySpark 使用 Apache Arrow 配合 Pandas 的指南。本指南旨在解释如何在 SparkR 中使用 Arrow 优化,并提供一些关键点。

确保 Arrow 已安装

Arrow R 库在 CRAN 上可用,可以如下安装。

Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'

有关更多详细信息,请参阅Apache Arrow 的官方文档

请注意,您必须确保 Arrow R 包已安装并在所有集群节点上可用。当前支持的最低版本是 1.0.0;但是,由于 SparkR 中的 Arrow 优化仍处于实验阶段,此版本可能会在次要版本之间发生变化。

启用 R DataFrame 之间的转换,以及 dapplygapply

Arrow 优化在以下情况下可用:使用 collect(spark_df) 将 Spark DataFrame 转换为 R DataFrame 时,使用 createDataFrame(r_df) 从 R DataFrame 创建 Spark DataFrame 时,通过 dapply(...) 将 R 原生函数应用于每个分区时,以及通过 gapply(...) 将 R 原生函数应用于分组数据时。要在执行这些操作时使用 Arrow,用户首先需要将 Spark 配置 'spark.sql.execution.arrow.sparkr.enabled' 设置为 'true'。默认情况下此功能是禁用的。

无论是否启用优化,SparkR 都会产生相同的结果。此外,当在实际计算之前因任何原因导致优化失败时,Spark DataFrame 和 R DataFrame 之间的转换会自动回退到非 Arrow 优化实现。

# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)

# Converts Spark DataFrame to an R DataFrame
collect(spark_df)

# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))

# Apply an R native function to grouped data.
collect(gapply(spark_df,
               "gear",
               function(key, group) {
                 data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
               },
               structType("gear double, disp boolean")))

请注意,即使使用 Arrow,collect(spark_df) 也会导致将 DataFrame 中的所有记录收集到驱动程序中,因此应在数据的少量子集上执行此操作。此外,gapply(...)dapply(...) 中指定的输出 Schema 应与给定函数返回的 R DataFrame 相匹配。

支持的 SQL 类型

目前,除了 FloatTypeBinaryTypeArrayTypeStructTypeMapType 之外,所有 Spark SQL 数据类型都支持基于 Arrow 的转换。

R 函数名称冲突

在 R 中加载和附加新包时,可能会出现名称冲突,即一个函数掩盖了另一个函数。

以下函数被 SparkR 包掩盖

被掩盖的函数如何访问
package:stats 中的 cov
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
package:stats 中的 filter
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
package:base 中的 sample base::sample(x, size, replace = FALSE, prob = NULL)

由于 SparkR 的部分功能是基于 dplyr 包建模的,因此 SparkR 中的某些函数与 dplyr 中的函数同名。根据两个包的加载顺序,先加载的包中的某些函数可能会被后加载的包中的同名函数掩盖。在这种情况下,请使用包名作为前缀来调用这些函数,例如 SparkR::cume_dist(x)dplyr::cume_dist(x)

您可以使用 search() 检查 R 中的搜索路径。

迁移指南

迁移指南现已存档在此页面上