优化 - 基于 RDD 的 API

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

数学描述

梯度下降

解决形式为 $\min_{\wv \in\R^d} \; f(\wv)$ 的优化问题的最简单方法是梯度下降。这类一阶优化方法(包括梯度下降及其随机变体)非常适合大规模和分布式计算。

梯度下降方法旨在通过沿最速下降方向迭代地迈步来寻找函数的局部最小值,该方向是函数在当前点(即当前参数值)处导数(称为梯度)的负方向。如果目标函数 $f$ 在所有自变量处都不可微,但仍是凸的,那么次梯度是梯度的自然推广,并扮演步长方向的角色。无论如何,计算 $f$ 的梯度或次梯度是昂贵的——它需要完整遍历整个数据集,才能计算所有损失项的贡献。

随机梯度下降 (SGD)

目标函数 $f$ 以求和形式表示的优化问题特别适合使用随机梯度下降 (SGD) 解决。在我们的例子中,对于监督机器学习中常用的优化公式,\begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation} 这样做尤其自然,因为损失被写成每个数据点产生的个体损失的平均值。

随机次梯度是向量的随机选择,使得在期望意义上,我们获得了原始目标函数的真实次梯度。通过在 $[1..n]$ 中均匀随机选择一个数据点 $i$,我们获得了 $\eqref{eq:regPrimal}$ 关于 $\wv$ 的随机次梯度,如下所示:\[ f'_{\wv,i} := L'_{\wv,i} + \lambda\, R'_\wv \ , \] 其中 $L'_{\wv,i} \in \R^d$ 是由第 $i$ 个数据点确定的损失函数部分的次梯度,即 $L'_{\wv,i} \in \frac{\partial}{\partial \wv} L(\wv;\x_i,y_i)$。此外,$R'_\wv$ 是正则项 $R(\wv)$ 的次梯度,即 $R'_\wv \in \frac{\partial}{\partial \wv} R(\wv)$。项 $R'_\wv$ 不依赖于选择了哪个随机数据点。显然,在对 $i\in[1..n]$ 的随机选择进行期望时,我们有 $f'_{\wv,i}$ 是原始目标函数 $f$ 的一个次梯度,这意味着 $\E\left[f'_{\wv,i}\right] \in \frac{\partial}{\partial \wv} f(\wv)$

现在运行 SGD 简单地变成沿着负随机次梯度 $f'_{\wv,i}$ 的方向移动,即 \begin{equation}\label{eq:SGDupdate} \wv^{(t+1)} := \wv^{(t)} - \gamma \; f'_{\wv,i} \ . \end{equation} 步长。 参数 $\gamma$ 是步长,在默认实现中,它随着迭代计数器的平方根递减,即在第 $t$ 次迭代中,$\gamma := \frac{s}{\sqrt{t}}$,其中输入参数 $s=$ stepSize。请注意,为 SGD 方法选择最佳步长在实践中通常很棘手,并且是一个活跃的研究领域。

梯度。 spark.mllib 中实现的机器学习方法的(次)梯度表可在分类与回归部分找到。

近端更新。 除了在步长方向上直接使用正则项的次梯度 $R'(\wv)$ 之外,在某些情况下,通过使用近端算子可以获得更好的更新。对于 L1 正则项,近端算子由软阈值法给出,如 L1Updater 中所实现。

分布式 SGD 的更新方案

GradientDescent 中的 SGD 实现使用简单(分布式)的数据示例采样。我们回顾一下,优化问题 $\eqref{eq:regPrimal}$ 的损失部分是 $\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)$,因此 $\frac1n \sum_{i=1}^n L'_{\wv,i}$ 将是真实的(次)梯度。由于这将需要访问完整数据集,参数 miniBatchFraction 指定了要使用完整数据的哪一部分。该子集上梯度的平均值,即 \[ \frac1{|S|} \sum_{i\in S} L'_{\wv,i} \ , \] 是一个随机梯度。这里 $S$ 是大小为 $|S|=$ miniBatchFraction $\cdot n$ 的采样子集。

在每次迭代中,对分布式数据集 (RDD) 的采样以及从每个工作机器计算部分结果的总和都是由标准的 Spark 例程执行的。

如果点分数 miniBatchFraction 设置为 1(默认值),那么每次迭代中产生的步长是精确的(次)梯度下降。在这种情况下,所使用的步长方向没有随机性,也没有方差。在另一个极端,如果 miniBatchFraction 选择得非常小,以至于只采样一个点,即 $|S|=$ miniBatchFraction $\cdot n = 1$,那么该算法等同于标准 SGD。在这种情况下,步长方向取决于点的均匀随机采样。

