Processing math: 100%

聚类 - 基于 RDD 的 API

聚类是一种无监督学习问题,我们的目标是根据某种相似性的概念将实体的子集相互分组。聚类通常用于探索性分析和/或作为分层监督学习管道的组件(其中为每个集群训练不同的分类器或回归模型)。

spark.mllib 包支持以下模型

K-means

K-means是最常用的聚类算法之一,它将数据点聚类成预定义数量的簇。spark.mllib 实现包括k-means++方法的一个并行变体,称为kmeans||spark.mllib 中的实现具有以下参数

示例

以下示例可以在 PySpark shell 中测试。

在以下示例中,在加载和解析数据后,我们使用 KMeans 对象将数据聚类为两个簇。所需簇的数量传递给算法。然后,我们计算集合内平方误差和 (WSSSE)。您可以通过增加k来减少此误差度量。实际上,最佳k通常是 WSSSE 图中存在“肘部”的地方。

有关 API 的更多详细信息,请参阅KMeans Python 文档KMeansModel Python 文档

from numpy import array
from math import sqrt

from pyspark.mllib.clustering import KMeans, KMeansModel

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
在 Spark repo 中的 "examples/src/main/python/mllib/k_means_example.py" 查找完整的示例代码。

以下代码片段可以在spark-shell中执行。

在以下示例中,在加载和解析数据后,我们使用 KMeans 对象将数据聚类为两个簇。所需簇的数量传递给算法。然后,我们计算集合内平方误差和 (WSSSE)。您可以通过增加k来减少此误差度量。事实上,最佳k通常是 WSSSE 图中存在“肘部”的地方。

有关 API 的详细信息,请参阅 KMeans Scala 文档KMeansModel Scala 文档

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Errors = $WSSSE")

// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
在 Spark repo 中的 "examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala" 查找完整的示例代码。

所有 MLlib 的方法都使用 Java 友好的类型,因此您可以像在 Scala 中一样导入并在那里调用它们。唯一的注意事项是,这些方法采用 Scala RDD 对象,而 Spark Java API 使用单独的 JavaRDD 类。您可以通过在您的 JavaRDD 对象上调用 .rdd() 将 Java RDD 转换为 Scala RDD。下面给出了一个自包含的应用程序示例,它等效于 Scala 中提供的示例

有关 API 的详细信息,请参阅 KMeans Java 文档KMeansModel Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
  String[] sarray = s.split(" ");
  double[] values = new double[sarray.length];
  for (int i = 0; i < sarray.length; i++) {
    values[i] = Double.parseDouble(sarray[i]);
  }
  return Vectors.dense(values);
});
parsedData.cache();

// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);

System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
  System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);

// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);

// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
  "target/org/apache/spark/JavaKMeansExample/KMeansModel");
在 Spark repo 中的 "examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java" 查找完整的示例代码。

高斯混合模型

高斯混合模型表示一种复合分布,其中点是从k个高斯子分布之一中抽取的,每个子分布都有自己的概率。spark.mllib 实现使用期望最大化算法来推导出给定一组样本的最大似然模型。该实现具有以下参数

示例

在以下示例中,在加载和解析数据后,我们使用 GaussianMixture 对象将数据聚类为两个簇。所需簇的数量传递给算法。然后,我们输出混合模型的参数。

有关 API 的更多详细信息,请参阅 GaussianMixture Python 文档GaussianMixtureModel Python 文档

from numpy import array

from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel

# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))

# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)

# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
    .load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")

# output parameters of model
for i in range(2):
    print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
          "sigma = ", gmm.gaussians[i].sigma.toArray())
在 Spark repo 中的 "examples/src/main/python/mllib/gaussian_mixture_example.py" 查找完整的示例代码。

在以下示例中,在加载和解析数据后,我们使用 GaussianMixture 对象将数据聚类为两个簇。所需簇的数量传递给算法。然后,我们输出混合模型的参数。

有关 API 的详细信息,请参阅 GaussianMixture Scala 文档GaussianMixtureModel Scala 文档

import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)

// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
  "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")

// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
  println("weight=%f\nmu=%s\nsigma=\n%s\n" format
    (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
在 Spark repo 中的 "examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala" 查找完整的示例代码。

所有 MLlib 的方法都使用 Java 友好的类型,因此您可以像在 Scala 中一样导入并在那里调用它们。唯一的注意事项是,这些方法采用 Scala RDD 对象,而 Spark Java API 使用单独的 JavaRDD 类。您可以通过在您的 JavaRDD 对象上调用 .rdd() 将 Java RDD 转换为 Scala RDD。下面给出了一个自包含的应用程序示例,它等效于 Scala 中提供的示例

有关 API 的详细信息,请参阅 GaussianMixture Java 文档GaussianMixtureModel Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
  String[] sarray = s.trim().split(" ");
  double[] values = new double[sarray.length];
  for (int i = 0; i < sarray.length; i++) {
    values[i] = Double.parseDouble(sarray[i]);
  }
  return Vectors.dense(values);
});
parsedData.cache();

// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());

// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
  "target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");

// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
  System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
    gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
在 Spark repo 中的 "examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java" 查找完整的示例代码。

幂迭代聚类 (PIC)

幂迭代聚类 (PIC) 是一种可扩展且高效的算法,用于对图的顶点进行聚类,给定成对相似度作为边缘属性,如 Lin 和 Cohen, Power Iteration Clustering 中所述。它通过幂迭代计算图的归一化亲和矩阵的伪特征向量,并使用它来聚类顶点。spark.mllib 包含一个使用 GraphX 作为后端的 PIC 实现。它采用 (srcId, dstId, similarity) 元组的 RDD 并输出一个包含聚类分配的模型。相似度必须是非负的。PIC 假设相似度度量是对称的。无论排序如何,一对 (srcId, dstId) 在输入数据中最多出现一次。如果输入中缺少一对,则它们的相似度将被视为零。spark.mllib 的 PIC 实现采用以下(超)参数

示例

在下面,我们展示了代码片段,演示如何在 spark.mllib 中使用 PIC。

PowerIterationClustering 实现了 PIC 算法。它采用一个 (srcId: Long, dstId: Long, similarity: Double) 元组的 RDD,表示亲和矩阵。调用 PowerIterationClustering.run 返回一个 PowerIterationClusteringModel,其中包含计算的聚类分配。

请参阅 PowerIterationClustering Python 文档PowerIterationClusteringModel Python 文档,获取有关 API 的更多详细信息。

from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel

# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))

# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)

model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))

# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
    .load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
在 Spark 仓库中的 "examples/src/main/python/mllib/power_iteration_clustering_example.py" 查找完整的示例代码。

PowerIterationClustering 实现了 PIC 算法。 它接受一个 RDD(srcId: Long, dstId: Long, similarity: Double) 元组,表示亲和力矩阵。 调用 PowerIterationClustering.run 会返回一个 PowerIterationClusteringModel,其中包含计算出的聚类分配。

请参阅 PowerIterationClustering Scala 文档PowerIterationClusteringModel Scala 文档,获取有关 API 的详细信息。

import org.apache.spark.mllib.clustering.PowerIterationClustering

val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
  .setK(params.k)
  .setMaxIterations(params.maxIterations)
  .setInitializationMode("degree")
  .run(circlesRdd)

val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
  .map { case (k, v) =>
    s"$k -> ${v.sorted.mkString("[", ",", "]")}"
  }.mkString(", ")
val sizesStr = assignments.map {
  _._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala" 查找完整的示例代码。

PowerIterationClustering 实现了 PIC 算法。 它接受一个 JavaRDD(srcId: Long, dstId: Long, similarity: Double) 元组,表示亲和力矩阵。 调用 PowerIterationClustering.run 会返回一个 PowerIterationClusteringModel,其中包含计算出的聚类分配。

请参阅 PowerIterationClustering Java 文档PowerIterationClusteringModel Java 文档,获取有关 API 的详细信息。

import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;

JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
  new Tuple3<>(0L, 1L, 0.9),
  new Tuple3<>(1L, 2L, 0.9),
  new Tuple3<>(2L, 3L, 0.9),
  new Tuple3<>(3L, 4L, 0.1),
  new Tuple3<>(4L, 5L, 0.9)));

