聚类

本页介绍了 MLlib 中的聚类算法。有关这些算法的相关信息,请参阅基于 RDD 的 API 中的聚类指南

目录

K 均值

k 均值 是最常用的聚类算法之一,它将数据点聚类到预定义数量的簇中。MLlib 实现包含 k 均值++ 方法的并行变体,称为 kmeans||

KMeans 实现为一个 Estimator,并生成一个 KMeansModel 作为基本模型。

输入列

参数名称 类型 默认值 描述
featuresCol 向量 "features" 特征向量

输出列

参数名称 类型 默认值 描述
predictionCol 整数 "prediction" 预测的聚类中心

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
在 Spark 存储库中的 "examples/src/main/python/ml/kmeans_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
在 Spark 存储库中的 "examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a k-means model.
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center: centers) {
  System.out.println(center);
}
在 Spark 存储库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Fit a k-means model with spark.kmeans
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
df_list <- randomSplit(training, c(7,3), 2)
kmeansDF <- df_list[[1]]
kmeansTestDF <- df_list[[2]]
kmeansModel <- spark.kmeans(kmeansDF, ~ Class + Sex + Age + Freq,
                            k = 3)

# Model summary
summary(kmeansModel)

# Get fitted result from the k-means model
head(fitted(kmeansModel))

# Prediction
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
head(kmeansPredictions)
在 Spark 存储库中的 "examples/src/main/r/ml/kmeans.R" 中找到完整的示例代码。

潜在狄利克雷分配 (LDA)

LDA 实现为一个 Estimator,它支持 EMLDAOptimizerOnlineLDAOptimizer,并生成一个 LDAModel 作为基本模型。如果需要,专家用户可以将由 EMLDAOptimizer 生成的 LDAModel 转换为 DistributedLDAModel

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
在 Spark 存储库中的 "examples/src/main/python/ml/lda_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.clustering.LDA

// Loads data.
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt")

// Trains a LDA model.
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)

val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")

// Describe topics.
val topics = model.describeTopics(3)
println("The topics described by their top-weighted terms:")
topics.show(false)

// Shows the result.
val transformed = model.transform(dataset)
transformed.show(false)
在 Spark 存储库中的 "examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt");

// Trains a LDA model.
LDA lda = new LDA().setK(10).setMaxIter(10);
LDAModel model = lda.fit(dataset);

double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll);
System.out.println("The upper bound on perplexity: " + lp);

// Describe topics.
Dataset<Row> topics = model.describeTopics(3);
System.out.println("The topics described by their top-weighted terms:");
topics.show(false);

// Shows the result.
Dataset<Row> transformed = model.transform(dataset);
transformed.show(false);
在 Spark 存储库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a latent dirichlet allocation model with spark.lda
model <- spark.lda(training, k = 10, maxIter = 10)

# Model summary
summary(model)

# Posterior probabilities
posterior <- spark.posterior(model, test)
head(posterior)

# The log perplexity of the LDA model
logPerplexity <- spark.perplexity(model, test)
print(paste0("The upper bound bound on perplexity: ", logPerplexity))
在 Spark 存储库中的 "examples/src/main/r/ml/lda.R" 中找到完整的示例代码。

二分 K 均值

二分 K 均值是一种 层次聚类,它使用一种分治(或“自上而下”)方法:所有观察值都从一个簇开始,并在向下移动层次结构时递归地执行拆分。

二分 K 均值通常比常规 K 均值快得多,但它通常会产生不同的聚类。

BisectingKMeans 实现为一个 Estimator,并生成一个 BisectingKMeansModel 作为基本模型。

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)
在 Spark 存储库中的 "examples/src/main/python/ml/bisecting_k_means_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a bisecting k-means model.
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
val centers = model.clusterCenters
centers.foreach(println)
在 Spark 存储库中的 "examples/src/main/scala/org/apache/spark/examples/ml/BisectingKMeansExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a bisecting k-means model.
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
System.out.println("Cluster Centers: ");
Vector[] centers = model.clusterCenters();
for (Vector center : centers) {
  System.out.println(center);
}
在 Spark 存储库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅R API 文档

t <- as.data.frame(Titanic)
training <- createDataFrame(t)

# Fit bisecting k-means model with four centers
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)

# get fitted result from a bisecting k-means model
fitted.model <- fitted(model, "centers")

# Model summary
head(summary(fitted.model))

# fitted values on training data
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
在 Spark 存储库中的 "examples/src/main/r/ml/bisectingKmeans.R" 中找到完整的示例代码。

高斯混合模型 (GMM)

