分类和回归

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

本页面涵盖了分类和回归的算法。它还包括讨论特定算法类的章节,例如线性方法、树和集成方法。

目录

分类

逻辑回归

逻辑回归是一种预测分类响应的常用方法。它是广义线性模型的一种特殊情况,用于预测结果的概率。在 spark.ml 中,逻辑回归可以通过使用二项逻辑回归来预测二元结果,也可以通过使用多项逻辑回归来预测多类结果。使用 family 参数在两种算法之间进行选择,或者将其保留为未设置,Spark 将推断正确的变体。

可以通过将 family 参数设置为“multinomial”来将多项逻辑回归用于二元分类。它将生成两组系数和两个截距。

当在具有恒定非零列的数据集上拟合没有截距的 LogisticRegressionModel 时,Spark MLlib 会为恒定非零列输出零系数。此行为与 R glmnet 相同,但与 LIBSVM 不同。

二项逻辑回归

有关二项逻辑回归的更多背景信息和更多实现细节,请参阅spark.mllib 中的逻辑回归的文档。

示例

以下示例演示如何训练具有弹性网络正则化的二元和多项逻辑回归模型,以进行二元分类。elasticNetParam 对应于 $\alpha$,regParam 对应于 $\lambda$。

有关参数的更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))
在 Spark 仓库的 "examples/src/main/python/ml/logistic_regression_with_elastic_net.py" 中查找完整的示例代码。

有关参数的更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.LogisticRegression

// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for logistic regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

// We can also use the multinomial family for binary classification
val mlr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
  .setFamily("multinomial")

val mlrModel = mlr.fit(training)

// Print the coefficients and intercepts for logistic regression with multinomial family
println(s"Multinomial coefficients: ${mlrModel.coefficientMatrix}")
println(s"Multinomial intercepts: ${mlrModel.interceptVector}")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionWithElasticNetExample.scala" 中查找完整的示例代码。

有关参数的更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> training = spark.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8);

// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for logistic regression
System.out.println("Coefficients: "
  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());

// We can also use the multinomial family for binary classification
LogisticRegression mlr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.3)
        .setElasticNetParam(0.8)
        .setFamily("multinomial");

// Fit the model
LogisticRegressionModel mlrModel = mlr.fit(training);

// Print the coefficients and intercepts for logistic regression with multinomial family
System.out.println("Multinomial coefficients: " + lrModel.coefficientMatrix()
  + "\nMultinomial intercepts: " + mlrModel.interceptVector());
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionWithElasticNetExample.java" 中查找完整的示例代码。

有关参数的更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit an binomial logistic regression model with spark.logit
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库的 "examples/src/main/r/ml/logit.R" 中查找完整的示例代码。

逻辑回归的 spark.ml 实现还支持提取训练集上模型的摘要。请注意,存储为 LogisticRegressionSummary 中的 DataFrame 的预测和指标已使用 @transient 进行注释,因此仅在驱动程序上可用。

LogisticRegressionTrainingSummary 提供了 LogisticRegressionModel 的摘要。在二元分类的情况下,可以使用某些其他指标,例如 ROC 曲线。请参阅 BinaryLogisticRegressionTrainingSummary

继续前面的示例

from pyspark.ml.classification import LogisticRegression

# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)
在 Spark 仓库的 "examples/src/main/python/ml/logistic_regression_summary_example.py" 中查找完整的示例代码。

LogisticRegressionTrainingSummary 提供了 LogisticRegressionModel 的摘要。在二元分类的情况下,可以使用某些其他指标,例如 ROC 曲线。可以使用 binarySummary 方法访问二元摘要。请参阅 BinaryLogisticRegressionTrainingSummary

继续前面的示例

import org.apache.spark.ml.classification.LogisticRegression

// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
val trainingSummary = lrModel.binarySummary

// Obtain the objective per iteration.
val objectiveHistory = trainingSummary.objectiveHistory
println("objectiveHistory:")
objectiveHistory.foreach(loss => println(loss))

// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
val roc = trainingSummary.roc
roc.show()
println(s"areaUnderROC: ${trainingSummary.areaUnderROC}")

// Set the model threshold to maximize F-Measure
val fMeasure = trainingSummary.fMeasureByThreshold
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
  .select("threshold").head().getDouble(0)
lrModel.setThreshold(bestThreshold)
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionSummaryExample.scala" 中查找完整的示例代码。

LogisticRegressionTrainingSummary 提供了 LogisticRegressionModel 的摘要。在二元分类的情况下,可以使用某些其他指标,例如 ROC 曲线。可以使用 binarySummary 方法访问二元摘要。请参阅 BinaryLogisticRegressionTrainingSummary

继续前面的示例

import org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
BinaryLogisticRegressionTrainingSummary trainingSummary = lrModel.binarySummary();

// Obtain the loss per iteration.
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
  System.out.println(lossPerIteration);
}

// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
Dataset<Row> roc = trainingSummary.roc();
roc.show();
roc.select("FPR").show();
System.out.println(trainingSummary.areaUnderROC());

// Get the threshold corresponding to the maximum F-Measure and rerun LogisticRegression with
// this selected threshold.
Dataset<Row> fMeasure = trainingSummary.fMeasureByThreshold();
double maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0);
double bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure))
  .select("threshold").head().getDouble(0);
lrModel.setThreshold(bestThreshold);
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java" 中查找完整的示例代码。

多项逻辑回归

通过多项式逻辑(softmax)回归支持多类分类。在多项式逻辑回归中,该算法生成 $K$ 组系数,或者维度为 $K \times J$ 的矩阵,其中 $K$ 是结果类的数量,$J$ 是特征的数量。如果使用截距项拟合算法,则可以使用长度为 $K$ 的截距向量。

多项式系数可用作 coefficientMatrix,截距可用作 interceptVector

不支持在以多项式族训练的逻辑回归模型上使用 coefficientsintercept 方法。请改用 coefficientMatrixinterceptVector

使用 softmax 函数对结果类 $k \in {1, 2, …, K}$ 的条件概率建模。

