分类与回归

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

本页介绍了分类和回归算法。它还包括讨论特定算法类别的部分,例如线性方法、树和集成。

目录

分类

逻辑回归

逻辑回归是一种预测分类响应的常用方法。它是广义线性模型的特例,用于预测结果的概率。在 spark.ml 中,逻辑回归可以通过使用二元逻辑回归来预测二元结果,也可以通过使用多项逻辑回归来预测多类别结果。使用 family 参数在这两种算法之间进行选择,或者将其保留为未设置状态,Spark 将推断出正确的变体。

可以通过将 family 参数设置为“multinomial”来将多项逻辑回归用于二元分类。它将生成两组系数和两个截距。

当在具有常数非零列的数据集上拟合没有截距的 LogisticRegressionModel 时,Spark MLlib 会为常数非零列输出零系数。此行为与 R glmnet 相同,但与 LIBSVM 不同。

二元逻辑回归

有关二元逻辑回归实现的更多背景和详细信息,请参阅spark.mllib 中逻辑回归的文档

示例

以下示例显示了如何使用弹性网络正则化训练用于二元分类的二元和多项逻辑回归模型。elasticNetParam 对应于 $\alpha$,regParam 对应于 $\lambda$。

有关参数的更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))
在 Spark 存储库的“examples/src/main/python/ml/logistic_regression_with_elastic_net.py”中查找完整的示例代码。

有关参数的更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.LogisticRegression

// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for logistic regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

// We can also use the multinomial family for binary classification
val mlr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
  .setFamily("multinomial")

val mlrModel = mlr.fit(training)

// Print the coefficients and intercepts for logistic regression with multinomial family
println(s"Multinomial coefficients: ${mlrModel.coefficientMatrix}")
println(s"Multinomial intercepts: ${mlrModel.interceptVector}")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionWithElasticNetExample.scala”中查找完整的示例代码。

有关参数的更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> training = spark.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8);

// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for logistic regression
System.out.println("Coefficients: "
  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());

// We can also use the multinomial family for binary classification
LogisticRegression mlr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.3)
        .setElasticNetParam(0.8)
        .setFamily("multinomial");

// Fit the model
LogisticRegressionModel mlrModel = mlr.fit(training);

// Print the coefficients and intercepts for logistic regression with multinomial family
System.out.println("Multinomial coefficients: " + lrModel.coefficientMatrix()
  + "\nMultinomial intercepts: " + mlrModel.interceptVector());
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionWithElasticNetExample.java”中查找完整的示例代码。

有关参数的更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit an binomial logistic regression model with spark.logit
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/logit.R”中查找完整的示例代码。

逻辑回归的 spark.ml 实现还支持提取训练集上模型的摘要。请注意,存储为 LogisticRegressionSummary 中的 DataFrame 的预测和指标都标注了 @transient,因此只能在驱动程序上使用。

LogisticRegressionTrainingSummary 提供了 LogisticRegressionModel 的摘要。在二元分类的情况下,可以使用某些其他指标,例如 ROC 曲线。请参阅 BinaryLogisticRegressionTrainingSummary

继续前面的例子

from pyspark.ml.classification import LogisticRegression

# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)
在 Spark 存储库的“examples/src/main/python/ml/logistic_regression_summary_example.py”中查找完整的示例代码。

LogisticRegressionTrainingSummary 提供了 LogisticRegressionModel 的摘要。在二元分类的情况下,可以使用某些其他指标,例如 ROC 曲线。可以通过 binarySummary 方法访问二元摘要。请参阅 BinaryLogisticRegressionTrainingSummary

继续前面的例子

import org.apache.spark.ml.classification.LogisticRegression

// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
val trainingSummary = lrModel.binarySummary

// Obtain the objective per iteration.
val objectiveHistory = trainingSummary.objectiveHistory
println("objectiveHistory:")
objectiveHistory.foreach(loss => println(loss))

// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
val roc = trainingSummary.roc
roc.show()
println(s"areaUnderROC: ${trainingSummary.areaUnderROC}")

// Set the model threshold to maximize F-Measure
val fMeasure = trainingSummary.fMeasureByThreshold
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
  .select("threshold").head().getDouble(0)
lrModel.setThreshold(bestThreshold)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionSummaryExample.scala”中查找完整的示例代码。

LogisticRegressionTrainingSummary 提供了 LogisticRegressionModel 的摘要。在二元分类的情况下,可以使用某些其他指标,例如 ROC 曲线。可以通过 binarySummary 方法访问二元摘要。请参阅 BinaryLogisticRegressionTrainingSummary

继续前面的例子

import org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
BinaryLogisticRegressionTrainingSummary trainingSummary = lrModel.binarySummary();

// Obtain the loss per iteration.
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
  System.out.println(lossPerIteration);
}

// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
Dataset<Row> roc = trainingSummary.roc();
roc.show();
roc.select("FPR").show();
System.out.println(trainingSummary.areaUnderROC());

// Get the threshold corresponding to the maximum F-Measure and rerun LogisticRegression with
// this selected threshold.
Dataset<Row> fMeasure = trainingSummary.fMeasureByThreshold();
double maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0);
double bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure))
  .select("threshold").head().getDouble(0);
lrModel.setThreshold(bestThreshold);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java”中查找完整的示例代码。

多项逻辑回归

多类别分类通过多项逻辑(softmax)回归得到支持。在多项逻辑回归中,算法生成 $K$ 组系数,或维度为 $K \times J$ 的矩阵,其中 $K$ 是结果类别的数量,$J$ 是特征的数量。如果算法拟合了截距项,则可以使用长度为 $K$ 的截距向量。

多项系数可作为 coefficientMatrix 使用,截距可作为 interceptVector 使用。

不支持对使用多项族训练的逻辑回归模型使用 coefficientsintercept 方法。请改用 coefficientMatrixinterceptVector

结果类别 $k \in {1, 2, …, K}$ 的条件概率使用 softmax 函数建模。

