SparkR (R on Spark)

概述

SparkR 是一个 R 包,它提供了一个轻量级的前端,用于从 R 使用 Apache Spark。在 Spark 3.5.1 中,SparkR 提供了一个分布式数据框实现,支持选择、过滤、聚合等操作(类似于 R 数据框,dplyr),但适用于大型数据集。SparkR 还支持使用 MLlib 进行分布式机器学习。

SparkDataFrame

SparkDataFrame 是一个分布式数据集合,组织成命名列。从概念上讲,它等同于关系数据库中的表或 R 中的数据框,但在幕后具有更丰富的优化。SparkDataFrame 可以从各种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据框。

本页上的所有示例都使用 R 或 Spark 分发版中包含的示例数据,可以使用 ./bin/sparkR shell 运行。

启动:SparkSession

SparkR 的入口点是 SparkSession,它将您的 R 程序连接到 Spark 集群。您可以使用 sparkR.session 创建 SparkSession,并传入选项,例如应用程序名称、任何依赖的 Spark 包等。此外,您还可以通过 SparkSession 使用 SparkDataFrame。如果您在 sparkR shell 中工作,则 SparkSession 应该已经为您创建,您不需要调用 sparkR.session

sparkR.session()

从 RStudio 启动

您也可以从 RStudio 启动 SparkR。您可以从 RStudio、R shell、Rscript 或其他 R IDE 将您的 R 程序连接到 Spark 集群。要开始,请确保 SPARK_HOME 在环境中设置(您可以检查 Sys.getenv),加载 SparkR 包,并按如下所示调用 sparkR.session。它将检查 Spark 安装,如果未找到,它将自动下载并缓存。或者,您也可以手动运行 install.spark

除了调用 sparkR.session 之外,您还可以指定某些 Spark 驱动程序属性。通常,这些 应用程序属性运行时环境 无法以编程方式设置,因为驱动程序 JVM 进程已经启动,在这种情况下,SparkR 会为您处理。要设置它们,请将它们作为其他配置属性传递给 sparkR.session() 中的 sparkConfig 参数。

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

以下 Spark 驱动程序属性可以在 sparkConfig 中使用 sparkR.session 从 RStudio 设置

属性名称属性组spark-submit 等效项
spark.master 应用程序属性 --master
spark.kerberos.keytab 应用程序属性 --keytab
spark.kerberos.principal 应用程序属性 --principal
spark.driver.memory 应用程序属性 --driver-memory
spark.driver.extraClassPath 运行时环境 --driver-class-path
spark.driver.extraJavaOptions 运行时环境 --driver-java-options
spark.driver.extraLibraryPath 运行时环境 --driver-library-path

创建 SparkDataFrame

使用 SparkSession,应用程序可以从本地 R 数据框、从 Hive 表 或从其他 数据源 创建 SparkDataFrame

从本地数据框

创建数据框最简单的方法是将本地 R 数据框转换为 SparkDataFrame。具体来说,我们可以使用 as.DataFramecreateDataFrame 并传入本地 R 数据框以创建 SparkDataFrame。例如,以下代码使用 R 中的 faithful 数据集创建了一个 SparkDataFrame

df <- as.DataFrame(faithful)

# Displays the first part of the SparkDataFrame
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

从数据源

SparkR 支持通过 SparkDataFrame 接口对各种数据源进行操作。本节介绍使用数据源加载和保存数据的通用方法。您可以查看 Spark SQL 编程指南以了解有关 特定选项 的更多信息,这些选项可用于内置数据源。

从数据源创建 SparkDataFrame 的通用方法是 read.df。此方法接受要加载的文件的路径和数据源类型,并且当前活动的 SparkSession 将自动使用。SparkR 本机支持读取 JSON、CSV 和 Parquet 文件,并且通过来自 第三方项目 等来源的可用包,您可以找到针对流行文件格式(如 Avro)的数据源连接器。这些包可以通过使用 spark-submitsparkR 命令指定 --packages 来添加,或者如果在交互式 R shell 或从 RStudio 中使用 sparkPackages 参数初始化 SparkSession。

sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.12:3.5.1")

我们可以看到如何使用示例 JSON 输入文件使用数据源。请注意,此处使用的文件 *不是* 典型的 JSON 文件。文件中的每一行都必须包含一个单独的、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式,也称为换行符分隔的 JSON。因此,常规的多行 JSON 文件通常会失败。

people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))

数据源 API 本机支持 CSV 格式的输入文件。有关更多信息,请参阅 SparkR read.df API 文档。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

数据源 API 也可以用于将 SparkDataFrame 保存到多种文件格式。例如,我们可以使用 write.df 将上一个示例中的 SparkDataFrame 保存到 Parquet 文件。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

从 Hive 表