\[ P(Y=k|\mathbf{X}, \boldsymbol{\beta}_k, \beta_{0k}) = \frac{e^{\boldsymbol{\beta}_k \cdot \mathbf{X} + \beta_{0k}}}{\sum_{k'=0}^{K-1} e^{\boldsymbol{\beta}_{k'} \cdot \mathbf{X} + \beta_{0k'}}} \]

我们使用多项式响应模型来最小化加权负对数似然,并使用弹性网络惩罚来控制过度拟合。

\[ \min_{\beta, \beta_0} -\left[\sum_{i=1}^L w_i \cdot \log P(Y = y_i|\mathbf{x}_i)\right] + \lambda \left[\frac{1}{2}\left(1 - \alpha\right)||\boldsymbol{\beta}||_2^2 + \alpha ||\boldsymbol{\beta}||_1\right] \]

有关详细推导,请参见此处

示例

以下示例演示如何训练具有弹性网络正则化的多类逻辑回归模型,以及提取多类训练摘要以评估模型。

from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark \
    .read \
    .format("libsvm") \
    .load("data/mllib/sample_multiclass_classification_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
在 Spark 仓库的 "examples/src/main/python/ml/multiclass_logistic_regression_with_elastic_net.py" 中查找完整的示例代码。
import org.apache.spark.ml.classification.LogisticRegression

// Load training data
val training = spark
  .read
  .format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for multinomial logistic regression
println(s"Coefficients: \n${lrModel.coefficientMatrix}")
println(s"Intercepts: \n${lrModel.interceptVector}")

val trainingSummary = lrModel.summary

// Obtain the objective per iteration
val objectiveHistory = trainingSummary.objectiveHistory
println("objectiveHistory:")
objectiveHistory.foreach(println)

// for multiclass, we can inspect metrics on a per-label basis
println("False positive rate by label:")
trainingSummary.falsePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
  println(s"label $label: $rate")
}

println("True positive rate by label:")
trainingSummary.truePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
  println(s"label $label: $rate")
}

println("Precision by label:")
trainingSummary.precisionByLabel.zipWithIndex.foreach { case (prec, label) =>
  println(s"label $label: $prec")
}

println("Recall by label:")
trainingSummary.recallByLabel.zipWithIndex.foreach { case (rec, label) =>
  println(s"label $label: $rec")
}


println("F-measure by label:")
trainingSummary.fMeasureByLabel.zipWithIndex.foreach { case (f, label) =>
  println(s"label $label: $f")
}

val accuracy = trainingSummary.accuracy
val falsePositiveRate = trainingSummary.weightedFalsePositiveRate
val truePositiveRate = trainingSummary.weightedTruePositiveRate
val fMeasure = trainingSummary.weightedFMeasure
val precision = trainingSummary.weightedPrecision
val recall = trainingSummary.weightedRecall
println(s"Accuracy: $accuracy\nFPR: $falsePositiveRate\nTPR: $truePositiveRate\n" +
  s"F-measure: $fMeasure\nPrecision: $precision\nRecall: $recall")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/MulticlassLogisticRegressionWithElasticNetExample.scala" 中查找完整的示例代码。
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> training = spark.read().format("libsvm")
        .load("data/mllib/sample_multiclass_classification_data.txt");

LogisticRegression lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.3)
        .setElasticNetParam(0.8);

// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for multinomial logistic regression
System.out.println("Coefficients: \n"
        + lrModel.coefficientMatrix() + " \nIntercept: " + lrModel.interceptVector());
LogisticRegressionTrainingSummary trainingSummary = lrModel.summary();

// Obtain the loss per iteration.
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
    System.out.println(lossPerIteration);
}

// for multiclass, we can inspect metrics on a per-label basis
System.out.println("False positive rate by label:");
int i = 0;
double[] fprLabel = trainingSummary.falsePositiveRateByLabel();
for (double fpr : fprLabel) {
    System.out.println("label " + i + ": " + fpr);
    i++;
}

System.out.println("True positive rate by label:");
i = 0;
double[] tprLabel = trainingSummary.truePositiveRateByLabel();
for (double tpr : tprLabel) {
    System.out.println("label " + i + ": " + tpr);
    i++;
}

System.out.println("Precision by label:");
i = 0;
double[] precLabel = trainingSummary.precisionByLabel();
for (double prec : precLabel) {
    System.out.println("label " + i + ": " + prec);
    i++;
}

System.out.println("Recall by label:");
i = 0;
double[] recLabel = trainingSummary.recallByLabel();
for (double rec : recLabel) {
    System.out.println("label " + i + ": " + rec);
    i++;
}

System.out.println("F-measure by label:");
i = 0;
double[] fLabel = trainingSummary.fMeasureByLabel();
for (double f : fLabel) {
    System.out.println("label " + i + ": " + f);
    i++;
}

double accuracy = trainingSummary.accuracy();
double falsePositiveRate = trainingSummary.weightedFalsePositiveRate();
double truePositiveRate = trainingSummary.weightedTruePositiveRate();
double fMeasure = trainingSummary.weightedFMeasure();
double precision = trainingSummary.weightedPrecision();
double recall = trainingSummary.weightedRecall();
System.out.println("Accuracy: " + accuracy);
System.out.println("FPR: " + falsePositiveRate);
System.out.println("TPR: " + truePositiveRate);
System.out.println("F-measure: " + fMeasure);
System.out.println("Precision: " + precision);
System.out.println("Recall: " + recall);
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaMulticlassLogisticRegressionWithElasticNetExample.java" 中查找完整的示例代码。

有关参数的更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a multinomial logistic regression model with spark.logit
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库的 "examples/src/main/r/ml/logit.R" 中查找完整的示例代码。

决策树分类器

决策树是分类和回归方法的常用族。有关 spark.ml 实现的更多信息,请在后面的 决策树部分中找到。

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在前一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用两个特征转换器来准备数据;这些有助于为标签和分类特征建立索引类别,并将元数据添加到决策树算法可以识别的 DataFrame

有关参数的更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)
在 Spark 仓库的 "examples/src/main/python/ml/decision_tree_classification_example.py" 中查找完整的示例代码。

有关参数的更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Chain indexers and tree in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${(1.0 - accuracy)}")

val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println(s"Learned classification tree model:\n ${treeModel.toDebugString}")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeClassificationExample.scala" 中查找完整的示例代码。

有关参数的更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.DecisionTreeClassifier;
import org.apache.spark.ml.classification.DecisionTreeClassificationModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load the data stored in LIBSVM format as a DataFrame.
Dataset<Row> data = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);

// Automatically identify categorical features, and index them.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a DecisionTree model.
DecisionTreeClassifier dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures");

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray()[0]);

// Chain indexers and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

DecisionTreeClassificationModel treeModel =
  (DecisionTreeClassificationModel) (model.stages()[2]);
System.out.println("Learned classification tree model:\n" + treeModel.toDebugString());
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java" 中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a DecisionTree classification model with spark.decisionTree
model <- spark.decisionTree(training, label ~ features, "classification")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库的 "examples/src/main/r/ml/decisionTree.R" 中查找完整的示例代码。

随机森林分类器

随机森林是分类和回归方法的常用族。有关 spark.ml 实现的更多信息,请在后面的 随机森林部分中找到。

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用两个特征转换器来准备数据;这些有助于为标签和类别特征建立索引,并将元数据添加到 DataFrame 中,以便基于树的算法可以识别。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only
在 Spark 仓库中的“examples/src/main/python/ml/random_forest_classifier_example.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setNumTrees(10)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Chain indexers and forest in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${(1.0 - accuracy)}")

val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
在 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/ml/RandomForestClassifierExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.RandomForestClassificationModel;
import org.apache.spark.ml.classification.RandomForestClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a RandomForest model.
RandomForestClassifier rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures");

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray()[0]);

// Chain indexers and forest in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, rf, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

RandomForestClassificationModel rfModel = (RandomForestClassificationModel)(model.stages()[2]);
System.out.println("Learned classification forest model:\n" + rfModel.toDebugString());
在 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestClassifierExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a random forest classification model with spark.randomForest
model <- spark.randomForest(training, label ~ features, "classification", numTrees = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库中的“examples/src/main/r/ml/randomForest.R”中查找完整的示例代码。

梯度提升树分类器

梯度提升树 (GBT) 是一种流行的分类和回归方法,它使用决策树的集成。有关 spark.ml 实现的更多信息,请参见关于 GBT 的章节

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用两个特征转换器来准备数据;这些有助于为标签和类别特征建立索引,并将元数据添加到 DataFrame 中,以便基于树的算法可以识别。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only
在 Spark 仓库中的“examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)
  .setFeatureSubsetStrategy("auto")

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Chain indexers and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${1.0 - accuracy}")

