优化 - 基于 RDD 的 API
\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{\0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{\0}} \]
数学描述
梯度下降
解决 $\min_{\wv \in\R^d} \; f(\wv)$
形式的优化问题最简单的方法是 梯度下降。 这种一阶优化方法(包括梯度下降及其随机变体)非常适合大规模和分布式计算。
梯度下降法旨在通过迭代地沿最陡下降方向(即函数在当前点的导数的负数,称为 梯度)迈出步长来找到函数的局部最小值,即在当前参数值处。 如果目标函数 $f$
在所有参数处都不可微,但仍然是凸的,那么次梯度是梯度的自然推广,并承担步长方向的角色。 在任何情况下,计算 $f$
的梯度或次梯度都是昂贵的——它需要完整地遍历整个数据集,以便计算所有损失项的贡献。
随机梯度下降 (SGD)
目标函数 $f$
写成和的优化问题特别适合使用随机梯度下降 (SGD) 解决。 在我们的例子中,对于 监督机器学习中常用的优化公式,\begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation}
这尤其自然,因为损失被写成来自每个数据点的各个损失的平均值。
随机次梯度是向量的随机选择,使得在期望中,我们获得原始目标函数的真实次梯度。 统一随机地选择一个数据点 $i\in[1..n]$
,我们获得 $\eqref{eq:regPrimal}$
的随机次梯度,关于 $\wv$
如下: \[ f'_{\wv,i} := L'_{\wv,i} + \lambda\, R'_\wv \ , \]
其中 $L'_{\wv,i} \in \R^d$
是损失函数的一部分的次梯度,该部分由第 $i$
个数据点确定,即 $L'_{\wv,i} \in \frac{\partial}{\partial \wv} L(\wv;\x_i,y_i)$
。 此外,$R'_\wv$
是正则化项 $R(\wv)$
的次梯度,即 $R'_\wv \in \frac{\partial}{\partial \wv} R(\wv)$
。 项 $R'_\wv$
不依赖于选择哪个随机数据点。 显然,在对 $i\in[1..n]$
的随机选择的期望中,我们得到 $f'_{\wv,i}$
是原始目标 $f$
的次梯度,这意味着 $\E\left[f'_{\wv,i}\right] \in \frac{\partial}{\partial \wv} f(\wv)$
。
现在运行 SGD 只是沿着负随机次梯度 $f'_{\wv,i}$
的方向行走,即 \begin{equation}\label{eq:SGDupdate} \wv^{(t+1)} := \wv^{(t)} - \gamma \; f'_{\wv,i} \ . \end{equation}
步长。 参数 $\gamma$
是步长,在默认实现中,选择步长随着迭代计数器的平方根减小,即在第 $t$
次迭代中,$\gamma := \frac{s}{\sqrt{t}}$
,其中输入参数 $s=$ stepSize
。 请注意,在实践中,为 SGD 方法选择最佳步长通常很微妙,并且是一个活跃的研究主题。
梯度。 spark.mllib
中实现的机器学习方法的(次)梯度表可在分类和回归部分中找到。
近端更新。 作为仅使用步长方向上的正则化项的次梯度 $R'(\wv)$
的替代方案,对于某些情况,可以通过使用近端算子来获得改进的更新。 对于 L1 正则化项,近端算子由软阈值给出,如 L1Updater 中所实现的那样。
分布式 SGD 的更新方案
GradientDescent 中的 SGD 实现使用数据示例的简单(分布式)采样。 我们回顾一下优化问题 $\eqref{eq:regPrimal}$
的损失部分是 $\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)$
,因此 $\frac1n \sum_{i=1}^n L'_{\wv,i}$
将是真正的(次)梯度。 由于这将需要访问完整的数据集,因此参数 miniBatchFraction
指定要使用的完整数据的哪个比例。 此子集上梯度的平均值,即 \[ \frac1{|S|} \sum_{i\in S} L'_{\wv,i} \ , \]
是随机梯度。 这里 $S$
是大小为 $|S|=$ miniBatchFraction $\cdot n$
的采样子集。
在每次迭代中,分布式数据集 (RDD) 上的采样,以及来自每个工作机器的部分结果的总和的计算由标准 spark 例程执行。
如果点数比例 miniBatchFraction
设置为 1(默认值),则每次迭代中生成的步长都是精确的(次)梯度下降。 在这种情况下,所使用的步长方向没有随机性,也没有方差。 另一方面,如果 miniBatchFraction
选择得非常小,以至于只采样一个点,即 $|S|=$ miniBatchFraction $\cdot n = 1$
,则该算法等效于标准 SGD。 在这种情况下,步长方向取决于点的均匀随机采样。
有限内存 BFGS (L-BFGS)
L-BFGS 是拟牛顿方法系列中的一种优化算法,用于解决 $\min_{\wv \in\R^d} \; f(\wv)$
形式的优化问题。 L-BFGS 方法将目标函数局部近似为二次函数,而不评估目标函数的二阶偏导数来构造 Hessian 矩阵。 Hessian 矩阵通过先前的梯度评估来近似,因此在牛顿方法中显式计算 Hessian 矩阵时不存在垂直可伸缩性问题(训练特征的数量)。 因此,与其他一阶优化相比,L-BFGS 通常可以实现更快的收敛。
选择优化方法
线性方法 在内部使用优化,并且 spark.mllib
中的一些线性方法支持 SGD 和 L-BFGS。 不同的优化方法可能具有不同的收敛保证,具体取决于目标函数的属性,我们无法在此处涵盖文献。 一般来说,当 L-BFGS 可用时,我们建议使用它而不是 SGD,因为 L-BFGS 往往收敛得更快(在更少的迭代中)。
MLlib 中的实现
梯度下降和随机梯度下降
梯度下降法(包括随机次梯度下降 (SGD))作为 MLlib
中的低级原语包含在内,各种 ML 算法都基于该原语开发,例如,请参见线性方法部分。
SGD 类 GradientDescent 设置以下参数
Gradient
是一个类,用于计算正在优化的函数的随机梯度,即,相对于单个训练示例,在当前参数值处。 MLlib 包括用于常见损失函数的梯度类,例如,hinge、logistic、最小二乘法。 梯度类将训练示例、其标签和当前参数值作为输入。Updater
是一个类,用于执行实际的梯度下降步骤,即,在每次迭代中更新权重,对于损失部分的给定梯度。 更新器还负责执行来自正则化部分进行的更新。 MLlib 包括用于没有正则化、以及 L1 和 L2 正则化项的情况下的更新器。stepSize
是一个标量值,表示梯度下降的初始步长。 MLlib 中的所有更新器在第 t 步使用的步长等于stepSize $/ \sqrt{t}$
。numIterations
是要运行的迭代次数。regParam
是使用 L1 或 L2 正则化时的正则化参数。miniBatchFraction
是每次迭代中采样的总数据的比例,用于计算梯度方向。- 采样仍然需要遍历整个 RDD,因此降低
miniBatchFraction
可能不会显著加快优化速度。当梯度计算成本很高时,用户将会看到最大的加速效果,因为只有选定的样本才用于计算梯度。
- 采样仍然需要遍历整个 RDD,因此降低
L-BFGS
L-BFGS 目前只是 MLlib
中的一个底层优化原语。如果您想在诸如线性回归和逻辑回归等各种 ML 算法中使用 L-BFGS,您必须将目标函数的梯度和更新器传递给优化器,而不是使用像 LogisticRegressionWithSGD 这样的训练 API。 请参阅下面的示例。 这将在下一个版本中解决。
使用 L1Updater 进行 L1 正则化将不起作用,因为 L1Updater 中的软阈值逻辑是为梯度下降设计的。请参阅开发人员的注释。
L-BFGS 方法 LBFGS.runLBFGS 具有以下参数
Gradient
是一个类,用于计算被优化目标函数的梯度,即相对于当前参数值的单个训练样本。MLlib 包括用于常见损失函数的梯度类,例如,hinge,logistic,最小二乘。 梯度类将训练样本,其标签和当前参数值作为输入。Updater
是一个类,用于计算 L-BFGS 的正则化部分的目标函数的梯度和损失。 MLlib 包括用于无正则化以及 L2 正则化器的更新器。numCorrections
是 L-BFGS 更新中使用的校正次数。 建议为 10。maxNumIterations
是 L-BFGS 可以运行的最大迭代次数。regParam
是使用正则化时的正则化参数。convergenceTol
控制 L-BFGS 被认为收敛时允许的相对变化量。 这必须是非负的。 较低的值容忍度较低,因此通常会导致运行更多迭代。 此值查看 Breeze LBFGS 内部的平均改进和梯度范数。
return
是一个包含两个元素的元组。 第一个元素是包含每个特征权重的列矩阵,第二个元素是包含为每次迭代计算的损失的数组。
这是一个使用 L-BFGS 优化器训练具有 L2 正则化的二元逻辑回归的示例。
有关 API 的详细信息,请参阅 LBFGS
Scala 文档 和 SquaredL2Updater
Scala 文档。
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size
// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
// Append 1 into the training data as intercept.
val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache()
val test = splits(1)
// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))
val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
training,
new LogisticGradient(),
new SquaredL2Updater(),
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
initialWeightsWithIntercept)
val model = new LogisticRegressionModel(
Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
weightsWithIntercept(weightsWithIntercept.size - 1))
// Clear the default threshold.
model.clearThreshold()
// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("Loss of each step in training process")
loss.foreach(println)
println(s"Area under ROC = $auROC")
有关 API 的详细信息,请参阅 LBFGS
Java 文档 和 SquaredL2Updater
Java 文档。
import java.util.Arrays;
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.optimization.*;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
int numFeatures = data.take(1).get(0).features().size();
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> trainingInit = data.sample(false, 0.6, 11L);
JavaRDD<LabeledPoint> test = data.subtract(trainingInit);
// Append 1 into the training data as intercept.
JavaPairRDD<Object, Vector> training = data.mapToPair(p ->
new Tuple2<>(p.label(), MLUtils.appendBias(p.features())));
training.cache();
// Run training algorithm to build the model.
int numCorrections = 10;
double convergenceTol = 1e-4;
int maxNumIterations = 20;
double regParam = 0.1;
Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]);
Tuple2<Vector, double[]> result = LBFGS.runLBFGS(
training.rdd(),
new LogisticGradient(),
new SquaredL2Updater(),
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
initialWeightsWithIntercept);
Vector weightsWithIntercept = result._1();
double[] loss = result._2();
LogisticRegressionModel model = new LogisticRegressionModel(
Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
(weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);
// Clear the default threshold.
model.clearThreshold();
// Compute raw scores on the test set.
JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p ->
new Tuple2<>(model.predict(p.features()), p.label()));
// Get evaluation metrics.
BinaryClassificationMetrics metrics =
new BinaryClassificationMetrics(scoreAndLabels.rdd());
double auROC = metrics.areaUnderROC();
System.out.println("Loss of each step in training process");
for (double l : loss) {
System.out.println(l);
}
System.out.println("Area under ROC = " + auROC);
开发者说明
由于 Hessian 是从先前的梯度评估中近似构建的,因此在优化过程中无法更改目标函数。 因此,通过仅使用 miniBatch 来天真地使用随机 L-BFGS 是行不通的。因此,在我们有更好的了解之前,我们不会提供它。
Updater
最初是一个为梯度下降设计的类,它计算实际的梯度下降步长。 但是,通过忽略仅针对梯度下降的逻辑部分,例如自适应步长之类的内容,我们可以获取 L-BFGS 正则化的目标函数的梯度和损失。 稍后,我们将把此内容重构为正则化器以替换更新器,以分隔正则化和步长更新之间的逻辑。