\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]
逻辑回归是一种预测分类响应的常用方法。它是广义线性模型的特例,用于预测结果的概率。在 spark.ml
中,逻辑回归可以通过使用二元逻辑回归来预测二元结果,也可以通过使用多项逻辑回归来预测多类别结果。使用 family
参数在这两种算法之间进行选择,或者将其保留为未设置状态,Spark 将推断出正确的变体。
当在具有常数非零列的数据集上拟合没有截距的 LogisticRegressionModel 时,Spark MLlib 会为常数非零列输出零系数。此行为与 R glmnet 相同,但与 LIBSVM 不同。
对应于 $\alpha$,regParam
对应于 $\lambda$。
有关参数的更多详细信息,请参阅Python API 文档。
from pyspark.ml.classification import LogisticRegression
# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(training)
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
# Fit the model
mlrModel = mlr.fit(training)
# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))
有关参数的更多详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.classification.LogisticRegression
// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val lr = new LogisticRegression()
// Fit the model
val lrModel = lr.fit(training)
// Print the coefficients and intercept for logistic regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// We can also use the multinomial family for binary classification
val mlr = new LogisticRegression()
val mlrModel = mlr.fit(training)
// Print the coefficients and intercepts for logistic regression with multinomial family
println(s"Multinomial coefficients: ${mlrModel.coefficientMatrix}")
println(s"Multinomial intercepts: ${mlrModel.interceptVector}")
有关参数的更多详细信息,请参阅Java API 文档。
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load training data
Dataset<Row> training = spark.read().format("libsvm")
LogisticRegression lr = new LogisticRegression()
// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);
// Print the coefficients and intercept for logistic regression
System.out.println("Coefficients: "
+ lrModel.coefficients() + " Intercept: " + lrModel.intercept());
// We can also use the multinomial family for binary classification
LogisticRegression mlr = new LogisticRegression()
// Fit the model
LogisticRegressionModel mlrModel = mlr.fit(training);
// Print the coefficients and intercepts for logistic regression with multinomial family
System.out.println("Multinomial coefficients: " + lrModel.coefficientMatrix()
+ "\nMultinomial intercepts: " + mlrModel.interceptVector());
有关参数的更多详细信息,请参阅R API 文档。
# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# Fit an binomial logistic regression model with spark.logit
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)
# Model summary
# Prediction
predictions <- predict(model, test)
逻辑回归的 spark.ml
实现还支持提取训练集上模型的摘要。请注意,存储为 LogisticRegressionSummary
中的 DataFrame
的预测和指标都标注了 @transient
提供了 LogisticRegressionModel
的摘要。在二元分类的情况下,可以使用某些其他指标,例如 ROC 曲线。请参阅 BinaryLogisticRegressionTrainingSummary
from pyspark.ml.classification import LogisticRegression
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
for objective in objectiveHistory:
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))
# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
提供了 LogisticRegressionModel
的摘要。在二元分类的情况下,可以使用某些其他指标,例如 ROC 曲线。可以通过 binarySummary
方法访问二元摘要。请参阅 BinaryLogisticRegressionTrainingSummary
import org.apache.spark.ml.classification.LogisticRegression
// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
val trainingSummary = lrModel.binarySummary
// Obtain the objective per iteration.
val objectiveHistory = trainingSummary.objectiveHistory
objectiveHistory.foreach(loss => println(loss))
// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
val roc = trainingSummary.roc
println(s"areaUnderROC: ${trainingSummary.areaUnderROC}")
// Set the model threshold to maximize F-Measure
val fMeasure = trainingSummary.fMeasureByThreshold
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
提供了 LogisticRegressionModel
的摘要。在二元分类的情况下,可以使用某些其他指标,例如 ROC 曲线。可以通过 binarySummary
方法访问二元摘要。请参阅 BinaryLogisticRegressionTrainingSummary
import org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
BinaryLogisticRegressionTrainingSummary trainingSummary = lrModel.binarySummary();
// Obtain the loss per iteration.
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
Dataset<Row> roc = trainingSummary.roc();
// Get the threshold corresponding to the maximum F-Measure and rerun LogisticRegression with
// this selected threshold.
Dataset<Row> fMeasure = trainingSummary.fMeasureByThreshold();
double maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0);
double bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure))
多类别分类通过多项逻辑(softmax)回归得到支持。在多项逻辑回归中,算法生成 $K$ 组系数,或维度为 $K \times J$ 的矩阵,其中 $K$ 是结果类别的数量,$J$ 是特征的数量。如果算法拟合了截距项,则可以使用长度为 $K$ 的截距向量。
结果类别 $k \in {1, 2, …, K}$ 的条件概率使用 softmax 函数建模。
\[ P(Y=k|\mathbf{X}, \boldsymbol{\beta}_k, \beta_{0k}) = \frac{e^{\boldsymbol{\beta}_k \cdot \mathbf{X} + \beta_{0k}}}{\sum_{k'=0}^{K-1} e^{\boldsymbol{\beta}_{k'} \cdot \mathbf{X} + \beta_{0k'}}} \]
\[ \min_{\beta, \beta_0} -\left[\sum_{i=1}^L w_i \cdot \log P(Y = y_i|\mathbf{x}_i)\right] + \lambda \left[\frac{1}{2}\left(1 - \alpha\right)||\boldsymbol{\beta}||_2^2 + \alpha ||\boldsymbol{\beta}||_1\right] \]
from pyspark.ml.classification import LogisticRegression
# Load training data
training = spark \
.read \
.format("libsvm") \
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(training)
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))
trainingSummary = lrModel.summary
# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
for objective in objectiveHistory:
# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
print("label %d: %s" % (i, rate))
print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
print("label %d: %s" % (i, rate))
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
print("label %d: %s" % (i, prec))
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
print("label %d: %s" % (i, rec))
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
print("label %d: %s" % (i, f))
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
% (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
import org.apache.spark.ml.classification.LogisticRegression
// Load training data
val training = spark
val lr = new LogisticRegression()
// Fit the model
val lrModel = lr.fit(training)
// Print the coefficients and intercept for multinomial logistic regression
println(s"Coefficients: \n${lrModel.coefficientMatrix}")
println(s"Intercepts: \n${lrModel.interceptVector}")
val trainingSummary = lrModel.summary
// Obtain the objective per iteration
val objectiveHistory = trainingSummary.objectiveHistory
// for multiclass, we can inspect metrics on a per-label basis
println("False positive rate by label:")
trainingSummary.falsePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
println(s"label $label: $rate")
println("True positive rate by label:")
trainingSummary.truePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
println(s"label $label: $rate")
println("Precision by label:")
trainingSummary.precisionByLabel.zipWithIndex.foreach { case (prec, label) =>
println(s"label $label: $prec")
println("Recall by label:")
trainingSummary.recallByLabel.zipWithIndex.foreach { case (rec, label) =>
println(s"label $label: $rec")
println("F-measure by label:")
trainingSummary.fMeasureByLabel.zipWithIndex.foreach { case (f, label) =>
println(s"label $label: $f")
val accuracy = trainingSummary.accuracy
val falsePositiveRate = trainingSummary.weightedFalsePositiveRate
val truePositiveRate = trainingSummary.weightedTruePositiveRate
val fMeasure = trainingSummary.weightedFMeasure
val precision = trainingSummary.weightedPrecision
val recall = trainingSummary.weightedRecall
println(s"Accuracy: $accuracy\nFPR: $falsePositiveRate\nTPR: $truePositiveRate\n" +
s"F-measure: $fMeasure\nPrecision: $precision\nRecall: $recall")
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load training data
Dataset<Row> training = spark.read().format("libsvm")
LogisticRegression lr = new LogisticRegression()
// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);
// Print the coefficients and intercept for multinomial logistic regression
System.out.println("Coefficients: \n"
+ lrModel.coefficientMatrix() + " \nIntercept: " + lrModel.interceptVector());
LogisticRegressionTrainingSummary trainingSummary = lrModel.summary();
// Obtain the loss per iteration.
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
// for multiclass, we can inspect metrics on a per-label basis
System.out.println("False positive rate by label:");
int i = 0;
double[] fprLabel = trainingSummary.falsePositiveRateByLabel();
for (double fpr : fprLabel) {
System.out.println("label " + i + ": " + fpr);
System.out.println("True positive rate by label:");
i = 0;
double[] tprLabel = trainingSummary.truePositiveRateByLabel();
for (double tpr : tprLabel) {
System.out.println("label " + i + ": " + tpr);
System.out.println("Precision by label:");
i = 0;
double[] precLabel = trainingSummary.precisionByLabel();
for (double prec : precLabel) {
System.out.println("label " + i + ": " + prec);
System.out.println("Recall by label:");
i = 0;
double[] recLabel = trainingSummary.recallByLabel();
for (double rec : recLabel) {
System.out.println("label " + i + ": " + rec);
System.out.println("F-measure by label:");
i = 0;
double[] fLabel = trainingSummary.fMeasureByLabel();
for (double f : fLabel) {
System.out.println("label " + i + ": " + f);
double accuracy = trainingSummary.accuracy();
double falsePositiveRate = trainingSummary.weightedFalsePositiveRate();
double truePositiveRate = trainingSummary.weightedTruePositiveRate();
double fMeasure = trainingSummary.weightedFMeasure();
double precision = trainingSummary.weightedPrecision();
double recall = trainingSummary.weightedRecall();
System.out.println("Accuracy: " + accuracy);
System.out.println("FPR: " + falsePositiveRate);
System.out.println("TPR: " + truePositiveRate);
System.out.println("F-measure: " + fMeasure);
System.out.println("Precision: " + precision);
System.out.println("Recall: " + recall);
有关参数的更多详细信息,请参阅R API 文档。
# Load training data
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a multinomial logistic regression model with spark.logit
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)
# Model summary
# Prediction
predictions <- predict(model, test)
决策树是一系列流行的分类和回归方法。您可以在决策树部分中找到有关 spark.ml
以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用两个特征转换器来准备数据;这些转换器有助于为标签和分类特征对类别进行索引,将元数据添加到决策树算法可以识别的 DataFrame
有关参数的更多详细信息,请参阅Python API 文档。
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
treeModel = model.stages[2]
# summary only
有关参数的更多详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
// Automatically identify categorical features, and index them.
val featureIndexer = new VectorIndexer()
.setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
// Chain indexers and tree in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${(1.0 - accuracy)}")
val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println(s"Learned classification tree model:\n ${treeModel.toDebugString}")
有关参数的更多详细信息,请参阅Java API 文档。
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.DecisionTreeClassifier;
import org.apache.spark.ml.classification.DecisionTreeClassificationModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load the data stored in LIBSVM format as a DataFrame.
Dataset<Row> data = spark
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
// Automatically identify categorical features, and index them.
VectorIndexerModel featureIndexer = new VectorIndexer()
.setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a DecisionTree model.
DecisionTreeClassifier dt = new DecisionTreeClassifier()
// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
// Chain indexers and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, labelConverter});
// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);
// Select (prediction, true label) and compute test error.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));
DecisionTreeClassificationModel treeModel =
(DecisionTreeClassificationModel) (model.stages()[2]);
System.out.println("Learned classification tree model:\n" + treeModel.toDebugString());
有关更多详细信息,请参阅R API 文档。
# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a DecisionTree classification model with spark.decisionTree
model <- spark.decisionTree(training, label ~ features, "classification")
# Model summary
# Prediction
predictions <- predict(model, test)
随机森林是一系列流行的分类和回归方法。您可以在随机森林部分中找到有关 spark.ml
以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用两个特征转换器来准备数据;这些转换器有助于为标签和分类特征对类别进行索引,将元数据添加到基于树的算法可以识别的 DataFrame
有关更多详细信息,请参阅Python API 文档。
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
rfModel = model.stages[2]
print(rfModel) # summary only
有关更多详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a RandomForest model.
val rf = new RandomForestClassifier()
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
// Chain indexers and forest in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${(1.0 - accuracy)}")
val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
有关更多详细信息,请参阅Java API 文档。
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.RandomForestClassificationModel;
import org.apache.spark.ml.classification.RandomForestClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a RandomForest model.
RandomForestClassifier rf = new RandomForestClassifier()
// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
// Chain indexers and forest in a Pipeline
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {labelIndexer, featureIndexer, rf, labelConverter});
// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);
// Select (prediction, true label) and compute test error
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));
RandomForestClassificationModel rfModel = (RandomForestClassificationModel)(model.stages()[2]);
System.out.println("Learned classification forest model:\n" + rfModel.toDebugString());
有关更多详细信息,请参阅R API 文档。
# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a random forest classification model with spark.randomForest
model <- spark.randomForest(training, label ~ features, "classification", numTrees = 10)
# Model summary
# Prediction
predictions <- predict(model, test)
梯度提升树 (GBT) 是一种流行的分类和回归方法,它使用决策树的集合。您可以在GBT 部分中找到有关 spark.ml
以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用两个特征转换器来准备数据;这些转换器有助于为标签和分类特征对类别进行索引,将元数据添加到基于树的算法可以识别的 DataFrame
有关更多详细信息,请参阅Python API 文档。
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)
# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
gbtModel = model.stages[2]
print(gbtModel) # summary only
有关更多详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a GBT model.
val gbt = new GBTClassifier()
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
// Chain indexers and GBT in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${1.0 - accuracy}")
val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]
println(s"Learned classification GBT model:\n ${gbtModel.toDebugString}")
有关更多详细信息,请参阅Java API 文档。
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.GBTClassificationModel;
import org.apache.spark.ml.classification.GBTClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a GBT model.
GBTClassifier gbt = new GBTClassifier()
// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
// Chain indexers and GBT in a Pipeline.
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {labelIndexer, featureIndexer, gbt, labelConverter});
// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);
// Select (prediction, true label) and compute test error.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));
GBTClassificationModel gbtModel = (GBTClassificationModel)(model.stages()[2]);
System.out.println("Learned classification GBT model:\n" + gbtModel.toDebugString());
有关更多详细信息,请参阅R API 文档。
# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a GBT classification model with spark.gbt
model <- spark.gbt(training, label ~ features, "classification", maxIter = 10)
# Model summary
# Prediction
predictions <- predict(model, test)
多层感知器分类器 (MLPC) 是一种基于前馈人工神经网络的分类器。MLPC 由多层节点组成。网络中每一层都完全连接到下一层。输入层中的节点表示输入数据。所有其他节点都通过节点权重 $\wv$
和偏差 $\bv$
的线性组合将输入映射到输出,并应用激活函数。这可以用矩阵形式表示为具有 $K+1$
层的 MLPC,如下所示:\[ \mathrm{y}(\x) = \mathrm{f_K}(...\mathrm{f_2}(\wv_2^T\mathrm{f_1}(\wv_1^T \x+b_1)+b_2)...+b_K) \]
中间层中的节点使用 sigmoid(逻辑)函数:\[ \mathrm{f}(z_i) = \frac{1}{1 + e^{-z_i}} \]
输出层中的节点使用 softmax 函数:\[ \mathrm{f}(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}} \]
输出层中的节点数 $N$
MLPC 采用反向传播来学习模型。我们使用逻辑损失函数进行优化,并使用 L-BFGS 作为优化例程。
有关更多详细信息,请参阅Python API 文档。
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load training data
data = spark.read.format("libsvm")\
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# train the model
model = trainer.fit(train)
# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
有关更多详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm")
// Split the data into train and test
val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
val train = splits(0)
val test = splits(1)
// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)
// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier()
// train the model
val model = trainer.fit(train)
// compute accuracy on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")
有关更多详细信息,请参阅Java API 文档。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel;
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
// Load training data
String path = "data/mllib/sample_multiclass_classification_data.txt";
Dataset<Row> dataFrame = spark.read().format("libsvm").load(path);
// Split the data into train and test
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];
// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
int[] layers = new int[] {4, 5, 4, 3};
// create the trainer and set its parameters
MultilayerPerceptronClassifier trainer = new MultilayerPerceptronClassifier()
// train the model
MultilayerPerceptronClassificationModel model = trainer.fit(train);
// compute accuracy on the test set
Dataset<Row> result = model.transform(test);
Dataset<Row> predictionAndLabels = result.select("prediction", "label");
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
System.out.println("Test set accuracy = " + evaluator.evaluate(predictionAndLabels));
有关更多详细信息,请参阅R API 文档。
# Load training data
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df
# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = c(4, 5, 4, 3)
# Fit a multi-layer perceptron neural network model with spark.mlp
model <- spark.mlp(training, label ~ features, maxIter = 100,
layers = layers, blockSize = 128, seed = 1234)
# Model summary
# Prediction
predictions <- predict(model, test)
支持向量机在高维或无限维空间中构造一个超平面或一组超平面,可用于分类、回归或其他任务。直观地说,良好的分离是通过超平面实现的,该超平面到任何类的最近训练数据点(所谓的函数间隔)具有最大距离,因为通常间隔越大,分类器的泛化误差越低。Spark ML 中的 LinearSVC 支持使用线性 SVM 进行二元分类。在内部,它使用 OWLQN 优化器优化铰链损失。
有关更多详细信息,请参阅Python API 文档。
from pyspark.ml.classification import LinearSVC
# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
lsvc = LinearSVC(maxIter=10, regParam=0.1)
# Fit the model
lsvcModel = lsvc.fit(training)
# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))
有关更多详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.classification.LinearSVC
// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val lsvc = new LinearSVC()
// Fit the model
val lsvcModel = lsvc.fit(training)
// Print the coefficients and intercept for linear svc
println(s"Coefficients: ${lsvcModel.coefficients} Intercept: ${lsvcModel.intercept}")
有关更多详细信息,请参阅Java API 文档。
import org.apache.spark.ml.classification.LinearSVC;
import org.apache.spark.ml.classification.LinearSVCModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load training data
Dataset<Row> training = spark.read().format("libsvm")
LinearSVC lsvc = new LinearSVC()
// Fit the model
LinearSVCModel lsvcModel = lsvc.fit(training);
// Print the coefficients and intercept for LinearSVC
System.out.println("Coefficients: "
+ lsvcModel.coefficients() + " Intercept: " + lsvcModel.intercept());
有关更多详细信息,请参阅R API 文档。
# load training data
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit Linear SVM model
model <- spark.svmLinear(training, Survived ~ ., regParam = 0.01, maxIter = 10)
# Model summary
# Prediction
prediction <- predict(model, training)
OneVsRest 是机器学习归约的一个示例,用于在给定可以有效执行二元分类的基础分类器的情况下执行多类分类。它也被称为“一对多”。
被实现为一个 Estimator
。对于基础分类器,它采用 Classifier
的实例,并为 k 个类中的每一个创建一个二元分类问题。训练类 i 的分类器以预测标签是否是 i,将类 i 与所有其他类区分开来。
下面的示例演示了如何加载Iris 数据集,将其解析为 DataFrame,并使用 OneVsRest
有关更多详细信息,请参阅Python API 文档。
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# load data file.
inputData = spark.read.format("libsvm") \
# generate the train/test split.
(train, test) = inputData.randomSplit([0.8, 0.2])
# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)
# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)
# train the multiclass model.
ovrModel = ovr.fit(train)
# score the model on test data.
predictions = ovrModel.transform(test)
# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
有关更多详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.classification.{LogisticRegression, OneVsRest}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// load data file.
val inputData = spark.read.format("libsvm")
// generate the train/test split.
val Array(train, test) = inputData.randomSplit(Array(0.8, 0.2))
// instantiate the base classifier
val classifier = new LogisticRegression()
// instantiate the One Vs Rest Classifier.
val ovr = new OneVsRest().setClassifier(classifier)
// train the multiclass model.
val ovrModel = ovr.fit(train)
// score the model on test data.
val predictions = ovrModel.transform(test)
// obtain evaluator.
val evaluator = new MulticlassClassificationEvaluator()
// compute the classification error on test data.
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${1 - accuracy}")
有关更多详细信息,请参阅Java API 文档。
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.OneVsRest;
import org.apache.spark.ml.classification.OneVsRestModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// load data file.
Dataset<Row> inputData = spark.read().format("libsvm")
// generate the train/test split.
Dataset<Row>[] tmp = inputData.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> train = tmp[0];
Dataset<Row> test = tmp[1];
// configure the base classifier.
LogisticRegression classifier = new LogisticRegression()
// instantiate the One Vs Rest Classifier.
OneVsRest ovr = new OneVsRest().setClassifier(classifier);
// train the multiclass model.
OneVsRestModel ovrModel = ovr.fit(train);
// score the model on test data.
Dataset<Row> predictions = ovrModel.transform(test)
.select("prediction", "label");
// obtain evaluator.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
// compute the classification error on test data.
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1 - accuracy));
MLlib 支持多项式朴素贝叶斯、补充朴素贝叶斯、伯努利朴素贝叶斯和高斯朴素贝叶斯。
可以通过设置参数 $\lambda$(默认为 $1.0$)来使用加性平滑。
有关更多详细信息,请参阅Python API 文档。
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load training data
data = spark.read.format("libsvm") \
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
# train the model
model = nb.fit(train)
# select example rows to display.
predictions = model.transform(test)
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
有关更多详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)
// Train a NaiveBayes model.
val model = new NaiveBayes()
// Select example rows to display.
val predictions = model.transform(testData)
// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
有关更多详细信息,请参阅Java API 文档。
import org.apache.spark.ml.classification.NaiveBayes;
import org.apache.spark.ml.classification.NaiveBayesModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load training data
Dataset<Row> dataFrame =
// Split the data into train and test
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];
// create the trainer and set its parameters
NaiveBayes nb = new NaiveBayes();
// train the model
NaiveBayesModel model = nb.fit(train);
// Select example rows to display.
Dataset<Row> predictions = model.transform(test);
// compute accuracy on the test set
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test set accuracy = " + accuracy);
有关更多详细信息,请参阅R API 文档。
# Fit a Bernoulli naive Bayes model with spark.naiveBayes
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
nbDF <- titanicDF
nbTestDF <- titanicDF
nbModel <- spark.naiveBayes(nbDF, Survived ~ Class + Sex + Age)
# Model summary
# Prediction
nbPredictions <- predict(nbModel, nbTestDF)
以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们将特征缩放至 0 到 1 之间,以防止出现梯度爆炸问题。
有关更多详细信息,请参阅Python API 文档。
from pyspark.ml import Pipeline
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a FM model.
fm = FMClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", stepSize=0.001)
# Create a Pipeline.
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])
# Train model.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
# Select (prediction, true label) and compute test accuracy
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = %g" % accuracy)
fmModel = model.stages[2]
print("Factors: " + str(fmModel.factors)) # type: ignore
print("Linear: " + str(fmModel.linear)) # type: ignore
print("Intercept: " + str(fmModel.intercept)) # type: ignore
有关更多详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{FMClassificationModel, FMClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, MinMaxScaler, StringIndexer}
// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
// Scale features.
val featureScaler = new MinMaxScaler()
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a FM model.
val fm = new FMClassifier()
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
// Create a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureScaler, fm, labelConverter))
// Train model.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
// Select (prediction, true label) and compute test accuracy.
val evaluator = new MulticlassClassificationEvaluator()
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
val fmModel = model.stages(2).asInstanceOf[FMClassificationModel]
println(s"Factors: ${fmModel.factors} Linear: ${fmModel.linear} " +
s"Intercept: ${fmModel.intercept}")
有关更多详细信息,请参阅Java API 文档。
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.FMClassificationModel;
import org.apache.spark.ml.classification.FMClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
// Scale features.
MinMaxScalerModel featureScaler = new MinMaxScaler()
// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a FM model.
FMClassifier fm = new FMClassifier()
// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
// Create a Pipeline.
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {labelIndexer, featureScaler, fm, labelConverter});
// Train model.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);
// Select (prediction, true label) and compute test accuracy.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Accuracy = " + accuracy);
FMClassificationModel fmModel = (FMClassificationModel)(model.stages()[2]);
System.out.println("Factors: " + fmModel.factors());
System.out.println("Linear: " + fmModel.linear());
System.out.println("Intercept: " + fmModel.intercept());
有关更多详细信息,请参阅 R API 文档。
注意:SparkR 目前不支持特征缩放。
# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a FM classification model
model <- spark.fmClassifier(training, label ~ features)
# Model summary
# Prediction
predictions <- predict(model, test)
当使用“l-bfgs”求解器在具有常数非零列的数据集上拟合没有截距的 LinearRegressionModel 时,Spark MLlib 会为常数非零列输出零系数。此行为与 R glmnet 相同,但与 LIBSVM 不同。
有关参数的更多详细信息,请参阅 Python API 文档。
from pyspark.ml.regression import LinearRegression
# Load training data
training = spark.read.format("libsvm")\
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(training)
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))
# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
有关参数的更多详细信息,请参阅 Scala API 文档。
import org.apache.spark.ml.regression.LinearRegression
// Load training data
val training = spark.read.format("libsvm")
val lr = new LinearRegression()
// Fit the model
val lrModel = lr.fit(training)
// Print the coefficients and intercept for linear regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
有关参数的更多详细信息,请参阅 Java API 文档。
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load training data.
Dataset<Row> training = spark.read().format("libsvm")
LinearRegression lr = new LinearRegression()
// Fit the model.
LinearRegressionModel lrModel = lr.fit(training);
// Print the coefficients and intercept for linear regression.
System.out.println("Coefficients: "
+ lrModel.coefficients() + " Intercept: " + lrModel.intercept());
// Summarize the model over the training set and print out some metrics.
LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
System.out.println("numIterations: " + trainingSummary.totalIterations());
System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory()));
System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError());
System.out.println("r2: " + trainingSummary.r2());
有关参数的更多详细信息,请参阅 R API 文档。
# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a linear regression model
model <- spark.lm(training, label ~ features, regParam = 0.3, elasticNetParam = 0.8)
# Prediction
predictions <- predict(model, test)
# Summarize
与假设输出服从高斯分布的线性回归相比,广义线性模型 (GLM) 是线性模型的规范,其中响应变量 $Y_i$ 服从 指数族分布 中的某个分布。Spark 的 GeneralizedLinearRegression
接口允许灵活地指定 GLM,可用于各种类型的预测问题,包括线性回归、泊松回归、逻辑回归等。目前在 spark.ml
**注意**:Spark 目前仅通过其 GeneralizedLinearRegression
接口支持最多 4096 个特征,如果超过此限制,将抛出异常。有关更多详细信息,请参阅高级部分。尽管如此,对于线性和逻辑回归,可以使用 LinearRegression
和 LogisticRegression
GLM 需要可以用其“规范”或“自然”形式编写的指数族分布,也称为 自然指数族分布。自然指数族分布的形式如下:
\[f_Y(y|\theta, \tau) = h(y, \tau)\exp{\left( \frac{\theta \cdot y - A(\theta)}{d(\tau)} \right)}\]其中 $\theta$ 是感兴趣的参数,$\tau$ 是离散参数。在 GLM 中,假设响应变量 $Y_i$ 来自自然指数族分布
\[Y_i \sim f\left(\cdot|\theta_i, \tau \right)\]其中,感兴趣的参数 $\theta_i$ 与响应变量 $\mu_i$ 的期望值相关,如下所示:
\[\mu_i = A'(\theta_i)\]这里,$A’(\theta_i)$ 由所选分布的形式定义。GLM 还允许指定链接函数,该函数定义响应变量 $\mu_i$ 的期望值与所谓的*线性预测器* $\eta_i$ 之间的关系
\[g(\mu_i) = \eta_i = \vec{x_i}^T \cdot \vec{\beta}\]通常,选择链接函数以使 $A’ = g^{-1}$,这会在感兴趣的参数 $\theta$ 和线性预测器 $\eta$ 之间产生简化的关系。在这种情况下,链接函数 $g(\mu)$ 被称为“规范”链接函数。
\[\theta_i = A'^{-1}(\mu_i) = g(g^{-1}(\eta_i)) = \eta_i\]GLM 查找最大化似然函数的回归系数 $\vec{\beta}$。
\[\max_{\vec{\beta}} \mathcal{L}(\vec{\theta}|\vec{y},X) = \prod_{i=1}^{N} h(y_i, \tau) \exp{\left(\frac{y_i\theta_i - A(\theta_i)}{d(\tau)}\right)}\]其中,感兴趣的参数 $\theta_i$ 与回归系数 $\vec{\beta}$ 相关,如下所示:
\[\theta_i = A'^{-1}(g^{-1}(\vec{x_i} \cdot \vec{\beta}))\]Spark 的广义线性回归接口还提供用于诊断 GLM 模型拟合的汇总统计信息,包括残差、p 值、偏差、赤池信息准则等。
有关 GLM 及其应用的更全面回顾,请参见此处。
族 | 响应类型 | 支持的链接 | |
高斯 | 连续 | 恒等*、对数、倒数 | |
二项式 | 二进制 | Logit*、Probit、CLogLog | |
泊松 | 计数 | 对数*、恒等、平方根 | |
伽玛 | 连续 | 倒数*、恒等、对数 | |
Tweedie | 零膨胀连续 | 幂律链接函数 | |
* 规范链接 |
以下示例演示了如何训练具有高斯响应和恒等链接函数的 GLM 并提取模型汇总统计信息。
有关更多详细信息,请参阅 Python API 文档。
from pyspark.ml.regression import GeneralizedLinearRegression
# Load training data
dataset = spark.read.format("libsvm")\
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)
# Fit the model
model = glr.fit(dataset)
# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
# Summarize the model over the training set and print out some metrics
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
有关更多详细信息,请参阅 Scala API 文档。
import org.apache.spark.ml.regression.GeneralizedLinearRegression
// Load training data
val dataset = spark.read.format("libsvm")
val glr = new GeneralizedLinearRegression()
// Fit the model
val model = glr.fit(dataset)
// Print the coefficients and intercept for generalized linear regression model
println(s"Coefficients: ${model.coefficients}")
println(s"Intercept: ${model.intercept}")
// Summarize the model over the training set and print out some metrics
val summary = model.summary
println(s"Coefficient Standard Errors: ${summary.coefficientStandardErrors.mkString(",")}")
println(s"T Values: ${summary.tValues.mkString(",")}")
println(s"P Values: ${summary.pValues.mkString(",")}")
println(s"Dispersion: ${summary.dispersion}")
println(s"Null Deviance: ${summary.nullDeviance}")
println(s"Residual Degree Of Freedom Null: ${summary.residualDegreeOfFreedomNull}")
println(s"Deviance: ${summary.deviance}")
println(s"Residual Degree Of Freedom: ${summary.residualDegreeOfFreedom}")
println(s"AIC: ${summary.aic}")
println("Deviance Residuals: ")
有关更多详细信息,请参阅 Java API 文档。
import java.util.Arrays;
import org.apache.spark.ml.regression.GeneralizedLinearRegression;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionModel;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Load training data
Dataset<Row> dataset = spark.read().format("libsvm")
GeneralizedLinearRegression glr = new GeneralizedLinearRegression()
// Fit the model
GeneralizedLinearRegressionModel model = glr.fit(dataset);
// Print the coefficients and intercept for generalized linear regression model
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());
// Summarize the model over the training set and print out some metrics
GeneralizedLinearRegressionTrainingSummary summary = model.summary();
System.out.println("Coefficient Standard Errors: "
+ Arrays.toString(summary.coefficientStandardErrors()));
System.out.println("T Values: " + Arrays.toString(summary.tValues()));
System.out.println("P Values: " + Arrays.toString(summary.pValues()));
System.out.println("Dispersion: " + summary.dispersion());
System.out.println("Null Deviance: " + summary.nullDeviance());
System.out.println("Residual Degree Of Freedom Null: " + summary.residualDegreeOfFreedomNull());
System.out.println("Deviance: " + summary.deviance());
System.out.println("Residual Degree Of Freedom: " + summary.residualDegreeOfFreedom());
System.out.println("AIC: " + summary.aic());
System.out.println("Deviance Residuals: ");
有关更多详细信息,请参阅 R API 文档。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7, 3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Model summary
# Prediction
gaussianPredictions <- predict(gaussianGLM, gaussianTestDF)
# Fit a generalized linear model with glm (R-compliant)
gaussianGLM2 <- glm(label ~ features, gaussianDF, family = "gaussian")
# Fit a generalized linear model of family "binomial" with spark.glm
training2 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training2 <- transform(training2, label = cast(training2$label > 1, "integer"))
df_list2 <- randomSplit(training2, c(7, 3), 2)
binomialDF <- df_list2[[1]]
binomialTestDF <- df_list2[[2]]
binomialGLM <- spark.glm(binomialDF, label ~ features, family = "binomial")
# Model summary
# Prediction
binomialPredictions <- predict(binomialGLM, binomialTestDF)
# Fit a generalized linear model of family "tweedie" with spark.glm
training3 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
tweedieDF <- transform(training3, label = training3$label * exp(randn(10)))
tweedieGLM <- spark.glm(tweedieDF, label ~ features, family = "tweedie",
var.power = 1.2, link.power = 0)
# Model summary
决策树是一系列流行的分类和回归方法。您可以在决策树部分中找到有关 spark.ml
以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用特征转换器对分类特征进行索引,将元数据添加到 DataFrame
有关参数的更多详细信息,请参阅 Python API 文档。
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
treeModel = model.stages[1]
# summary only
有关参数的更多详细信息,请参阅 Scala API 文档。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Automatically identify categorical features, and index them.
// Here, we treat features with > 4 distinct values as continuous.
val featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a DecisionTree model.
val dt = new DecisionTreeRegressor()
// Chain indexer and tree in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, dt))
// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
println(s"Learned regression tree model:\n ${treeModel.toDebugString}")
有关参数的更多详细信息,请参阅 Java API 文档。
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load the data stored in LIBSVM format as a DataFrame.
Dataset<Row> data = spark.read().format("libsvm")
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a DecisionTree model.
DecisionTreeRegressor dt = new DecisionTreeRegressor()
// Chain indexer and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[]{featureIndexer, dt});
// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("label", "features").show(5);
// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
DecisionTreeRegressionModel treeModel =
(DecisionTreeRegressionModel) (model.stages()[1]);
System.out.println("Learned regression tree model:\n" + treeModel.toDebugString());
有关更多详细信息,请参阅R API 文档。
# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a DecisionTree regression model with spark.decisionTree
model <- spark.decisionTree(training, label ~ features, "regression")
# Model summary
# Prediction
predictions <- predict(model, test)
随机森林是一系列流行的分类和回归方法。您可以在随机森林部分中找到有关 spark.ml
以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们使用特征转换器对分类特征进行索引,将元数据添加到 DataFrame
有关更多详细信息,请参阅 Python API 文档。
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")
# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
rfModel = model.stages[1]
print(rfModel) # summary only
有关更多详细信息,请参阅 Scala API 文档。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a RandomForest model.
val rf = new RandomForestRegressor()
// Chain indexer and forest in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, rf))
// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
有关更多详细信息,请参阅 Java API 文档。
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.RandomForestRegressionModel;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a RandomForest model.
RandomForestRegressor rf = new RandomForestRegressor()
// Chain indexer and forest in a Pipeline
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {featureIndexer, rf});
// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);
// Select (prediction, true label) and compute test error
RegressionEvaluator evaluator = new RegressionEvaluator()
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
RandomForestRegressionModel rfModel = (RandomForestRegressionModel)(model.stages()[1]);
System.out.println("Learned regression forest model:\n" + rfModel.toDebugString());
有关更多详细信息,请参阅R API 文档。
# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a random forest regression model with spark.randomForest
model <- spark.randomForest(training, label ~ features, "regression", numTrees = 10)
# Model summary
# Prediction
predictions <- predict(model, test)
梯度提升树 (GBT) 是一种流行的回归方法,它使用决策树的集合。有关 spark.ml
实现的更多信息,请参阅关于 GBT 的部分。
实际上只需要 1 次迭代,但这在一般情况下并非如此。
有关更多详细信息,请参阅 Python API 文档。
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)
# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
gbtModel = model.stages[1]
print(gbtModel) # summary only
有关更多详细信息,请参阅 Scala API 文档。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a GBT model.
val gbt = new GBTRegressor()
// Chain indexer and GBT in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, gbt))
// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println(s"Learned regression GBT model:\n ${gbtModel.toDebugString}")
有关更多详细信息,请参阅 Java API 文档。
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a GBT model.
GBTRegressor gbt = new GBTRegressor()
// Chain indexer and GBT in a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});
// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);
// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());
有关更多详细信息,请参阅R API 文档。
# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df
# Fit a GBT regression model with spark.gbt
model <- spark.gbt(training, label ~ features, "regression", maxIter = 10)
# Model summary
# Prediction
predictions <- predict(model, test)
在 spark.ml
中,我们实现了 加速失效时间 (AFT) 模型,该模型是用于删失数据的参数生存回归模型。它描述了生存时间对数的模型,因此它通常被称为生存分析的对数线性模型。与为相同目的设计的 比例风险 模型不同,AFT 模型更容易并行化,因为每个实例都独立地贡献于目标函数。
给定协变量 $x^{‘}$ 的值,对于受试者 i = 1, …, n 的随机生存时间 $t_{i}$,可能存在右删失,AFT 模型下的似然函数如下:\[ L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} \]
其中 $\delta_{i}$ 是事件已发生的指标,即是否未经删失。使用 $\epsilon_{i}=\frac{\log{t_{i}}-x^{‘}\beta}{\sigma}$,对数似然函数的形式如下:\[ \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+\delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] \]
其中 $S_{0}(\epsilon_{i})$ 是基线生存函数,$f_{0}(\epsilon_{i})$ 是相应的密度函数。
最常用的 AFT 模型是基于生存时间的威布尔分布。生存时间的威布尔分布对应于生存时间对数的极值分布,$S_{0}(\epsilon)$ 函数为:\[ S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) \]
$f_{0}(\epsilon_{i})$ 函数为:\[ f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) \]
具有威布尔生存时间分布的 AFT 模型的对数似然函数为:\[ \iota(\beta,\sigma)= -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] \]
由于最小化负对数似然等效于最大后验概率,因此我们用于优化的损失函数为 $-\iota(\beta,\sigma)$。$\beta$ 和 $\log\sigma$ 的梯度函数分别为:\[ \frac{\partial (-\iota)}{\partial \beta}=\sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \]
\[ \frac{\partial (-\iota)}{\partial (\log\sigma)}=\sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}] \]
AFT 模型可以表述为凸优化问题,即找到依赖于系数向量 $\beta$ 和尺度参数的对数 $\log\sigma$ 的凸函数 $-\iota(\beta,\sigma)$ 的最小化器的任务。实现背后的优化算法是 L-BFGS。该实现与 R 的生存函数 survreg 的结果相匹配
当在具有常数非零列的数据集上拟合没有截距的 AFTSurvivalRegressionModel 时,Spark MLlib 会为常数非零列输出零系数。此行为与 R survival::survreg 不同。
有关更多详细信息,请参阅 Python API 文档。
from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors
training = spark.createDataFrame([
(1.218, 1.0, Vectors.dense(1.560, -0.605)),
(2.949, 0.0, Vectors.dense(0.346, 2.158)),
(3.627, 0.0, Vectors.dense(1.380, 0.231)),
(0.273, 1.0, Vectors.dense(0.520, 1.151)),
(4.199, 0.0, Vectors.dense(0.795, -0.226))], ["label", "censor", "features"])
quantileProbabilities = [0.3, 0.6]
aft = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities,
model = aft.fit(training)
# Print the coefficients, intercept and scale parameter for AFT survival regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
print("Scale: " + str(model.scale))
有关详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.AFTSurvivalRegression
val training = spark.createDataFrame(Seq(
(1.218, 1.0, Vectors.dense(1.560, -0.605)),
(2.949, 0.0, Vectors.dense(0.346, 2.158)),
(3.627, 0.0, Vectors.dense(1.380, 0.231)),
(0.273, 1.0, Vectors.dense(0.520, 1.151)),
(4.199, 0.0, Vectors.dense(0.795, -0.226))
)).toDF("label", "censor", "features")
val quantileProbabilities = Array(0.3, 0.6)
val aft = new AFTSurvivalRegression()
val model = aft.fit(training)
// Print the coefficients, intercept and scale parameter for AFT survival regression
println(s"Coefficients: ${model.coefficients}")
println(s"Intercept: ${model.intercept}")
println(s"Scale: ${model.scale}")
有关详细信息,请参阅Java API 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.regression.AFTSurvivalRegression;
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(1.218, 1.0, Vectors.dense(1.560, -0.605)),
RowFactory.create(2.949, 0.0, Vectors.dense(0.346, 2.158)),
RowFactory.create(3.627, 0.0, Vectors.dense(1.380, 0.231)),
RowFactory.create(0.273, 1.0, Vectors.dense(0.520, 1.151)),
RowFactory.create(4.199, 0.0, Vectors.dense(0.795, -0.226))
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("censor", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
Dataset<Row> training = spark.createDataFrame(data, schema);
double[] quantileProbabilities = new double[]{0.3, 0.6};
AFTSurvivalRegression aft = new AFTSurvivalRegression()
AFTSurvivalRegressionModel model = aft.fit(training);
// Print the coefficients, intercept and scale parameter for AFT survival regression
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());
System.out.println("Scale: " + model.scale());
有关详细信息,请参阅R API 文档。
# Use the ovarian dataset available in R survival package
# Fit an accelerated failure time (AFT) survival regression model with spark.survreg
ovarianDF <- suppressWarnings(createDataFrame(ovarian))
aftDF <- ovarianDF
aftTestDF <- ovarianDF
aftModel <- spark.survreg(aftDF, Surv(futime, fustat) ~ ecog_ps + rx)
# Model summary
# Prediction
aftPredictions <- predict(aftModel, aftTestDF)
保序回归属于回归算法家族。形式上,保序回归是一个问题,给定一个有限的实数集 $Y = {y_1, y_2, ..., y_n}$
表示观察到的响应,$X = {x_1, x_2, ..., x_n}$
\begin{equation} f(x) = \sum_{i=1}^n w_i (y_i - x_i)^2 \end{equation}
在完全排序约束下最小化 $x_1\le x_2\le ...\le x_n$
,其中 $w_i$
我们实现了一种相邻违规者池算法,该算法使用了一种并行化保序回归的方法。训练输入是一个 DataFrame,它包含三列:标签、特征和权重。此外,IsotonicRegression 算法还有一个可选参数,称为 $isotonic$,默认为 true。此参数指定保序回归是保序的(单调递增)还是反序的(单调递减)。
训练返回一个 IsotonicRegressionModel,可用于预测已知和未知特征的标签。保序回归的结果被视为分段线性函数。因此,预测规则是
- 如果预测输入与训练特征完全匹配,则返回关联的预测。如果有多个预测具有相同的特征,则返回其中一个。返回哪一个是未定义的(与 java.util.Arrays.binarySearch 相同)。
- 如果预测输入低于或高于所有训练特征,则分别返回具有最低或最高特征的预测。如果有多个预测具有相同的特征,则分别返回最低或最高的预测。
- 如果预测输入介于两个训练特征之间,则将预测视为分段线性函数,并根据两个最接近特征的预测计算插值。如果有多个值具有相同的特征,则使用与前一点相同的规则。
有关 API 的更多详细信息,请参阅IsotonicRegression
Python 文档。
from pyspark.ml.regression import IsotonicRegression
# Loads data.
dataset = spark.read.format("libsvm")\
# Trains an isotonic regression model.
model = IsotonicRegression().fit(dataset)
print("Boundaries in increasing order: %s\n" % str(model.boundaries))
print("Predictions associated with the boundaries: %s\n" % str(model.predictions))
# Makes predictions.
有关 API 的详细信息,请参阅IsotonicRegression
Scala 文档。
import org.apache.spark.ml.regression.IsotonicRegression
// Loads data.
val dataset = spark.read.format("libsvm")
// Trains an isotonic regression model.
val ir = new IsotonicRegression()
val model = ir.fit(dataset)
println(s"Boundaries in increasing order: ${model.boundaries}\n")
println(s"Predictions associated with the boundaries: ${model.predictions}\n")
// Makes predictions.
有关 API 的详细信息,请参阅IsotonicRegression
Java 文档。
import org.apache.spark.ml.regression.IsotonicRegression;
import org.apache.spark.ml.regression.IsotonicRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
// Trains an isotonic regression model.
IsotonicRegression ir = new IsotonicRegression();
IsotonicRegressionModel model = ir.fit(dataset);
System.out.println("Boundaries in increasing order: " + model.boundaries() + "\n");
System.out.println("Predictions associated with the boundaries: " + model.predictions() + "\n");
// Makes predictions.
有关 API 的更多详细信息,请参阅IsotonicRegression
R API 文档。
# Load training data
df <- read.df("data/mllib/sample_isotonic_regression_libsvm_data.txt", source = "libsvm")
training <- df
test <- df
# Fit an isotonic regression model with spark.isoreg
model <- spark.isoreg(training, label ~ features, isotonic = FALSE)
# Model summary
# Prediction
predictions <- predict(model, test)
以下示例加载 LibSVM 格式的数据集,将其拆分为训练集和测试集,在第一个数据集上进行训练,然后在保留的测试集上进行评估。我们将特征缩放至 0 到 1 之间,以防止出现梯度爆炸问题。
有关详细信息,请参阅Python API 文档。
from pyspark.ml import Pipeline
from pyspark.ml.regression import FMRegressor
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a FM model.
fm = FMRegressor(featuresCol="scaledFeatures", stepSize=0.001)
# Create a Pipeline.
pipeline = Pipeline(stages=[featureScaler, fm])
# Train model.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
fmModel = model.stages[1]
print("Factors: " + str(fmModel.factors)) # type: ignore
print("Linear: " + str(fmModel.linear)) # type: ignore
print("Intercept: " + str(fmModel.intercept)) # type: ignore
有关详细信息,请参阅Scala API 文档。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.regression.{FMRegressionModel, FMRegressor}
// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Scale features.
val featureScaler = new MinMaxScaler()
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a FM model.
val fm = new FMRegressor()
// Create a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(featureScaler, fm))
// Train model.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
val fmModel = model.stages(1).asInstanceOf[FMRegressionModel]
println(s"Factors: ${fmModel.factors} Linear: ${fmModel.linear} " +
s"Intercept: ${fmModel.intercept}")
有关详细信息,请参阅Java API 文档。
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.regression.FMRegressionModel;
import org.apache.spark.ml.regression.FMRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Scale features.
MinMaxScalerModel featureScaler = new MinMaxScaler()
// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a FM model.
FMRegressor fm = new FMRegressor()
// Create a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureScaler, fm});
// Train model.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);
// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
FMRegressionModel fmModel = (FMRegressionModel)(model.stages()[1]);
System.out.println("Factors: " + fmModel.factors());
System.out.println("Linear: " + fmModel.linear());
System.out.println("Intercept: " + fmModel.intercept());
有关详细信息,请参阅R API 文档。
注意:SparkR 目前不支持特征缩放。
# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training_test <- randomSplit(df, c(0.7, 0.3))
training <- training_test[[1]]
test <- training_test[[2]]
# Fit a FM regression model
model <- spark.fmRegressor(training, label ~ features)
# Model summary
# Prediction
predictions <- predict(model, test)
我们实现了流行的线性方法,例如具有 $L_1$ 或 $L_2$ 正则化的逻辑回归和线性最小二乘法。有关实现和调整的详细信息,请参阅基于 RDD 的 API 的线性方法指南;这些信息仍然适用。
我们还为弹性网络提供了一个 DataFrame API,它是Zou 等人在“通过弹性网络进行正则化和变量选择”中提出的 $L_1$ 和 $L_2$ 正则化的混合。在数学上,它被定义为 $L_1$ 和 $L_2$ 正则化项的凸组合:\[ \alpha \left( \lambda \|\wv\|_1 \right) + (1-\alpha) \left( \frac{\lambda}{2}\|\wv\|_2^2 \right) , \alpha \in [0, 1], \lambda \geq 0 \]
通过正确设置 $\alpha$,弹性网络包含 $L_1$ 和 $L_2$ 正则化作为特例。例如,如果使用弹性网络参数 $\alpha$ 设置为 $1$ 训练线性回归模型,则它等效于Lasso模型。另一方面,如果将 $\alpha$ 设置为 $0$,则训练后的模型将简化为岭回归模型。我们为具有弹性网络正则化的线性回归和逻辑回归实现了 Pipelines API。
\[\hat{y} = w_0 + \sum\limits^n_{i-1} w_i x_i + \sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j\]前两项表示截距和线性项(与线性回归相同),最后一项表示成对交互项。\(v_i\) 描述了具有 k 个因子的第 i 个变量。
FM 可用于回归,优化准则是均方误差。FM 还可以通过 sigmoid 函数用于二元分类。优化准则是逻辑损失。
\[\sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j = \frac{1}{2}\sum\limits^k_{f=1} \left(\left( \sum\limits^n_{i=1}v_{i,f}x_i \right)^2 - \sum\limits^n_{i=1}v_{i,f}^2x_i^2 \right)\]该方程在 k 和 n 中都只有线性复杂度,即其计算复杂度为 \(O(kn)\)。
通常,为了防止梯度爆炸问题,最好将连续特征缩放至 0 到 1 之间,或者对连续特征进行分箱并进行独热编码。
用户可以在MLlib 决策树指南中找到有关决策树算法的更多信息。此 API 与原始 MLlib 决策树 API之间的主要区别在于
- 支持 ML Pipelines
- 将决策树用于分类与回归的分离
- 使用 DataFrame 元数据来区分连续特征和分类特征
决策树的 Pipelines API 比原始 API 提供了更多功能。特别是,对于分类,用户可以获得每个类的预测概率(也称为类条件概率);对于回归,用户可以获得预测的偏差样本方差。
参数名称 | 类型 | 默认值 | 说明 |
labelCol | Double | “label” | 要预测的标签 |
featuresCol | Vector | “features” | 特征向量 |
参数名称 | 类型 | 默认值 | 说明 | 备注 |
predictionCol | Double | “prediction” | 预测的标签 | |
rawPredictionCol | Vector | “rawPrediction” | 长度为 # 类别的向量,其中包含进行预测的树节点处的训练实例标签计数 | 仅限分类 |
probabilityCol | Vector | “probability” | 长度为 # 类别的向量,等于归一化为多项分布的 rawPrediction | 仅限分类 |
varianceCol | Double | 预测的偏差样本方差 | 仅限回归 |
DataFrame API 支持两种主要的树集成算法:随机森林和梯度提升树 (GBT)。两者都使用spark.ml
用户可以在MLlib 集成指南中找到有关集成算法的更多信息。在本节中,我们将演示集成的 DataFrame API。
此 API 与原始 MLlib 集成 API之间的主要区别在于
- 支持 DataFrames 和 ML Pipelines
- 分类与回归的分离
- 使用 DataFrame 元数据来区分连续特征和分类特征
- 随机森林的更多功能:特征重要性估计,以及分类中每个类的预测概率(也称为类条件概率)。
参数名称 | 类型 | 默认值 | 说明 |
labelCol | Double | “label” | 要预测的标签 |
featuresCol | Vector | “features” | 特征向量 |
参数名称 | 类型 | 默认值 | 说明 | 备注 |
predictionCol | Double | “prediction” | 预测的标签 | |
rawPredictionCol | Vector | “rawPrediction” | 长度为 # 类别的向量,其中包含进行预测的树节点处的训练实例标签计数 | 仅限分类 |
probabilityCol | Vector | “probability” | 长度为 # 类别的向量,等于归一化为多项分布的 rawPrediction | 仅限分类 |
梯度提升树 (GBT)
梯度提升树 (GBT)是决策树的集成。GBT 迭代地训练决策树,以最小化损失函数。spark.ml
实现支持用于二元分类和回归的 GBT,使用连续和分类特征。
文档中的 GBT。
参数名称 | 类型 | 默认值 | 说明 |
labelCol | Double | “label” | 要预测的标签 |
featuresCol | Vector | “features” | 特征向量 |
参数名称 | 类型 | 默认值 | 说明 | 备注 |
predictionCol | Double | “prediction” | 预测的标签 |
也将输出 rawPrediction
和 probability
列,就像 RandomForestClassifier