协同过滤
协同过滤
协同过滤 常用于推荐系统。这些技术旨在填充用户-物品关联矩阵中的缺失项。spark.ml
目前支持基于模型的协同过滤,其中用户和产品通过一小组潜在因子来描述,这些因子可用于预测缺失项。spark.ml
使用 交替最小二乘 (ALS) 算法来学习这些潜在因子。spark.ml
中的实现具有以下参数
- numBlocks 是用户和物品将被分区成的块的数量,以便并行计算(默认为 10)。
- rank 是模型中潜在因子的数量(默认为 10)。
- maxIter 是要运行的最大迭代次数(默认为 10)。
- regParam 指定 ALS 中的正则化参数(默认为 1.0)。
- implicitPrefs 指定是使用 显式反馈 ALS 变体还是适用于 隐式反馈 数据的变体(默认为
false
,这意味着使用 显式反馈)。 - alpha 是适用于 ALS 隐式反馈变体的一个参数,用于控制偏好观察值的 基线 置信度(默认为 1.0)。
- nonnegative 指定是否对最小二乘使用非负约束(默认为
false
)。
注意: ALS 的基于 DataFrame 的 API 目前仅支持整数作为用户和物品 ID。用户和物品 ID 列支持其他数值类型,但 ID 必须在整数值范围内。
显式反馈与隐式反馈
基于矩阵分解的协同过滤的标准方法将用户-物品矩阵中的条目视为用户对物品给出的 显式 偏好,例如,用户给电影评分。
在许多实际用例中,通常只能访问 隐式反馈(例如,视图、点击、购买、点赞、分享等)。spark.ml
中用于处理此类数据的方法源自 隐式反馈数据集的协同过滤。本质上,这种方法不是直接尝试建模评分矩阵,而是将数据视为代表用户行为观察(例如点击次数,或某人观看电影的累计时长)的 强度 的数字。然后,这些数字与观察到的用户偏好的置信度相关联,而不是与给物品的显式评分相关联。模型随后尝试找到可用于预测用户对物品预期偏好的潜在因子。
正则化参数的缩放
在解决每个最小二乘问题时,我们根据用户在更新用户因子时产生的评分数量,或产品在更新产品因子时收到的评分数量,来缩放正则化参数 regParam
。这种方法被称为“ALS-WR”,并在论文“针对 Netflix 竞赛的大规模并行协同过滤”中进行了讨论。它使 regParam
不太依赖于数据集的规模,因此我们可以将从采样子集中学习到的最佳参数应用于完整数据集,并预期获得相似的性能。
冷启动策略
当使用 ALSModel
进行预测时,通常会在测试数据集中遇到在模型训练期间不存在的用户和/或物品。这通常发生在两种情况下:
- 在生产环境中,对于没有评分历史且模型尚未对其进行训练的新用户或物品(这就是“冷启动问题”)。
- 在交叉验证期间,数据在训练集和评估集之间进行拆分。当使用 Spark 的
CrossValidator
或TrainValidationSplit
中的简单随机拆分时,在评估集中遇到不在训练集中的用户和/或物品实际上非常常见。
默认情况下,当用户和/或物品因子在模型中不存在时,Spark 在 ALSModel.transform
期间会分配 NaN
预测值。这在生产系统中可能很有用,因为它表示新的用户或物品,因此系统可以决定使用某种备用方案作为预测。
然而,这在交叉验证期间是不希望的,因为任何 NaN
预测值都将导致评估指标的 NaN
结果(例如,当使用 RegressionEvaluator
时)。这使得模型选择变得不可能。
Spark 允许用户将 coldStartStrategy
参数设置为“drop”,以便丢弃预测 DataFrame
中包含 NaN
值的任何行。然后,评估指标将基于非 NaN
数据进行计算,并且是有效的。此参数的用法在下面的示例中进行了说明。
注意: 目前支持的冷启动策略是“nan”(上面提到的默认行为)和“drop”。未来可能会支持更多策略。
示例
在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认假定评分为显式(implicitPrefs
为 False
)。我们通过测量评分预测的均方根误差来评估推荐模型。
请参阅 ALS
Python 文档 了解更多 API 详情。
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
如果评分矩阵来源于其他信息源(即,它是从其他信号推断出来的),您可以将 implicitPrefs
设置为 True
以获得更好的结果
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
userCol="userId", itemCol="movieId", ratingCol="rating")
在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认假定评分为显式(implicitPrefs
为 false
)。我们通过测量评分预测的均方根误差来评估推荐模型。
请参阅 ALS
Scala 文档 了解更多 API 详情。
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
如果评分矩阵来源于其他信息源(即,它是从其他信号推断出来的),您可以将 implicitPrefs
设置为 true
以获得更好的结果
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认假定评分为显式(implicitPrefs
为 false
)。我们通过测量评分预测的均方根误差来评估推荐模型。
请参阅 ALS
Java 文档 了解更多 API 详情。
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
public static class Rating implements Serializable {
private int userId;
private int movieId;
private float rating;
private long timestamp;
public Rating() {}
public Rating(int userId, int movieId, float rating, long timestamp) {
this.userId = userId;
this.movieId = movieId;
this.rating = rating;
this.timestamp = timestamp;
}
public int getUserId() {
return userId;
}
public int getMovieId() {
return movieId;
}
public float getRating() {
return rating;
}
public long getTimestamp() {
return timestamp;
}
public static Rating parseRating(String str) {
String[] fields = str.split("::");
if (fields.length != 4) {
throw new IllegalArgumentException("Each line must contain 4 fields");
}
int userId = Integer.parseInt(fields[0]);
int movieId = Integer.parseInt(fields[1]);
float rating = Float.parseFloat(fields[2]);
long timestamp = Long.parseLong(fields[3]);
return new Rating(userId, movieId, rating, timestamp);
}
}
JavaRDD<Rating> ratingsRDD = spark
.read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
.map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];
// Build the recommendation model using ALS on the training data
ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating");
ALSModel model = als.fit(training);
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);
RegressionEvaluator evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);
// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);
// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
如果评分矩阵来源于其他信息源(即,它是从其他信号推断出来的),您可以将 implicitPrefs
设置为 true
以获得更好的结果
ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating");
请参阅 R API 文档 了解更多详情。
# Load training data
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0),
list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(data, c("userId", "movieId", "rating"))
training <- df
test <- df
# Fit a recommendation model using ALS with spark.als
model <- spark.als(training, maxIter = 5, regParam = 0.01, userCol = "userId",
itemCol = "movieId", ratingCol = "rating")
# Model summary
summary(model)
# Prediction
predictions <- predict(model, test)
head(predictions)