\[ P(Y=k|\mathbf{X}, \boldsymbol{\beta}_k, \beta_{0k}) = \frac{e^{\boldsymbol{\beta}_k \cdot \mathbf{X} + \beta_{0k}}}{\sum_{k'=0}^{K-1} e^{\boldsymbol{\beta}_{k'} \cdot \mathbf{X} + \beta_{0k'}}} \]

我们使用多项响应模型最小化加权负对数似然,并使用弹性网络惩罚来控制过拟合。

\[ \min_{\beta, \beta_0} -\left[\sum_{i=1}^L w_i \cdot \log P(Y = y_i|\mathbf{x}_i)\right] + \lambda \left[\frac{1}{2}\left(1 - \alpha\right)||\boldsymbol{\beta}||_2^2 + \alpha ||\boldsymbol{\beta}||_1\right] \]

有关详细推导,请参见此处

示例

以下示例显示了如何使用弹性网络正则化训练多类别逻辑回归模型,以及如何提取多类别训练摘要以评估模型。

from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark \
    .read \
    .format("libsvm") \
    .load("data/mllib/sample_multiclass_classification_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
在 Spark 存储库的“examples/src/main/python/ml/multiclass_logistic_regression_with_elastic_net.py”中查找完整的示例代码。
import org.apache.spark.ml.classification.LogisticRegression

// Load training data
val training = spark
  .read
  .format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for multinomial logistic regression
println(s"Coefficients: \n${lrModel.coefficientMatrix}")
println(s"Intercepts: \n${lrModel.interceptVector}")

val trainingSummary = lrModel.summary

// Obtain the objective per iteration
val objectiveHistory = trainingSummary.objectiveHistory
println("objectiveHistory:")
objectiveHistory.foreach(println)

// for multiclass, we can inspect metrics on a per-label basis
println("False positive rate by label:")
trainingSummary.falsePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
  println(s"label $label: $rate")
}

println("True positive rate by label:")
trainingSummary.truePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
  println(s"label $label: $rate")
}

println("Precision by label:")
trainingSummary.precisionByLabel.zipWithIndex.foreach { case (prec, label) =>
  println(s"label $label: $prec")
}

println("Recall by label:")
trainingSummary.recallByLabel.zipWithIndex.foreach { case (rec, label) =>
  println(s"label $label: $rec")
}


println("F-measure by label:")
trainingSummary.fMeasureByLabel.zipWithIndex.foreach { case (f, label) =>
  println(s"label $label: $f")
}

val accuracy = trainingSummary.accuracy
val falsePositiveRate = trainingSummary.weightedFalsePositiveRate
val truePositiveRate = trainingSummary.weightedTruePositiveRate
val fMeasure = trainingSummary.weightedFMeasure
val precision = trainingSummary.weightedPrecision
val recall = trainingSummary.weightedRecall
println(s"Accuracy: $accuracy\nFPR: $falsePositiveRate\nTPR: $truePositiveRate\n" +
  s"F-measure: $fMeasure\nPrecision: $precision\nRecall: $recall")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/MulticlassLogisticRegressionWithElasticNetExample.scala”中查找完整的示例代码。
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> training = spark.read().format("libsvm")
        .load("data/mllib/sample_multiclass_classification_data.txt");

LogisticRegression lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.3)
        .setElasticNetParam(0.8);

// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for multinomial logistic regression
System.out.println("Coefficients: \n"
        + lrModel.coefficientMatrix() + " \nIntercept: " + lrModel.interceptVector());
LogisticRegressionTrainingSummary trainingSummary = lrModel.summary();

// Obtain the loss per iteration.
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
    System.out.println(lossPerIteration);
}

// for multiclass, we can inspect metrics on a per-label basis
System.out.println("False positive rate by label:");
int i = 0;
double[] fprLabel = trainingSummary.falsePositiveRateByLabel();
for (double fpr : fprLabel) {
    System.out.println("label " + i + ": " + fpr);
    i++;
}

System.out.println("True positive rate by label:");
i = 0;
double[] tprLabel = trainingSummary.truePositiveRateByLabel();
for (double tpr : tprLabel) {
    System.out.println("label " + i + ": " + tpr);
    i++;
}

System.out.println("Precision by label:");
i = 0;
double[] precLabel = trainingSummary.precisionByLabel();
for (double prec : precLabel) {
    System.out.println("label " + i + ": " + prec);
    i++;
}

System.out.println("Recall by label:");
i = 0;
double[] recLabel = trainingSummary.recallByLabel();
for (double rec : recLabel) {
    System.out.println("label " + i + ": " + rec);
    i++;
}

System.out.println("F-measure by label:");
i = 0;
double[] fLabel = trainingSummary.fMeasureByLabel();
for (double f : fLabel) {
    System.out.println("label " + i + ": " + f);
    i++;
}

double accuracy = trainingSummary.accuracy();
double falsePositiveRate = trainingSummary.weightedFalsePositiveRate();
double truePositiveRate = trainingSummary.weightedTruePositiveRate();
double fMeasure = trainingSummary.weightedFMeasure();
double precision = trainingSummary.weightedPrecision();
double recall = trainingSummary.weightedRecall();
System.out.println("Accuracy: " + accuracy);
System.out.println("FPR: " + falsePositiveRate);
System.out.println("TPR: " + truePositiveRate);
System.out.println("F-measure: " + fMeasure);
System.out.println("Precision: " + precision);
System.out.println("Recall: " + recall);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaMulticlassLogisticRegressionWithElasticNetExample.java”中查找完整的示例代码。

有关参数的更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a multinomial logistic regression model with spark.logit
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/logit.R”中查找完整的示例代码。

决策树分类器

决策树是一系列流行的分类和回归方法。您可以在决策树部分中找到有关 spark.ml 实现的更多信息。

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用两个特征转换器来准备数据;这些转换器有助于为标签和分类特征对类别进行索引,将元数据添加到决策树算法可以识别的 DataFrame 中。

有关参数的更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)
在 Spark 存储库的“examples/src/main/python/ml/decision_tree_classification_example.py”中查找完整的示例代码。

有关参数的更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Chain indexers and tree in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${(1.0 - accuracy)}")

val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println(s"Learned classification tree model:\n ${treeModel.toDebugString}")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeClassificationExample.scala”中查找完整的示例代码。

有关参数的更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.DecisionTreeClassifier;
import org.apache.spark.ml.classification.DecisionTreeClassificationModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load the data stored in LIBSVM format as a DataFrame.
Dataset<Row> data = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);

// Automatically identify categorical features, and index them.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a DecisionTree model.
DecisionTreeClassifier dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures");

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray()[0]);

// Chain indexers and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

DecisionTreeClassificationModel treeModel =
  (DecisionTreeClassificationModel) (model.stages()[2]);
System.out.println("Learned classification tree model:\n" + treeModel.toDebugString());
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a DecisionTree classification model with spark.decisionTree
model <- spark.decisionTree(training, label ~ features, "classification")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/decisionTree.R”中查找完整的示例代码。

随机森林分类器

随机森林是一系列流行的分类和回归方法。您可以在随机森林部分中找到有关 spark.ml 实现的更多信息。

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用两个特征转换器来准备数据;这些转换器有助于为标签和分类特征对类别进行索引,将元数据添加到基于树的算法可以识别的 DataFrame 中。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only
在 Spark 存储库的“examples/src/main/python/ml/random_forest_classifier_example.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setNumTrees(10)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Chain indexers and forest in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${(1.0 - accuracy)}")

val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/RandomForestClassifierExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.RandomForestClassificationModel;
import org.apache.spark.ml.classification.RandomForestClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a RandomForest model.
RandomForestClassifier rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures");

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray()[0]);

// Chain indexers and forest in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, rf, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

RandomForestClassificationModel rfModel = (RandomForestClassificationModel)(model.stages()[2]);
System.out.println("Learned classification forest model:\n" + rfModel.toDebugString());
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestClassifierExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a random forest classification model with spark.randomForest
model <- spark.randomForest(training, label ~ features, "classification", numTrees = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/randomForest.R”中查找完整的示例代码。

梯度提升树分类器

梯度提升树 (GBT) 是一种流行的分类和回归方法,它使用决策树的集合。您可以在GBT 部分中找到有关 spark.ml 实现的更多信息。

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用两个特征转换器来准备数据;这些转换器有助于为标签和分类特征对类别进行索引,将元数据添加到基于树的算法可以识别的 DataFrame 中。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only
在 Spark 存储库的“examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)
  .setFeatureSubsetStrategy("auto")

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Chain indexers and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${1.0 - accuracy}")

val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]
println(s"Learned classification GBT model:\n ${gbtModel.toDebugString}")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeClassifierExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.GBTClassificationModel;
import org.apache.spark.ml.classification.GBTClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a GBT model.
GBTClassifier gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray()[0]);

