协同过滤

协同过滤

协同过滤 通常用于推荐系统。这些技术旨在填充用户-项目关联矩阵中的缺失条目。 spark.ml 目前支持基于模型的协同过滤,其中用户和产品由一组小的潜在因素描述,这些因素可用于预测缺失条目。 spark.ml 使用 交替最小二乘 (ALS) 算法来学习这些潜在因素。 spark.ml 中的实现具有以下参数

注意:ALS 的基于 DataFrame 的 API 目前仅支持用户和项目 ID 的整数。其他数字类型支持用户和项目 ID 列,但 ID 必须在整数值范围内。

显式反馈与隐式反馈

基于矩阵分解的协同过滤的标准方法将用户-项目矩阵中的条目视为用户对项目的显式偏好,例如,用户对电影进行评分。

在许多现实世界的用例中,通常只能访问隐式反馈(例如,查看、点击、购买、点赞、分享等)。 spark.ml 中用于处理此类数据的方法取自 隐式反馈数据集的协同过滤。本质上,这种方法不是直接尝试对评分矩阵进行建模,而是将数据视为代表用户行为观察强度的数字(例如,点击次数或某人观看电影的累计时长)。然后,这些数字与对观察到的用户偏好的置信度级别相关联,而不是对项目的显式评分。然后,模型尝试找到可以用来预测用户对项目的预期偏好的潜在因素。

正则化参数的缩放

我们在解决每个最小二乘问题时,通过用户在更新用户因素时生成的评分数量或产品在更新产品因素时收到的评分数量来缩放正则化参数 regParam。这种方法被称为“ALS-WR”,并在论文“用于 Netflix 奖的超大规模并行协同过滤”中讨论。它使 regParam 较少依赖于数据集的规模,因此我们可以将从采样子集中学习到的最佳参数应用于完整数据集,并期望获得类似的性能。

冷启动策略

使用 ALSModel 进行预测时,通常会在测试数据集中遇到在训练模型期间不存在的用户和/或项目。这通常发生在两种情况下

  1. 在生产中,对于没有评分历史记录且模型尚未对其进行训练的新用户或项目(这是“冷启动问题”)。
  2. 在交叉验证期间,数据在训练集和评估集之间进行拆分。当使用 Spark 的 CrossValidatorTrainValidationSplit 中的简单随机拆分时,实际上非常常见的是在评估集中遇到不在训练集中的用户和/或项目

默认情况下,Spark 在 ALSModel.transform 期间分配 NaN 预测,当模型中不存在用户和/或项目因素时。这在生产系统中可能很有用,因为它表明一个新用户或项目,因此系统可以决定使用一些回退作为预测。

但是,这在交叉验证期间是不可取的,因为任何 NaN 预测值将导致评估指标的 NaN 结果(例如,当使用 RegressionEvaluator 时)。这使得模型选择变得不可能。

Spark 允许用户将 coldStartStrategy 参数设置为“drop”,以便删除预测 DataFrame 中包含 NaN 值的任何行。然后,评估指标将在非 NaN 数据上计算,并将是有效的。以下示例说明了此参数的使用。

注意:目前支持的冷启动策略是“nan”(上面提到的默认行为)和“drop”。将来可能会支持更多策略。

示例

在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认情况下假设评分是显式的(implicitPrefsFalse)。我们通过衡量评分预测的均方根误差来评估推荐模型。

有关 API 的更多详细信息,请参阅 ALS Python 文档

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
在 Spark 存储库的“examples/src/main/python/ml/als_example.py”中找到完整的示例代码。

如果评分矩阵来自其他信息源(即从其他信号推断出来),则可以将 implicitPrefs 设置为 True 以获得更好的结果

als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
          userCol="userId", itemCol="movieId", ratingCol="rating")

在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认情况下假设评分是显式的(implicitPrefsfalse)。我们通过衡量评分预测的均方根误差来评估推荐模型。

有关 API 的更多详细信息,请参阅 ALS Scala 文档

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala”中找到完整的示例代码。

如果评分矩阵来自其他信息源(即从其他信号推断出来),则可以将 implicitPrefs 设置为 true 以获得更好的结果

val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")

在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认情况下假设评分是显式的(implicitPrefsfalse)。我们通过衡量评分预测的均方根误差来评估推荐模型。

有关 API 的更多详细信息,请参阅 ALS Java 文档

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;

public static class Rating implements Serializable {
  private int userId;
  private int movieId;
  private float rating;
  private long timestamp;

  public Rating() {}

  public Rating(int userId, int movieId, float rating, long timestamp) {
    this.userId = userId;
    this.movieId = movieId;
    this.rating = rating;
    this.timestamp = timestamp;
  }

  public int getUserId() {
    return userId;
  }

  public int getMovieId() {
    return movieId;
  }

  public float getRating() {
    return rating;
  }

  public long getTimestamp() {
    return timestamp;
  }

  public static Rating parseRating(String str) {
    String[] fields = str.split("::");
    if (fields.length != 4) {
      throw new IllegalArgumentException("Each line must contain 4 fields");
    }
    int userId = Integer.parseInt(fields[0]);
    int movieId = Integer.parseInt(fields[1]);
    float rating = Float.parseFloat(fields[2]);
    long timestamp = Long.parseLong(fields[3]);
    return new Rating(userId, movieId, rating, timestamp);
  }
}

JavaRDD<Rating> ratingsRDD = spark
  .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
  .map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];

// Build the recommendation model using ALS on the training data
ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");
ALSModel model = als.fit(training);

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);

RegressionEvaluator evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);

// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);

// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java”中找到完整的示例代码。

如果评分矩阵来自其他信息源(即从其他信号推断出来),则可以将 implicitPrefs 设置为 true 以获得更好的结果

ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");

有关更多详细信息,请参阅 R API 文档

# Load training data
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0),
             list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(data, c("userId", "movieId", "rating"))
training <- df
test <- df

# Fit a recommendation model using ALS with spark.als
model <- spark.als(training, maxIter = 5, regParam = 0.01, userCol = "userId",
                   itemCol = "movieId", ratingCol = "rating")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/als.R”中找到完整的示例代码。