协同过滤

协同过滤

协同过滤 常用于推荐系统。这些技术旨在填充用户-物品关联矩阵中的缺失项。spark.ml 目前支持基于模型的协同过滤,其中用户和产品通过一小组潜在因子来描述,这些因子可用于预测缺失项。spark.ml 使用 交替最小二乘 (ALS) 算法来学习这些潜在因子。spark.ml 中的实现具有以下参数

注意: ALS 的基于 DataFrame 的 API 目前仅支持整数作为用户和物品 ID。用户和物品 ID 列支持其他数值类型,但 ID 必须在整数值范围内。

显式反馈与隐式反馈

基于矩阵分解的协同过滤的标准方法将用户-物品矩阵中的条目视为用户对物品给出的 显式 偏好,例如,用户给电影评分。

在许多实际用例中,通常只能访问 隐式反馈(例如,视图、点击、购买、点赞、分享等)。spark.ml 中用于处理此类数据的方法源自 隐式反馈数据集的协同过滤。本质上,这种方法不是直接尝试建模评分矩阵,而是将数据视为代表用户行为观察(例如点击次数,或某人观看电影的累计时长)的 强度 的数字。然后,这些数字与观察到的用户偏好的置信度相关联,而不是与给物品的显式评分相关联。模型随后尝试找到可用于预测用户对物品预期偏好的潜在因子。

正则化参数的缩放

在解决每个最小二乘问题时,我们根据用户在更新用户因子时产生的评分数量,或产品在更新产品因子时收到的评分数量,来缩放正则化参数 regParam。这种方法被称为“ALS-WR”,并在论文“针对 Netflix 竞赛的大规模并行协同过滤”中进行了讨论。它使 regParam 不太依赖于数据集的规模,因此我们可以将从采样子集中学习到的最佳参数应用于完整数据集,并预期获得相似的性能。

冷启动策略

当使用 ALSModel 进行预测时,通常会在测试数据集中遇到在模型训练期间不存在的用户和/或物品。这通常发生在两种情况下:

  1. 在生产环境中,对于没有评分历史且模型尚未对其进行训练的新用户或物品(这就是“冷启动问题”)。
  2. 在交叉验证期间,数据在训练集和评估集之间进行拆分。当使用 Spark 的 CrossValidatorTrainValidationSplit 中的简单随机拆分时,在评估集中遇到不在训练集中的用户和/或物品实际上非常常见。

默认情况下,当用户和/或物品因子在模型中不存在时,Spark 在 ALSModel.transform 期间会分配 NaN 预测值。这在生产系统中可能很有用,因为它表示新的用户或物品,因此系统可以决定使用某种备用方案作为预测。

然而,这在交叉验证期间是不希望的,因为任何 NaN 预测值都将导致评估指标的 NaN 结果(例如,当使用 RegressionEvaluator 时)。这使得模型选择变得不可能。

Spark 允许用户将 coldStartStrategy 参数设置为“drop”,以便丢弃预测 DataFrame 中包含 NaN 值的任何行。然后,评估指标将基于非 NaN 数据进行计算,并且是有效的。此参数的用法在下面的示例中进行了说明。

注意: 目前支持的冷启动策略是“nan”(上面提到的默认行为)和“drop”。未来可能会支持更多策略。

示例

在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认假定评分为显式(implicitPrefsFalse)。我们通过测量评分预测的均方根误差来评估推荐模型。

请参阅 ALS Python 文档 了解更多 API 详情。

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
完整示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/als_example.py"。

如果评分矩阵来源于其他信息源(即,它是从其他信号推断出来的),您可以将 implicitPrefs 设置为 True 以获得更好的结果

als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
          userCol="userId", itemCol="movieId", ratingCol="rating")

在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认假定评分为显式(implicitPrefsfalse)。我们通过测量评分预测的均方根误差来评估推荐模型。

请参阅 ALS Scala 文档 了解更多 API 详情。

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
完整示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala"。

如果评分矩阵来源于其他信息源(即,它是从其他信号推断出来的),您可以将 implicitPrefs 设置为 true 以获得更好的结果

val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")

在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认假定评分为显式(implicitPrefsfalse)。我们通过测量评分预测的均方根误差来评估推荐模型。

请参阅 ALS Java 文档 了解更多 API 详情。

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;

public static class Rating implements Serializable {
  private int userId;
  private int movieId;
  private float rating;
  private long timestamp;

  public Rating() {}

  public Rating(int userId, int movieId, float rating, long timestamp) {
    this.userId = userId;
    this.movieId = movieId;
    this.rating = rating;
    this.timestamp = timestamp;
  }

  public int getUserId() {
    return userId;
  }

  public int getMovieId() {
    return movieId;
  }

  public float getRating() {
    return rating;
  }

  public long getTimestamp() {
    return timestamp;
  }

  public static Rating parseRating(String str) {
    String[] fields = str.split("::");
    if (fields.length != 4) {
      throw new IllegalArgumentException("Each line must contain 4 fields");
    }
    int userId = Integer.parseInt(fields[0]);
    int movieId = Integer.parseInt(fields[1]);
    float rating = Float.parseFloat(fields[2]);
    long timestamp = Long.parseLong(fields[3]);
    return new Rating(userId, movieId, rating, timestamp);
  }
}

JavaRDD<Rating> ratingsRDD = spark
  .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
  .map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];

// Build the recommendation model using ALS on the training data
ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");
ALSModel model = als.fit(training);

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);

RegressionEvaluator evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);

// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);

// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
完整示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java"。

如果评分矩阵来源于其他信息源(即,它是从其他信号推断出来的),您可以将 implicitPrefs 设置为 true 以获得更好的结果

ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");

请参阅 R API 文档 了解更多详情。

# Load training data
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0),
             list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(data, c("userId", "movieId", "rating"))
training <- df
test <- df

# Fit a recommendation model using ALS with spark.als
model <- spark.als(training, maxIter = 5, regParam = 0.01, userCol = "userId",
                   itemCol = "movieId", ratingCol = "rating")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完整示例代码位于 Spark 仓库中的 "examples/src/main/r/ml/als.R"。