// Chain indexers and GBT in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, gbt, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

GBTClassificationModel gbtModel = (GBTClassificationModel)(model.stages()[2]);
System.out.println("Learned classification GBT model:\n" + gbtModel.toDebugString());
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeClassifierExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a GBT classification model with spark.gbt
model <- spark.gbt(training, label ~ features, "classification", maxIter = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/gbt.R”中查找完整的示例代码。

多层感知器分类器

多层感知器分类器 (MLPC) 是一种基于前馈人工神经网络的分类器。MLPC 由多层节点组成。网络中每一层都完全连接到下一层。输入层中的节点表示输入数据。所有其他节点都通过节点权重 $\wv$ 和偏差 $\bv$ 的线性组合将输入映射到输出,并应用激活函数。这可以用矩阵形式表示为具有 $K+1$ 层的 MLPC,如下所示:\[ \mathrm{y}(\x) = \mathrm{f_K}(...\mathrm{f_2}(\wv_2^T\mathrm{f_1}(\wv_1^T \x+b_1)+b_2)...+b_K) \] 中间层中的节点使用 sigmoid(逻辑)函数:\[ \mathrm{f}(z_i) = \frac{1}{1 + e^{-z_i}} \] 输出层中的节点使用 softmax 函数:\[ \mathrm{f}(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}} \] 输出层中的节点数 $N$ 对应于类的数量。

MLPC 采用反向传播来学习模型。我们使用逻辑损失函数进行优化,并使用 L-BFGS 作为优化例程。

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm")\
    .load("data/mllib/sample_multiclass_classification_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
在 Spark 存储库的“examples/src/main/python/ml/multilayer_perceptron_classification.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")

// Split the data into train and test
val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
val train = splits(0)
val test = splits(1)

// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)

// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100)

// train the model
val model = trainer.fit(train)

// compute accuracy on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy")

println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/MultilayerPerceptronClassifierExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel;
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;

// Load training data
String path = "data/mllib/sample_multiclass_classification_data.txt";
Dataset<Row> dataFrame = spark.read().format("libsvm").load(path);

// Split the data into train and test
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];

// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
int[] layers = new int[] {4, 5, 4, 3};

// create the trainer and set its parameters
MultilayerPerceptronClassifier trainer = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100);

// train the model
MultilayerPerceptronClassificationModel model = trainer.fit(train);

// compute accuracy on the test set
Dataset<Row> result = model.transform(test);
Dataset<Row> predictionAndLabels = result.select("prediction", "label");
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy");

System.out.println("Test set accuracy = " + evaluator.evaluate(predictionAndLabels));
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaMultilayerPerceptronClassifierExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = c(4, 5, 4, 3)

# Fit a multi-layer perceptron neural network model with spark.mlp
model <- spark.mlp(training, label ~ features, maxIter = 100,
                   layers = layers, blockSize = 128, seed = 1234)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/mlp.R”中查找完整的示例代码。

线性支持向量机

支持向量机在高维或无限维空间中构造一个超平面或一组超平面,可用于分类、回归或其他任务。直观地说,良好的分离是通过超平面实现的,该超平面到任何类的最近训练数据点(所谓的函数间隔)具有最大距离,因为通常间隔越大,分类器的泛化误差越低。Spark ML 中的 LinearSVC 支持使用线性 SVM 进行二元分类。在内部,它使用 OWLQN 优化器优化铰链损失

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import LinearSVC

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(training)

# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))
在 Spark 存储库的“examples/src/main/python/ml/linearsvc.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.LinearSVC

// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val lsvc = new LinearSVC()
  .setMaxIter(10)
  .setRegParam(0.1)

// Fit the model
val lsvcModel = lsvc.fit(training)

// Print the coefficients and intercept for linear svc
println(s"Coefficients: ${lsvcModel.coefficients} Intercept: ${lsvcModel.intercept}")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/LinearSVCExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.classification.LinearSVC;
import org.apache.spark.ml.classification.LinearSVCModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> training = spark.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

LinearSVC lsvc = new LinearSVC()
  .setMaxIter(10)
  .setRegParam(0.1);

// Fit the model
LinearSVCModel lsvcModel = lsvc.fit(training);

// Print the coefficients and intercept for LinearSVC
System.out.println("Coefficients: "
  + lsvcModel.coefficients() + " Intercept: " + lsvcModel.intercept());
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaLinearSVCExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# load training data
t <- as.data.frame(Titanic)
training <- createDataFrame(t)

# fit Linear SVM model
model <- spark.svmLinear(training,  Survived ~ ., regParam = 0.01, maxIter = 10)

# Model summary
summary(model)

# Prediction
prediction <- predict(model, training)
showDF(prediction)
在 Spark 存储库的“examples/src/main/r/ml/svmLinear.R”中查找完整的示例代码。

一对多分类器(又称一对所有)

OneVsRest 是机器学习归约的一个示例,用于在给定可以有效执行二元分类的基础分类器的情况下执行多类分类。它也被称为“一对多”。

OneVsRest 被实现为一个 Estimator。对于基础分类器,它采用 Classifier 的实例,并为 k 个类中的每一个创建一个二元分类问题。训练类 i 的分类器以预测标签是否是 i,将类 i 与所有其他类区分开来。

预测是通过评估每个二元分类器来完成的,并且最有信心的分类器的索引作为标签输出。

示例

下面的示例演示了如何加载Iris 数据集,将其解析为 DataFrame,并使用 OneVsRest 执行多类分类。计算测试误差以衡量算法的准确性。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# load data file.
inputData = spark.read.format("libsvm") \
    .load("data/mllib/sample_multiclass_classification_data.txt")

# generate the train/test split.
(train, test) = inputData.randomSplit([0.8, 0.2])

# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
在 Spark 存储库的“examples/src/main/python/ml/one_vs_rest_example.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.{LogisticRegression, OneVsRest}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// load data file.
val inputData = spark.read.format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")

// generate the train/test split.
val Array(train, test) = inputData.randomSplit(Array(0.8, 0.2))

// instantiate the base classifier
val classifier = new LogisticRegression()
  .setMaxIter(10)
  .setTol(1E-6)
  .setFitIntercept(true)

// instantiate the One Vs Rest Classifier.
val ovr = new OneVsRest().setClassifier(classifier)

// train the multiclass model.
val ovrModel = ovr.fit(train)

// score the model on test data.
val predictions = ovrModel.transform(test)

// obtain evaluator.
val evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy")

// compute the classification error on test data.
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${1 - accuracy}")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.OneVsRest;
import org.apache.spark.ml.classification.OneVsRestModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// load data file.
Dataset<Row> inputData = spark.read().format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt");

// generate the train/test split.
Dataset<Row>[] tmp = inputData.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> train = tmp[0];
Dataset<Row> test = tmp[1];

// configure the base classifier.
LogisticRegression classifier = new LogisticRegression()
  .setMaxIter(10)
  .setTol(1E-6)
  .setFitIntercept(true);

// instantiate the One Vs Rest Classifier.
OneVsRest ovr = new OneVsRest().setClassifier(classifier);

// train the multiclass model.
OneVsRestModel ovrModel = ovr.fit(train);

// score the model on test data.
Dataset<Row> predictions = ovrModel.transform(test)
  .select("prediction", "label");

