协同过滤 - 基于 RDD 的 API

协同过滤

协同过滤常用于推荐系统。这些技术旨在填充用户-物品关联矩阵中的缺失条目。spark.mllib 目前支持基于模型的协同过滤,其中用户和产品由少量潜在因子描述,可用于预测缺失条目。spark.mllib 使用交替最小二乘法 (ALS) 算法来学习这些潜在因子。spark.mllib 中的实现具有以下参数:

显式反馈与隐式反馈

基于矩阵分解的协同过滤的标准方法将用户-物品矩阵中的条目视为用户对物品给出的显式偏好,例如,用户对电影的评分。

在许多实际用例中,通常只能获取隐式反馈(例如,浏览量、点击量、购买、点赞、分享等)。spark.mllib 中处理此类数据的方法来源于《Implicit Feedback Datasets 的协同过滤》。本质上,这种方法不是直接尝试建模评分矩阵,而是将数据视为表示用户行为观察中强度的数字(例如点击次数,或某人观看电影的累计时长)。这些数字与观察到的用户偏好的置信度水平相关,而不是物品的显式评分。模型随后尝试寻找可用于预测用户对物品的预期偏好的潜在因子。

正则化参数的缩放

从 v1.1 版本开始,我们在解决每个最小二乘问题时,根据更新用户因子时用户产生的评分数量,或更新产品因子时产品收到的评分数量,对正则化参数lambda进行缩放。这种方法被称为“ALS-WR”,并在论文《针对 Netflix 大奖的大规模并行协同过滤》中进行了讨论。它使得lambda对数据集的规模依赖性降低,因此我们可以将从抽样子集中学到的最佳参数应用于完整数据集,并期望获得相似的性能。

示例

在以下示例中,我们加载评分数据。每行包含一个用户、一个产品和一个评分。我们使用默认的 ALS.train() 方法,该方法假定评分为显式。我们通过测量评分预测的均方误差来评估推荐。

有关 API 的更多详细信息,请参阅ALS Python 文档

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
完整的示例代码可在 Spark 仓库中的 "examples/src/main/python/mllib/recommendation_example.py" 找到。

如果评分矩阵来源于其他信息源(即,它是从其他信号推断出来的),您可以使用 `trainImplicit` 方法获得更好的结果。

# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, rank, numIterations, alpha=0.01)

在以下示例中,我们加载评分数据。每行包含一个用户、一个产品和一个评分。我们使用默认的ALS.train()方法,该方法假定评分为显式。我们通过测量评分预测的均方误差来评估推荐模型。

有关 API 的更多详细信息,请参阅ALS Scala 文档

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
  Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions =
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()
println(s"Mean Squared Error = $MSE")

// Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
完整的示例代码可在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/mllib/RecommendationExample.scala" 找到。

如果评分矩阵来源于另一个信息源(即,它是从其他信号推断出来的),您可以使用trainImplicit方法获得更好的结果。

val alpha = 0.01
val lambda = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)

MLlib 的所有方法都使用 Java 友好型类型,因此您可以在 Java 中像在 Scala 中一样导入和调用它们。唯一的注意事项是,这些方法接受 Scala RDD 对象,而 Spark Java API 使用单独的JavaRDD类。您可以通过在JavaRDD对象上调用.rdd()将 Java RDD 转换为 Scala RDD。下面提供了一个与 Scala 中提供的示例等效的独立应用程序示例。

有关 API 的更多详细信息,请参阅ALS Java 文档

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;

SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Load and parse the data
String path = "data/mllib/als/test.data";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Rating> ratings = data.map(s -> {
  String[] sarray = s.split(",");
  return new Rating(Integer.parseInt(sarray[0]),
    Integer.parseInt(sarray[1]),
    Double.parseDouble(sarray[2]));
});

// Build the recommendation model using ALS
int rank = 10;
int numIterations = 10;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);

// Evaluate the model on rating data
JavaRDD<Tuple2<Object, Object>> userProducts =
  ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
  model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
      .map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
);
JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
    ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
  .join(predictions).values();
double MSE = ratesAndPreds.mapToDouble(pair -> {
  double err = pair._1() - pair._2();
  return err * err;
}).mean();
System.out.println("Mean Squared Error = " + MSE);

// Save and load model
model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
  "target/tmp/myCollaborativeFilter");
完整的示例代码可在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java" 找到。

为了运行上述应用程序,请遵循 Spark 快速入门指南中独立应用程序部分提供的说明。请务必将 spark-mllib 作为依赖项包含到您的构建文件中。

教程

Spark 峰会 2014 的培训练习包括一个使用spark.mllib进行个性化电影推荐的实践教程。