val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]
println(s"Learned classification GBT model:\n ${gbtModel.toDebugString}")
在 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeClassifierExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.GBTClassificationModel;
import org.apache.spark.ml.classification.GBTClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a GBT model.
GBTClassifier gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray()[0]);

// Chain indexers and GBT in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, gbt, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

GBTClassificationModel gbtModel = (GBTClassificationModel)(model.stages()[2]);
System.out.println("Learned classification GBT model:\n" + gbtModel.toDebugString());
在 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeClassifierExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a GBT classification model with spark.gbt
model <- spark.gbt(training, label ~ features, "classification", maxIter = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库中的“examples/src/main/r/ml/gbt.R”中查找完整的示例代码。

多层感知器分类器

多层感知器分类器 (MLPC) 是一种基于前馈人工神经网络的分类器。 MLPC 由多个节点的层组成。每一层都完全连接到网络中的下一层。输入层中的节点表示输入数据。所有其他节点通过输入与节点的权重 $\wv$ 和偏差 $\bv$ 的线性组合并将激活函数应用于输出来将输入映射到输出。 对于具有 $K+1$ 层的 MLPC,这可以用矩阵形式写成如下形式:\[ \mathrm{y}(\x) = \mathrm{f_K}(...\mathrm{f_2}(\wv_2^T\mathrm{f_1}(\wv_1^T \x+b_1)+b_2)...+b_K) \] 中间层中的节点使用 sigmoid (logistic) 函数:\[ \mathrm{f}(z_i) = \frac{1}{1 + e^{-z_i}} \] 输出层中的节点使用 softmax 函数:\[ \mathrm{f}(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}} \] 输出层中的节点数 $N$ 对应于类的数量。

MLPC 采用反向传播来学习模型。 我们使用 logistic 损失函数进行优化,并使用 L-BFGS 作为优化例程。

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm")\
    .load("data/mllib/sample_multiclass_classification_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
在 Spark 仓库中的“examples/src/main/python/ml/multilayer_perceptron_classification.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")

// Split the data into train and test
val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
val train = splits(0)
val test = splits(1)

// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)

// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100)

// train the model
val model = trainer.fit(train)

// compute accuracy on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy")

println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")
在 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/ml/MultilayerPerceptronClassifierExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel;
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;

// Load training data
String path = "data/mllib/sample_multiclass_classification_data.txt";
Dataset<Row> dataFrame = spark.read().format("libsvm").load(path);

// Split the data into train and test
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];

// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
int[] layers = new int[] {4, 5, 4, 3};

// create the trainer and set its parameters
MultilayerPerceptronClassifier trainer = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100);

// train the model
MultilayerPerceptronClassificationModel model = trainer.fit(train);

// compute accuracy on the test set
Dataset<Row> result = model.transform(test);
Dataset<Row> predictionAndLabels = result.select("prediction", "label");
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy");

System.out.println("Test set accuracy = " + evaluator.evaluate(predictionAndLabels));
在 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaMultilayerPerceptronClassifierExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = c(4, 5, 4, 3)

# Fit a multi-layer perceptron neural network model with spark.mlp
model <- spark.mlp(training, label ~ features, maxIter = 100,
                   layers = layers, blockSize = 128, seed = 1234)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库中的“examples/src/main/r/ml/mlp.R”中查找完整的示例代码。

线性支持向量机

支持向量机在高维或无限维空间中构造超平面或超平面集,可用于分类、回归或其他任务。直观上,良好的分离是通过具有到任何类别的最近训练数据点最大距离(所谓的函数边距)的超平面来实现的,因为通常边距越大,分类器的泛化误差越低。 Spark ML 中的 LinearSVC 支持使用线性 SVM 进行二元分类。 在内部,它使用 OWLQN 优化器优化 Hinge Loss

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import LinearSVC

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(training)

# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))
在 Spark 仓库中的“examples/src/main/python/ml/linearsvc.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.LinearSVC

// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val lsvc = new LinearSVC()
  .setMaxIter(10)
  .setRegParam(0.1)

// Fit the model
val lsvcModel = lsvc.fit(training)

// Print the coefficients and intercept for linear svc
println(s"Coefficients: ${lsvcModel.coefficients} Intercept: ${lsvcModel.intercept}")
在 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/ml/LinearSVCExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.classification.LinearSVC;
import org.apache.spark.ml.classification.LinearSVCModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> training = spark.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

LinearSVC lsvc = new LinearSVC()
  .setMaxIter(10)
  .setRegParam(0.1);

// Fit the model
LinearSVCModel lsvcModel = lsvc.fit(training);

// Print the coefficients and intercept for LinearSVC
System.out.println("Coefficients: "
  + lsvcModel.coefficients() + " Intercept: " + lsvcModel.intercept());
在 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaLinearSVCExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# load training data
t <- as.data.frame(Titanic)
training <- createDataFrame(t)

# fit Linear SVM model
model <- spark.svmLinear(training,  Survived ~ ., regParam = 0.01, maxIter = 10)

# Model summary
summary(model)

# Prediction
prediction <- predict(model, training)
showDF(prediction)
在 Spark 仓库中的“examples/src/main/r/ml/svmLinear.R”中查找完整的示例代码。

One-vs-Rest 分类器(又名 One-vs-All)

OneVsRest 是一种机器学习缩减示例,用于在给定可以有效执行二元分类的基础分类器的情况下执行多类分类。它也称为“One-vs-All”。

OneVsRest 被实现为 Estimator。对于基础分类器,它采用 Classifier 的实例,并为 k 个类中的每一个创建二元分类问题。训练用于区分类别 i 与所有其他类别的类别 i 的分类器以预测标签是否为 i。

通过评估每个二元分类器来完成预测,并将置信度最高的分类器的索引输出为标签。

示例

下面的示例演示了如何加载 Iris 数据集,将其解析为 DataFrame 并使用 OneVsRest 执行多类分类。计算测试误差以衡量算法的准确性。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# load data file.
inputData = spark.read.format("libsvm") \
    .load("data/mllib/sample_multiclass_classification_data.txt")

# generate the train/test split.
(train, test) = inputData.randomSplit([0.8, 0.2])

# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
在 Spark 仓库中的“examples/src/main/python/ml/one_vs_rest_example.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.{LogisticRegression, OneVsRest}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// load data file.
val inputData = spark.read.format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")

// generate the train/test split.
val Array(train, test) = inputData.randomSplit(Array(0.8, 0.2))

// instantiate the base classifier
val classifier = new LogisticRegression()
  .setMaxIter(10)
  .setTol(1E-6)
  .setFitIntercept(true)

// instantiate the One Vs Rest Classifier.
val ovr = new OneVsRest().setClassifier(classifier)

// train the multiclass model.
val ovrModel = ovr.fit(train)

// score the model on test data.
val predictions = ovrModel.transform(test)

// obtain evaluator.
val evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy")

// compute the classification error on test data.
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${1 - accuracy}")
在 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.OneVsRest;
import org.apache.spark.ml.classification.OneVsRestModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// load data file.
Dataset<Row> inputData = spark.read().format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt");

// generate the train/test split.
Dataset<Row>[] tmp = inputData.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> train = tmp[0];
Dataset<Row> test = tmp[1];

// configure the base classifier.
LogisticRegression classifier = new LogisticRegression()
  .setMaxIter(10)
  .setTol(1E-6)
  .setFitIntercept(true);