// obtain evaluator.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
        .setMetricName("accuracy");

// compute the classification error on test data.
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1 - accuracy));
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java”中查找完整的示例代码。

朴素贝叶斯

朴素贝叶斯分类器是一系列简单的概率多类分类器,它基于贝叶斯定理,并在每对特征之间具有强(朴素)独立性假设。

朴素贝叶斯可以非常有效地进行训练。通过对训练数据的一次传递,它计算出给定每个标签的每个特征的条件概率分布。对于预测,它应用贝叶斯定理来计算给定观察值的每个标签的条件概率分布。

MLlib 支持多项式朴素贝叶斯补充朴素贝叶斯伯努利朴素贝叶斯高斯朴素贝叶斯

输入数据:这些多项式、补充和伯努利模型通常用于文档分类。在该上下文中,每个观察值都是一个文档,每个特征都表示一个术语。特征的值是术语的频率(在多项式或补充朴素贝叶斯中)或表示术语是否在文档中找到的零或一(在伯努利朴素贝叶斯中)。多项式和伯努利模型的特征值必须为*非负*。模型类型使用可选参数“multinomial”、“complement”、“bernoulli”或“gaussian”进行选择,默认值为“multinomial”。对于文档分类,输入特征向量通常应该是稀疏向量。由于训练数据只使用一次,因此不需要缓存它。

可以通过设置参数 $\lambda$(默认为 $1.0$)来使用加性平滑

示例

有关更多详细信息,请参阅Python API 文档

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm") \
    .load("data/mllib/sample_libsvm_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
在 Spark 存储库的“examples/src/main/python/ml/naive_bayes_example.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)

// Train a NaiveBayes model.
val model = new NaiveBayes()
  .fit(trainingData)

// Select example rows to display.
val predictions = model.transform(testData)
predictions.show()

// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/NaiveBayesExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.classification.NaiveBayes;
import org.apache.spark.ml.classification.NaiveBayesModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Split the data into train and test
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];

// create the trainer and set its parameters
NaiveBayes nb = new NaiveBayes();

// train the model
NaiveBayesModel model = nb.fit(train);

// Select example rows to display.
Dataset<Row> predictions = model.transform(test);
predictions.show();

// compute accuracy on the test set
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test set accuracy = " + accuracy);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaNaiveBayesExample.java”中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Fit a Bernoulli naive Bayes model with spark.naiveBayes
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
nbDF <- titanicDF
nbTestDF <- titanicDF
nbModel <- spark.naiveBayes(nbDF, Survived ~ Class + Sex + Age)

# Model summary
summary(nbModel)

# Prediction
nbPredictions <- predict(nbModel, nbTestDF)
head(nbPredictions)
在 Spark 存储库的“examples/src/main/r/ml/naiveBayes.R”中查找完整的示例代码。

因子分解机分类器

有关分解机的更多背景信息和实现的更多详细信息,请参阅分解机部分

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们将特征缩放至 0 到 1 之间,以防止出现梯度爆炸问题。

有关更多详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a FM model.
fm = FMClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", stepSize=0.001)

# Create a Pipeline.
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])

# Train model.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = %g" % accuracy)

fmModel = model.stages[2]
print("Factors: " + str(fmModel.factors))  # type: ignore
print("Linear: " + str(fmModel.linear))  # type: ignore
print("Intercept: " + str(fmModel.intercept))  # type: ignore
在 Spark 存储库的“examples/src/main/python/ml/fm_classifier_example.py”中查找完整的示例代码。

有关更多详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{FMClassificationModel, FMClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, MinMaxScaler, StringIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Scale features.
val featureScaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a FM model.
val fm = new FMClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("scaledFeatures")
  .setStepSize(0.001)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Create a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureScaler, fm, labelConverter))

// Train model.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test accuracy.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")

val fmModel = model.stages(2).asInstanceOf[FMClassificationModel]
println(s"Factors: ${fmModel.factors} Linear: ${fmModel.linear} " +
  s"Intercept: ${fmModel.intercept}")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/FMClassifierExample.scala”中查找完整的示例代码。

有关更多详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.FMClassificationModel;
import org.apache.spark.ml.classification.FMClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark
    .read()
    .format("libsvm")
    .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data);
// Scale features.
MinMaxScalerModel featureScaler = new MinMaxScaler()
    .setInputCol("features")
    .setOutputCol("scaledFeatures")
    .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a FM model.
FMClassifier fm = new FMClassifier()
    .setLabelCol("indexedLabel")
    .setFeaturesCol("scaledFeatures")
    .setStepSize(0.001);

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labelsArray()[0]);

// Create a Pipeline.
Pipeline pipeline = new Pipeline()
    .setStages(new PipelineStage[] {labelIndexer, featureScaler, fm, labelConverter});

// Train model.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test accuracy.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
    .setLabelCol("indexedLabel")
    .setPredictionCol("prediction")
    .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Accuracy = " + accuracy);

FMClassificationModel fmModel = (FMClassificationModel)(model.stages()[2]);
System.out.println("Factors: " + fmModel.factors());
System.out.println("Linear: " + fmModel.linear());
System.out.println("Intercept: " + fmModel.intercept());
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaFMClassifierExample.java” 中查找完整的示例代码。

有关更多详细信息,请参阅 R API 文档

注意:SparkR 目前不支持特征缩放。

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a FM classification model
model <- spark.fmClassifier(training, label ~ features)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 代码库的 “examples/src/main/r/ml/fmClassifier.R” 中查找完整的示例代码。

回归

线性回归

用于处理线性回归模型和模型摘要的接口与逻辑回归的情况类似。

当使用“l-bfgs”求解器在具有常数非零列的数据集上拟合没有截距的 LinearRegressionModel 时,Spark MLlib 会为常数非零列输出零系数。此行为与 R glmnet 相同,但与 LIBSVM 不同。

示例

以下示例演示了如何训练弹性网络正则化线性回归模型并提取模型汇总统计信息。

有关参数的更多详细信息,请参阅 Python API 文档

from pyspark.ml.regression import LinearRegression

# Load training data
training = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
在 Spark 代码库的 “examples/src/main/python/ml/linear_regression_with_elastic_net.py” 中查找完整的示例代码。

有关参数的更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.regression.LinearRegression

// Load training data
val training = spark.read.format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt")

val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for linear regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

// Summarize the model over the training set and print out some metrics
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionWithElasticNetExample.scala” 中查找完整的示例代码。

有关参数的更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data.
Dataset<Row> training = spark.read().format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt");

LinearRegression lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8);

// Fit the model.
LinearRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for linear regression.
System.out.println("Coefficients: "
  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());

// Summarize the model over the training set and print out some metrics.
LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
System.out.println("numIterations: " + trainingSummary.totalIterations());
System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory()));
trainingSummary.residuals().show();
System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError());
System.out.println("r2: " + trainingSummary.r2());
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java” 中查找完整的示例代码。

有关参数的更多详细信息,请参阅 R API 文档

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a linear regression model
model <- spark.lm(training, label ~ features, regParam = 0.3, elasticNetParam = 0.8)

# Prediction
predictions <- predict(model, test)
head(predictions)

# Summarize
summary(model)
在 Spark 代码库的 “examples/src/main/r/ml/lm_with_elastic_net.R” 中查找完整的示例代码。

广义线性回归

