聚类 - 基于 RDD 的 API
聚类是一种无监督学习问题,我们的目标是根据某种相似性概念将实体子集相互分组。聚类通常用于探索性分析和/或作为分层有监督学习管道的一个组成部分(其中针对每个聚类训练不同的分类器或回归模型)。
spark.mllib
包支持以下模型
K-means
K-means是最常用的聚类算法之一,它将数据点聚类到预定义数量的聚类中。spark.mllib
的实现包含k-means++方法的一个并行化变体,称为kmeans||。spark.mllib
中的实现具有以下参数
- k 是所需的聚类数。请注意,返回的聚类数可能少于 k,例如,如果可供聚类的不同点少于 k 个。
- maxIterations 是运行的最大迭代次数。
- initializationMode 指定随机初始化或通过 k-means|| 进行初始化。
- runs 此参数自 Spark 2.0.0 起无效。
- initializationSteps 决定 k-means|| 算法中的步数。
- epsilon 决定了我们认为 k-means 已收敛的距离阈值。
- initialModel 是一个可选的聚类中心集合,用于初始化。如果提供此参数,则只执行一次运行。
示例
以下示例可在 PySpark shell 中进行测试。
在以下示例中,加载并解析数据后,我们使用 KMeans 对象将数据聚类成两个聚类。所需聚类数会传递给算法。然后我们计算簇内平方误差和 (WSSSE)。您可以通过增加 k 来减少此误差度量。事实上,最佳 k 通常是 WSSSE 图中出现“肘部”的位置。
有关 API 的更多详细信息,请参阅 KMeans
Python 文档和KMeansModel
Python 文档。
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
以下代码片段可在 spark-shell
中执行。
在以下示例中,加载并解析数据后,我们使用 KMeans
对象将数据聚类成两个聚类。所需聚类数会传递给算法。然后我们计算簇内平方误差和 (WSSSE)。您可以通过增加 k 来减少此误差度量。事实上,最佳 k 通常是 WSSSE 图中出现“肘部”的位置。
有关 API 的详细信息,请参阅 KMeans
Scala 文档和KMeansModel
Scala 文档。
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Errors = $WSSSE")
// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
MLlib 的所有方法都使用 Java 友好类型,因此您可以像在 Scala 中一样导入和调用它们。唯一的注意事项是,这些方法接受 Scala RDD 对象,而 Spark Java API 使用单独的 JavaRDD
类。您可以通过在您的 JavaRDD
对象上调用 .rdd()
将 Java RDD 转换为 Scala RDD。下面是与 Scala 中提供的示例等效的独立应用程序示例
有关 API 的详细信息,请参阅 KMeans
Java 文档和KMeansModel
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
"target/org/apache/spark/JavaKMeansExample/KMeansModel");
高斯混合
一个高斯混合模型表示一个复合分布,其中点是从 k 个高斯子分布中的一个抽取的,每个子分布都有自己的概率。spark.mllib
的实现使用期望最大化算法,根据给定的一组样本推导出最大似然模型。该实现具有以下参数
- k 是所需聚类数。
- convergenceTol 是对数似然的最大变化,达到此变化时我们认为已实现收敛。
- maxIterations 是在未达到收敛时执行的最大迭代次数。
- initialModel 是一个可选的起始点,用于启动 EM 算法。如果省略此参数,则将从数据中构建一个随机起始点。
示例
在以下示例中,加载并解析数据后,我们使用 GaussianMixture 对象将数据聚类成两个聚类。所需聚类数会传递给算法。然后我们输出混合模型的参数。
有关 API 的更多详细信息,请参阅 GaussianMixture
Python 文档和GaussianMixtureModel
Python 文档。
from numpy import array
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)
# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
.load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
# output parameters of model
for i in range(2):
print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())
在以下示例中,加载并解析数据后,我们使用 GaussianMixture 对象将数据聚类成两个聚类。所需聚类数会传递给算法。然后我们输出混合模型的参数。
有关 API 的详细信息,请参阅 GaussianMixture
Scala 文档和GaussianMixtureModel
Scala 文档。
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)
// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
"target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
MLlib 的所有方法都使用 Java 友好类型,因此您可以像在 Scala 中一样导入和调用它们。唯一的注意事项是,这些方法接受 Scala RDD 对象,而 Spark Java API 使用单独的 JavaRDD
类。您可以通过在您的 JavaRDD
对象上调用 .rdd()
将 Java RDD 转换为 Scala RDD。下面是与 Scala 中提供的示例等效的独立应用程序示例
有关 API 的详细信息,请参阅 GaussianMixture
Java 文档和GaussianMixtureModel
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
"target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
幂迭代聚类 (PIC)
幂迭代聚类 (PIC) 是一种可扩展且高效的算法,用于根据成对相似性作为边属性来聚类图的顶点,该算法在 Lin 和 Cohen 的《幂迭代聚类》中有所描述。它通过幂迭代计算图的归一化亲和矩阵的伪特征向量,并用其来聚类顶点。spark.mllib
包含一个使用 GraphX 作为其后端实现的 PIC。它接受一个由 (srcId, dstId, similarity)
元组组成的 RDD
,并输出一个包含聚类分配的模型。相似性必须是非负的。PIC 假定相似性度量是对称的。无论顺序如何,一对 (srcId, dstId)
在输入数据中最多只能出现一次。如果输入中缺少一对,则它们的相似性被视为零。spark.mllib
的 PIC 实现接受以下(超)参数
k
:聚类数maxIterations
:最大幂迭代次数initializationMode
:初始化模型。可以是默认的“random”,即使用随机向量作为顶点属性,或者“degree”,即使用归一化和相似性。
示例
下面,我们展示代码片段,演示如何在 spark.mllib
中使用 PIC。
PowerIterationClustering
实现了 PIC 算法。它接受一个由 (srcId: Long, dstId: Long, similarity: Double)
元组组成的 RDD
,代表亲和矩阵。调用 PowerIterationClustering.run
会返回一个 PowerIterationClusteringModel
,其中包含计算出的聚类分配。
有关 API 的更多详细信息,请参阅 PowerIterationClustering
Python 文档和PowerIterationClusteringModel
Python 文档。
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)
model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
.load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
PowerIterationClustering
实现了 PIC 算法。它接受一个由 (srcId: Long, dstId: Long, similarity: Double)
元组组成的 RDD
,代表亲和矩阵。调用 PowerIterationClustering.run
会返回一个 PowerIterationClusteringModel
,其中包含计算出的聚类分配。
有关 API 的详细信息,请参阅 PowerIterationClustering
Scala 文档和PowerIterationClusteringModel
Scala 文档。
import org.apache.spark.mllib.clustering.PowerIterationClustering
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)
val clusters = model.assignments.collect().groupBy(_.cluster).transform((_, v) => v.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(", ")
val sizesStr = assignments.map {
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
PowerIterationClustering
实现了 PIC 算法。它接受一个由 (srcId: Long, dstId: Long, similarity: Double)
元组组成的 JavaRDD
,代表亲和矩阵。调用 PowerIterationClustering.run
会返回一个 PowerIterationClusteringModel
,其中包含计算出的聚类分配。
有关 API 的详细信息,请参阅 PowerIterationClustering
Java 文档和PowerIterationClusteringModel
Java 文档。
import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
new Tuple3<>(0L, 1L, 0.9),
new Tuple3<>(1L, 2L, 0.9),
new Tuple3<>(2L, 3L, 0.9),
new Tuple3<>(3L, 4L, 0.1),
new Tuple3<>(4L, 5L, 0.9)));
PowerIterationClustering pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}
潜在狄利克雷分配 (LDA)
潜在狄利克雷分配 (LDA) 是一种主题模型,它从文本文档集合中推断主题。LDA 可以被视为一种聚类算法,具体如下
- 主题对应于聚类中心,文档对应于数据集中的示例(行)。
- 主题和文档都存在于特征空间中,其中特征向量是词频向量(词袋)。
- LDA 不使用传统距离来估计聚类,而是使用基于文本文档生成方式的统计模型函数。
LDA 通过 setOptimizer
函数支持不同的推断算法。EMLDAOptimizer
使用在似然函数上进行的期望最大化来学习聚类,并产生全面的结果,而 OnlineLDAOptimizer
则使用迭代小批量采样进行在线变分推断,并且通常内存友好。
LDA 接收一个作为词频向量的文档集合,以及以下参数(使用构建器模式设置)
k
:主题数(即聚类中心)optimizer
:用于学习 LDA 模型的优化器,可以是EMLDAOptimizer
或OnlineLDAOptimizer
docConcentration
:文档在主题上的分布先验的狄利克雷参数。值越大,鼓励推断分布越平滑。topicConcentration
:主题在词项(词)上的分布先验的狄利克雷参数。值越大,鼓励推断分布越平滑。maxIterations
:迭代次数限制。checkpointInterval
:如果使用检查点(在 Spark 配置中设置),此参数指定创建检查点的频率。如果maxIterations
很大,使用检查点可以帮助减少磁盘上的混洗文件大小,并有助于故障恢复。
所有 spark.mllib
的 LDA 模型都支持
describeTopics
:以最重要的词项及其权重的数组形式返回主题topicsMatrix
:返回一个vocabSize
乘k
的矩阵,其中每列都是一个主题
注意:LDA 仍是正在积极开发中的实验性功能。因此,某些功能仅在由优化器生成的两种优化器/模型中的一种中可用。目前,分布式模型可以转换为本地模型,但反之则不行。
以下讨论将分别描述每对优化器/模型。
期望最大化
在 EMLDAOptimizer
和 DistributedLDAModel
中实现。
对于提供给 LDA
的参数
docConcentration
:只支持对称先验,因此提供的k
维向量中的所有值必须相同。所有值也必须 $> 1.0$。提供Vector(-1)
会导致默认行为(值为 $(50 / k) + 1$ 的均匀k
维向量)。topicConcentration
:只支持对称先验。值必须 $> 1.0$。提供-1
会导致默认值为 $0.1 + 1$。maxIterations
:EM 迭代的最大次数。
注意:进行足够的迭代很重要。在早期迭代中,EM 通常会有无用主题,但在更多迭代后这些主题会显著改善。根据您的数据集,使用至少 20 次,甚至可能 50-100 次迭代通常是合理的。
EMLDAOptimizer
产生一个 DistributedLDAModel
,它不仅存储推断出的主题,还存储完整的训练语料库以及训练语料库中每个文档的主题分布。DistributedLDAModel
支持
topTopicsPerDocument
:训练语料库中每个文档的顶部主题及其权重topDocumentsPerTopic
:每个主题的顶部文档以及该主题在文档中的相应权重。logPrior
:给定超参数docConcentration
和topicConcentration
,估计主题和文档-主题分布的对数概率logLikelihood
:给定推断出的主题和文档-主题分布,训练语料库的对数似然
在线变分贝叶斯
在 OnlineLDAOptimizer
和 LocalLDAModel
中实现。
对于提供给 LDA
的参数
docConcentration
:可以通过传入一个向量来使用非对称先验,该向量中每个k
维度的值等于狄利克雷参数。值应 $>= 0$。提供Vector(-1)
会导致默认行为(值为 $(1.0 / k)$ 的均匀k
维向量)。topicConcentration
:只支持对称先验。值必须 $>= 0$。提供-1
会导致默认值为 $(1.0 / k)$。maxIterations
:要提交的小批量(minibatches)的最大数量。
此外,OnlineLDAOptimizer
接受以下参数
miniBatchFraction
:每次迭代中采样并使用的语料库分数optimizeDocConcentration
:如果设置为 true,则在每个小批量后对超参数docConcentration
(又称alpha
)执行最大似然估计,并在返回的LocalLDAModel
中设置优化的docConcentration
tau0
和kappa
:用于学习率衰减,学习率衰减由 $(\tau_0 + iter)^{-\kappa}$ 计算,其中 $iter$ 是当前迭代次数。
OnlineLDAOptimizer
产生一个 LocalLDAModel
,它只存储推断出的主题。LocalLDAModel
支持
logLikelihood(documents)
:计算给定推断主题的所提供documents
的下界。logPerplexity(documents)
:计算给定推断主题的所提供documents
困惑度的上界。
示例
在以下示例中,我们加载代表文档语料库的词频向量。然后我们使用 LDA 从文档中推断出三个主题。所需聚类数会传递给算法。然后我们输出主题,主题表示为词上的概率分布。
有关 API 的更多详细信息,请参阅 LDA
Python 文档和LDAModel
Python 文档。
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
.load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
有关 API 的详细信息,请参阅 LDA
Scala 文档和DistributedLDAModel
Scala 文档。
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex().map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println(s"Learned topics (as distributions over vocab of ${ldaModel.vocabSize} words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print(s"Topic $topic :")
for (word <- Range(0, ldaModel.vocabSize)) {
print(s"${topics(word, topic)}")
}
println()
}
// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
"target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
有关 API 的详细信息,请参阅 LDA
Java 文档和DistributedLDAModel
Java 文档。
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();
// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);
// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
ldaModel.save(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
二分 k-means
二分 K-means 通常比常规 K-means 快得多,但它通常会产生不同的聚类结果。
二分 k-means 是一种分层聚类。分层聚类是聚类分析最常用的方法之一,旨在构建聚类层次结构。分层聚类的策略通常分为两种类型
- 凝聚式 (Agglomerative):这是一种“自下而上”的方法:每个观察值都从自己的聚类开始,随着层次结构向上移动,成对的聚类被合并。
- 分裂式 (Divisive):这是一种“自上而下”的方法:所有观察值都从一个聚类开始,随着层次结构向下移动,递归地执行分裂。
二分 k-means 算法是一种分裂式算法。MLlib 中的实现具有以下参数
- k: 所需的叶聚类数(默认值:4)。如果没有可分裂的叶聚类,实际数量可能会更小。
- maxIterations: 分裂聚类的最大 k-means 迭代次数(默认值:20)
- minDivisibleClusterSize: 可分裂聚类的最小点数(如果 >= 1.0)或最小点比例(如果 < 1.0)(默认值:1)
- seed: 随机种子(默认值:类名的哈希值)
示例
有关 API 的更多详细信息,请参阅 BisectingKMeans
Python 文档和BisectingKMeansModel
Python 文档。
from numpy import array
from pyspark.mllib.clustering import BisectingKMeans
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)
# Evaluate clustering
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))
有关 API 的详细信息,请参阅 BisectingKMeans
Scala 文档和BisectingKMeansModel
Scala 文档。
import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)
// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
println(s"Cluster Center ${idx}: ${center}")
}
有关 API 的详细信息,请参阅 BisectingKMeans
Java 文档和BisectingKMeansModel
Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
List<Vector> localData = Arrays.asList(
Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3),
Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);
BisectingKMeans bkm = new BisectingKMeans()
.setK(4);
BisectingKMeansModel model = bkm.run(data);
System.out.println("Compute Cost: " + model.computeCost(data));
Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
流式 k-means
当数据以流式方式到达时,我们可能希望动态估计聚类,并随着新数据到达而更新它们。spark.mllib
提供对流式 k-means 聚类的支持,带有控制估计值衰减(或“遗忘”)的参数。该算法使用迷你批量 k-means 更新规则的推广形式。对于每个数据批次,我们将所有点分配给其最近的聚类,计算新的聚类中心,然后使用以下公式更新每个聚类
\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation}
\begin{equation} n_{t+1} = n_t + m_t \end{equation}
其中 $c_t$
是该聚类的前一个中心,$n_t$
是迄今为止分配给该聚类的点数,$x_t$
是当前批次中的新聚类中心,$m_t$
是当前批次中添加到该聚类的点数。衰减因子 $\alpha$
可用于忽略过去:当 $\alpha$=1
时,所有数据都将从头开始使用;当 $\alpha$=0
时,只使用最新数据。这类似于指数加权移动平均。
衰减可以使用 halfLife
参数指定,该参数确定正确的衰减因子 a
,使得对于在时间 t
获取的数据,其在时间 t + halfLife
的贡献将下降到 0.5。时间单位可以指定为 batches
(批次)或 points
(点),更新规则将相应调整。
示例
此示例演示如何在流式数据上估计聚类。
有关 API 的更多详细信息,请参阅 StreamingKMeans
Python 文档。有关 StreamingContext 的详细信息,请参阅Spark Streaming 编程指南。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(')')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
trainingQueue = [trainingData]
testingQueue = [testingData]
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
有关 API 的详细信息,请参阅 StreamingKMeans
Scala 文档。有关 StreamingContext 的详细信息,请参阅Spark Streaming 编程指南。
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt, 0.0)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
当您添加带有数据的新文本文件时,聚类中心将更新。每个训练点应格式化为 [x1, x2, x3]
,每个测试数据点应格式化为 (y, [x1, x2, x3])
,其中 y
是某个有用的标签或标识符(例如真实类别分配)。任何时候将文本文件放置在 /training/data/dir
中,模型都会更新。任何时候将文本文件放置在 /testing/data/dir
中,您都将看到预测。随着新数据的到来,聚类中心将发生变化!