您还可以从 Hive 表创建 SparkDataFrame。为此,我们需要创建一个具有 Hive 支持的 SparkSession,它可以访问 Hive MetaStore 中的表。请注意,Spark 应该使用 Hive 支持 构建,有关更多详细信息,请参阅 SQL 编程指南。在 SparkR 中,默认情况下,它将尝试创建一个启用了 Hive 支持的 SparkSession (enableHiveSupport = TRUE)。

sparkR.session()

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

SparkDataFrame 操作

SparkDataFrame 支持许多函数来执行结构化数据处理。这里我们包含一些基本示例,完整的列表可以在 API 文档中找到

选择行、列

# Create the SparkDataFrame
df <- as.DataFrame(faithful)

# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]

# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333

# You can also pass in column name as strings
head(select(df, "eruptions"))

# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

分组、聚合

SparkR 数据框支持许多常用的函数,用于在分组后聚合数据。例如,我们可以计算 faithful 数据集中 waiting 时间的直方图,如下所示

# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2

# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

除了标准聚合之外,SparkR 还支持 OLAP 立方体 运算符 cube

head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1  NA 140.8    4     22.8
##2   4  75.7    4     30.4
##3   8 400.0    3     19.2
##4   8 318.0    3     15.5
##5  NA 351.0   NA     15.8
##6  NA 275.8   NA     16.3

rollup

head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1   4  75.7    4     30.4
##2   8 400.0    3     19.2
##3   8 318.0    3     15.5
##4   4  78.7   NA     32.4
##5   8 304.0    3     15.2
##6   4  79.0   NA     27.3

对列进行操作

SparkR 还提供了一些函数,可以直接应用于列,用于数据处理和聚合期间。以下示例展示了基本算术函数的使用。

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

应用用户定义函数

在 SparkR 中,我们支持几种用户定义函数

使用 dapplydapplyCollect 在大型数据集上运行给定函数

dapply

将函数应用于 SparkDataFrame 的每个分区。应用于 SparkDataFrame 每个分区的函数应该只有一个参数,该参数将传递一个对应于每个分区的 data.frame。函数的输出应该是一个 data.frame。模式指定了结果 SparkDataFrame 的行格式。它必须与返回值的 数据类型 匹配。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300
dapplyCollect

类似于 dapply,将函数应用于 SparkDataFrame 的每个分区,并将结果收集回来。函数的输出应该是一个 data.frame。但是,不需要传递模式。请注意,如果在所有分区上运行的 UDF 的输出无法被拉到驱动程序并适合驱动程序内存,则 dapplyCollect 可能会失败。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

使用 gapplygapplyCollect 在大型数据集上运行给定函数,并按输入列分组

gapply

将函数应用于 SparkDataFrame 的每个组。该函数将应用于 SparkDataFrame 的每个组,并且应该只有两个参数:分组键和对应于该键的 R data.frame。组是从 SparkDataFrame 的列中选择的。函数的输出应该是一个 data.frame。模式指定了结果 SparkDataFrame 的行格式。它必须根据 Spark 数据类型 表示 R 函数的输出模式。返回的 data.frame 的列名由用户设置。

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900
gapplyCollect

类似于 gapply,将函数应用于 SparkDataFrame 的每个分区,并将结果收集回 R data.frame。函数的输出应该是一个 data.frame。但是,不需要传递模式。请注意,如果在所有分区上运行的 UDF 的输出无法被拉到驱动程序并适合驱动程序内存,则 gapplyCollect 可能会失败。

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

使用 spark.lapply 运行分布式本地 R 函数

spark.lapply

类似于原生 R 中的 lapplyspark.lapply 在元素列表上运行函数,并使用 Spark 分发计算。以类似于 doParallellapply 的方式将函数应用于列表的元素。所有计算的结果应该适合一台机器。如果不是这种情况,他们可以做类似于 df <- createDataFrame(list) 的事情,然后使用 dapply

# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

急切执行

如果启用了急切执行,则在创建 SparkDataFrame 时,数据将立即返回到 R 客户端。默认情况下,急切执行未启用,可以通过在启动 SparkSession 时将配置属性 spark.sql.repl.eagerEval.enabled 设置为 true 来启用它。

要显示的数据的最大行数和每列的最大字符数可以通过 spark.sql.repl.eagerEval.maxNumRowsspark.sql.repl.eagerEval.truncate 配置属性分别控制。这些属性仅在启用急切执行时有效。如果这些属性没有显式设置,默认情况下,将显示最多 20 行和每列最多 20 个字符的数据。

# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
                                  spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))

# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")

# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2

##+-------+-----+
##|waiting|count|
##+-------+-----+
##|   43.0|    1|
##|   45.0|    3|
##|   46.0|    5|
##|   47.0|    4|
##|   48.0|    3|
##|   49.0|    5|
##|   50.0|    5|
##|   51.0|    6|
##|   52.0|    5|
##|   53.0|    7|
##+-------+-----+
##only showing top 10 rows

