聚类 - 基于 RDD 的 API
聚类是一种无监督学习问题,我们的目标是根据某种相似性概念将实体子集相互分组。聚类通常用于探索性分析和/或作为分层有监督学习管道的一个组成部分(其中针对每个聚类训练不同的分类器或回归模型)。
spark.mllib 包支持以下模型
K-means
K-means是最常用的聚类算法之一,它将数据点聚类到预定义数量的聚类中。spark.mllib的实现包含k-means++方法的一个并行化变体,称为kmeans||。spark.mllib中的实现具有以下参数
- k 是所需的聚类数。请注意,返回的聚类数可能少于 k,例如,如果可供聚类的不同点少于 k 个。
- maxIterations 是运行的最大迭代次数。
- initializationMode 指定随机初始化或通过 k-means|| 进行初始化。
- runs 此参数自 Spark 2.0.0 起无效。
- initializationSteps 决定 k-means|| 算法中的步数。
- epsilon 决定了我们认为 k-means 已收敛的距离阈值。
- initialModel 是一个可选的聚类中心集合,用于初始化。如果提供此参数,则只执行一次运行。
示例
以下示例可在 PySpark shell 中进行测试。
在以下示例中,加载并解析数据后,我们使用 KMeans 对象将数据聚类成两个聚类。所需聚类数会传递给算法。然后我们计算簇内平方误差和 (WSSSE)。您可以通过增加 k 来减少此误差度量。事实上,最佳 k 通常是 WSSSE 图中出现“肘部”的位置。
有关 API 的更多详细信息,请参阅 KMeans Python 文档和KMeansModel Python 文档。
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")以下代码片段可在 spark-shell 中执行。
在以下示例中,加载并解析数据后,我们使用 KMeans 对象将数据聚类成两个聚类。所需聚类数会传递给算法。然后我们计算簇内平方误差和 (WSSSE)。您可以通过增加 k 来减少此误差度量。事实上,最佳 k 通常是 WSSSE 图中出现“肘部”的位置。
有关 API 的详细信息,请参阅 KMeans Scala 文档和KMeansModel Scala 文档。
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Errors = $WSSSE")
// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")MLlib 的所有方法都使用 Java 友好类型,因此您可以像在 Scala 中一样导入和调用它们。唯一的注意事项是,这些方法接受 Scala RDD 对象,而 Spark Java API 使用单独的 JavaRDD 类。您可以通过在您的 JavaRDD 对象上调用 .rdd() 将 Java RDD 转换为 Scala RDD。下面是与 Scala 中提供的示例等效的独立应用程序示例
有关 API 的详细信息,请参阅 KMeans Java 文档和KMeansModel Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
"target/org/apache/spark/JavaKMeansExample/KMeansModel");高斯混合
一个高斯混合模型表示一个复合分布,其中点是从 k 个高斯子分布中的一个抽取的,每个子分布都有自己的概率。spark.mllib 的实现使用期望最大化算法,根据给定的一组样本推导出最大似然模型。该实现具有以下参数
- k 是所需聚类数。
- convergenceTol 是对数似然的最大变化,达到此变化时我们认为已实现收敛。
- maxIterations 是在未达到收敛时执行的最大迭代次数。
- initialModel 是一个可选的起始点,用于启动 EM 算法。如果省略此参数,则将从数据中构建一个随机起始点。
示例
在以下示例中,加载并解析数据后,我们使用 GaussianMixture 对象将数据聚类成两个聚类。所需聚类数会传递给算法。然后我们输出混合模型的参数。
有关 API 的更多详细信息,请参阅 GaussianMixture Python 文档和GaussianMixtureModel Python 文档。
from numpy import array
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)
# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
.load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
# output parameters of model
for i in range(2):
print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())在以下示例中,加载并解析数据后,我们使用 GaussianMixture 对象将数据聚类成两个聚类。所需聚类数会传递给算法。然后我们输出混合模型的参数。
有关 API 的详细信息,请参阅 GaussianMixture Scala 文档和GaussianMixtureModel Scala 文档。
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)
// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
"target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}MLlib 的所有方法都使用 Java 友好类型,因此您可以像在 Scala 中一样导入和调用它们。唯一的注意事项是,这些方法接受 Scala RDD 对象,而 Spark Java API 使用单独的 JavaRDD 类。您可以通过在您的 JavaRDD 对象上调用 .rdd() 将 Java RDD 转换为 Scala RDD。下面是与 Scala 中提供的示例等效的独立应用程序示例
有关 API 的详细信息,请参阅 GaussianMixture Java 文档和GaussianMixtureModel Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
"target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}幂迭代聚类 (PIC)
幂迭代聚类 (PIC) 是一种可扩展且高效的算法,用于根据成对相似性作为边属性来聚类图的顶点,该算法在 Lin 和 Cohen 的《幂迭代聚类》中有所描述。它通过幂迭代计算图的归一化亲和矩阵的伪特征向量,并用其来聚类顶点。spark.mllib 包含一个使用 GraphX 作为其后端实现的 PIC。它接受一个由 (srcId, dstId, similarity) 元组组成的 RDD,并输出一个包含聚类分配的模型。相似性必须是非负的。PIC 假定相似性度量是对称的。无论顺序如何,一对 (srcId, dstId) 在输入数据中最多只能出现一次。如果输入中缺少一对,则它们的相似性被视为零。spark.mllib 的 PIC 实现接受以下(超)参数
k:聚类数maxIterations:最大幂迭代次数initializationMode:初始化模型。可以是默认的“random”,即使用随机向量作为顶点属性,或者“degree”,即使用归一化和相似性。
示例
下面,我们展示代码片段,演示如何在 spark.mllib 中使用 PIC。
PowerIterationClustering 实现了 PIC 算法。它接受一个由 (srcId: Long, dstId: Long, similarity: Double) 元组组成的 RDD,代表亲和矩阵。调用 PowerIterationClustering.run 会返回一个 PowerIterationClusteringModel,其中包含计算出的聚类分配。
有关 API 的更多详细信息,请参阅 PowerIterationClustering Python 文档和PowerIterationClusteringModel Python 文档。
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)
model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
.load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")PowerIterationClustering 实现了 PIC 算法。它接受一个由 (srcId: Long, dstId: Long, similarity: Double) 元组组成的 RDD,代表亲和矩阵。调用 PowerIterationClustering.run 会返回一个 PowerIterationClusteringModel,其中包含计算出的聚类分配。
有关 API 的详细信息,请参阅 PowerIterationClustering Scala 文档和PowerIterationClusteringModel Scala 文档。
import org.apache.spark.mllib.clustering.PowerIterationClustering
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)
val clusters = model.assignments.collect().groupBy(_.cluster).transform((_, v) => v.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(", ")
val sizesStr = assignments.map {
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")PowerIterationClustering 实现了 PIC 算法。它接受一个由 (srcId: Long, dstId: Long, similarity: Double) 元组组成的 JavaRDD,代表亲和矩阵。调用 PowerIterationClustering.run 会返回一个 PowerIterationClusteringModel,其中包含计算出的聚类分配。
有关 API 的详细信息,请参阅 PowerIterationClustering Java 文档和PowerIterationClusteringModel Java 文档。
import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
new Tuple3<>(0L, 1L, 0.9),
new Tuple3<>(1L, 2L, 0.9),
new Tuple3<>(2L, 3L, 0.9),
new Tuple3<>(3L, 4L, 0.1),
new Tuple3<>(4L, 5L, 0.9)));
PowerIterationClustering pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}潜在狄利克雷分配 (LDA)
潜在狄利克雷分配 (LDA) 是一种主题模型,它从文本文档集合中推断主题。LDA 可以被视为一种聚类算法,具体如下
- 主题对应于聚类中心,文档对应于数据集中的示例(行)。
- 主题和文档都存在于特征空间中,其中特征向量是词频向量(词袋)。
- LDA 不使用传统距离来估计聚类,而是使用基于文本文档生成方式的统计模型函数。
LDA 通过 setOptimizer 函数支持不同的推断算法。EMLDAOptimizer 使用在似然函数上进行的期望最大化来学习聚类,并产生全面的结果,而 OnlineLDAOptimizer 则使用迭代小批量采样进行在线变分推断,并且通常内存友好。
LDA 接收一个作为词频向量的文档集合,以及以下参数(使用构建器模式设置)
k:主题数(即聚类中心)optimizer:用于学习 LDA 模型的优化器,可以是EMLDAOptimizer或OnlineLDAOptimizerdocConcentration:文档在主题上的分布先验的狄利克雷参数。值越大,鼓励推断分布越平滑。topicConcentration:主题在词项(词)上的分布先验的狄利克雷参数。值越大,鼓励推断分布越平滑。maxIterations:迭代次数限制。checkpointInterval:如果使用检查点(在 Spark 配置中设置),此参数指定创建检查点的频率。如果maxIterations很大,使用检查点可以帮助减少磁盘上的混洗文件大小,并有助于故障恢复。
所有 spark.mllib 的 LDA 模型都支持
describeTopics:以最重要的词项及其权重的数组形式返回主题topicsMatrix:返回一个vocabSize乘k的矩阵,其中每列都是一个主题
注意:LDA 仍是正在积极开发中的实验性功能。因此,某些功能仅在由优化器生成的两种优化器/模型中的一种中可用。目前,分布式模型可以转换为本地模型,但反之则不行。
以下讨论将分别描述每对优化器/模型。
期望最大化
在 EMLDAOptimizer 和 DistributedLDAModel 中实现。
对于提供给 LDA 的参数
docConcentration:只支持对称先验,因此提供的k维向量中的所有值必须相同。所有值也必须 $> 1.0$。提供Vector(-1)会导致默认行为(值为 $(50 / k) + 1$ 的均匀k维向量)。topicConcentration:只支持对称先验。值必须 $> 1.0$。提供-1会导致默认值为 $0.1 + 1$。maxIterations:EM 迭代的最大次数。
注意:进行足够的迭代很重要。在早期迭代中,EM 通常会有无用主题,但在更多迭代后这些主题会显著改善。根据您的数据集,使用至少 20 次,甚至可能 50-100 次迭代通常是合理的。
EMLDAOptimizer 产生一个 DistributedLDAModel,它不仅存储推断出的主题,还存储完整的训练语料库以及训练语料库中每个文档的主题分布。DistributedLDAModel 支持
topTopicsPerDocument:训练语料库中每个文档的顶部主题及其权重topDocumentsPerTopic:每个主题的顶部文档以及该主题在文档中的相应权重。logPrior:给定超参数docConcentration和topicConcentration,估计主题和文档-主题分布的对数概率logLikelihood:给定推断出的主题和文档-主题分布,训练语料库的对数似然
在线变分贝叶斯
在 OnlineLDAOptimizer 和 LocalLDAModel 中实现。
对于提供给 LDA 的参数
docConcentration:可以通过传入一个向量来使用非对称先验,该向量中每个k维度的值等于狄利克雷参数。值应 $>= 0$。提供Vector(-1)会导致默认行为(值为 $(1.0 / k)$ 的均匀k维向量)。topicConcentration:只支持对称先验。值必须 $>= 0$。提供-1会导致默认值为 $(1.0 / k)$。maxIterations:要提交的小批量(minibatches)的最大数量。
此外,OnlineLDAOptimizer 接受以下参数
miniBatchFraction:每次迭代中采样并使用的语料库分数optimizeDocConcentration:如果设置为 true,则在每个小批量后对超参数docConcentration(又称alpha)执行最大似然估计,并在返回的LocalLDAModel中设置优化的docConcentrationtau0和kappa:用于学习率衰减,学习率衰减由 $(\tau_0 + iter)^{-\kappa}$ 计算,其中 $iter$ 是当前迭代次数。
OnlineLDAOptimizer 产生一个 LocalLDAModel,它只存储推断出的主题。LocalLDAModel 支持
logLikelihood(documents):计算给定推断主题的所提供documents的下界。logPerplexity(documents):计算给定推断主题的所提供documents困惑度的上界。
示例
在以下示例中,我们加载代表文档语料库的词频向量。然后我们使用 LDA 从文档中推断出三个主题。所需聚类数会传递给算法。然后我们输出主题,主题表示为词上的概率分布。
有关 API 的更多详细信息,请参阅 LDA Python 文档和LDAModel Python 文档。
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
.load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")有关 API 的详细信息,请参阅 LDA Scala 文档和DistributedLDAModel Scala 文档。
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex().map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println(s"Learned topics (as distributions over vocab of ${ldaModel.vocabSize} words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print(s"Topic $topic :")
for (word <- Range(0, ldaModel.vocabSize)) {
print(s"${topics(word, topic)}")
}
println()
}
// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
"target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")有关 API 的详细信息,请参阅 LDA Java 文档和DistributedLDAModel Java 文档。
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();
// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);
// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
ldaModel.save(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");二分 k-means
二分 K-means 通常比常规 K-means 快得多,但它通常会产生不同的聚类结果。
二分 k-means 是一种分层聚类。分层聚类是聚类分析最常用的方法之一,旨在构建聚类层次结构。分层聚类的策略通常分为两种类型
- 凝聚式 (Agglomerative):这是一种“自下而上”的方法:每个观察值都从自己的聚类开始,随着层次结构向上移动,成对的聚类被合并。
- 分裂式 (Divisive):这是一种“自上而下”的方法:所有观察值都从一个聚类开始,随着层次结构向下移动,递归地执行分裂。
二分 k-means 算法是一种分裂式算法。MLlib 中的实现具有以下参数
- k: 所需的叶聚类数(默认值:4)。如果没有可分裂的叶聚类,实际数量可能会更小。
- maxIterations: 分裂聚类的最大 k-means 迭代次数(默认值:20)
- minDivisibleClusterSize: 可分裂聚类的最小点数(如果 >= 1.0)或最小点比例(如果 < 1.0)(默认值:1)
- seed: 随机种子(默认值:类名的哈希值)
示例
有关 API 的更多详细信息,请参阅 BisectingKMeans Python 文档和BisectingKMeansModel Python 文档。
from numpy import array
from pyspark.mllib.clustering import BisectingKMeans
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)
# Evaluate clustering
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))有关 API 的详细信息,请参阅 BisectingKMeans Scala 文档和BisectingKMeansModel Scala 文档。
import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)
// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
println(s"Cluster Center ${idx}: ${center}")
}有关 API 的详细信息,请参阅 BisectingKMeans Java 文档和BisectingKMeansModel Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
List<Vector> localData = Arrays.asList(
Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3),
Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);
BisectingKMeans bkm = new BisectingKMeans()
.setK(4);
BisectingKMeansModel model = bkm.run(data);
System.out.println("Compute Cost: " + model.computeCost(data));
Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
System.out.println("Cluster Center " + i + ": " + clusterCenter);
}流式 k-means
当数据以流式方式到达时,我们可能希望动态估计聚类,并随着新数据到达而更新它们。spark.mllib 提供对流式 k-means 聚类的支持,带有控制估计值衰减(或“遗忘”)的参数。该算法使用迷你批量 k-means 更新规则的推广形式。对于每个数据批次,我们将所有点分配给其最近的聚类,计算新的聚类中心,然后使用以下公式更新每个聚类
\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation} \begin{equation} n_{t+1} = n_t + m_t \end{equation}
其中 $c_t$ 是该聚类的前一个中心,$n_t$ 是迄今为止分配给该聚类的点数,$x_t$ 是当前批次中的新聚类中心,$m_t$ 是当前批次中添加到该聚类的点数。衰减因子 $\alpha$ 可用于忽略过去:当 $\alpha$=1 时,所有数据都将从头开始使用;当 $\alpha$=0 时,只使用最新数据。这类似于指数加权移动平均。
衰减可以使用 halfLife 参数指定,该参数确定正确的衰减因子 a,使得对于在时间 t 获取的数据,其在时间 t + halfLife 的贡献将下降到 0.5。时间单位可以指定为 batches(批次)或 points(点),更新规则将相应调整。
示例
此示例演示如何在流式数据上估计聚类。
有关 API 的更多详细信息,请参阅 StreamingKMeans Python 文档。有关 StreamingContext 的详细信息,请参阅Spark Streaming 编程指南。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(')')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
trainingQueue = [trainingData]
testingQueue = [testingData]
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)有关 API 的详细信息,请参阅 StreamingKMeans Scala 文档。有关 StreamingContext 的详细信息,请参阅Spark Streaming 编程指南。
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt, 0.0)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()当您添加带有数据的新文本文件时,聚类中心将更新。每个训练点应格式化为 [x1, x2, x3],每个测试数据点应格式化为 (y, [x1, x2, x3]),其中 y 是某个有用的标签或标识符(例如真实类别分配)。任何时候将文本文件放置在 /training/data/dir 中,模型都会更新。任何时候将文本文件放置在 /testing/data/dir 中,您都将看到预测。随着新数据的到来,聚类中心将发生变化!