协同过滤
协同过滤
协同过滤 通常用于推荐系统。这些技术旨在填充用户-项目关联矩阵中的缺失条目。 spark.ml
目前支持基于模型的协同过滤,其中用户和产品由一组小的潜在因素描述,这些因素可用于预测缺失条目。 spark.ml
使用 交替最小二乘 (ALS) 算法来学习这些潜在因素。 spark.ml
中的实现具有以下参数
- numBlocks 是用户和项目将被划分为的块数,以便并行化计算(默认为 10)。
- rank 是模型中潜在因素的数量(默认为 10)。
- maxIter 是要运行的最大迭代次数(默认为 10)。
- regParam 指定 ALS 中的正则化参数(默认为 1.0)。
- implicitPrefs 指定是使用显式反馈 ALS 变体还是一个适用于隐式反馈数据的变体(默认为
false
,这意味着使用显式反馈)。 - alpha 是适用于 ALS 的隐式反馈变体的参数,它控制对偏好观察的基线置信度(默认为 1.0)。
- nonnegative 指定是否对最小二乘使用非负约束(默认为
false
)。
注意:ALS 的基于 DataFrame 的 API 目前仅支持用户和项目 ID 的整数。其他数字类型支持用户和项目 ID 列,但 ID 必须在整数值范围内。
显式反馈与隐式反馈
基于矩阵分解的协同过滤的标准方法将用户-项目矩阵中的条目视为用户对项目的显式偏好,例如,用户对电影进行评分。
在许多现实世界的用例中,通常只能访问隐式反馈(例如,查看、点击、购买、点赞、分享等)。 spark.ml
中用于处理此类数据的方法取自 隐式反馈数据集的协同过滤。本质上,这种方法不是直接尝试对评分矩阵进行建模,而是将数据视为代表用户行为观察强度的数字(例如,点击次数或某人观看电影的累计时长)。然后,这些数字与对观察到的用户偏好的置信度级别相关联,而不是对项目的显式评分。然后,模型尝试找到可以用来预测用户对项目的预期偏好的潜在因素。
正则化参数的缩放
我们在解决每个最小二乘问题时,通过用户在更新用户因素时生成的评分数量或产品在更新产品因素时收到的评分数量来缩放正则化参数 regParam
。这种方法被称为“ALS-WR”,并在论文“用于 Netflix 奖的超大规模并行协同过滤”中讨论。它使 regParam
较少依赖于数据集的规模,因此我们可以将从采样子集中学习到的最佳参数应用于完整数据集,并期望获得类似的性能。
冷启动策略
使用 ALSModel
进行预测时,通常会在测试数据集中遇到在训练模型期间不存在的用户和/或项目。这通常发生在两种情况下
- 在生产中,对于没有评分历史记录且模型尚未对其进行训练的新用户或项目(这是“冷启动问题”)。
- 在交叉验证期间,数据在训练集和评估集之间进行拆分。当使用 Spark 的
CrossValidator
或TrainValidationSplit
中的简单随机拆分时,实际上非常常见的是在评估集中遇到不在训练集中的用户和/或项目
默认情况下,Spark 在 ALSModel.transform
期间分配 NaN
预测,当模型中不存在用户和/或项目因素时。这在生产系统中可能很有用,因为它表明一个新用户或项目,因此系统可以决定使用一些回退作为预测。
但是,这在交叉验证期间是不可取的,因为任何 NaN
预测值将导致评估指标的 NaN
结果(例如,当使用 RegressionEvaluator
时)。这使得模型选择变得不可能。
Spark 允许用户将 coldStartStrategy
参数设置为“drop”,以便删除预测 DataFrame
中包含 NaN
值的任何行。然后,评估指标将在非 NaN
数据上计算,并将是有效的。以下示例说明了此参数的使用。
注意:目前支持的冷启动策略是“nan”(上面提到的默认行为)和“drop”。将来可能会支持更多策略。
示例
在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认情况下假设评分是显式的(implicitPrefs
为 False
)。我们通过衡量评分预测的均方根误差来评估推荐模型。
有关 API 的更多详细信息,请参阅 ALS
Python 文档。
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
如果评分矩阵来自其他信息源(即从其他信号推断出来),则可以将 implicitPrefs
设置为 True
以获得更好的结果
在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认情况下假设评分是显式的(implicitPrefs
为 false
)。我们通过衡量评分预测的均方根误差来评估推荐模型。
有关 API 的更多详细信息,请参阅 ALS
Scala 文档。
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
如果评分矩阵来自其他信息源(即从其他信号推断出来),则可以将 implicitPrefs
设置为 true
以获得更好的结果
在以下示例中,我们从 MovieLens 数据集 加载评分数据,每行包含一个用户、一部电影、一个评分和一个时间戳。然后,我们训练一个 ALS 模型,该模型默认情况下假设评分是显式的(implicitPrefs
为 false
)。我们通过衡量评分预测的均方根误差来评估推荐模型。
有关 API 的更多详细信息,请参阅 ALS
Java 文档。
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
public static class Rating implements Serializable {
private int userId;
private int movieId;
private float rating;
private long timestamp;
public Rating() {}
public Rating(int userId, int movieId, float rating, long timestamp) {
this.userId = userId;
this.movieId = movieId;
this.rating = rating;
this.timestamp = timestamp;
}
public int getUserId() {
return userId;
}
public int getMovieId() {
return movieId;
}
public float getRating() {
return rating;
}
public long getTimestamp() {
return timestamp;
}
public static Rating parseRating(String str) {
String[] fields = str.split("::");
if (fields.length != 4) {
throw new IllegalArgumentException("Each line must contain 4 fields");
}
int userId = Integer.parseInt(fields[0]);
int movieId = Integer.parseInt(fields[1]);
float rating = Float.parseFloat(fields[2]);
long timestamp = Long.parseLong(fields[3]);
return new Rating(userId, movieId, rating, timestamp);
}
}
JavaRDD<Rating> ratingsRDD = spark
.read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
.map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];
// Build the recommendation model using ALS on the training data
ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating");
ALSModel model = als.fit(training);
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);
RegressionEvaluator evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);
// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);
// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
如果评分矩阵来自其他信息源(即从其他信号推断出来),则可以将 implicitPrefs
设置为 true
以获得更好的结果
有关更多详细信息,请参阅 R API 文档。
# Load training data
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0),
list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(data, c("userId", "movieId", "rating"))
training <- df
test <- df
# Fit a recommendation model using ALS with spark.als
model <- spark.als(training, maxIter = 5, regParam = 0.01, userCol = "userId",
itemCol = "movieId", ratingCol = "rating")
# Model summary
summary(model)
# Prediction
predictions <- predict(model, test)
head(predictions)