// instantiate the One Vs Rest Classifier.
OneVsRest ovr = new OneVsRest().setClassifier(classifier);

// train the multiclass model.
OneVsRestModel ovrModel = ovr.fit(train);

// score the model on test data.
Dataset<Row> predictions = ovrModel.transform(test)
  .select("prediction", "label");

// obtain evaluator.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
        .setMetricName("accuracy");

// compute the classification error on test data.
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1 - accuracy));
在 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java”中查找完整的示例代码。

朴素贝叶斯

朴素贝叶斯分类器是一系列简单的概率多类分类器,它基于应用贝叶斯定理,并在每对特征之间具有强的(朴素的)独立性假设。

朴素贝叶斯可以非常有效地进行训练。通过对训练数据进行一次传递,它计算每个特征在给定每个标签时的条件概率分布。对于预测,它应用贝叶斯定理来计算在给定观察的情况下每个标签的条件概率分布。

MLlib 支持 多项式朴素贝叶斯补集朴素贝叶斯伯努利朴素贝叶斯高斯朴素贝叶斯

输入数据:这些多项式、补集和伯努利模型通常用于文档分类。 在这种情况下,每个观察值都是一个文档,每个特征代表一个术语。 特征的值是术语的频率(在多项式或补集朴素贝叶斯中)或指示术语是否在文档中找到的零或一(在伯努利朴素贝叶斯中)。 多项式和伯努利模型的特征值必须为非负数。 模型类型通过可选参数“multinomial”、“complement”、“bernoulli”或“gaussian”选择,默认为“multinomial”。 对于文档分类,输入特征向量通常应该是稀疏向量。 由于训练数据仅使用一次,因此无需缓存它。

可以通过设置参数 $\lambda$ (默认为 $1.0$)来使用 加性平滑

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm") \
    .load("data/mllib/sample_libsvm_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
在 Spark 仓库中的“examples/src/main/python/ml/naive_bayes_example.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)

// Train a NaiveBayes model.
val model = new NaiveBayes()
  .fit(trainingData)

// Select example rows to display.
val predictions = model.transform(testData)
predictions.show()

// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
在 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/ml/NaiveBayesExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.classification.NaiveBayes;
import org.apache.spark.ml.classification.NaiveBayesModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Split the data into train and test
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];

// create the trainer and set its parameters
NaiveBayes nb = new NaiveBayes();

// train the model
NaiveBayesModel model = nb.fit(train);

// Select example rows to display.
Dataset<Row> predictions = model.transform(test);
predictions.show();

// compute accuracy on the test set
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test set accuracy = " + accuracy);
在 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaNaiveBayesExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Fit a Bernoulli naive Bayes model with spark.naiveBayes
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
nbDF <- titanicDF
nbTestDF <- titanicDF
nbModel <- spark.naiveBayes(nbDF, Survived ~ Class + Sex + Age)

# Model summary
summary(nbModel)

# Prediction
nbPredictions <- predict(nbModel, nbTestDF)
head(nbPredictions)
在 Spark 仓库中的“examples/src/main/r/ml/naiveBayes.R”中查找完整的示例代码。

因子分解机分类器

有关因子分解机的更多背景信息和更多详细信息,请参阅 因子分解机部分

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们将特征缩放到 0 到 1 之间,以防止梯度爆炸问题。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a FM model.
fm = FMClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", stepSize=0.001)

# Create a Pipeline.
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])

# Train model.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = %g" % accuracy)

fmModel = model.stages[2]
print("Factors: " + str(fmModel.factors))  # type: ignore
print("Linear: " + str(fmModel.linear))  # type: ignore
print("Intercept: " + str(fmModel.intercept))  # type: ignore
在 Spark 仓库的 "examples/src/main/python/ml/fm_classifier_example.py" 中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{FMClassificationModel, FMClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, MinMaxScaler, StringIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Scale features.
val featureScaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a FM model.
val fm = new FMClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("scaledFeatures")
  .setStepSize(0.001)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Create a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureScaler, fm, labelConverter))

// Train model.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test accuracy.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")

val fmModel = model.stages(2).asInstanceOf[FMClassificationModel]
println(s"Factors: ${fmModel.factors} Linear: ${fmModel.linear} " +
  s"Intercept: ${fmModel.intercept}")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/FMClassifierExample.scala" 中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.FMClassificationModel;
import org.apache.spark.ml.classification.FMClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark
    .read()
    .format("libsvm")
    .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data);
// Scale features.
MinMaxScalerModel featureScaler = new MinMaxScaler()
    .setInputCol("features")
    .setOutputCol("scaledFeatures")
    .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a FM model.
FMClassifier fm = new FMClassifier()
    .setLabelCol("indexedLabel")
    .setFeaturesCol("scaledFeatures")
    .setStepSize(0.001);

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labelsArray()[0]);

// Create a Pipeline.
Pipeline pipeline = new Pipeline()
    .setStages(new PipelineStage[] {labelIndexer, featureScaler, fm, labelConverter});

// Train model.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test accuracy.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
    .setLabelCol("indexedLabel")
    .setPredictionCol("prediction")
    .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Accuracy = " + accuracy);

FMClassificationModel fmModel = (FMClassificationModel)(model.stages()[2]);
System.out.println("Factors: " + fmModel.factors());
System.out.println("Linear: " + fmModel.linear());
System.out.println("Intercept: " + fmModel.intercept());
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaFMClassifierExample.java" 中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

注意:目前 SparkR 不支持特征缩放。

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a FM classification model
model <- spark.fmClassifier(training, label ~ features)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库的 "examples/src/main/r/ml/fmClassifier.R" 中查找完整的示例代码。

回归

线性回归

用于处理线性回归模型和模型汇总的接口与逻辑回归的情况类似。

当在具有恒定非零列的数据集上,使用 "l-bfgs" 求解器拟合没有截距的 LinearRegressionModel 时,Spark MLlib 会为恒定的非零列输出零系数。此行为与 R glmnet 相同,但与 LIBSVM 不同。

示例

以下示例演示了如何训练弹性网络正则化的线性回归模型并提取模型摘要统计信息。

有关参数的更多详细信息,请参见Python API 文档

from pyspark.ml.regression import LinearRegression

# Load training data
training = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
在 Spark 仓库的 "examples/src/main/python/ml/linear_regression_with_elastic_net.py" 中查找完整的示例代码。

有关参数的更多详细信息,请参见Scala API 文档

import org.apache.spark.ml.regression.LinearRegression

// Load training data
val training = spark.read.format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt")

val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for linear regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

// Summarize the model over the training set and print out some metrics
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionWithElasticNetExample.scala" 中查找完整的示例代码。

有关参数的更多详细信息,请参见Java API 文档

import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data.
Dataset<Row> training = spark.read().format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt");

LinearRegression lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8);

// Fit the model.
LinearRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for linear regression.
System.out.println("Coefficients: "
  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());

// Summarize the model over the training set and print out some metrics.
LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
System.out.println("numIterations: " + trainingSummary.totalIterations());
System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory()));
trainingSummary.residuals().show();
System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError());
System.out.println("r2: " + trainingSummary.r2());
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java" 中查找完整的示例代码。

有关参数的更多详细信息,请参见R API 文档

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a linear regression model
model <- spark.lm(training, label ~ features, regParam = 0.3, elasticNetParam = 0.8)