与假设输出服从高斯分布的线性回归相比,广义线性模型 (GLM) 是线性模型的规范,其中响应变量 $Y_i$ 服从 指数族分布 中的某个分布。Spark 的 GeneralizedLinearRegression 接口允许灵活地指定 GLM,可用于各种类型的预测问题,包括线性回归、泊松回归、逻辑回归等。目前在 spark.ml 中,仅支持指数族分布的一个子集,它们列在下面

**注意**:Spark 目前仅通过其 GeneralizedLinearRegression 接口支持最多 4096 个特征,如果超过此限制,将抛出异常。有关更多详细信息,请参阅高级部分。尽管如此,对于线性和逻辑回归,可以使用 LinearRegressionLogisticRegression 估计器来训练具有更多特征的模型。

GLM 需要可以用其“规范”或“自然”形式编写的指数族分布,也称为 自然指数族分布。自然指数族分布的形式如下:

\[f_Y(y|\theta, \tau) = h(y, \tau)\exp{\left( \frac{\theta \cdot y - A(\theta)}{d(\tau)} \right)}\]

其中 $\theta$ 是感兴趣的参数,$\tau$ 是离散参数。在 GLM 中,假设响应变量 $Y_i$ 来自自然指数族分布

\[Y_i \sim f\left(\cdot|\theta_i, \tau \right)\]

其中,感兴趣的参数 $\theta_i$ 与响应变量 $\mu_i$ 的期望值相关,如下所示:

\[\mu_i = A'(\theta_i)\]

这里,$A’(\theta_i)$ 由所选分布的形式定义。GLM 还允许指定链接函数,该函数定义响应变量 $\mu_i$ 的期望值与所谓的*线性预测器* $\eta_i$ 之间的关系

\[g(\mu_i) = \eta_i = \vec{x_i}^T \cdot \vec{\beta}\]

通常,选择链接函数以使 $A’ = g^{-1}$,这会在感兴趣的参数 $\theta$ 和线性预测器 $\eta$ 之间产生简化的关系。在这种情况下,链接函数 $g(\mu)$ 被称为“规范”链接函数。

\[\theta_i = A'^{-1}(\mu_i) = g(g^{-1}(\eta_i)) = \eta_i\]

GLM 查找最大化似然函数的回归系数 $\vec{\beta}$。

\[\max_{\vec{\beta}} \mathcal{L}(\vec{\theta}|\vec{y},X) = \prod_{i=1}^{N} h(y_i, \tau) \exp{\left(\frac{y_i\theta_i - A(\theta_i)}{d(\tau)}\right)}\]

其中,感兴趣的参数 $\theta_i$ 与回归系数 $\vec{\beta}$ 相关,如下所示:

\[\theta_i = A'^{-1}(g^{-1}(\vec{x_i} \cdot \vec{\beta}))\]

Spark 的广义线性回归接口还提供用于诊断 GLM 模型拟合的汇总统计信息,包括残差、p 值、偏差、赤池信息准则等。

有关 GLM 及其应用的更全面回顾,请参见此处

可用族

响应类型 支持的链接
高斯 连续 恒等*、对数、倒数
二项式 二进制 Logit*、Probit、CLogLog
泊松 计数 对数*、恒等、平方根
伽玛 连续 倒数*、恒等、对数
Tweedie 零膨胀连续 幂律链接函数
* 规范链接

示例

以下示例演示了如何训练具有高斯响应和恒等链接函数的 GLM 并提取模型汇总统计信息。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.regression import GeneralizedLinearRegression

# Load training data
dataset = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")

glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)

# Fit the model
model = glr.fit(dataset)

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Summarize the model over the training set and print out some metrics
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()
在 Spark 代码库的 “examples/src/main/python/ml/generalized_linear_regression_example.py” 中查找完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.regression.GeneralizedLinearRegression

// Load training data
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt")

val glr = new GeneralizedLinearRegression()
  .setFamily("gaussian")
  .setLink("identity")
  .setMaxIter(10)
  .setRegParam(0.3)

// Fit the model
val model = glr.fit(dataset)

// Print the coefficients and intercept for generalized linear regression model
println(s"Coefficients: ${model.coefficients}")
println(s"Intercept: ${model.intercept}")

// Summarize the model over the training set and print out some metrics
val summary = model.summary
println(s"Coefficient Standard Errors: ${summary.coefficientStandardErrors.mkString(",")}")
println(s"T Values: ${summary.tValues.mkString(",")}")
println(s"P Values: ${summary.pValues.mkString(",")}")
println(s"Dispersion: ${summary.dispersion}")
println(s"Null Deviance: ${summary.nullDeviance}")
println(s"Residual Degree Of Freedom Null: ${summary.residualDegreeOfFreedomNull}")
println(s"Deviance: ${summary.deviance}")
println(s"Residual Degree Of Freedom: ${summary.residualDegreeOfFreedom}")
println(s"AIC: ${summary.aic}")
println("Deviance Residuals: ")
summary.residuals().show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/GeneralizedLinearRegressionExample.scala” 中查找完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import java.util.Arrays;

import org.apache.spark.ml.regression.GeneralizedLinearRegression;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionModel;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Load training data
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt");

GeneralizedLinearRegression glr = new GeneralizedLinearRegression()
  .setFamily("gaussian")
  .setLink("identity")
  .setMaxIter(10)
  .setRegParam(0.3);

// Fit the model
GeneralizedLinearRegressionModel model = glr.fit(dataset);

// Print the coefficients and intercept for generalized linear regression model
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());

// Summarize the model over the training set and print out some metrics
GeneralizedLinearRegressionTrainingSummary summary = model.summary();
System.out.println("Coefficient Standard Errors: "
  + Arrays.toString(summary.coefficientStandardErrors()));
System.out.println("T Values: " + Arrays.toString(summary.tValues()));
System.out.println("P Values: " + Arrays.toString(summary.pValues()));
System.out.println("Dispersion: " + summary.dispersion());
System.out.println("Null Deviance: " + summary.nullDeviance());
System.out.println("Residual Degree Of Freedom Null: " + summary.residualDegreeOfFreedomNull());
System.out.println("Deviance: " + summary.deviance());
System.out.println("Residual Degree Of Freedom: " + summary.residualDegreeOfFreedom());
System.out.println("AIC: " + summary.aic());
System.out.println("Deviance Residuals: ");
summary.residuals().show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaGeneralizedLinearRegressionExample.java” 中查找完整的示例代码。

有关更多详细信息,请参阅 R API 文档

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7, 3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Model summary
summary(gaussianGLM)

# Prediction
gaussianPredictions <- predict(gaussianGLM, gaussianTestDF)
head(gaussianPredictions)

# Fit a generalized linear model with glm (R-compliant)
gaussianGLM2 <- glm(label ~ features, gaussianDF, family = "gaussian")
summary(gaussianGLM2)

# Fit a generalized linear model of family "binomial" with spark.glm
training2 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training2 <- transform(training2, label = cast(training2$label > 1, "integer"))
df_list2 <- randomSplit(training2, c(7, 3), 2)
binomialDF <- df_list2[[1]]
binomialTestDF <- df_list2[[2]]
binomialGLM <- spark.glm(binomialDF, label ~ features, family = "binomial")

# Model summary
summary(binomialGLM)

# Prediction
binomialPredictions <- predict(binomialGLM, binomialTestDF)
head(binomialPredictions)

# Fit a generalized linear model of family "tweedie" with spark.glm
training3 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
tweedieDF <- transform(training3, label = training3$label * exp(randn(10)))
tweedieGLM <- spark.glm(tweedieDF, label ~ features, family = "tweedie",
                        var.power = 1.2, link.power = 0)

# Model summary
summary(tweedieGLM)
在 Spark 代码库的 “examples/src/main/r/ml/glm.R” 中查找完整的示例代码。

决策树回归

决策树是一系列流行的分类和回归方法。您可以在决策树部分中找到有关 spark.ml 实现的更多信息。

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用特征转换器对分类特征进行索引,将元数据添加到 DataFrame 中,以便决策树算法可以识别。

有关参数的更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)
在 Spark 代码库的 “examples/src/main/python/ml/decision_tree_regression_example.py” 中查找完整的示例代码。

