优化 - 基于 RDD 的 API

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{\0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{\0}} \]

数学描述

梯度下降

解决 $\min_{\wv \in\R^d} \; f(\wv)$ 形式的优化问题最简单的方法是 梯度下降。 这种一阶优化方法(包括梯度下降及其随机变体)非常适合大规模和分布式计算。

梯度下降法旨在通过迭代地沿最陡下降方向(即函数在当前点的导数的负数,称为 梯度)迈出步长来找到函数的局部最小值,即在当前参数值处。 如果目标函数 $f$ 在所有参数处都不可微,但仍然是凸的,那么次梯度是梯度的自然推广,并承担步长方向的角色。 在任何情况下,计算 $f$ 的梯度或次梯度都是昂贵的——它需要完整地遍历整个数据集,以便计算所有损失项的贡献。

随机梯度下降 (SGD)

目标函数 $f$ 写成和的优化问题特别适合使用随机梯度下降 (SGD) 解决。 在我们的例子中,对于 监督机器学习中常用的优化公式,\begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation} 这尤其自然,因为损失被写成来自每个数据点的各个损失的平均值。

随机次梯度是向量的随机选择,使得在期望中,我们获得原始目标函数的真实次梯度。 统一随机地选择一个数据点 $i\in[1..n]$,我们获得 $\eqref{eq:regPrimal}$ 的随机次梯度,关于 $\wv$ 如下: \[ f'_{\wv,i} := L'_{\wv,i} + \lambda\, R'_\wv \ , \] 其中 $L'_{\wv,i} \in \R^d$ 是损失函数的一部分的次梯度,该部分由第 $i$ 个数据点确定,即 $L'_{\wv,i} \in \frac{\partial}{\partial \wv} L(\wv;\x_i,y_i)$。 此外,$R'_\wv$ 是正则化项 $R(\wv)$ 的次梯度,即 $R'_\wv \in \frac{\partial}{\partial \wv} R(\wv)$。 项 $R'_\wv$ 不依赖于选择哪个随机数据点。 显然,在对 $i\in[1..n]$ 的随机选择的期望中,我们得到 $f'_{\wv,i}$ 是原始目标 $f$ 的次梯度,这意味着 $\E\left[f'_{\wv,i}\right] \in \frac{\partial}{\partial \wv} f(\wv)$

现在运行 SGD 只是沿着负随机次梯度 $f'_{\wv,i}$ 的方向行走,即 \begin{equation}\label{eq:SGDupdate} \wv^{(t+1)} := \wv^{(t)} - \gamma \; f'_{\wv,i} \ . \end{equation} 步长。 参数 $\gamma$ 是步长,在默认实现中,选择步长随着迭代计数器的平方根减小,即在第 $t$ 次迭代中,$\gamma := \frac{s}{\sqrt{t}}$,其中输入参数 $s=$ stepSize。 请注意,在实践中,为 SGD 方法选择最佳步长通常很微妙,并且是一个活跃的研究主题。

梯度。 spark.mllib 中实现的机器学习方法的(次)梯度表可在分类和回归部分中找到。

近端更新。 作为仅使用步长方向上的正则化项的次梯度 $R'(\wv)$ 的替代方案,对于某些情况,可以通过使用近端算子来获得改进的更新。 对于 L1 正则化项,近端算子由软阈值给出,如 L1Updater 中所实现的那样。

分布式 SGD 的更新方案

GradientDescent 中的 SGD 实现使用数据示例的简单(分布式)采样。 我们回顾一下优化问题 $\eqref{eq:regPrimal}$ 的损失部分是 $\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)$,因此 $\frac1n \sum_{i=1}^n L'_{\wv,i}$ 将是真正的(次)梯度。 由于这将需要访问完整的数据集,因此参数 miniBatchFraction 指定要使用的完整数据的哪个比例。 此子集上梯度的平均值,即 \[ \frac1{|S|} \sum_{i\in S} L'_{\wv,i} \ , \] 是随机梯度。 这里 $S$ 是大小为 $|S|=$ miniBatchFraction $\cdot n$ 的采样子集。

在每次迭代中,分布式数据集 (RDD) 上的采样,以及来自每个工作机器的部分结果的总和的计算由标准 spark 例程执行。

如果点数比例 miniBatchFraction 设置为 1(默认值),则每次迭代中生成的步长都是精确的(次)梯度下降。 在这种情况下,所使用的步长方向没有随机性,也没有方差。 另一方面,如果 miniBatchFraction 选择得非常小,以至于只采样一个点,即 $|S|=$ miniBatchFraction $\cdot n = 1$,则该算法等效于标准 SGD。 在这种情况下,步长方向取决于点的均匀随机采样。

有限内存 BFGS (L-BFGS)