PowerIterationClustering pic = new PowerIterationClustering()
  .setK(2)
  .setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);

for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
  System.out.println(a.id() + " -> " + a.cluster());
}
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java" 查找完整的示例代码。

潜在狄利克雷分配 (LDA)

隐狄利克雷分布(LDA)是一种主题模型,它可以从文本文件集合中推断主题。 LDA 可以被认为是一种聚类算法,如下所示:

LDA 通过 setOptimizer 函数支持不同的推理算法。 EMLDAOptimizer 使用 期望最大化 在似然函数上学习聚类,并产生全面的结果,而 OnlineLDAOptimizer 使用迭代的 mini-batch 采样进行 在线变分推理,并且通常对内存友好。

LDA 接受文档集合作为单词计数的向量和以下参数(使用构建器模式设置)

所有 spark.mllib 的 LDA 模型都支持:

注意: LDA 仍然是一个处于积极开发中的实验性功能。 因此,某些功能仅在优化器/优化器生成的模型中的一个中可用。 目前,分布式模型可以转换为本地模型,但反之则不然。

以下讨论将分别描述每个优化器/模型对。

期望最大化

EMLDAOptimizerDistributedLDAModel 中实现。

对于提供给 LDA 的参数:

注意: 进行足够的迭代非常重要。 在早期迭代中,EM 通常具有无用的主题,但是这些主题在更多迭代后会显着改善。 根据您的数据集,使用至少 20 次甚至 50-100 次迭代通常是合理的。

EMLDAOptimizer 生成一个 DistributedLDAModel,它不仅存储推断的主题,还存储完整的训练语料库和训练语料库中每个文档的主题分布。 一个 DistributedLDAModel 支持:

在线变分贝叶斯

OnlineLDAOptimizerLocalLDAModel 中实现。

对于提供给 LDA 的参数:

此外,OnlineLDAOptimizer 接受以下参数:

OnlineLDAOptimizer 生成一个 LocalLDAModel,它仅存储推断的主题。 一个 LocalLDAModel 支持:

示例

在以下示例中,我们加载表示文档语料库的单词计数向量。 然后,我们使用 LDA 从文档中推断出三个主题。 所需的聚类数将传递给算法。 然后,我们输出主题,表示为单词上的概率分布。

请参阅 LDA Python 文档LDAModel Python 文档,获取有关 API 的更多详细信息。

from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)

# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
    .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
在 Spark 仓库的 "examples/src/main/python/mllib/latent_dirichlet_allocation_example.py" 中查找完整的示例代码。

有关 API 的详细信息,请参阅 LDA Scala 文档DistributedLDAModel Scala 文档

import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()

// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)

// Output topics. Each is a distribution over words (matching word count vectors)
println(s"Learned topics (as distributions over vocab of ${ldaModel.vocabSize} words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
  print(s"Topic $topic :")
  for (word <- Range(0, ldaModel.vocabSize)) {
    print(s"${topics(word, topic)}")
  }
  println()
}

// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
  "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala" 中查找完整的示例代码。

有关 API 的详细信息,请参阅 LDA Java 文档DistributedLDAModel Java 文档

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
  String[] sarray = s.trim().split(" ");
  double[] values = new double[sarray.length];
  for (int i = 0; i < sarray.length; i++) {
    values[i] = Double.parseDouble(sarray[i]);
  }
  return Vectors.dense(values);
});
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
  JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();

// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);

// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
  + " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
  System.out.print("Topic " + topic + ":");
  for (int word = 0; word < ldaModel.vocabSize(); word++) {
    System.out.print(" " + topics.apply(word, topic));
  }
  System.out.println();
}