# Prediction
predictions <- predict(model, test)
head(predictions)

# Summarize
summary(model)
在 Spark 仓库的 "examples/src/main/r/ml/lm_with_elastic_net.R" 中查找完整的示例代码。

广义线性回归

与假设输出遵循高斯分布的线性回归相反,广义线性模型 (GLM) 是线性模型的规范,其中响应变量 $Y_i$ 遵循来自指数分布族的某个分布。Spark 的 GeneralizedLinearRegression 接口允许灵活地指定 GLM,该 GLM 可用于各种类型的预测问题,包括线性回归、泊松回归、逻辑回归等。目前在 spark.ml 中,仅支持指数分布族的一个子集,它们在 下面列出。

注意:Spark 目前通过其 GeneralizedLinearRegression 接口最多支持 4096 个特征,如果超过此约束,将抛出异常。 有关更多详细信息,请参阅高级部分。 尽管如此,对于线性和逻辑回归,可以使用 LinearRegressionLogisticRegression 评估器来训练具有更多特征的模型。

GLM 需要可以用其“规范”或“自然”形式编写的指数分布族,又名 自然指数分布族。自然指数分布族的形式给出如下:

\[f_Y(y|\theta, \tau) = h(y, \tau)\exp{\left( \frac{\theta \cdot y - A(\theta)}{d(\tau)} \right)}\]

其中 $\theta$ 是感兴趣的参数,$\tau$ 是离散参数。在 GLM 中,假设响应变量 $Y_i$ 从自然指数分布族中提取

\[Y_i \sim f\left(\cdot|\theta_i, \tau \right)\]

其中感兴趣的参数 $\theta_i$ 通过以下方式与响应变量 $\mu_i$ 的期望值相关:

\[\mu_i = A'(\theta_i)\]

这里,$A’(\theta_i)$ 由所选分布的形式定义。GLM 还允许指定链接函数,该链接函数定义了响应变量 $\mu_i$ 的期望值与所谓的线性预测变量 $\eta_i$ 之间的关系

\[g(\mu_i) = \eta_i = \vec{x_i}^T \cdot \vec{\beta}\]

通常,选择链接函数使得 $A’ = g^{-1}$,这产生了参数 $\theta$ 和线性预测变量 $\eta$ 之间简化的关系。 在这种情况下,链接函数 $g(\mu)$ 被称为“规范”链接函数。

\[\theta_i = A'^{-1}(\mu_i) = g(g^{-1}(\eta_i)) = \eta_i\]

GLM 找到使似然函数最大化的回归系数 $\vec{\beta}$。

\[\max_{\vec{\beta}} \mathcal{L}(\vec{\theta}|\vec{y},X) = \prod_{i=1}^{N} h(y_i, \tau) \exp{\left(\frac{y_i\theta_i - A(\theta_i)}{d(\tau)}\right)}\]

其中感兴趣的参数 $\theta_i$ 通过以下方式与回归系数 $\vec{\beta}$ 相关

\[\theta_i = A'^{-1}(g^{-1}(\vec{x_i} \cdot \vec{\beta}))\]

Spark 的广义线性回归接口还提供摘要统计信息,用于诊断 GLM 模型的拟合,包括残差、p 值、偏差、Akaike 信息准则等。

有关 GLM 及其应用的更全面回顾,请参见此处

可用族

响应类型 支持的链接
高斯 连续 恒等式*,对数,倒数
二项式 二进制 Logit*,Probit,CLogLog
泊松 计数 对数*,恒等式,Sqrt
伽马 连续 倒数*,恒等式,对数
Tweedie 零膨胀连续 幂链接函数
* 规范链接

示例

以下示例演示了如何训练具有高斯响应和恒等式链接函数的 GLM 并提取模型摘要统计信息。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.regression import GeneralizedLinearRegression

# Load training data
dataset = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")

glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)

# Fit the model
model = glr.fit(dataset)

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Summarize the model over the training set and print out some metrics
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()
在 Spark 仓库的 "examples/src/main/python/ml/generalized_linear_regression_example.py" 中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.regression.GeneralizedLinearRegression

// Load training data
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt")

val glr = new GeneralizedLinearRegression()
  .setFamily("gaussian")
  .setLink("identity")
  .setMaxIter(10)
  .setRegParam(0.3)

// Fit the model
val model = glr.fit(dataset)

// Print the coefficients and intercept for generalized linear regression model
println(s"Coefficients: ${model.coefficients}")
println(s"Intercept: ${model.intercept}")

// Summarize the model over the training set and print out some metrics
val summary = model.summary
println(s"Coefficient Standard Errors: ${summary.coefficientStandardErrors.mkString(",")}")
println(s"T Values: ${summary.tValues.mkString(",")}")
println(s"P Values: ${summary.pValues.mkString(",")}")
println(s"Dispersion: ${summary.dispersion}")
println(s"Null Deviance: ${summary.nullDeviance}")
println(s"Residual Degree Of Freedom Null: ${summary.residualDegreeOfFreedomNull}")
println(s"Deviance: ${summary.deviance}")
println(s"Residual Degree Of Freedom: ${summary.residualDegreeOfFreedom}")
println(s"AIC: ${summary.aic}")
println("Deviance Residuals: ")
summary.residuals().show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/GeneralizedLinearRegressionExample.scala" 中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import java.util.Arrays;

import org.apache.spark.ml.regression.GeneralizedLinearRegression;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionModel;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Load training data
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt");

GeneralizedLinearRegression glr = new GeneralizedLinearRegression()
  .setFamily("gaussian")
  .setLink("identity")
  .setMaxIter(10)
  .setRegParam(0.3);

// Fit the model
GeneralizedLinearRegressionModel model = glr.fit(dataset);

// Print the coefficients and intercept for generalized linear regression model
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());

// Summarize the model over the training set and print out some metrics
GeneralizedLinearRegressionTrainingSummary summary = model.summary();
System.out.println("Coefficient Standard Errors: "
  + Arrays.toString(summary.coefficientStandardErrors()));
System.out.println("T Values: " + Arrays.toString(summary.tValues()));
System.out.println("P Values: " + Arrays.toString(summary.pValues()));
System.out.println("Dispersion: " + summary.dispersion());
System.out.println("Null Deviance: " + summary.nullDeviance());
System.out.println("Residual Degree Of Freedom Null: " + summary.residualDegreeOfFreedomNull());
System.out.println("Deviance: " + summary.deviance());
System.out.println("Residual Degree Of Freedom: " + summary.residualDegreeOfFreedom());
System.out.println("AIC: " + summary.aic());
System.out.println("Deviance Residuals: ");
summary.residuals().show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaGeneralizedLinearRegressionExample.java" 中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7, 3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Model summary
summary(gaussianGLM)

# Prediction
gaussianPredictions <- predict(gaussianGLM, gaussianTestDF)
head(gaussianPredictions)

# Fit a generalized linear model with glm (R-compliant)
gaussianGLM2 <- glm(label ~ features, gaussianDF, family = "gaussian")
summary(gaussianGLM2)

# Fit a generalized linear model of family "binomial" with spark.glm
training2 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training2 <- transform(training2, label = cast(training2$label > 1, "integer"))
df_list2 <- randomSplit(training2, c(7, 3), 2)
binomialDF <- df_list2[[1]]
binomialTestDF <- df_list2[[2]]
binomialGLM <- spark.glm(binomialDF, label ~ features, family = "binomial")

# Model summary
summary(binomialGLM)

# Prediction
binomialPredictions <- predict(binomialGLM, binomialTestDF)
head(binomialPredictions)

# Fit a generalized linear model of family "tweedie" with spark.glm
training3 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
tweedieDF <- transform(training3, label = training3$label * exp(randn(10)))
tweedieGLM <- spark.glm(tweedieDF, label ~ features, family = "tweedie",
                        var.power = 1.2, link.power = 0)

# Model summary
summary(tweedieGLM)
在 Spark 仓库的 "examples/src/main/r/ml/glm.R" 中查找完整的示例代码。

决策树回归

决策树是分类和回归方法的常用族。有关 spark.ml 实现的更多信息,请在后面的 决策树部分中找到。

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用特征转换器对分类特征进行索引,从而将元数据添加到决策树算法可以识别的 DataFrame

有关参数的更多详细信息,请参见Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)
在 Spark 仓库的 "examples/src/main/python/ml/decision_tree_regression_example.py" 中查找完整的示例代码。

