聚类

本页介绍 MLlib 中的聚类算法。 基于 RDD 的 API 中的聚类指南也包含有关这些算法的相关信息。

目录

K-means

k-means 是一种最常用的聚类算法,它将数据点聚类成预定数量的簇。 MLlib 实现包括 k-means++ 方法的一个并行化变体,称为 kmeans||

KMeans 实现为一个 Estimator,并生成一个 KMeansModel 作为基本模型。

输入列

参数名称 类型 默认值 描述
featuresCol Vector "features" 特征向量

输出列

参数名称 类型 默认值 描述
predictionCol Int "prediction" 预测的簇中心

例子

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
在 Spark 仓库中的 "examples/src/main/python/ml/kmeans_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a k-means model.
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center: centers) {
  System.out.println(center);
}
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅 R API 文档

# Fit a k-means model with spark.kmeans
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
df_list <- randomSplit(training, c(7,3), 2)
kmeansDF <- df_list[[1]]
kmeansTestDF <- df_list[[2]]
kmeansModel <- spark.kmeans(kmeansDF, ~ Class + Sex + Age + Freq,
                            k = 3)

# Model summary
summary(kmeansModel)

# Get fitted result from the k-means model
head(fitted(kmeansModel))

# Prediction
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
head(kmeansPredictions)
在 Spark 仓库中的 "examples/src/main/r/ml/kmeans.R" 中找到完整的示例代码。

潜在狄利克雷分配 (LDA)

LDA 实现为一个 Estimator,它支持 EMLDAOptimizerOnlineLDAOptimizer,并生成一个 LDAModel 作为基本模型。 如果需要,专家用户可以将 EMLDAOptimizer 生成的 LDAModel 转换为 DistributedLDAModel

例子

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
在 Spark 仓库中的 "examples/src/main/python/ml/lda_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.clustering.LDA

// Loads data.
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt")

// Trains a LDA model.
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)

val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")

// Describe topics.
val topics = model.describeTopics(3)
println("The topics described by their top-weighted terms:")
topics.show(false)

// Shows the result.
val transformed = model.transform(dataset)
transformed.show(false)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt");

// Trains a LDA model.
LDA lda = new LDA().setK(10).setMaxIter(10);
LDAModel model = lda.fit(dataset);

double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll);
System.out.println("The upper bound on perplexity: " + lp);

// Describe topics.
Dataset<Row> topics = model.describeTopics(3);
System.out.println("The topics described by their top-weighted terms:");
topics.show(false);

// Shows the result.
Dataset<Row> transformed = model.transform(dataset);
transformed.show(false);
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅 R API 文档

# Load training data
df <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a latent dirichlet allocation model with spark.lda
model <- spark.lda(training, k = 10, maxIter = 10)

# Model summary
summary(model)

# Posterior probabilities
posterior <- spark.posterior(model, test)
head(posterior)

# The log perplexity of the LDA model
logPerplexity <- spark.perplexity(model, test)
print(paste0("The upper bound bound on perplexity: ", logPerplexity))
在 Spark 仓库中的 "examples/src/main/r/ml/lda.R" 中找到完整的示例代码。

二分 k-means

二分 k-means 是一种 层次聚类,它使用一种分裂的(或“自顶向下”)方法:所有观察结果都从一个簇开始,并且当向下移动层级时,递归地执行分裂。

二分 K-means 通常比常规 K-means 快得多,但它通常会产生不同的聚类。

BisectingKMeans 实现为一个 Estimator,并生成一个 BisectingKMeansModel 作为基本模型。

例子

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)
在 Spark 仓库中的 "examples/src/main/python/ml/bisecting_k_means_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a bisecting k-means model.
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
val centers = model.clusterCenters
centers.foreach(println)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/BisectingKMeansExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a bisecting k-means model.
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
System.out.println("Cluster Centers: ");
Vector[] centers = model.clusterCenters();
for (Vector center : centers) {
  System.out.println(center);
}
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅 R API 文档

t <- as.data.frame(Titanic)
training <- createDataFrame(t)

# Fit bisecting k-means model with four centers
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)

# get fitted result from a bisecting k-means model
fitted.model <- fitted(model, "centers")

# Model summary
head(summary(fitted.model))

# fitted values on training data
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
在 Spark 仓库中的 "examples/src/main/r/ml/bisectingKmeans.R" 中找到完整的示例代码。