ldaModel.save(jsc.sc(),
  "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
  "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java" 中查找完整的示例代码。

二分 k-means

二分 K-means 通常比常规 K-means 快得多,但通常会产生不同的聚类结果。

二分 k-means 是一种层次聚类。层次聚类是聚类分析中最常用的方法之一,旨在构建聚类层次结构。层次聚类的策略通常分为两种类型

二分 k-means 算法是一种分裂型算法。 MLlib 中的实现具有以下参数

示例

有关 API 的更多详细信息,请参阅 BisectingKMeans Python 文档BisectingKMeansModel Python 文档

from numpy import array

from pyspark.mllib.clustering import BisectingKMeans

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)

# Evaluate clustering
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))
在 Spark 仓库的 "examples/src/main/python/mllib/bisecting_k_means_example.py" 中查找完整的示例代码。

有关 API 的详细信息,请参阅 BisectingKMeans Scala 文档BisectingKMeansModel Scala 文档

import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()

// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)

// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
  println(s"Cluster Center ${idx}: ${center}")
}
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala" 中查找完整的示例代码。

有关 API 的详细信息,请参阅 BisectingKMeans Java 文档BisectingKMeansModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

List<Vector> localData = Arrays.asList(
  Vectors.dense(0.1, 0.1),   Vectors.dense(0.3, 0.3),
  Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
  Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
  Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);

BisectingKMeans bkm = new BisectingKMeans()
  .setK(4);
BisectingKMeansModel model = bkm.run(data);

System.out.println("Compute Cost: " + model.computeCost(data));

Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
  Vector clusterCenter = clusterCenters[i];
  System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java" 中查找完整的示例代码。

流式 k-means

当数据以流的形式到达时,我们可能希望动态地估计聚类,并在新数据到达时更新它们。 spark.mllib 提供了对流式 k-means 聚类的支持,带有控制估计衰减(或“遗忘性”)的参数。 该算法使用 mini-batch k-means 更新规则的推广。 对于每批数据,我们将所有点分配到其最近的聚类,计算新的聚类中心,然后使用以下公式更新每个聚类

ct+1=ctntα+xtmtntα+mt nt+1=nt+mt

其中 ct 是聚类之前的中心, nt 是到目前为止分配给聚类的点数, xt 是当前批次中的新聚类中心, mt 是当前批次中添加到聚类的点数。 衰减因子 α 可用于忽略过去:使用 α=1 将使用从一开始的所有数据; 使用 α=0 将仅使用最新的数据。 这类似于指数加权移动平均。

可以使用 halfLife 参数指定衰减,该参数确定正确的衰减因子 a,以便对于在时间 t 获取的数据,它在时间 t + halfLife 的贡献将降至 0.5。 时间单位可以指定为 batchespoints,更新规则将相应地进行调整。

示例

此示例显示了如何在流数据上估计聚类。

有关 API 的更多详细信息,请参阅 StreamingKMeans Python 文档。 有关 StreamingContext 的详细信息,请参阅 Spark Streaming 编程指南

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(')')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))

    return LabeledPoint(label, vec)

trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
    .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))

testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)

trainingQueue = [trainingData]
testingQueue = [testingData]

trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)

# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)

# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)

result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()

ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
在 Spark 仓库的 "examples/src/main/python/mllib/streaming_k_means_example.py" 中查找完整的示例代码。

有关 API 的详细信息,请参阅 StreamingKMeans Scala 文档。 有关 StreamingContext 的详细信息,请参阅 Spark Streaming 编程指南

import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val model = new StreamingKMeans()
  .setK(args(3).toInt)
  .setDecayFactor(1.0)
  .setRandomCenters(args(4).toInt, 0.0)

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala" 中查找完整的示例代码。

当您添加包含数据的新文本文件时,聚类中心将会更新。 每个训练点都应格式化为 [x1, x2, x3],每个测试数据点都应格式化为 (y, [x1, x2, x3]),其中 y 是一些有用的标签或标识符(例如,真实类别分配)。 任何时候将文本文件放置在 /training/data/dir 中,模型都会更新。 任何时候将文本文件放置在 /testing/data/dir 中,您都会看到预测。 随着新数据的到来,聚类中心将会改变!