L-BFGS 是拟牛顿方法系列中的一种优化算法,用于解决 $\min_{\wv \in\R^d} \; f(\wv)$ 形式的优化问题。 L-BFGS 方法将目标函数局部近似为二次函数,而不评估目标函数的二阶偏导数来构造 Hessian 矩阵。 Hessian 矩阵通过先前的梯度评估来近似,因此在牛顿方法中显式计算 Hessian 矩阵时不存在垂直可伸缩性问题(训练特征的数量)。 因此,与其他一阶优化相比,L-BFGS 通常可以实现更快的收敛。

选择优化方法

线性方法 在内部使用优化,并且 spark.mllib 中的一些线性方法支持 SGD 和 L-BFGS。 不同的优化方法可能具有不同的收敛保证,具体取决于目标函数的属性,我们无法在此处涵盖文献。 一般来说,当 L-BFGS 可用时,我们建议使用它而不是 SGD,因为 L-BFGS 往往收敛得更快(在更少的迭代中)。

MLlib 中的实现

梯度下降和随机梯度下降

梯度下降法(包括随机次梯度下降 (SGD))作为 MLlib 中的低级原语包含在内,各种 ML 算法都基于该原语开发,例如,请参见线性方法部分。

SGD 类 GradientDescent 设置以下参数

L-BFGS

L-BFGS 目前只是 MLlib 中的一个底层优化原语。如果您想在诸如线性回归和逻辑回归等各种 ML 算法中使用 L-BFGS,您必须将目标函数的梯度和更新器传递给优化器,而不是使用像 LogisticRegressionWithSGD 这样的训练 API。 请参阅下面的示例。 这将在下一个版本中解决。

使用 L1Updater 进行 L1 正则化将不起作用,因为 L1Updater 中的软阈值逻辑是为梯度下降设计的。请参阅开发人员的注释。

L-BFGS 方法 LBFGS.runLBFGS 具有以下参数

return 是一个包含两个元素的元组。 第一个元素是包含每个特征权重的列矩阵,第二个元素是包含为每次迭代计算的损失的数组。

这是一个使用 L-BFGS 优化器训练具有 L2 正则化的二元逻辑回归的示例。

有关 API 的详细信息,请参阅 LBFGS Scala 文档SquaredL2Updater Scala 文档

import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

// Append 1 into the training data as intercept.
val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache()

val test = splits(1)

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)

val model = new LogisticRegressionModel(
  Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
  weightsWithIntercept(weightsWithIntercept.size - 1))

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println("Loss of each step in training process")
loss.foreach(println)
println(s"Area under ROC = $auROC")
在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/mllib/LBFGSExample.scala" 中查找完整的示例代码。

有关 API 的详细信息,请参阅 LBFGS Java 文档SquaredL2Updater Java 文档

import java.util.Arrays;

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.optimization.*;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
int numFeatures = data.take(1).get(0).features().size();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> trainingInit = data.sample(false, 0.6, 11L);
JavaRDD<LabeledPoint> test = data.subtract(trainingInit);

// Append 1 into the training data as intercept.
JavaPairRDD<Object, Vector> training = data.mapToPair(p ->
  new Tuple2<>(p.label(), MLUtils.appendBias(p.features())));
training.cache();

// Run training algorithm to build the model.
int numCorrections = 10;
double convergenceTol = 1e-4;
int maxNumIterations = 20;
double regParam = 0.1;
Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]);

Tuple2<Vector, double[]> result = LBFGS.runLBFGS(
  training.rdd(),
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept);
Vector weightsWithIntercept = result._1();
double[] loss = result._2();

LogisticRegressionModel model = new LogisticRegressionModel(
  Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
  (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
BinaryClassificationMetrics metrics =
  new BinaryClassificationMetrics(scoreAndLabels.rdd());
double auROC = metrics.areaUnderROC();

System.out.println("Loss of each step in training process");
for (double l : loss) {
  System.out.println(l);
}
System.out.println("Area under ROC = " + auROC);
在 Spark repo 的 "examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java" 中查找完整的示例代码。

开发者说明

由于 Hessian 是从先前的梯度评估中近似构建的,因此在优化过程中无法更改目标函数。 因此,通过仅使用 miniBatch 来天真地使用随机 L-BFGS 是行不通的。因此,在我们有更好的了解之前,我们不会提供它。

Updater 最初是一个为梯度下降设计的类,它计算实际的梯度下降步长。 但是,通过忽略仅针对梯度下降的逻辑部分,例如自适应步长之类的内容,我们可以获取 L-BFGS 正则化的目标函数的梯度和损失。 稍后,我们将把此内容重构为正则化器以替换更新器,以分隔正则化和步长更新之间的逻辑。