高斯混合模型 表示一个复合分布,其中点从 *k* 个高斯子分布中的一个中抽取,每个子分布都有自己的概率。 spark.ml 实现使用 期望最大化 算法来推断给定一组样本的最大似然模型。

GaussianMixture 实现为一个 Estimator,并生成一个 GaussianMixtureModel 作为基本模型。

输入列

参数名称 类型 默认值 描述
featuresCol 向量 "features" 特征向量

输出列

参数名称 类型 默认值 描述
predictionCol 整数 "prediction" 预测的聚类中心
probabilityCol 向量 "probability" 每个簇的概率

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.clustering import GaussianMixture

# loads data
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)

print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)
在 Spark 存储库中的 "examples/src/main/python/ml/gaussian_mixture_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.clustering.GaussianMixture

// Loads data
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains Gaussian Mixture Model
val gmm = new GaussianMixture()
  .setK(2)
val model = gmm.fit(dataset)

// output parameters of mixture model model
for (i <- 0 until model.getK) {
  println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
      s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
在 Spark 存储库中的 "examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a GaussianMixture model
GaussianMixture gmm = new GaussianMixture()
  .setK(2);
GaussianMixtureModel model = gmm.fit(dataset);

// Output the parameters of the mixture model
for (int i = 0; i < model.getK(); i++) {
  System.out.printf("Gaussian %d:\nweight=%f\nmu=%s\nsigma=\n%s\n\n",
          i, model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
}
在 Spark 存储库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_kmeans_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a gaussian mixture clustering model with spark.gaussianMixture
model <- spark.gaussianMixture(training, ~ features, k = 2)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库中的 "examples/src/main/r/ml/gaussianMixture.R" 中找到完整的示例代码。

幂迭代聚类 (PIC)

幂迭代聚类 (PIC) 是由 Lin 和 Cohen 开发的一种可扩展的图聚类算法。摘自摘要:PIC 使用数据归一化成对相似度矩阵上的截断幂迭代来找到数据集的非常低维嵌入。

spark.ml 的幂迭代聚类实现采用以下参数

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.clustering import PowerIterationClustering

df = spark.createDataFrame([
    (0, 1, 1.0),
    (0, 2, 1.0),
    (1, 2, 1.0),
    (3, 4, 1.0),
    (4, 0, 0.1)
], ["src", "dst", "weight"])

pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")

# Shows the cluster assignment
pic.assignClusters(df).show()
在 Spark 存储库中的 "examples/src/main/python/ml/power_iteration_clustering_example.py" 中找到完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.clustering.PowerIterationClustering

val dataset = spark.createDataFrame(Seq(
  (0L, 1L, 1.0),
  (0L, 2L, 1.0),
  (1L, 2L, 1.0),
  (3L, 4L, 1.0),
  (4L, 0L, 0.1)
)).toDF("src", "dst", "weight")

val model = new PowerIterationClustering().
  setK(2).
  setMaxIter(20).
  setInitMode("degree").
  setWeightCol("weight")

val prediction = model.assignClusters(dataset).select("id", "cluster")

//  Shows the cluster assignment
prediction.show(false)
在 Spark 存储库中的 "examples/src/main/scala/org/apache/spark/examples/ml/PowerIterationClusteringExample.scala" 中找到完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.clustering.PowerIterationClustering;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0L, 1L, 1.0),
  RowFactory.create(0L, 2L, 1.0),
  RowFactory.create(1L, 2L, 1.0),
  RowFactory.create(3L, 4L, 1.0),
  RowFactory.create(4L, 0L, 0.1)
);

StructType schema = new StructType(new StructField[]{
  new StructField("src", DataTypes.LongType, false, Metadata.empty()),
  new StructField("dst", DataTypes.LongType, false, Metadata.empty()),
  new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PowerIterationClustering model = new PowerIterationClustering()
  .setK(2)
  .setMaxIter(10)
  .setInitMode("degree")
  .setWeightCol("weight");

Dataset<Row> result = model.assignClusters(df);
result.show(false);
在 Spark 存储库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java" 中找到完整的示例代码。

有关更多详细信息,请参阅R API 文档

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                           list(4L, 0L, 0.1)),
                      schema = c("src", "dst", "weight"))
# assign clusters
clusters <- spark.assignClusters(df, k = 2L, maxIter = 20L,
                                 initMode = "degree", weightCol = "weight")

showDF(arrange(clusters, clusters$id))
在 Spark 存储库中的 "examples/src/main/r/ml/powerIterationClustering.R" 中找到完整的示例代码。