有关参数的更多详细信息,请参见Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Here, we treat features with > 4 distinct values as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a DecisionTree model.
val dt = new DecisionTreeRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")

// Chain indexer and tree in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, dt))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
println(s"Learned regression tree model:\n ${treeModel.toDebugString}")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeRegressionExample.scala" 中查找完整的示例代码。

有关参数的更多详细信息,请参见Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load the data stored in LIBSVM format as a DataFrame.
Dataset<Row> data = spark.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a DecisionTree model.
DecisionTreeRegressor dt = new DecisionTreeRegressor()
  .setFeaturesCol("indexedFeatures");

// Chain indexer and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[]{featureIndexer, dt});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

DecisionTreeRegressionModel treeModel =
  (DecisionTreeRegressionModel) (model.stages()[1]);
System.out.println("Learned regression tree model:\n" + treeModel.toDebugString());
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeRegressionExample.java" 中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a DecisionTree regression model with spark.decisionTree
model <- spark.decisionTree(training, label ~ features, "regression")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库的 "examples/src/main/r/ml/decisionTree.R" 中查找完整的示例代码。

随机森林回归

随机森林是分类和回归方法的常用族。有关 spark.ml 实现的更多信息,请在后面的 随机森林部分中找到。

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用特征转换器对分类特征进行索引,从而将元数据添加到基于树的算法可以识别的 DataFrame

有关更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only
在 Spark 仓库的 "examples/src/main/python/ml/random_forest_regressor_example.py" 中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")

// Chain indexer and forest in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, rf))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/RandomForestRegressorExample.scala" 中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.RandomForestRegressionModel;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a RandomForest model.
RandomForestRegressor rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures");

// Chain indexer and forest in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {featureIndexer, rf});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

RandomForestRegressionModel rfModel = (RandomForestRegressionModel)(model.stages()[1]);
System.out.println("Learned regression forest model:\n" + rfModel.toDebugString());
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample.java" 中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a random forest regression model with spark.randomForest
model <- spark.randomForest(training, label ~ features, "regression", numTrees = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库中的“examples/src/main/r/ml/randomForest.R”中查找完整的示例代码。

梯度提升树回归

梯度提升树 (GBT) 是一种流行的回归方法,它使用决策树的集成。有关 spark.ml 实现的更多信息,可以在有关 GBT 的部分中找到。

示例

注意:对于此示例数据集,GBTRegressor 实际上只需要 1 次迭代,但这通常是不正确的。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)  # summary only
在 Spark 仓库的 "examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py" 中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)

// Chain indexer and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, gbt))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println(s"Learned regression GBT model:\n ${gbtModel.toDebugString}")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeRegressorExample.scala" 中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a GBT model.
GBTRegressor gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Chain indexer and GBT in a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeRegressorExample.java" 中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a GBT regression model with spark.gbt
model <- spark.gbt(training, label ~ features, "regression", maxIter = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库中的“examples/src/main/r/ml/gbt.R”中查找完整的示例代码。

生存回归

spark.ml 中,我们实现了 加速失效时间 (AFT) 模型,它是一种用于删失数据的参数生存回归模型。它描述了生存时间对数的模型,因此通常称为生存分析的对数线性模型。与为此目的设计的比例风险模型不同,AFT 模型更容易并行化,因为每个实例独立地对目标函数做出贡献。

给定协变量 $x^{‘}$ 的值,对于 subjects i = 1, …, n 的随机生存时间 $t_{i}$,可能存在右删失,AFT 模型下的似然函数如下:\[ L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} \] 其中 $\delta_{i}$ 是事件是否发生的指标,即是否被删失。 使用 $\epsilon_{i}=\frac{\log{t_{i}}-x^{‘}\beta}{\sigma}$,则对数似然函数的形式为:\[ \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+\delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] \] 其中 $S_{0}(\epsilon_{i})$ 是基线生存函数,而 $f_{0}(\epsilon_{i})$ 是相应的密度函数。

最常用的 AFT 模型基于生存时间的 Weibull 分布。生存时间的 Weibull 分布对应于生存时间对数的极值分布,并且 $S_{0}(\epsilon)$ 函数为:\[ S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) \] $f_{0}(\epsilon_{i})$ 函数为:\[ f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) \] Weibull 分布生存时间的 AFT 模型的对数似然函数为:\[ \iota(\beta,\sigma)= -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] \] 由于最小化负对数似然等同于最大后验概率,因此我们用于优化的损失函数是 $-\iota(\beta,\sigma)$。 $\beta$ 和 $\log\sigma$ 的梯度函数分别为:\[ \frac{\partial (-\iota)}{\partial \beta}=\sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \] \[ \frac{\partial (-\iota)}{\partial (\log\sigma)}=\sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}] \]

AFT 模型可以被公式化为一个凸优化问题,即寻找凸函数 $-\iota(\beta,\sigma)$ 的最小值,该凸函数取决于系数向量 $\beta$ 和尺度参数的对数 $\log\sigma$。实现所基于的优化算法是 L-BFGS。 该实现与 R 的生存函数 survreg 的结果相匹配

当在具有常数非零列的数据集上拟合不带截距的 AFTSurvivalRegressionModel 时,Spark MLlib 会为常数非零列输出零系数。 此行为与 R survival::survreg 不同。

示例

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors

training = spark.createDataFrame([
    (1.218, 1.0, Vectors.dense(1.560, -0.605)),
    (2.949, 0.0, Vectors.dense(0.346, 2.158)),
    (3.627, 0.0, Vectors.dense(1.380, 0.231)),
    (0.273, 1.0, Vectors.dense(0.520, 1.151)),
    (4.199, 0.0, Vectors.dense(0.795, -0.226))], ["label", "censor", "features"])
quantileProbabilities = [0.3, 0.6]
aft = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities,
                            quantilesCol="quantiles")

model = aft.fit(training)

# Print the coefficients, intercept and scale parameter for AFT survival regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
print("Scale: " + str(model.scale))
model.transform(training).show(truncate=False)
在 Spark 仓库的 "examples/src/main/python/ml/aft_survival_regression.py" 中查找完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.AFTSurvivalRegression

