集成 - 基于 RDD 的 API
一个 集成方法 是一种学习算法,它创建由一组其他基础模型组成的模型。 spark.mllib
支持两种主要的集成算法:GradientBoostedTrees
和 RandomForest
。两者都使用 决策树 作为其基础模型。
梯度提升树与随机森林
两者 梯度提升树 (GBT) 和 随机森林 都是用于学习树集成算法,但训练过程不同。存在一些实际的权衡
- GBT 每次训练一棵树,因此训练时间可能比随机森林长。随机森林可以并行训练多棵树。
- 另一方面,使用 GBT 时,通常可以使用比随机森林更小(更浅)的树,训练更小的树需要更少的时间。
- 随机森林可能不太容易过拟合。在随机森林中训练更多树可以降低过拟合的可能性,但在 GBT 中训练更多树会增加过拟合的可能性。(用统计学的语言来说,随机森林通过使用更多树来减少方差,而 GBT 通过使用更多树来减少偏差。)
- 随机森林可能更容易调整,因为性能随着树的数量单调增加(而 GBT 的性能如果树的数量过大,可能会开始下降)。
简而言之,两种算法都可能有效,选择应根据具体数据集而定。
随机森林
随机森林 是 决策树 的集成。随机森林是用于分类和回归的最成功的机器学习模型之一。它们结合了许多决策树,以降低过拟合的风险。与决策树一样,随机森林处理分类特征,扩展到多类分类设置,不需要特征缩放,并且能够捕获非线性关系和特征交互。
spark.mllib
支持使用连续特征和分类特征进行二元和多类分类以及回归的随机森林。 spark.mllib
使用现有的 决策树 实现来实现随机森林。有关树的更多信息,请参见决策树指南。
基本算法
随机森林分别训练一组决策树,因此训练可以在并行进行。该算法在训练过程中注入随机性,以便每棵决策树都略有不同。结合每棵树的预测可以减少预测的方差,从而提高测试数据的性能。
训练
注入训练过程的随机性包括
- 在每次迭代中对原始数据集进行子采样以获得不同的训练集(也称为自助采样)。
- 在每个树节点上考虑不同的特征随机子集进行分割。
除了这些随机化之外,决策树训练与单个决策树的训练方式相同。
预测
为了对新实例进行预测,随机森林必须聚合其决策树集的预测。这种聚合对于分类和回归是不同的。
分类:多数投票。每棵树的预测都被计为对一个类别的投票。预测的标签是获得最多投票的类别。
回归:平均。每棵树预测一个实数值。预测的标签是树预测的平均值。
使用技巧
我们通过讨论各种参数,包括一些使用随机森林的指南。我们省略了一些决策树参数,因为这些参数在 决策树指南 中有介绍。
我们提到的前两个参数是最重要的,调整它们通常可以提高性能
numTrees
:森林中的树木数量。- 增加树木数量将减少预测的方差,从而提高模型的测试时准确性。
- 训练时间大致线性增加树木数量。
maxDepth
:森林中每棵树的最大深度。- 增加深度使模型更具表现力和更强大。但是,深树需要更长时间训练,也更容易过拟合。
- 通常,在使用随机森林时,可以训练比使用单个决策树更深的树。一棵树比随机森林更容易过拟合(因为森林中多棵树的平均值会减少方差)。
接下来的两个参数通常不需要调整。但是,它们可以被调整以加快训练速度。
-
subsamplingRate
:此参数指定用于训练森林中每棵树的数据集的大小,作为原始数据集大小的比例。建议使用默认值 (1.0),但减少此比例可以加快训练速度。 -
featureSubsetStrategy
:在每个树节点上用作分割候选的特征数量。该数量指定为总特征数量的比例或函数。减少此数量将加快训练速度,但如果太低,有时会影响性能。
示例
分类
以下示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用随机森林进行分类。计算测试误差以衡量算法的准确性。
有关 API 的更多详细信息,请参阅 RandomForest
Python 文档 和 RandomForest
Python 文档。
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
# Note: Use larger numTrees in practice.
# Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='gini', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myRandomForestClassificationModel")
sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")
有关 API 的详细信息,请参阅 RandomForest
Scala 文档 和 RandomForestModel
Scala 文档。
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 4
val maxBins = 32
val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification forest model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myRandomForestClassificationModel")
val sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")
有关 API 的详细信息,请参阅 RandomForest
Java 文档 和 RandomForestModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.RandomForest;
import org.apache.spark.mllib.tree.model.RandomForestModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaRandomForestClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
int numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "gini";
int maxDepth = 5;
int maxBins = 32;
int seed = 12345;
RandomForestModel model = RandomForest.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins,
seed);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification forest model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myRandomForestClassificationModel");
RandomForestModel sameModel = RandomForestModel.load(jsc.sc(),
"target/tmp/myRandomForestClassificationModel");
回归
以下示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用随机森林进行回归。最后计算均方误差 (MSE) 以评估 拟合优度。
有关 API 的更多详细信息,请参阅 RandomForest
Python 文档 和 RandomForest
Python 文档。
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a RandomForest model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
# Note: Use larger numTrees in practice.
# Setting featureSubsetStrategy="auto" lets the algorithm choose.
model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='variance', maxDepth=4, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myRandomForestRegressionModel")
sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestRegressionModel")
有关 API 的详细信息,请参阅 RandomForest
Scala 文档 和 RandomForestModel
Scala 文档。
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "variance"
val maxDepth = 4
val maxBins = 32
val model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression forest model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myRandomForestRegressionModel")
val sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestRegressionModel")
有关 API 的详细信息,请参阅 RandomForest
Java 文档 和 RandomForestModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.RandomForest;
import org.apache.spark.mllib.tree.model.RandomForestModel;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
SparkConf sparkConf = new SparkConf().setAppName("JavaRandomForestRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
int numTrees = 3; // Use more in practice.
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "variance";
int maxDepth = 4;
int maxBins = 32;
int seed = 12345;
// Train a RandomForest model.
RandomForestModel model = RandomForest.trainRegressor(trainingData,
categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
double diff = pl._1() - pl._2();
return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression forest model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myRandomForestRegressionModel");
RandomForestModel sameModel = RandomForestModel.load(jsc.sc(),
"target/tmp/myRandomForestRegressionModel");
梯度提升树 (GBT)
梯度提升树 (GBT) 是 决策树 的集成。GBT 迭代地训练决策树以最小化损失函数。与决策树一样,GBT 处理分类特征,扩展到多类分类设置,不需要特征缩放,并且能够捕获非线性关系和特征交互。
spark.mllib
支持使用连续特征和分类特征进行二元分类和回归的 GBT。 spark.mllib
使用现有的 决策树 实现来实现 GBT。有关树的更多信息,请参见决策树指南。
注意:GBT 尚未支持多类分类。对于多类问题,请使用 决策树 或 随机森林。
基本算法
梯度提升迭代地训练一系列决策树。在每次迭代中,该算法使用当前集成来预测每个训练实例的标签,然后将预测与真实标签进行比较。对数据集进行重新标记,以更加重视预测效果差的训练实例。因此,在下一轮迭代中,决策树将有助于纠正之前的错误。
重新标记实例的具体机制由损失函数定义(如下所述)。在每次迭代中,GBT 都会进一步减少训练数据的损失函数。
损失
下表列出了 spark.mllib
中 GBT 当前支持的损失。请注意,每个损失都适用于分类或回归,而不是两者都适用。
符号:$N$ = 实例数量。$y_i$ = 实例 $i$ 的标签。$x_i$ = 实例 $i$ 的特征。$F(x_i)$ = 模型对实例 $i$ 的预测标签。
损失 | 任务 | 公式 | 描述 |
---|---|---|---|
对数损失 | 分类 | $2 \sum_{i=1}^{N} \log(1+\exp(-2 y_i F(x_i)))$ | 两次二项式负对数似然。 |
平方误差 | 回归 | $\sum_{i=1}^{N} (y_i - F(x_i))^2$ | 也称为 L2 损失。回归任务的默认损失。 |
绝对误差 | 回归 | $\sum_{i=1}^{N} |y_i - F(x_i)|$ | 也称为 L1 损失。比平方误差对异常值更稳健。 |
使用技巧
我们通过讨论各种参数,包括了一些使用 GBT 的指南。我们省略了一些决策树参数,因为这些参数在 决策树指南 中有介绍。
-
loss
:有关损失及其对任务(分类与回归)的适用性的信息,请参见上面的部分。不同的损失会根据数据集产生显著不同的结果。 -
numIterations
:这设置了集成中的树木数量。每次迭代都会生成一棵树。增加此数字会使模型更具表现力,从而提高训练数据的准确性。但是,如果此数字太大,测试时的准确性可能会下降。 -
learningRate
:此参数不需要调整。如果算法行为看起来不稳定,降低此值可能会提高稳定性。 -
algo
:算法或任务(分类与回归)是使用树 [策略] 参数设置的。
训练时的验证
当使用更多树进行训练时,梯度提升可能会过拟合。为了防止过拟合,在训练时进行验证非常有用。已提供 runWithValidation 方法来使用此选项。它接受一对 RDD 作为参数,第一个是训练数据集,第二个是验证数据集。
当验证误差的改进不超过某个容差(由 BoostingStrategy
中的 validationTol
参数提供)时,训练将停止。在实践中,验证误差最初会下降,然后会上升。可能存在验证误差不会单调变化的情况,建议用户设置一个足够大的负容差,并使用 evaluateEachIteration
(它提供每次迭代的误差或损失)检查验证曲线,以调整迭代次数。
示例
分类
以下示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用具有对数损失的梯度提升树进行分类。计算测试误差以衡量算法的准确性。
有关 API 的更多详细信息,请参阅 GradientBoostedTrees
Python 文档 和 GradientBoostedTreesModel
Python 文档。
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file.
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GradientBoostedTrees model.
# Notes: (a) Empty categoricalFeaturesInfo indicates all features are continuous.
# (b) Use more iterations in practice.
model = GradientBoostedTrees.trainClassifier(trainingData,
categoricalFeaturesInfo={}, numIterations=3)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification GBT model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myGradientBoostingClassificationModel")
sameModel = GradientBoostedTreesModel.load(sc,
"target/tmp/myGradientBoostingClassificationModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Scala 文档 和 GradientBoostedTreesModel
Scala 文档。
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a GradientBoostedTrees model.
// The defaultParams for Classification use LogLoss by default.
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.
boostingStrategy.treeStrategy.numClasses = 2
boostingStrategy.treeStrategy.maxDepth = 5
// Empty categoricalFeaturesInfo indicates all features are continuous.
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification GBT model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myGradientBoostingClassificationModel")
val sameModel = GradientBoostedTreesModel.load(sc,
"target/tmp/myGradientBoostingClassificationModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Java 文档 和 GradientBoostedTreesModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.GradientBoostedTrees;
import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf()
.setAppName("JavaGradientBoostedTreesClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Train a GradientBoostedTrees model.
// The defaultParams for Classification use LogLoss by default.
BoostingStrategy boostingStrategy = BoostingStrategy.defaultParams("Classification");
boostingStrategy.setNumIterations(3); // Note: Use more iterations in practice.
boostingStrategy.getTreeStrategy().setNumClasses(2);
boostingStrategy.getTreeStrategy().setMaxDepth(5);
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification GBT model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myGradientBoostingClassificationModel");
GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load(jsc.sc(),
"target/tmp/myGradientBoostingClassificationModel");
回归
以下示例演示了如何加载 LIBSVM 数据文件,将其解析为 LabeledPoint
的 RDD,然后使用具有平方误差作为损失的梯度提升树进行回归。最后计算均方误差 (MSE) 以评估 拟合优度。
有关 API 的更多详细信息,请参阅 GradientBoostedTrees
Python 文档 和 GradientBoostedTreesModel
Python 文档。
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file.
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GradientBoostedTrees model.
# Notes: (a) Empty categoricalFeaturesInfo indicates all features are continuous.
# (b) Use more iterations in practice.
model = GradientBoostedTrees.trainRegressor(trainingData,
categoricalFeaturesInfo={}, numIterations=3)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression GBT model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myGradientBoostingRegressionModel")
sameModel = GradientBoostedTreesModel.load(sc, "target/tmp/myGradientBoostingRegressionModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Scala 文档 和 GradientBoostedTreesModel
Scala 文档。
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a GradientBoostedTrees model.
// The defaultParams for Regression use SquaredError by default.
val boostingStrategy = BoostingStrategy.defaultParams("Regression")
boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.
boostingStrategy.treeStrategy.maxDepth = 5
// Empty categoricalFeaturesInfo indicates all features are continuous.
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(trainingData, boostingStrategy)
// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression GBT model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myGradientBoostingRegressionModel")
val sameModel = GradientBoostedTreesModel.load(sc,
"target/tmp/myGradientBoostingRegressionModel")
有关 API 的详细信息,请参阅 GradientBoostedTrees
Java 文档 和 GradientBoostedTreesModel
Java 文档。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.GradientBoostedTrees;
import org.apache.spark.mllib.tree.configuration.BoostingStrategy;
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf()
.setAppName("JavaGradientBoostedTreesRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Train a GradientBoostedTrees model.
// The defaultParams for Regression use SquaredError by default.
BoostingStrategy boostingStrategy = BoostingStrategy.defaultParams("Regression");
boostingStrategy.setNumIterations(3); // Note: Use more iterations in practice.
boostingStrategy.getTreeStrategy().setMaxDepth(5);
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);
GradientBoostedTreesModel model = GradientBoostedTrees.train(trainingData, boostingStrategy);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
double diff = pl._1() - pl._2();
return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression GBT model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myGradientBoostingRegressionModel");
GradientBoostedTreesModel sameModel = GradientBoostedTreesModel.load(jsc.sc(),
"target/tmp/myGradientBoostingRegressionModel");