有关参数的更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Here, we treat features with > 4 distinct values as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a DecisionTree model.
val dt = new DecisionTreeRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")

// Chain indexer and tree in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, dt))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
println(s"Learned regression tree model:\n ${treeModel.toDebugString}")
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeRegressionExample.scala” 中查找完整的示例代码。

有关参数的更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load the data stored in LIBSVM format as a DataFrame.
Dataset<Row> data = spark.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a DecisionTree model.
DecisionTreeRegressor dt = new DecisionTreeRegressor()
  .setFeaturesCol("indexedFeatures");

// Chain indexer and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[]{featureIndexer, dt});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

DecisionTreeRegressionModel treeModel =
  (DecisionTreeRegressionModel) (model.stages()[1]);
System.out.println("Learned regression tree model:\n" + treeModel.toDebugString());
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeRegressionExample.java” 中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a DecisionTree regression model with spark.decisionTree
model <- spark.decisionTree(training, label ~ features, "regression")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/decisionTree.R”中查找完整的示例代码。

随机森林回归

随机森林是一系列流行的分类和回归方法。您可以在随机森林部分中找到有关 spark.ml 实现的更多信息。

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用特征转换器对分类特征进行索引,将元数据添加到 DataFrame 中,以便基于树的算法可以识别。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only
在 Spark 代码库的 “examples/src/main/python/ml/random_forest_regressor_example.py” 中查找完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")

// Chain indexer and forest in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, rf))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/RandomForestRegressorExample.scala” 中查找完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.RandomForestRegressionModel;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a RandomForest model.
RandomForestRegressor rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures");

// Chain indexer and forest in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {featureIndexer, rf});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

RandomForestRegressionModel rfModel = (RandomForestRegressionModel)(model.stages()[1]);
System.out.println("Learned regression forest model:\n" + rfModel.toDebugString());
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample.java” 中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a random forest regression model with spark.randomForest
model <- spark.randomForest(training, label ~ features, "regression", numTrees = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/randomForest.R”中查找完整的示例代码。

梯度提升树回归

梯度提升树 (GBT) 是一种流行的回归方法,它使用决策树的集合。有关 spark.ml 实现的更多信息,请参阅关于 GBT 的部分

示例

注意:对于此示例数据集,GBTRegressor 实际上只需要 1 次迭代,但这在一般情况下并非如此。

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)  # summary only
在 Spark 代码库的 “examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py” 中查找完整的示例代码。

有关更多详细信息,请参阅 Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)

// Chain indexer and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, gbt))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println(s"Learned regression GBT model:\n ${gbtModel.toDebugString}")
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeRegressorExample.scala” 中查找完整的示例代码。

有关更多详细信息,请参阅 Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a GBT model.
GBTRegressor gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Chain indexer and GBT in a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeRegressorExample.java” 中查找完整的示例代码。

有关更多详细信息,请参阅R API 文档

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a GBT regression model with spark.gbt
model <- spark.gbt(training, label ~ features, "regression", maxIter = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/gbt.R”中查找完整的示例代码。

生存回归

spark.ml 中,我们实现了 加速失效时间 (AFT) 模型,该模型是用于删失数据的参数生存回归模型。它描述了生存时间对数的模型,因此它通常被称为生存分析的对数线性模型。与为相同目的设计的 比例风险 模型不同,AFT 模型更容易并行化,因为每个实例都独立地贡献于目标函数。

给定协变量 $x^{‘}$ 的值,对于受试者 i = 1, …, n 的随机生存时间 $t_{i}$,可能存在右删失,AFT 模型下的似然函数如下:\[ L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} \] 其中 $\delta_{i}$ 是事件已发生的指标,即是否未经删失。使用 $\epsilon_{i}=\frac{\log{t_{i}}-x^{‘}\beta}{\sigma}$,对数似然函数的形式如下:\[ \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+\delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] \] 其中 $S_{0}(\epsilon_{i})$ 是基线生存函数,$f_{0}(\epsilon_{i})$ 是相应的密度函数。

最常用的 AFT 模型是基于生存时间的威布尔分布。生存时间的威布尔分布对应于生存时间对数的极值分布,$S_{0}(\epsilon)$ 函数为:\[ S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) \] $f_{0}(\epsilon_{i})$ 函数为:\[ f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) \] 具有威布尔生存时间分布的 AFT 模型的对数似然函数为:\[ \iota(\beta,\sigma)= -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] \] 由于最小化负对数似然等效于最大后验概率,因此我们用于优化的损失函数为 $-\iota(\beta,\sigma)$。$\beta$ 和 $\log\sigma$ 的梯度函数分别为:\[ \frac{\partial (-\iota)}{\partial \beta}=\sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \] \[ \frac{\partial (-\iota)}{\partial (\log\sigma)}=\sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}] \]

AFT 模型可以表述为凸优化问题,即找到依赖于系数向量 $\beta$ 和尺度参数的对数 $\log\sigma$ 的凸函数 $-\iota(\beta,\sigma)$ 的最小化器的任务。实现背后的优化算法是 L-BFGS。该实现与 R 的生存函数 survreg 的结果相匹配

当在具有常数非零列的数据集上拟合没有截距的 AFTSurvivalRegressionModel 时,Spark MLlib 会为常数非零列输出零系数。此行为与 R survival::survreg 不同。

示例

有关更多详细信息,请参阅 Python API 文档

from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors

training = spark.createDataFrame([
    (1.218, 1.0, Vectors.dense(1.560, -0.605)),
    (2.949, 0.0, Vectors.dense(0.346, 2.158)),
    (3.627, 0.0, Vectors.dense(1.380, 0.231)),
    (0.273, 1.0, Vectors.dense(0.520, 1.151)),
    (4.199, 0.0, Vectors.dense(0.795, -0.226))], ["label", "censor", "features"])
quantileProbabilities = [0.3, 0.6]
aft = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities,
                            quantilesCol="quantiles")

model = aft.fit(training)

# Print the coefficients, intercept and scale parameter for AFT survival regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
print("Scale: " + str(model.scale))
model.transform(training).show(truncate=False)
在 Spark 代码库的 “examples/src/main/python/ml/aft_survival_regression.py” 中查找完整的示例代码。

有关详细信息,请参阅Scala API 文档

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.AFTSurvivalRegression