val training = spark.createDataFrame(Seq(
  (1.218, 1.0, Vectors.dense(1.560, -0.605)),
  (2.949, 0.0, Vectors.dense(0.346, 2.158)),
  (3.627, 0.0, Vectors.dense(1.380, 0.231)),
  (0.273, 1.0, Vectors.dense(0.520, 1.151)),
  (4.199, 0.0, Vectors.dense(0.795, -0.226))
)).toDF("label", "censor", "features")
val quantileProbabilities = Array(0.3, 0.6)
val aft = new AFTSurvivalRegression()
  .setQuantileProbabilities(quantileProbabilities)
  .setQuantilesCol("quantiles")

val model = aft.fit(training)

// Print the coefficients, intercept and scale parameter for AFT survival regression
println(s"Coefficients: ${model.coefficients}")
println(s"Intercept: ${model.intercept}")
println(s"Scale: ${model.scale}")
model.transform(training).show(false)
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/AFTSurvivalRegressionExample.scala" 中查找完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.regression.AFTSurvivalRegression;
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(1.218, 1.0, Vectors.dense(1.560, -0.605)),
  RowFactory.create(2.949, 0.0, Vectors.dense(0.346, 2.158)),
  RowFactory.create(3.627, 0.0, Vectors.dense(1.380, 0.231)),
  RowFactory.create(0.273, 1.0, Vectors.dense(0.520, 1.151)),
  RowFactory.create(4.199, 0.0, Vectors.dense(0.795, -0.226))
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("censor", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(data, schema);
double[] quantileProbabilities = new double[]{0.3, 0.6};
AFTSurvivalRegression aft = new AFTSurvivalRegression()
  .setQuantileProbabilities(quantileProbabilities)
  .setQuantilesCol("quantiles");

AFTSurvivalRegressionModel model = aft.fit(training);

// Print the coefficients, intercept and scale parameter for AFT survival regression
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());
System.out.println("Scale: " + model.scale());
model.transform(training).show(false);
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java" 中查找完整的示例代码。

有关更多详细信息,请参阅 R API 文档

# Use the ovarian dataset available in R survival package
library(survival)

# Fit an accelerated failure time (AFT) survival regression model with spark.survreg
ovarianDF <- suppressWarnings(createDataFrame(ovarian))
aftDF <- ovarianDF
aftTestDF <- ovarianDF
aftModel <- spark.survreg(aftDF, Surv(futime, fustat) ~ ecog_ps + rx)

# Model summary
summary(aftModel)

# Prediction
aftPredictions <- predict(aftModel, aftTestDF)
head(aftPredictions)
在 Spark 仓库的 "examples/src/main/r/ml/survreg.R" 中查找完整的示例代码。

保序回归

保序回归属于回归算法系列。 形式上,保序回归是指给定一组有限的实数 $Y = {y_1, y_2, ..., y_n}$ 表示观察到的响应,并且 $X = {x_1, x_2, ..., x_n}$ 表示要拟合的未知响应值,找到一个最小化以下公式的函数:

\begin{equation} f(x) = \sum_{i=1}^n w_i (y_i - x_i)^2 \end{equation}

相对于完全排序,受到 $x_1\le x_2\le ...\le x_n$ 的约束,其中 $w_i$ 是正权重。 结果函数称为保序回归,它是唯一的。 它可以被视为在顺序约束下的最小二乘问题。 本质上,保序回归是一个 单调函数,最适合原始数据点。

我们实现了一个 pool adjacent violators 算法,该算法使用一种 并行化保序回归的方法。 训练输入是一个 DataFrame,其中包含三列:标签、特征和权重。 此外,IsotonicRegression 算法有一个可选参数,称为 $isotonic$,默认为 true。 此参数指定保序回归是保序的(单调递增)还是反序的(单调递减)。

训练返回一个 IsotonicRegressionModel,可用于预测已知和未知特征的标签。 保序回归的结果被视为分段线性函数。 因此,预测规则如下

示例

有关 API 的更多详细信息,请参阅 IsotonicRegression Python 文档

from pyspark.ml.regression import IsotonicRegression

# Loads data.
dataset = spark.read.format("libsvm")\
    .load("data/mllib/sample_isotonic_regression_libsvm_data.txt")

# Trains an isotonic regression model.
model = IsotonicRegression().fit(dataset)
print("Boundaries in increasing order: %s\n" % str(model.boundaries))
print("Predictions associated with the boundaries: %s\n" % str(model.predictions))

# Makes predictions.
model.transform(dataset).show()
在 Spark 仓库的 "examples/src/main/python/ml/isotonic_regression_example.py" 中查找完整的示例代码。

有关 API 的详细信息,请参阅 IsotonicRegression Scala 文档

import org.apache.spark.ml.regression.IsotonicRegression

// Loads data.
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_isotonic_regression_libsvm_data.txt")

// Trains an isotonic regression model.
val ir = new IsotonicRegression()
val model = ir.fit(dataset)

println(s"Boundaries in increasing order: ${model.boundaries}\n")
println(s"Predictions associated with the boundaries: ${model.predictions}\n")

// Makes predictions.
model.transform(dataset).show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/IsotonicRegressionExample.scala" 中查找完整的示例代码。

有关 API 的详细信息,请参阅 IsotonicRegression Java 文档

import org.apache.spark.ml.regression.IsotonicRegression;
import org.apache.spark.ml.regression.IsotonicRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_isotonic_regression_libsvm_data.txt");

// Trains an isotonic regression model.
IsotonicRegression ir = new IsotonicRegression();
IsotonicRegressionModel model = ir.fit(dataset);

System.out.println("Boundaries in increasing order: " + model.boundaries() + "\n");
System.out.println("Predictions associated with the boundaries: " + model.predictions() + "\n");

// Makes predictions.
model.transform(dataset).show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaIsotonicRegressionExample.java" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 IsotonicRegression R API 文档