高斯混合模型 (GMM)

高斯混合模型表示一种组合分布,其中点是从 k 个高斯子分布之一中抽取的,每个子分布都有自己的概率。 spark.ml 实现使用 期望最大化算法来诱导给定一组样本的最大似然模型。

GaussianMixture 实现为一个 Estimator,并生成一个 GaussianMixtureModel 作为基本模型。

输入列

参数名称 类型 默认值 描述
featuresCol Vector "features" 特征向量

输出列

参数名称 类型 默认值 描述
predictionCol Int "prediction" 预测的簇中心
probabilityCol Vector "probability" 每个簇的概率

例子

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import GaussianMixture

# loads data
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)

print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)
在 Spark 仓库中的 "examples/src/main/python/ml/gaussian_mixture_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.clustering.GaussianMixture

// Loads data
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains Gaussian Mixture Model
val gmm = new GaussianMixture()
  .setK(2)
val model = gmm.fit(dataset)

// output parameters of mixture model model
for (i <- 0 until model.getK) {
  println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
      s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a GaussianMixture model
GaussianMixture gmm = new GaussianMixture()
  .setK(2);
GaussianMixtureModel model = gmm.fit(dataset);

// Output the parameters of the mixture model
for (int i = 0; i < model.getK(); i++) {
  System.out.printf("Gaussian %d:\nweight=%f\nmu=%s\nsigma=\n%s\n\n",
          i, model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
}
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅 R API 文档

# Load training data
df <- read.df("data/mllib/sample_kmeans_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a gaussian mixture clustering model with spark.gaussianMixture
model <- spark.gaussianMixture(training, ~ features, k = 2)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库中的 "examples/src/main/r/ml/gaussianMixture.R" 中找到完整的示例代码。

幂迭代聚类 (PIC)

幂迭代聚类 (PIC) 是由 Lin 和 Cohen 开发的一种可扩展的图聚类算法。 从摘要中:PIC 使用对数据的归一化成对相似度矩阵的截断幂迭代来找到数据集的非常低维的嵌入。

spark.ml 的 PowerIterationClustering 实现采用以下参数

例子

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.clustering import PowerIterationClustering

df = spark.createDataFrame([
    (0, 1, 1.0),
    (0, 2, 1.0),
    (1, 2, 1.0),
    (3, 4, 1.0),
    (4, 0, 0.1)
], ["src", "dst", "weight"])

pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")

# Shows the cluster assignment
pic.assignClusters(df).show()
在 Spark 仓库中的 "examples/src/main/python/ml/power_iteration_clustering_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.clustering.PowerIterationClustering

val dataset = spark.createDataFrame(Seq(
  (0L, 1L, 1.0),
  (0L, 2L, 1.0),
  (1L, 2L, 1.0),
  (3L, 4L, 1.0),
  (4L, 0L, 0.1)
)).toDF("src", "dst", "weight")

val model = new PowerIterationClustering().
  setK(2).
  setMaxIter(20).
  setInitMode("degree").
  setWeightCol("weight")

val prediction = model.assignClusters(dataset).select("id", "cluster")

//  Shows the cluster assignment
prediction.show(false)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/PowerIterationClusteringExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.clustering.PowerIterationClustering;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0L, 1L, 1.0),
  RowFactory.create(0L, 2L, 1.0),
  RowFactory.create(1L, 2L, 1.0),
  RowFactory.create(3L, 4L, 1.0),
  RowFactory.create(4L, 0L, 0.1)
);

StructType schema = new StructType(new StructField[]{
  new StructField("src", DataTypes.LongType, false, Metadata.empty()),
  new StructField("dst", DataTypes.LongType, false, Metadata.empty()),
  new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PowerIterationClustering model = new PowerIterationClustering()
  .setK(2)
  .setMaxIter(10)
  .setInitMode("degree")
  .setWeightCol("weight");

Dataset<Row> result = model.assignClusters(df);
result.show(false);
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅 R API 文档

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                           list(4L, 0L, 0.1)),
                      schema = c("src", "dst", "weight"))
# assign clusters
clusters <- spark.assignClusters(df, k = 2L, maxIter = 20L,
                                 initMode = "degree", weightCol = "weight")

showDF(arrange(clusters, clusters$id))
在 Spark 仓库中的 "examples/src/main/r/ml/powerIterationClustering.R" 中找到完整的示例代码。