val training = spark.createDataFrame(Seq(
  (1.218, 1.0, Vectors.dense(1.560, -0.605)),
  (2.949, 0.0, Vectors.dense(0.346, 2.158)),
  (3.627, 0.0, Vectors.dense(1.380, 0.231)),
  (0.273, 1.0, Vectors.dense(0.520, 1.151)),
  (4.199, 0.0, Vectors.dense(0.795, -0.226))
)).toDF("label", "censor", "features")
val quantileProbabilities = Array(0.3, 0.6)
val aft = new AFTSurvivalRegression()
  .setQuantileProbabilities(quantileProbabilities)
  .setQuantilesCol("quantiles")

val model = aft.fit(training)

// Print the coefficients, intercept and scale parameter for AFT survival regression
println(s"Coefficients: ${model.coefficients}")
println(s"Intercept: ${model.intercept}")
println(s"Scale: ${model.scale}")
model.transform(training).show(false)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/AFTSurvivalRegressionExample.scala”中查找完整的示例代码。

有关详细信息,请参阅Java API 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.regression.AFTSurvivalRegression;
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(1.218, 1.0, Vectors.dense(1.560, -0.605)),
  RowFactory.create(2.949, 0.0, Vectors.dense(0.346, 2.158)),
  RowFactory.create(3.627, 0.0, Vectors.dense(1.380, 0.231)),
  RowFactory.create(0.273, 1.0, Vectors.dense(0.520, 1.151)),
  RowFactory.create(4.199, 0.0, Vectors.dense(0.795, -0.226))
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("censor", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(data, schema);
double[] quantileProbabilities = new double[]{0.3, 0.6};
AFTSurvivalRegression aft = new AFTSurvivalRegression()
  .setQuantileProbabilities(quantileProbabilities)
  .setQuantilesCol("quantiles");

AFTSurvivalRegressionModel model = aft.fit(training);

// Print the coefficients, intercept and scale parameter for AFT survival regression
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());
System.out.println("Scale: " + model.scale());
model.transform(training).show(false);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java”中查找完整的示例代码。

有关详细信息,请参阅R API 文档

# Use the ovarian dataset available in R survival package
library(survival)

# Fit an accelerated failure time (AFT) survival regression model with spark.survreg
ovarianDF <- suppressWarnings(createDataFrame(ovarian))
aftDF <- ovarianDF
aftTestDF <- ovarianDF
aftModel <- spark.survreg(aftDF, Surv(futime, fustat) ~ ecog_ps + rx)

# Model summary
summary(aftModel)

# Prediction
aftPredictions <- predict(aftModel, aftTestDF)
head(aftPredictions)
在 Spark 存储库的“examples/src/main/r/ml/survreg.R”中查找完整的示例代码。

保序回归

保序回归属于回归算法家族。形式上,保序回归是一个问题,给定一个有限的实数集 $Y = {y_1, y_2, ..., y_n}$ 表示观察到的响应,$X = {x_1, x_2, ..., x_n}$ 表示要拟合的未知响应值,找到一个函数,使

\begin{equation} f(x) = \sum_{i=1}^n w_i (y_i - x_i)^2 \end{equation}

在完全排序约束下最小化 $x_1\le x_2\le ...\le x_n$,其中 $w_i$ 是正权重。得到的函数称为保序回归,它是唯一的。它可以看作是顺序约束下的最小二乘问题。本质上,保序回归是一个最适合原始数据点的单调函数

我们实现了一种相邻违规者池算法,该算法使用了一种并行化保序回归的方法。训练输入是一个 DataFrame,它包含三列:标签、特征和权重。此外,IsotonicRegression 算法还有一个可选参数,称为 $isotonic$,默认为 true。此参数指定保序回归是保序的(单调递增)还是反序的(单调递减)。

训练返回一个 IsotonicRegressionModel,可用于预测已知和未知特征的标签。保序回归的结果被视为分段线性函数。因此,预测规则是

示例

有关 API 的更多详细信息,请参阅IsotonicRegression Python 文档

from pyspark.ml.regression import IsotonicRegression

# Loads data.
dataset = spark.read.format("libsvm")\
    .load("data/mllib/sample_isotonic_regression_libsvm_data.txt")

# Trains an isotonic regression model.
model = IsotonicRegression().fit(dataset)
print("Boundaries in increasing order: %s\n" % str(model.boundaries))
print("Predictions associated with the boundaries: %s\n" % str(model.predictions))

# Makes predictions.
model.transform(dataset).show()
在 Spark 存储库的“examples/src/main/python/ml/isotonic_regression_example.py”中查找完整的示例代码。

有关 API 的详细信息,请参阅IsotonicRegression Scala 文档

import org.apache.spark.ml.regression.IsotonicRegression

// Loads data.
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_isotonic_regression_libsvm_data.txt")

// Trains an isotonic regression model.
val ir = new IsotonicRegression()
val model = ir.fit(dataset)

println(s"Boundaries in increasing order: ${model.boundaries}\n")
println(s"Predictions associated with the boundaries: ${model.predictions}\n")

// Makes predictions.
model.transform(dataset).show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/IsotonicRegressionExample.scala”中查找完整的示例代码。

有关 API 的详细信息,请参阅IsotonicRegression Java 文档

import org.apache.spark.ml.regression.IsotonicRegression;
import org.apache.spark.ml.regression.IsotonicRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_isotonic_regression_libsvm_data.txt");

// Trains an isotonic regression model.
IsotonicRegression ir = new IsotonicRegression();
IsotonicRegressionModel model = ir.fit(dataset);

System.out.println("Boundaries in increasing order: " + model.boundaries() + "\n");
System.out.println("Predictions associated with the boundaries: " + model.predictions() + "\n");

// Makes predictions.
model.transform(dataset).show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaIsotonicRegressionExample.java”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅IsotonicRegression R API 文档

