集成方法 - 基于 RDD 的 API
集成方法是一种学习算法,它创建的模型由一组其他基础模型组成。spark.mllib
支持两种主要的集成算法:GradientBoostedTrees
和 RandomForest
。它们都使用决策树作为其基础模型。
梯度提升树 vs. 随机森林
梯度提升树 (GBT) 和随机森林都是用于学习树集成模型的算法,但它们的训练过程不同。存在一些实际的权衡:
- 梯度提升树 (GBT) 一次训练一棵树,因此训练时间可能比随机森林更长。随机森林可以并行训练多棵树。
- 另一方面,使用梯度提升树 (GBT) 时通常可以使用比随机森林更小(更浅)的树,并且训练较小的树所需时间更少。
- 随机森林不太容易过拟合。在随机森林中训练更多的树可以降低过拟合的可能性,但在梯度提升树 (GBT) 中训练更多的树会增加过拟合的可能性。(用统计学语言来说,随机森林通过使用更多的树来减少方差,而梯度提升树 (GBT) 通过使用更多的树来减少偏差。)
- 随机森林更容易调优,因为性能会随着树的数量的增加而单调提升(而对于梯度提升树 (GBT),如果树的数量过多,性能可能会开始下降)。
简而言之,两种算法都可以有效,选择应基于特定的数据集。
随机森林
随机森林是决策树的集成。随机森林是分类和回归领域最成功的机器学习模型之一。它们结合了许多决策树,以降低过拟合的风险。与决策树一样,随机森林可以处理分类特征,扩展到多类别分类设置,不需要特征缩放,并且能够捕获非线性关系和特征交互。
spark.mllib
支持使用连续和分类特征的二元和多类别分类以及回归的随机森林。spark.mllib
使用现有的决策树实现来构建随机森林。有关树的更多信息,请参阅决策树指南。
基本算法
随机森林分别训练一组决策树,因此训练可以并行进行。该算法在训练过程中注入随机性,使每个决策树略有不同。结合每棵树的预测结果可以减少预测的方差,从而提高测试数据的性能。
训练
训练过程中注入的随机性包括:
- 在每次迭代中对原始数据集进行子采样以获得不同的训练集(也称为自举法)。
- 在每个树节点处考虑不同的随机特征子集进行分裂。
除了这些随机化之外,决策树的训练方式与单个决策树的训练方式相同。
预测
为了对新实例进行预测,随机森林必须汇总其决策树集合的预测结果。这种汇总方式对于分类和回归任务是不同的。
分类:多数投票。每棵树的预测都被计为对一个类别的投票。标签被预测为获得最多投票的类别。
回归:平均。每棵树预测一个实数值。标签被预测为所有树预测的平均值。
使用技巧
我们通过讨论各种参数来提供一些使用随机森林的指南。我们省略了一些决策树参数,因为这些参数已在决策树指南中介绍。
我们提到的前两个参数是最重要的,调优它们通常可以提高性能:
numTrees
:森林中树的数量。- 增加树的数量将减少预测的方差,从而提高模型在测试时的准确性。
- 训练时间大致随树的数量线性增加。
maxDepth
:森林中每棵树的最大深度。- 增加深度会使模型更具表现力和强大。然而,深度树需要更长的训练时间,也更容易过拟合。
- 通常,在使用随机森林时训练更深的树是可以接受的,而使用单个决策树则不然。单棵树比随机森林更容易过拟合(因为随机森林中多棵树的平均可以减少方差)。
接下来的两个参数通常不需要调优。然而,可以通过调优它们来加快训练速度。
-
subsamplingRate
:此参数指定用于训练森林中每棵树的数据集大小,表示为原始数据集大小的一部分。建议使用默认值 (1.0),但减小此比例可以加快训练速度。 -
featureSubsetStrategy
:在每个树节点处用作分裂候选的特征数量。该数量被指定为总特征数量的一部分或函数。减小此数量将加快训练速度,但如果过低有时会影响性能。
示例
分类
下面的示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用随机森林执行分类。计算测试误差以衡量算法准确性。
有关 API 的更多详细信息,请参阅 RandomForest
Python 文档 和 RandomForest
Python 文档。
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
# Note: Use larger numTrees in practice.
# Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='gini', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myRandomForestClassificationModel")
sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")
有关 API 的详细信息,请参阅 RandomForest
Scala 文档 和 RandomForestModel
Scala 文档。
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 4
val maxBins = 32
val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification forest model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myRandomForestClassificationModel")
val sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")
有关 API 的详细信息,请参阅 RandomForest
Java 文档 和 RandomForestModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.RandomForest;
import org.apache.spark.mllib.tree.model.RandomForestModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaRandomForestClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
int numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "gini";
int maxDepth = 5;
int maxBins = 32;
int seed = 12345;
RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins,
seed);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification forest model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myRandomForestClassificationModel");
RandomForestModel sameModel = RandomForestModel.load(jsc.sc(),
"target/tmp/myRandomForestClassificationModel");
回归
下面的示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用随机森林执行回归。最后计算均方误差 (MSE) 以评估拟合优度。
有关 API 的更多详细信息,请参阅 RandomForest
Python 文档 和 RandomForest
Python 文档。
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
# Note: Use larger numTrees in practice.
# Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='variance', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myRandomForestRegressionModel")
sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestRegressionModel")
有关 API 的详细信息,请参阅 RandomForest
Scala 文档 和 RandomForestModel
Scala 文档。
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "variance"
val maxDepth = 4
val maxBins = 32
val model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression forest model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myRandomForestRegressionModel")
val sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestRegressionModel")
有关 API 的详细信息,请参阅 RandomForest
Java 文档 和 RandomForestModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.RandomForest;
import org.apache.spark.mllib.tree.model.RandomForestModel;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
SparkConf sparkConf = new SparkConf().setAppName("JavaRandomForestRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
int numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "variance";
int maxDepth = 4;
int maxBins = 32;
int seed = 12345;
// Train a RandomForest model.
RandomForestModel model = RandomForest.trainRegressor(trainingData,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
double diff = pl._1() - pl._2();
return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression forest model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myRandomForestRegressionModel");
RandomForestModel sameModel = RandomForestModel.load(jsc.sc(),
"target/tmp/myRandomForestRegressionModel");
梯度提升树 (GBT)
梯度提升树 (GBT) 是决策树的集成。梯度提升树 (GBT) 迭代地训练决策树以最小化损失函数。与决策树一样,梯度提升树 (GBT) 可以处理分类特征,扩展到多类别分类设置,不需要特征缩放,并且能够捕获非线性关系和特征交互。
spark.mllib
支持使用连续和分类特征的二元分类和回归的梯度提升树 (GBT)。spark.mllib
使用现有的决策树实现来构建梯度提升树 (GBT)。有关树的更多信息,请参阅决策树指南。
注意:梯度提升树 (GBT) 尚不支持多类别分类。对于多类别问题,请使用决策树或随机森林。
基本算法
梯度提升迭代地训练一系列决策树。在每次迭代中,算法使用当前的集成模型预测每个训练实例的标签,然后将预测与真实标签进行比较。数据集会被重新标记,以便更强调预测较差的训练实例。因此,在下一次迭代中,决策树将帮助纠正之前的错误。
重新标记实例的具体机制由损失函数(下文讨论)定义。每次迭代,梯度提升树 (GBT) 都会进一步减少训练数据上的此损失函数。
损失函数
下表列出了 spark.mllib
中梯度提升树 (GBT) 当前支持的损失函数。请注意,每个损失函数仅适用于分类或回归之一,而非两者都适用。
符号:N = 实例数量。 yi = 实例 i 的标签。 xi = 实例 i 的特征。 F(xi) = 模型对实例 i 的预测标签。
损失函数 | 任务 | 公式 | 描述 |
---|---|---|---|
对数损失 | 分类 | 2∑Ni=1log(1+exp(−2yiF(xi))) | 二项式负对数似然的两倍。 |
平方误差 | 回归 | ∑Ni=1(yi−F(xi))2 | 也称为 L2 损失。回归任务的默认损失函数。 |
绝对误差 | 回归 | ∑Ni=1|yi−F(xi)| | 也称为 L1 损失。比平方误差对异常值更具鲁棒性。 |
使用技巧
我们通过讨论各种参数来提供一些使用梯度提升树 (GBT) 的指南。我们省略了一些决策树参数,因为这些参数已在决策树指南中介绍。
-
loss
:有关损失函数及其对任务(分类与回归)适用性的信息,请参阅上文。不同的损失函数可能会根据数据集给出显著不同的结果。 -
numIterations
:这设置了集成模型中树的数量。每次迭代生成一棵树。增加此数量会使模型更具表现力,从而提高训练数据的准确性。然而,如果此值过大,测试时的准确性可能会下降。 -
learningRate
:此参数通常不需要调优。如果算法行为不稳定,减小此值可能会提高稳定性。 -
algo
:算法或任务(分类与回归)是使用树的 [Strategy] 参数设置的。
训练时的验证
梯度提升在训练树数量增多时可能过拟合。为了防止过拟合,在训练时进行验证是有用的。已提供 runWithValidation
方法来利用此选项。它接受一对 RDD 作为参数,第一个是训练数据集,第二个是验证数据集。
当验证误差的改进不超过某个容差(由 BoostingStrategy
中的 validationTol
参数提供)时,训练将停止。实际上,验证误差最初会下降,随后会增加。在某些情况下,验证误差可能不会单调变化,建议用户设置足够大的负容差,并使用 evaluateEachIteration
(它提供每次迭代的误差或损失)来检查验证曲线,以调优迭代次数。
示例
分类
下面的示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用梯度提升树和对数损失进行分类。计算测试误差以衡量算法准确性。
有关 API 的更多详细信息,请参阅 GradientBoostedTrees
Python 文档 和 GradientBoostedTreesModel
Python 文档。
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file.
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GradientBoostedTrees model.
# Notes: (a) Empty categoricalFeaturesInfo indicates all features are continuous.
# (b) Use more iterations in practice.
model = GradientBoostedTrees.trainClassifier(trainingData,
categoricalFeaturesInfo={}, numIterations=3)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification GBT model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myGradientBoostingClassificationModel")
sameModel = GradientBoostedTreesModel.load(sc,
"target/tmp/myGradientBoostingClassificationModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Scala 文档 和 GradientBoostedTreesModel
Scala 文档。
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a GradientBoostedTrees model.
// The defaultParams for Classification use LogLoss by default.
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.
boostingStrategy.treeStrategy.numClasses = 2
boostingStrategy.treeStrategy.maxDepth = 5
// Empty categoricalFeaturesInfo indicates all features are continuous.
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification GBT model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myGradientBoostingClassificationModel")
val sameModel = GradientBoostedTreesModel.load(sc,
"target/tmp/myGradientBoostingClassificationModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Java 文档 和 GradientBoostedTreesModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.GradientBoostedTrees;
import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf()
.setAppName("JavaGradientBoostedTreesClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Train a GradientBoostedTrees model.
// The defaultParams for Classification use LogLoss by default.
BoostingStrategy boostingStrategy = BoostingStrategy.defaultParams("Classification");
boostingStrategy.setNumIterations(3); // Note: Use more iterations in practice.
boostingStrategy.getTreeStrategy().setNumClasses(2);
boostingStrategy.getTreeStrategy().setMaxDepth(5);
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification GBT model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myGradientBoostingClassificationModel");
GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load(jsc.sc(),
"target/tmp/myGradientBoostingClassificationModel");
回归
下面的示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用梯度提升树和平方误差作为损失函数执行回归。最后计算均方误差 (MSE) 以评估拟合优度。
有关 API 的更多详细信息,请参阅 GradientBoostedTrees
Python 文档 和 GradientBoostedTreesModel
Python 文档。
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file.
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GradientBoostedTrees model.
# Notes: (a) Empty categoricalFeaturesInfo indicates all features are continuous.
# (b) Use more iterations in practice.
model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo={}, numIterations=3)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression GBT model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myGradientBoostingRegressionModel")
sameModel = GradientBoostedTreesModel.load(sc, "target/tmp/myGradientBoostingRegressionModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Scala 文档 和 GradientBoostedTreesModel
Scala 文档。
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a GradientBoostedTrees model.
// The defaultParams for Regression use SquaredError by default.
val boostingStrategy = BoostingStrategy.defaultParams("Regression")
boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.
boostingStrategy.treeStrategy.maxDepth = 5
// Empty categoricalFeaturesInfo indicates all features are continuous.
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression GBT model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myGradientBoostingRegressionModel")
val sameModel = GradientBoostedTreesModel.load(sc,
"target/tmp/myGradientBoostingRegressionModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Java 文档 和 GradientBoostedTreesModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.GradientBoostedTrees;
import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf()
.setAppName("JavaGradientBoostedTreesRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Train a GradientBoostedTrees model.
// The defaultParams for Regression use SquaredError by default.
BoostingStrategy boostingStrategy = BoostingStrategy.defaultParams("Regression");
boostingStrategy.setNumIterations(3); // Note: Use more iterations in practice.
boostingStrategy.getTreeStrategy().setMaxDepth(5);
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
double diff = pl._1() - pl._2();
return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression GBT model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myGradientBoostingRegressionModel");
GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load(jsc.sc(),
"target/tmp/myGradientBoostingRegressionModel");