# Load training data
df <- read.df("data/mllib/sample_isotonic_regression_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit an isotonic regression model with spark.isoreg
model <- spark.isoreg(training, label ~ features, isotonic = FALSE)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库的 "examples/src/main/r/ml/isoreg.R" 中查找完整的示例代码。

因子分解机回归器

有关因子分解机的更多背景信息和更多详细信息,请参阅 因子分解机部分

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们将特征缩放到 0 到 1 之间,以防止梯度爆炸问题。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import FMRegressor
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a FM model.
fm = FMRegressor(featuresCol="scaledFeatures", stepSize=0.001)

# Create a Pipeline.
pipeline = Pipeline(stages=[featureScaler, fm])

# Train model.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

fmModel = model.stages[1]
print("Factors: " + str(fmModel.factors))  # type: ignore
print("Linear: " + str(fmModel.linear))  # type: ignore
print("Intercept: " + str(fmModel.intercept))  # type: ignore
在 Spark 仓库的 "examples/src/main/python/ml/fm_regressor_example.py" 中查找完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.regression.{FMRegressionModel, FMRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Scale features.
val featureScaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a FM model.
val fm = new FMRegressor()
  .setLabelCol("label")
  .setFeaturesCol("scaledFeatures")
  .setStepSize(0.001)

// Create a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureScaler, fm))

// Train model.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val fmModel = model.stages(1).asInstanceOf[FMRegressionModel]
println(s"Factors: ${fmModel.factors} Linear: ${fmModel.linear} " +
  s"Intercept: ${fmModel.intercept}")
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/FMRegressorExample.scala" 中查找完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.regression.FMRegressionModel;
import org.apache.spark.ml.regression.FMRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Scale features.
MinMaxScalerModel featureScaler = new MinMaxScaler()
    .setInputCol("features")
    .setOutputCol("scaledFeatures")
    .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a FM model.
FMRegressor fm = new FMRegressor()
    .setLabelCol("label")
    .setFeaturesCol("scaledFeatures")
    .setStepSize(0.001);

// Create a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureScaler, fm});

// Train model.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

FMRegressionModel fmModel = (FMRegressionModel)(model.stages()[1]);
System.out.println("Factors: " + fmModel.factors());
System.out.println("Linear: " + fmModel.linear());
System.out.println("Intercept: " + fmModel.intercept());
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaFMRegressorExample.java" 中查找完整的示例代码。

有关更多详细信息,请参阅 R API 文档

注意:目前 SparkR 不支持特征缩放。

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training_test <- randomSplit(df, c(0.7, 0.3))
training <- training_test[[1]]
test <- training_test[[2]]

# Fit a FM regression model
model <- spark.fmRegressor(training, label ~ features)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 仓库的 "examples/src/main/r/ml/fmRegressor.R" 中查找完整的示例代码。

线性方法

我们实现了流行的线性方法,例如具有 $L_1$ 或 $L_2$ 正则化的 Logistic 回归和线性最小二乘法。 有关实现和调整的详细信息,请参阅 基于 RDD 的 API 的线性方法指南;此信息仍然相关。

我们还包括用于 Elastic net 的 DataFrame API,它是 Zou 等人,《Regularization and variable selection via the elastic net》中提出的 $L_1$ 和 $L_2$ 正则化的混合。 在数学上,它被定义为 $L_1$ 和 $L_2$ 正则化项的凸组合:\[ \alpha \left( \lambda \|\wv\|_1 \right) + (1-\alpha) \left( \frac{\lambda}{2}\|\wv\|_2^2 \right) , \alpha \in [0, 1], \lambda \geq 0 \] 通过正确设置 $\alpha$,Elastic net 包含 $L_1$ 和 $L_2$ 正则化作为特例。 例如,如果使用设置为 $1$ 的 Elastic net 参数 $\alpha$ 训练 线性回归模型,则它等效于 Lasso 模型。 另一方面,如果 $\alpha$ 设置为 $0$,则训练后的模型将简化为 岭回归模型。 我们为具有 Elastic net 正则化的线性回归和 Logistic 回归实现 Pipelines API。

因子分解机

因子分解机即使在具有巨大稀疏性的问题中(如广告和推荐系统)也能够估计特征之间的交互。spark.ml 实现支持用于二元分类和回归的因子分解机。

因子分解机公式为

\[\hat{y} = w_0 + \sum\limits^n_{i-1} w_i x_i + \sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j\]

前两个术语表示截距和线性项(与线性回归中相同),最后一个术语表示成对交互项。 \(v_i\) 用 k 个因子描述第 i 个变量。

FM 可用于回归,优化标准是均方误差。 FM 也可以通过 Sigmoid 函数用于二元分类。 优化标准是 Logistic 损失。

成对交互可以重新表述为

\[\sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j = \frac{1}{2}\sum\limits^k_{f=1} \left(\left( \sum\limits^n_{i=1}v_{i,f}x_i \right)^2 - \sum\limits^n_{i=1}v_{i,f}^2x_i^2 \right)\]

该等式在 k 和 n 中都只有线性复杂度 - 即其计算复杂度为 \(O(kn)\)。

通常,为了防止梯度爆炸问题,最好将连续特征缩放到 0 和 1 之间,或者对连续特征进行分箱并进行 One-Hot 编码。

决策树

决策树及其集成是用于分类和回归的机器学习任务的流行方法。 决策树被广泛使用,因为它们易于解释、处理分类特征、扩展到多类分类设置、不需要特征缩放,并且能够捕获非线性特征和特征交互。 诸如随机森林和 Boosting 之类的树集成算法是分类和回归任务中最出色的算法之一。

spark.ml 实现支持使用连续和分类特征的二元和多类分类以及回归的决策树。 该实现按行划分数据,从而允许使用数百万甚至数十亿个实例进行分布式训练。

用户可以在 MLlib 决策树指南中找到有关决策树算法的更多信息。 此 API 与 原始 MLlib 决策树 API 之间的主要区别是

用于决策树的 Pipelines API 比原始 API 提供了更多的功能。 特别是,对于分类,用户可以获得每个类的预测概率(也称为类条件概率);对于回归,用户可以获得预测的偏差样本方差。

树的集成(随机森林和梯度提升树)在下面的树的集成章节中描述。

输入和输出

我们在此列出输入和输出(预测)列的类型。所有输出列都是可选的;要排除一个输出列,请将其对应的Param设置为空字符串。

输入列

Param 名称 类型 默认值 描述
labelCol Double "label" 要预测的标签
featuresCol Vector "features" 特征向量

输出列

Param 名称 类型 默认值 描述 注释
predictionCol Double "prediction" 预测标签
rawPredictionCol Vector "rawPrediction" 长度为# 类的向量,包含在做出预测的树节点上训练实例标签的计数 仅限分类
probabilityCol Vector "probability" 长度为# 类的向量,等于归一化为多项式分布的 rawPrediction 仅限分类
varianceCol Double 预测的有偏样本方差 仅限回归

树集成

DataFrame API支持两种主要的树集成算法:随机森林梯度提升树(GBT)。两者都使用spark.ml 决策树作为其基本模型。

用户可以在MLlib集成指南中找到关于集成算法的更多信息。在本节中,我们将演示用于集成的DataFrame API。

此API和原始MLlib集成API之间的主要区别是

随机森林

随机森林决策树的集合。 随机森林结合了许多决策树,以降低过度拟合的风险。 spark.ml实现支持使用连续和分类特征的二元和多类分类以及回归的随机森林。

有关算法本身的更多信息,请参阅spark.mllib关于随机森林的文档

输入和输出

我们在此列出输入和输出(预测)列的类型。所有输出列都是可选的;要排除一个输出列,请将其对应的Param设置为空字符串。

输入列

Param 名称 类型 默认值 描述
labelCol Double "label" 要预测的标签
featuresCol Vector "features" 特征向量

输出列(预测)

Param 名称 类型 默认值 描述 注释
predictionCol Double "prediction" 预测标签
rawPredictionCol Vector "rawPrediction" 长度为# 类的向量,包含在做出预测的树节点上训练实例标签的计数 仅限分类
probabilityCol Vector "probability" 长度为# 类的向量,等于归一化为多项式分布的 rawPrediction 仅限分类

梯度提升树 (GBT)

梯度提升树 (GBT)决策树 的集合。 GBT迭代地训练决策树以最小化损失函数。spark.ml 实现支持使用连续和分类特征的二元分类和回归的 GBT。

有关算法本身的更多信息,请参阅 spark.mllib 关于 GBT 的文档

输入和输出

我们在此列出输入和输出(预测)列的类型。所有输出列都是可选的;要排除一个输出列,请将其对应的Param设置为空字符串。

输入列

Param 名称 类型 默认值 描述
labelCol Double "label" 要预测的标签
featuresCol Vector "features" 特征向量

请注意,GBTClassifier 当前仅支持二元标签。

输出列(预测)

Param 名称 类型 默认值 描述 注释
predictionCol Double "prediction" 预测标签

将来,GBTClassifier 也会像 RandomForestClassifier 一样,输出 rawPredictionprobability 列。