# Load training data
df <- read.df("data/mllib/sample_isotonic_regression_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit an isotonic regression model with spark.isoreg
model <- spark.isoreg(training, label ~ features, isotonic = FALSE)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/isoreg.R”中查找完整的示例代码。

因子分解机回归器

有关分解机的更多背景信息和实现的更多详细信息,请参阅分解机部分

示例

以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们将特征缩放至 0 到 1 之间,以防止出现梯度爆炸问题。

有关详细信息,请参阅Python API 文档

from pyspark.ml import Pipeline
from pyspark.ml.regression import FMRegressor
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a FM model.
fm = FMRegressor(featuresCol="scaledFeatures", stepSize=0.001)

# Create a Pipeline.
pipeline = Pipeline(stages=[featureScaler, fm])

# Train model.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

fmModel = model.stages[1]
print("Factors: " + str(fmModel.factors))  # type: ignore
print("Linear: " + str(fmModel.linear))  # type: ignore
print("Intercept: " + str(fmModel.intercept))  # type: ignore
在 Spark 存储库的“examples/src/main/python/ml/fm_regressor_example.py”中查找完整的示例代码。

有关详细信息,请参阅Scala API 文档

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.regression.{FMRegressionModel, FMRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Scale features.
val featureScaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a FM model.
val fm = new FMRegressor()
  .setLabelCol("label")
  .setFeaturesCol("scaledFeatures")
  .setStepSize(0.001)

// Create a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureScaler, fm))

// Train model.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val fmModel = model.stages(1).asInstanceOf[FMRegressionModel]
println(s"Factors: ${fmModel.factors} Linear: ${fmModel.linear} " +
  s"Intercept: ${fmModel.intercept}")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/FMRegressorExample.scala”中查找完整的示例代码。

有关详细信息,请参阅Java API 文档

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.regression.FMRegressionModel;
import org.apache.spark.ml.regression.FMRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Scale features.
MinMaxScalerModel featureScaler = new MinMaxScaler()
    .setInputCol("features")
    .setOutputCol("scaledFeatures")
    .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a FM model.
FMRegressor fm = new FMRegressor()
    .setLabelCol("label")
    .setFeaturesCol("scaledFeatures")
    .setStepSize(0.001);

// Create a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureScaler, fm});

// Train model.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

FMRegressionModel fmModel = (FMRegressionModel)(model.stages()[1]);
System.out.println("Factors: " + fmModel.factors());
System.out.println("Linear: " + fmModel.linear());
System.out.println("Intercept: " + fmModel.intercept());
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaFMRegressorExample.java”中查找完整的示例代码。

有关详细信息,请参阅R API 文档

注意:SparkR 目前不支持特征缩放。

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training_test <- randomSplit(df, c(0.7, 0.3))
training <- training_test[[1]]
test <- training_test[[2]]

# Fit a FM regression model
model <- spark.fmRegressor(training, label ~ features)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
在 Spark 存储库的“examples/src/main/r/ml/fmRegressor.R”中查找完整的示例代码。

线性方法

我们实现了流行的线性方法,例如具有 $L_1$ 或 $L_2$ 正则化的逻辑回归和线性最小二乘法。有关实现和调整的详细信息,请参阅基于 RDD 的 API 的线性方法指南;这些信息仍然适用。

我们还为弹性网络提供了一个 DataFrame API,它是Zou 等人在“通过弹性网络进行正则化和变量选择”中提出的 $L_1$ 和 $L_2$ 正则化的混合。在数学上,它被定义为 $L_1$ 和 $L_2$ 正则化项的凸组合:\[ \alpha \left( \lambda \|\wv\|_1 \right) + (1-\alpha) \left( \frac{\lambda}{2}\|\wv\|_2^2 \right) , \alpha \in [0, 1], \lambda \geq 0 \] 通过正确设置 $\alpha$,弹性网络包含 $L_1$ 和 $L_2$ 正则化作为特例。例如,如果使用弹性网络参数 $\alpha$ 设置为 $1$ 训练线性回归模型,则它等效于Lasso模型。另一方面,如果将 $\alpha$ 设置为 $0$,则训练后的模型将简化为岭回归模型。我们为具有弹性网络正则化的线性回归和逻辑回归实现了 Pipelines API。

因子分解机

因子分解机能够估计特征之间的交互作用,即使在具有巨大稀疏性的问题中(如广告和推荐系统)。spark.ml 实现支持用于二元分类和回归的因子分解机。

因子分解机公式为

\[\hat{y} = w_0 + \sum\limits^n_{i-1} w_i x_i + \sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j\]

前两项表示截距和线性项(与线性回归相同),最后一项表示成对交互项。\(v_i\) 描述了具有 k 个因子的第 i 个变量。

FM 可用于回归,优化准则是均方误差。FM 还可以通过 sigmoid 函数用于二元分类。优化准则是逻辑损失。

成对交互可以重新表述为

\[\sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j = \frac{1}{2}\sum\limits^k_{f=1} \left(\left( \sum\limits^n_{i=1}v_{i,f}x_i \right)^2 - \sum\limits^n_{i=1}v_{i,f}^2x_i^2 \right)\]

该方程在 k 和 n 中都只有线性复杂度,即其计算复杂度为 \(O(kn)\)。

通常,为了防止梯度爆炸问题,最好将连续特征缩放至 0 到 1 之间,或者对连续特征进行分箱并进行独热编码。

决策树

决策树及其集成是机器学习中分类和回归任务的常用方法。决策树应用广泛,因为它们易于解释、能够处理分类特征、可扩展到多分类设置、不需要特征缩放,并且能够捕获非线性和特征交互。树集成算法(如随机森林和提升算法)是分类和回归任务中性能最佳的算法之一。

spark.ml 实现支持用于二元和多分类分类以及回归的决策树,使用连续和分类特征。该实现按行对数据进行分区,允许使用数百万甚至数十亿个实例进行分布式训练。

用户可以在MLlib 决策树指南中找到有关决策树算法的更多信息。此 API 与原始 MLlib 决策树 API之间的主要区别在于

决策树的 Pipelines API 比原始 API 提供了更多功能。特别是,对于分类,用户可以获得每个类的预测概率(也称为类条件概率);对于回归,用户可以获得预测的偏差样本方差。

树的集成(随机森林和梯度提升树)将在下面的树集成部分中介绍。

输入和输出

我们在此列出了输入和输出(预测)列类型。所有输出列都是可选的;要排除输出列,请将其对应的参数设置为空字符串。

输入列

参数名称 类型 默认值 说明
labelCol Double “label” 要预测的标签
featuresCol Vector “features” 特征向量

输出列

参数名称 类型 默认值 说明 备注
predictionCol Double “prediction” 预测的标签
rawPredictionCol Vector “rawPrediction” 长度为 # 类别的向量,其中包含进行预测的树节点处的训练实例标签计数 仅限分类
probabilityCol Vector “probability” 长度为 # 类别的向量,等于归一化为多项分布的 rawPrediction 仅限分类
varianceCol Double 预测的偏差样本方差 仅限回归

树集成

DataFrame API 支持两种主要的树集成算法:随机森林梯度提升树 (GBT)。两者都使用spark.ml 决策树作为其基本模型。

用户可以在MLlib 集成指南中找到有关集成算法的更多信息。在本节中,我们将演示集成的 DataFrame API。

此 API 与原始 MLlib 集成 API之间的主要区别在于

随机森林

随机森林决策树的集成。随机森林结合了许多决策树,以降低过度拟合的风险。spark.ml 实现支持用于二元和多分类分类以及回归的随机森林,使用连续和分类特征。

有关算法本身的更多信息,请参阅spark.mllib 文档中的随机森林

输入和输出

我们在此列出了输入和输出(预测)列类型。所有输出列都是可选的;要排除输出列,请将其对应的参数设置为空字符串。

输入列

参数名称 类型 默认值 说明
labelCol Double “label” 要预测的标签
featuresCol Vector “features” 特征向量

输出列(预测)

参数名称 类型 默认值 说明 备注
predictionCol Double “prediction” 预测的标签
rawPredictionCol Vector “rawPrediction” 长度为 # 类别的向量,其中包含进行预测的树节点处的训练实例标签计数 仅限分类
probabilityCol Vector “probability” 长度为 # 类别的向量,等于归一化为多项分布的 rawPrediction 仅限分类

梯度提升树 (GBT)

梯度提升树 (GBT)决策树的集成。GBT 迭代地训练决策树,以最小化损失函数。spark.ml 实现支持用于二元分类和回归的 GBT,使用连续和分类特征。

有关算法本身的更多信息,请参阅spark.mllib 文档中的 GBT

输入和输出

我们在此列出了输入和输出(预测)列类型。所有输出列都是可选的;要排除输出列,请将其对应的参数设置为空字符串。

输入列

参数名称 类型 默认值 说明
labelCol Double “label” 要预测的标签
featuresCol Vector “features” 特征向量

请注意,GBTClassifier 当前仅支持二元标签。

输出列(预测)

参数名称 类型 默认值 说明 备注
predictionCol Double “prediction” 预测的标签

未来,GBTClassifier 也将输出 rawPredictionprobability 列,就像 RandomForestClassifier 一样。