请注意,要在 sparkR shell 中启用急切执行,请将 spark.sql.repl.eagerEval.enabled=true 配置属性添加到 --conf 选项。

从 SparkR 运行 SQL 查询

SparkDataFrame 也可以在 Spark SQL 中注册为临时视图,这允许您在其数据上运行 SQL 查询。 sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame 返回。

# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

机器学习

算法

SparkR 目前支持以下机器学习算法

分类

回归

聚类

协同过滤

频繁模式挖掘

统计

在幕后,SparkR 使用 MLlib 训练模型。有关示例代码,请参阅 MLlib 用户指南的相应部分。用户可以调用 summary 打印拟合模型的摘要,predict 对新数据进行预测,以及 write.ml/read.ml 保存/加载拟合模型。SparkR 支持模型拟合中可用的 R 公式运算符的子集,包括“~”、“.”、“:”、“+”和“-”。

模型持久化

以下示例展示了如何通过 SparkR 保存/加载 MLlib 模型。

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)

# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)

unlink(modelPath)
在 Spark 存储库的“examples/src/main/r/ml/ml.R”中找到完整的示例代码。

R 和 Spark 之间的数据类型映射

RSpark
byte byte
integer integer
float float
double double
numeric double
character string
string string
binary binary
raw binary
logical boolean
POSIXct timestamp
POSIXlt timestamp
Date date
array array
list array
env map

结构化流

SparkR 支持结构化流式 API。结构化流式是一种可扩展且容错的流式处理引擎,构建在 Spark SQL 引擎之上。有关更多信息,请参阅 结构化流式编程指南 上的 R API

SparkR 中的 Apache Arrow

Apache Arrow 是一种内存中列式数据格式,用于在 Spark 中在 JVM 和 R 进程之间高效地传输数据。另请参阅 PySpark 完成的优化,PySpark 使用指南,用于使用 Apache Arrow 的 Pandas。本指南旨在解释如何在 SparkR 中使用 Arrow 优化,并提供一些关键要点。

确保安装 Arrow

Arrow R 库在 CRAN 上可用,可以按如下方式安装。

Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'

有关更多详细信息,请参阅 Apache Arrow 的官方文档

请注意,您必须确保 Arrow R 包已安装并在所有集群节点上可用。当前支持的最低版本是 1.0.0;但是,由于 SparkR 中的 Arrow 优化是实验性的,因此这可能会在次要版本之间发生变化。

启用转换为/从 R DataFrame、dapplygapply

当使用 collect(spark_df) 将 Spark DataFrame 转换为 R DataFrame,使用 createDataFrame(r_df) 从 R DataFrame 创建 Spark DataFrame,使用 dapply(...) 将 R 原生函数应用于每个分区,以及使用 gapply(...) 将 R 原生函数应用于分组数据时,可以使用 Arrow 优化。要使用这些功能中的 Arrow,用户需要先将 Spark 配置 ‘spark.sql.execution.arrow.sparkr.enabled’ 设置为 ‘true’。默认情况下,此功能处于禁用状态。

无论优化是否启用,SparkR 都能产生相同的结果。此外,当优化因任何原因在实际计算之前失败时,Spark DataFrame 和 R DataFrame 之间的转换会自动回退到非 Arrow 优化实现。

# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)

# Converts Spark DataFrame to an R DataFrame
collect(spark_df)

# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))

# Apply an R native function to grouped data.
collect(gapply(spark_df,
               "gear",
               function(key, group) {
                 data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
               },
               structType("gear double, disp boolean")))

请注意,即使使用 Arrow,collect(spark_df) 也会导致将 DataFrame 中的所有记录收集到驱动程序程序中,因此应该只对数据的小子集执行此操作。此外,在 gapply(...)dapply(...) 中指定的输出模式应与给定函数返回的 R DataFrame 相匹配。

支持的 SQL 类型

目前,Arrow 基于的转换支持所有 Spark SQL 数据类型,除了 FloatTypeBinaryTypeArrayTypeStructTypeMapType

R 函数名称冲突

在 R 中加载并附加新包时,可能会出现名称 冲突,其中一个函数会屏蔽另一个函数。

以下函数被 SparkR 包屏蔽

被屏蔽的函数如何访问
cov in package:stats
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
filter in package:stats
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
sample in package:base base::sample(x, size, replace = FALSE, prob = NULL)

由于 SparkR 的一部分是根据 dplyr 包建模的,因此 SparkR 中的某些函数与 dplyr 中的函数具有相同的名称。根据这两个包的加载顺序,第一个加载的包中的某些函数会被第二个加载的包中的函数屏蔽。在这种情况下,请在这些调用之前加上包名称,例如 SparkR::cume_dist(x)dplyr::cume_dist(x)

可以使用 search() 检查 R 中的搜索路径。

迁移指南

迁移指南现已存档 在此页面上