有限内存 BFGS (L-BFGS)

L-BFGS 是一种准牛顿方法家族中的优化算法,用于解决形式为 $\min_{\wv \in\R^d} \; f(\wv)$ 的优化问题。L-BFGS 方法在不评估目标函数的二阶偏导数来构建 Hessian 矩阵的情况下,将目标函数局部近似为二次函数。Hessian 矩阵通过先前的梯度评估进行近似,因此在牛顿法中显式计算 Hessian 矩阵时,没有垂直扩展性问题(训练特征的数量)。因此,L-BFGS 通常比其他一阶优化方法收敛更快。

选择优化方法

线性方法内部使用优化,spark.mllib 中的一些线性方法同时支持 SGD 和 L-BFGS。不同的优化方法根据目标函数的特性可能具有不同的收敛保证,我们在此无法涵盖所有文献。通常,当 L-BFGS 可用时,我们建议使用它而不是 SGD,因为 L-BFGS 倾向于收敛更快(迭代次数更少)。

MLlib 中的实现

梯度下降和随机梯度下降

梯度下降方法,包括随机次梯度下降 (SGD),作为低级原语包含在 MLlib 中,在此基础上开发了各种 ML 算法,例如请参阅线性方法部分。

SGD 类 GradientDescent 设置以下参数:

L-BFGS

L-BFGS 目前只是 MLlib 中的一个低级优化原语。如果你想在各种机器学习算法(如线性回归和逻辑回归)中使用 L-BFGS,你必须自己将目标函数的梯度和更新器传递给优化器,而不是使用像 LogisticRegressionWithSGD 这样的训练 API。请参阅下面的示例。这将在下一个版本中解决。

使用 L1Updater 进行 L1 正则化将不起作用,因为 L1Updater 中的软阈值逻辑是为梯度下降设计的。请参阅开发者注意事项。

L-BFGS 方法 LBFGS.runLBFGS 具有以下参数:

返回值是一个包含两个元素的元组。第一个元素是包含每个特征权重的列矩阵,第二个元素是包含每次迭代计算的损失的数组。

以下是使用 L-BFGS 优化器训练带有 L2 正则化的二元逻辑回归的示例。

有关 API 详情,请参阅 LBFGS Scala 文档SquaredL2Updater Scala 文档

import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

// Append 1 into the training data as intercept.
val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache()

val test = splits(1)

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)

val model = new LogisticRegressionModel(
  Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
  weightsWithIntercept(weightsWithIntercept.size - 1))

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println("Loss of each step in training process")
loss.foreach(println)
println(s"Area under ROC = $auROC")
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/mllib/LBFGSExample.scala" 查找完整的示例代码。

有关 API 详情,请参阅 LBFGS Java 文档SquaredL2Updater Java 文档

import java.util.Arrays;

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.optimization.*;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
int numFeatures = data.take(1).get(0).features().size();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> trainingInit = data.sample(false, 0.6, 11L);
JavaRDD<LabeledPoint> test = data.subtract(trainingInit);

// Append 1 into the training data as intercept.
JavaPairRDD<Object, Vector> training = data.mapToPair(p ->
  new Tuple2<>(p.label(), MLUtils.appendBias(p.features())));
training.cache();

// Run training algorithm to build the model.
int numCorrections = 10;
double convergenceTol = 1e-4;
int maxNumIterations = 20;
double regParam = 0.1;
Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]);

Tuple2<Vector, double[]> result = LBFGS.runLBFGS(
  training.rdd(),
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept);
Vector weightsWithIntercept = result._1();
double[] loss = result._2();

LogisticRegressionModel model = new LogisticRegressionModel(
  Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
  (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
BinaryClassificationMetrics metrics =
  new BinaryClassificationMetrics(scoreAndLabels.rdd());
double auROC = metrics.areaUnderROC();

System.out.println("Loss of each step in training process");
for (double l : loss) {
  System.out.println(l);
}
System.out.println("Area under ROC = " + auROC);
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java" 查找完整的示例代码。

开发者注意事项

由于 Hessian 是根据先前的梯度评估近似构建的,因此在优化过程中目标函数无法更改。因此,随机 L-BFGS 无法通过简单使用 miniBatch 来直接工作;因此,在我们有更好的理解之前,我们不提供此功能。

Updater 是一个最初为梯度下降设计的类,它计算实际的梯度下降步长。然而,我们能够通过忽略仅适用于梯度下降的逻辑部分(例如自适应步长等)来获取 L-BFGS 正则化部分的目标函数的梯度和损失。我们将在以后将其重构为 regularizer 以替代 updater,从而分离正则化和步长更新之间的逻辑。