协同过滤 - 基于 RDD 的 API
协同过滤
协同过滤 通常用于推荐系统。 这些技术旨在填充用户-项目关联矩阵中缺失的条目。 spark.mllib
当前支持基于模型的协同过滤,其中用户和产品由一小组潜在因子描述,这些因子可用于预测缺失的条目。 spark.mllib
使用 交替最小二乘法 (ALS) 算法来学习这些潜在因子。 spark.mllib
中的实现具有以下参数
- numBlocks 是用于并行计算的块数(设置为 -1 以自动配置)。
- rank 是要使用的特征数(也称为潜在因子数)。
- iterations 是要运行的 ALS 的迭代次数。 ALS 通常在 20 次或更少的迭代中收敛到合理的解决方案。
- lambda 指定 ALS 中的正则化参数。
- implicitPrefs 指定是使用显式反馈 ALS 变体,还是使用适用于隐式反馈数据的变体。
- alpha 是适用于 ALS 隐式反馈变体的参数,它控制偏好观察中的基线置信度。
显式与隐式反馈
基于矩阵分解的协同过滤的标准方法是将用户-项目矩阵中的条目视为用户给予项目的显式偏好,例如,用户给电影评分。
在许多现实世界的用例中,通常只能访问隐式反馈(例如,浏览量、点击次数、购买次数、喜欢、分享等)。 spark.mllib
中处理此类数据的方法取自 隐式反馈数据集的协同过滤。 本质上,这种方法不是试图直接对评分矩阵建模,而是将数据视为表示用户行为观察强度的数字(例如,点击次数或某人观看电影的累计时长)。 然后,这些数字与观察到的用户偏好的置信度相关,而不是给予项目的显式评分。 然后,该模型尝试找到可用于预测用户对项目预期偏好的潜在因子。
正则化参数的缩放
自 v1.1 以来,我们在解决每个最小二乘问题时,通过用户在更新用户因子时生成的评分数量,或产品在更新产品因子时收到的评分数量,来缩放正则化参数 lambda
。 这种方法被称为“ALS-WR”,并在论文“Netflix Prize 的大规模并行协同过滤”中进行了讨论。 它使 lambda
对数据集的规模依赖性降低,因此我们可以将从采样的子集中学习的最佳参数应用于完整数据集,并期望获得类似的性能。
示例
在以下示例中,我们加载评分数据。 每行由用户、产品和评分组成。 我们使用默认的 ALS.train() 方法,该方法假定评分为显式。 我们通过测量评分预测的均方误差来评估推荐。
有关 API 的更多详细信息,请参阅 ALS
Python 文档。
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))\
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
如果评分矩阵是从其他信息来源派生的(即,它是从其他信号推断出来的),则可以使用 trainImplicit 方法来获得更好的结果。
# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, rank, numIterations, alpha=0.01)
在以下示例中,我们加载评分数据。 每行由用户、产品和评分组成。 我们使用默认的 ALS.train() 方法,该方法假定评分为显式。 我们通过测量评分预测的均方误差来评估推荐模型。
有关 API 的更多详细信息,请参阅 ALS
Scala 文档。
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating
// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
Rating(user.toInt, item.toInt, rate.toDouble)
})
// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)
// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
(user, product)
}
val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =>
((user, product), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println(s"Mean Squared Error = $MSE")
// Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
如果评分矩阵是从另一个信息来源派生的(即,它是从其他信号推断出来的),则可以使用 trainImplicit
方法来获得更好的结果。
val alpha = 0.01
val lambda = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)
所有 MLlib 的方法都使用 Java 友好的类型,因此您可以像在 Scala 中一样导入并在那里调用它们。 唯一的注意事项是这些方法采用 Scala RDD 对象,而 Spark Java API 使用单独的 JavaRDD
类。 您可以通过在 JavaRDD
对象上调用 .rdd()
将 Java RDD 转换为 Scala RDD。 下面给出了一个独立的应用程序示例,它等效于 Scala 中提供的示例
有关 API 的更多详细信息,请参阅 ALS
Java 文档。
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;
SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");
JavaSparkContext jsc = new JavaSparkContext(conf);
// Load and parse the data
String path = "data/mllib/als/test.data";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Rating> ratings = data.map(s -> {
String[] sarray = s.split(",");
return new Rating(Integer.parseInt(sarray[0]),
Integer.parseInt(sarray[1]),
Double.parseDouble(sarray[2]));
});
// Build the recommendation model using ALS
int rank = 10;
int numIterations = 10;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
// Evaluate the model on rating data
JavaRDD<Tuple2<Object, Object>> userProducts =
ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
);
JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
.join(predictions).values();
double MSE = ratesAndPreds.mapToDouble(pair -> {
double err = pair._1() - pair._2();
return err * err;
}).mean();
System.out.println("Mean Squared Error = " + MSE);
// Save and load model
model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
"target/tmp/myCollaborativeFilter");
为了运行上述应用程序,请按照 Spark 快速入门指南的独立应用程序部分中提供的说明进行操作。 请务必将 *spark-mllib* 也包含在您的构建文件中作为依赖项。
教程
Spark Summit 2014 的培训练习包括一个实践教程,用于使用 spark.mllib
进行个性化电影推荐。