集成 - 基于RDD的API
集成方法是一种学习算法,它创建一个由一组其他基本模型组成的模型。 spark.mllib
支持两种主要的集成算法:GradientBoostedTrees
和 RandomForest
。 两者都使用决策树作为其基本模型。
梯度提升树 vs. 随机森林
梯度提升树 (GBTs) 和 随机森林都是用于学习树的集成的算法,但训练过程不同。 有几个实际的权衡。
- GBTs 一次训练一棵树,因此它们比随机森林需要更长的训练时间。 随机森林可以并行训练多棵树。
- 另一方面,使用 GBTs 比随机森林通常更合理地使用更小(更浅)的树,并且训练较小的树需要更少的时间。
- 随机森林不太容易过度拟合。 在随机森林中训练更多树会降低过度拟合的可能性,但是使用 GBTs 训练更多树会增加过度拟合的可能性。 (用统计语言来说,随机森林通过使用更多树来减少方差,而 GBTs 通过使用更多树来减少偏差。)
- 随机森林更容易调整,因为性能随树的数量单调提高(如果树的数量变得太大,GBTs 的性能可能会开始下降)。
简而言之,两种算法都可能有效,并且选择应基于特定的数据集。
随机森林
随机森林 是 决策树的集成。 随机森林是用于分类和回归的最成功的机器学习模型之一。 它们结合了许多决策树,以降低过度拟合的风险。 像决策树一样,随机森林处理分类特征,扩展到多类分类设置,不需要特征缩放,并且能够捕获非线性和特征交互。
spark.mllib
支持用于二元和多类分类以及回归的随机森林,同时使用连续和分类特征。 spark.mllib
使用现有的决策树实现来实现随机森林。 请参阅决策树指南,以获取有关树的更多信息。
基本算法
随机森林分别训练一组决策树,因此可以并行完成训练。 该算法将随机性注入到训练过程中,以使每个决策树都略有不同。 组合来自每棵树的预测会减少预测的方差,从而提高测试数据的性能。
训练
注入到训练过程中的随机性包括
- 在每次迭代中对原始数据集进行子采样以获得不同的训练集(也称为引导)。
- 在每个树节点上考虑不同的特征随机子集进行拆分。
除了这些随机化之外,决策树训练的完成方式与单个决策树相同。
预测
为了对新实例进行预测,随机森林必须汇总其决策树集中的预测。 对于分类和回归,此聚合的完成方式有所不同。
分类:多数投票。 每棵树的预测都计为一类的一票。 该标签预计为获得最多选票的类别。
回归:平均。 每棵树都预测一个实际值。 该标签预计为树预测的平均值。
使用技巧
我们通过讨论各种参数来包括一些使用随机森林的指南。 我们省略了一些决策树参数,因为这些参数已在决策树指南中介绍。
我们提到的前两个参数是最重要的,调整它们通常可以提高性能
numTrees
:森林中的树数。- 增加树的数量将减少预测的方差,从而提高模型的测试时间准确性。
- 训练时间大致与树的数量线性增加。
maxDepth
:森林中每棵树的最大深度。- 增加深度会使模型更具表现力和强大。 但是,深层树的训练时间更长,并且更容易过度拟合。
- 通常,在使用随机森林时,比使用单个决策树时训练更深的树是可以接受的。 一棵树比随机森林更容易过度拟合(由于森林中平均多棵树导致的方差减少)。
接下来的两个参数通常不需要调整。 但是,可以调整它们以加快训练速度。
-
subsamplingRate
:此参数指定用于训练森林中每棵树的数据集的大小,占原始数据集大小的一部分。 建议使用默认值 (1.0),但减小此比例可以加快训练速度。 -
featureSubsetStrategy
:用作每个树节点拆分候选者的特征数。 该数字指定为特征总数的一部分或函数。 减少此数量将加快训练速度,但如果太低,有时会影响性能。
示例
分类
下面的示例演示了如何加载LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用随机森林执行分类。 计算测试误差以衡量算法的准确性。
有关API的更多详细信息,请参阅RandomForest
Python 文档和RandomForest
Python 文档。
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
# Note: Use larger numTrees in practice.
# Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='gini', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myRandomForestClassificationModel")
sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")
有关API的详细信息,请参阅RandomForest
Scala 文档和RandomForestModel
Scala 文档。
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 4
val maxBins = 32
val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification forest model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myRandomForestClassificationModel")
val sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")
有关API的详细信息,请参阅RandomForest
Java 文档和RandomForestModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.RandomForest;
import org.apache.spark.mllib.tree.model.RandomForestModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaRandomForestClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
int numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "gini";
int maxDepth = 5;
int maxBins = 32;
int seed = 12345;
RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins,
seed);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification forest model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myRandomForestClassificationModel");
RandomForestModel sameModel = RandomForestModel.load(jsc.sc(),
"target/tmp/myRandomForestClassificationModel");
回归
下面的示例演示了如何加载LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用随机森林执行回归。 最后计算均方误差 (MSE) 以评估拟合优度。
有关API的更多详细信息,请参阅RandomForest
Python 文档和RandomForest
Python 文档。
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
# Note: Use larger numTrees in practice.
# Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='variance', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myRandomForestRegressionModel")
sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestRegressionModel")
有关API的详细信息,请参阅RandomForest
Scala 文档和RandomForestModel
Scala 文档。
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "variance"
val maxDepth = 4
val maxBins = 32
val model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression forest model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myRandomForestRegressionModel")
val sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestRegressionModel")
有关API的详细信息,请参阅RandomForest
Java 文档和RandomForestModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.RandomForest;
import org.apache.spark.mllib.tree.model.RandomForestModel;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
SparkConf sparkConf = new SparkConf().setAppName("JavaRandomForestRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
int numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "variance";
int maxDepth = 4;
int maxBins = 32;
int seed = 12345;
// Train a RandomForest model.
RandomForestModel model = RandomForest.trainRegressor(trainingData,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
double diff = pl._1() - pl._2();
return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression forest model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myRandomForestRegressionModel");
RandomForestModel sameModel = RandomForestModel.load(jsc.sc(),
"target/tmp/myRandomForestRegressionModel");
梯度提升树 (GBTs)
梯度提升树 (GBTs) 是 决策树的集成。 GBTs 迭代地训练决策树,以最大程度地减少损失函数。 像决策树一样,GBTs 处理分类特征,扩展到多类分类设置,不需要特征缩放,并且能够捕获非线性和特征交互。
spark.mllib
支持使用连续和分类特征的二元分类和回归的 GBTs。 spark.mllib
使用现有的决策树实现来实现 GBTs。 请参阅决策树指南,以获取有关树的更多信息。
注意:GBTs 尚不支持多类分类。 对于多类问题,请使用决策树或随机森林。
基本算法
梯度提升算法迭代地训练一系列决策树。在每次迭代中,该算法使用当前的集成模型来预测每个训练实例的标签,然后将预测结果与真实标签进行比较。数据集会被重新标记,以更加强调预测效果差的训练实例。因此,在下一次迭代中,决策树将有助于纠正之前的错误。
重新标记实例的具体机制由损失函数定义(如下所述)。每次迭代时,GBTs 会进一步降低训练数据上的此损失函数。
损失
下表列出了 spark.mllib
中 GBTs 当前支持的损失函数。 请注意,每个损失函数都适用于分类或回归中的一种,而非两者都适用。
符号:$N$ = 实例数。$y_i$ = 实例 $i$ 的标签。$x_i$ = 实例 $i$ 的特征。$F(x_i)$ = 模型对实例 $i$ 的预测标签。
损失 | 任务 | 公式 | 描述 |
---|---|---|---|
Log 损失 | 分类 | $2 \sum_{i=1}^{N} \log(1+\exp(-2 y_i F(x_i)))$ | 二项负对数似然的两倍。 |
平方误差 | 回归 | $\sum_{i=1}^{N} (y_i - F(x_i))^2$ | 也称为 L2 损失。回归任务的默认损失函数。 |
绝对误差 | 回归 | $\sum_{i=1}^{N} |y_i - F(x_i)|$ | 也称为 L1 损失。与平方误差相比,它对异常值更具鲁棒性。 |
使用技巧
我们通过讨论各种参数来提供一些使用 GBTs 的指南。我们省略了一些决策树参数,因为这些参数已在决策树指南中介绍。
-
loss
:有关损失及其对任务(分类与回归)的适用性的信息,请参见以上部分。不同的损失函数可能会给出截然不同的结果,具体取决于数据集。 -
numIterations
:这设置了集成模型中树的数量。每次迭代生成一棵树。增加此数字会使模型更具表达能力,从而提高训练数据的准确性。但是,如果该值过大,则测试时的准确性可能会受到影响。 -
learningRate
:此参数应该不需要调整。如果算法行为看起来不稳定,则减小此值可能会提高稳定性。 -
algo
:算法或任务(分类与回归)是使用树[策略]参数设置的。
训练时验证
当使用更多树进行训练时,梯度提升可能会过拟合。为了防止过拟合,在训练时进行验证很有用。已经提供了方法 runWithValidation 来使用此选项。它采用一对 RDD 作为参数,第一个是训练数据集,第二个是验证数据集。
当验证误差的改进不超过某个容差(由 BoostingStrategy
中的 validationTol
参数提供)时,训练将停止。实际上,验证误差最初会降低,然后会升高。在某些情况下,验证误差可能不会单调变化,建议用户设置足够大的负容差,并使用 evaluateEachIteration
(它给出每次迭代的误差或损失)来检查验证曲线,以调整迭代次数。
示例
分类
下面的示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用具有 log 损失的梯度提升树执行分类。计算测试误差以衡量算法的准确性。
有关 API 的更多详细信息,请参阅 GradientBoostedTrees
Python 文档 和 GradientBoostedTreesModel
Python 文档。
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file.
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GradientBoostedTrees model.
# Notes: (a) Empty categoricalFeaturesInfo indicates all features are continuous.
# (b) Use more iterations in practice.
model = GradientBoostedTrees.trainClassifier(trainingData,
categoricalFeaturesInfo={}, numIterations=3)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification GBT model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myGradientBoostingClassificationModel")
sameModel = GradientBoostedTreesModel.load(sc,
"target/tmp/myGradientBoostingClassificationModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Scala 文档 和 GradientBoostedTreesModel
Scala 文档。
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a GradientBoostedTrees model.
// The defaultParams for Classification use LogLoss by default.
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.
boostingStrategy.treeStrategy.numClasses = 2
boostingStrategy.treeStrategy.maxDepth = 5
// Empty categoricalFeaturesInfo indicates all features are continuous.
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification GBT model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myGradientBoostingClassificationModel")
val sameModel = GradientBoostedTreesModel.load(sc,
"target/tmp/myGradientBoostingClassificationModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Java 文档 和 GradientBoostedTreesModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.GradientBoostedTrees;
import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf()
.setAppName("JavaGradientBoostedTreesClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Train a GradientBoostedTrees model.
// The defaultParams for Classification use LogLoss by default.
BoostingStrategy boostingStrategy = BoostingStrategy.defaultParams("Classification");
boostingStrategy.setNumIterations(3); // Note: Use more iterations in practice.
boostingStrategy.getTreeStrategy().setNumClasses(2);
boostingStrategy.getTreeStrategy().setMaxDepth(5);
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification GBT model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myGradientBoostingClassificationModel");
GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load(jsc.sc(),
"target/tmp/myGradientBoostingClassificationModel");
回归
下面的示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用以平方误差作为损失的梯度提升树执行回归。计算平均平方误差 (MSE) 以评估拟合优度。
有关 API 的更多详细信息,请参阅 GradientBoostedTrees
Python 文档 和 GradientBoostedTreesModel
Python 文档。
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file.
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GradientBoostedTrees model.
# Notes: (a) Empty categoricalFeaturesInfo indicates all features are continuous.
# (b) Use more iterations in practice.
model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo={}, numIterations=3)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression GBT model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myGradientBoostingRegressionModel")
sameModel = GradientBoostedTreesModel.load(sc, "target/tmp/myGradientBoostingRegressionModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Scala 文档 和 GradientBoostedTreesModel
Scala 文档。
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a GradientBoostedTrees model.
// The defaultParams for Regression use SquaredError by default.
val boostingStrategy = BoostingStrategy.defaultParams("Regression")
boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.
boostingStrategy.treeStrategy.maxDepth = 5
// Empty categoricalFeaturesInfo indicates all features are continuous.
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression GBT model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myGradientBoostingRegressionModel")
val sameModel = GradientBoostedTreesModel.load(sc,
"target/tmp/myGradientBoostingRegressionModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Java 文档 和 GradientBoostedTreesModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.GradientBoostedTrees;
import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf()
.setAppName("JavaGradientBoostedTreesRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Train a GradientBoostedTrees model.
// The defaultParams for Regression use SquaredError by default.
BoostingStrategy boostingStrategy = BoostingStrategy.defaultParams("Regression");
boostingStrategy.setNumIterations(3); // Note: Use more iterations in practice.
boostingStrategy.getTreeStrategy().setMaxDepth(5);
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
double diff = pl._1() - pl._2();
return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression GBT model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myGradientBoostingRegressionModel");
GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load(jsc.sc(),
"target/tmp/myGradientBoostingRegressionModel");