优化 - 基于 RDD 的 API

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

数学描述

梯度下降

解决形式为 $\min_{\wv \in\R^d} \; f(\wv)$ 的优化问题的最简单方法是 梯度下降。这种一阶优化方法(包括梯度下降及其随机变体)非常适合大规模和分布式计算。

梯度下降方法旨在通过迭代地在最陡下降方向上迈出步长来找到函数的局部最小值,最陡下降方向是函数在当前点(即当前参数值)处的导数(称为 梯度)的负值。如果目标函数 $f$ 在所有参数处不可微,但仍然是凸的,那么次梯度是梯度的自然推广,并承担了步长方向的作用。无论哪种情况,计算 $f$ 的梯度或次梯度都是昂贵的——它需要遍历整个数据集,才能计算所有损失项的贡献。

随机梯度下降 (SGD)

目标函数 $f$ 写成求和形式的优化问题特别适合使用随机梯度下降 (SGD) 来解决。在我们的例子中,对于 监督机器学习 中常用的优化公式,\begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation} 这是特别自然的,因为损失被写成来自每个数据点的各个损失的平均值。

随机次梯度是向量的随机选择,使得在期望中,我们获得了原始目标函数的真实次梯度。均匀随机地选择一个数据点 $i\in[1..n]$,我们获得了 $\eqref{eq:regPrimal}$ 关于 $\wv$ 的随机次梯度,如下所示:\[ f'_{\wv,i} := L'_{\wv,i} + \lambda\, R'_\wv \ , \] 其中 $L'_{\wv,i} \in \R^d$ 是由第 $i$ 个数据点确定的损失函数部分的次梯度,即 $L'_{\wv,i} \in \frac{\partial}{\partial \wv} L(\wv;\x_i,y_i)$。此外,$R'_\wv$ 是正则化器 $R(\wv)$ 的次梯度,即 $R'_\wv \in \frac{\partial}{\partial \wv} R(\wv)$。项 $R'_\wv$ 不依赖于选择哪个随机数据点。显然,在随机选择 $i\in[1..n]$ 的期望中,我们有 $f'_{\wv,i}$ 是原始目标 $f$ 的次梯度,这意味着 $\E\left[f'_{\wv,i}\right] \in \frac{\partial}{\partial \wv} f(\wv)$

现在运行 SGD 只是沿着负随机次梯度 $f'_{\wv,i}$ 的方向行走,即 \begin{equation}\label{eq:SGDupdate} \wv^{(t+1)} := \wv^{(t)} - \gamma \; f'_{\wv,i} \ . \end{equation} 步长。 参数 $\gamma$ 是步长,在默认实现中,它随着迭代计数器的平方根而递减,即在第 $t$ 次迭代中,$\gamma := \frac{s}{\sqrt{t}}$,其中输入参数 $s=$ stepSize。请注意,在实践中,为 SGD 方法选择最佳步长通常很微妙,也是一个活跃的研究领域。

梯度。 spark.mllib 中实现的机器学习方法的(次)梯度表,可以在 分类和回归 部分找到。

近端更新。 作为仅使用正则化器 $R'(\wv)$ 的次梯度作为步长方向的替代方案,在某些情况下,可以使用近端算子来获得改进的更新。对于 L1 正则化器,近端算子由软阈值给出,如 L1Updater 中所实现。

分布式 SGD 的更新方案

GradientDescent 中的 SGD 实现使用了一种简单的(分布式)数据示例采样。我们回顾一下,优化问题 $\eqref{eq:regPrimal}$ 的损失部分是 $\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)$,因此 $\frac1n \sum_{i=1}^n L'_{\wv,i}$ 将是真实的(次)梯度。由于这将需要访问整个数据集,因此参数 miniBatchFraction 指定了要使用多少比例的完整数据。在该子集上梯度的平均值,即 \[ \frac1{|S|} \sum_{i\in S} L'_{\wv,i} \ , \] 是一个随机梯度。这里 $S$ 是大小为 $|S|=$ miniBatchFraction $\cdot n$ 的采样子集。

在每次迭代中,对分布式数据集 (RDD) 的采样,以及来自每个工作机器的部分结果的总和的计算,都是由标准的 spark 例程执行的。

如果点比例 miniBatchFraction 设置为 1(默认值),那么每次迭代中产生的步长就是精确的(次)梯度下降。在这种情况下,没有随机性,也没有使用步长方向的方差。在另一个极端情况下,如果 miniBatchFraction 选择得非常小,使得只采样一个点,即 $|S|=$ miniBatchFraction $\cdot n = 1$,那么该算法等效于标准 SGD。在这种情况下,步长方向取决于点的均匀随机采样。

有限内存 BFGS (L-BFGS)

L-BFGS 是一种拟牛顿方法家族中的优化算法,用于解决形式为 $\min_{\wv \in\R^d} \; f(\wv)$ 的优化问题。L-BFGS 方法将目标函数局部近似为二次函数,而无需评估目标函数的二阶偏导数来构造 Hessian 矩阵。Hessian 矩阵通过之前的梯度评估来近似,因此在显式计算牛顿方法中的 Hessian 矩阵时,不存在垂直可扩展性问题(训练特征的数量)。因此,与其他一阶优化方法相比,L-BFGS 通常能实现更快的收敛速度。

选择优化方法

线性方法 在内部使用优化,spark.mllib 中的一些线性方法支持 SGD 和 L-BFGS。不同的优化方法可能具有不同的收敛保证,具体取决于目标函数的性质,我们在这里无法涵盖相关文献。一般来说,当 L-BFGS 可用时,我们建议使用它而不是 SGD,因为 L-BFGS 往往收敛更快(迭代次数更少)。

MLlib 中的实现

梯度下降和随机梯度下降

梯度下降方法,包括随机次梯度下降 (SGD),作为 MLlib 中的低级原语包含在内,各种 ML 算法都是基于它开发的,例如,请参阅 线性方法 部分。

SGD 类 GradientDescent 设置以下参数

L-BFGS

L-BFGS 目前在 MLlib 中只是一个低级优化原语。如果您想在各种 ML 算法(如线性回归和逻辑回归)中使用 L-BFGS,您必须自己将目标函数的梯度和更新器传递给优化器,而不是使用像 LogisticRegressionWithSGD 这样的训练 API。请参见下面的示例。这将在下一个版本中解决。

使用 L1Updater 的 L1 正则化将不起作用,因为 L1Updater 中的软阈值逻辑是为梯度下降设计的。请参见开发人员说明。

L-BFGS 方法 LBFGS.runLBFGS 具有以下参数

The return 是一个包含两个元素的元组。第一个元素是一个包含每个特征权重的列矩阵,第二个元素是一个包含每次迭代计算的损失的数组。

以下是如何使用 L-BFGS 优化器训练具有 L2 正则化的二元逻辑回归的示例。

有关 API 的详细信息,请参阅 LBFGS Scala 文档SquaredL2Updater Scala 文档

import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

// Append 1 into the training data as intercept.
val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache()

val test = splits(1)

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)

val model = new LogisticRegressionModel(
  Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
  weightsWithIntercept(weightsWithIntercept.size - 1))

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println("Loss of each step in training process")
loss.foreach(println)
println(s"Area under ROC = $auROC")
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/mllib/LBFGSExample.scala”中找到完整的示例代码。

有关 API 的详细信息,请参阅 LBFGS Java 文档SquaredL2Updater Java 文档

import java.util.Arrays;

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.optimization.*;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
int numFeatures = data.take(1).get(0).features().size();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> trainingInit = data.sample(false, 0.6, 11L);
JavaRDD<LabeledPoint> test = data.subtract(trainingInit);

// Append 1 into the training data as intercept.
JavaPairRDD<Object, Vector> training = data.mapToPair(p ->
  new Tuple2<>(p.label(), MLUtils.appendBias(p.features())));
training.cache();

// Run training algorithm to build the model.
int numCorrections = 10;
double convergenceTol = 1e-4;
int maxNumIterations = 20;
double regParam = 0.1;
Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]);

Tuple2<Vector, double[]> result = LBFGS.runLBFGS(
  training.rdd(),
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept);
Vector weightsWithIntercept = result._1();
double[] loss = result._2();

LogisticRegressionModel model = new LogisticRegressionModel(
  Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
  (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaPairRDD<Object, Object> scoreAndLabels = test.mapToPair(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
BinaryClassificationMetrics metrics =
  new BinaryClassificationMetrics(scoreAndLabels.rdd());
double auROC = metrics.areaUnderROC();

System.out.println("Loss of each step in training process");
for (double l : loss) {
  System.out.println(l);
}
System.out.println("Area under ROC = " + auROC);
在 Spark 存储库中的“examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java”中找到完整的示例代码。

开发者笔记

由于 Hessian 是根据之前的梯度评估近似构建的,因此目标函数在优化过程中不能更改。因此,随机 L-BFGS 不会通过简单地使用 miniBatch 来天真地工作;因此,在我们有更好的理解之前,我们不会提供它。

Updater 是一个最初为梯度下降设计的类,它计算实际的梯度下降步骤。但是,我们能够通过忽略仅用于梯度下降的逻辑部分(例如自适应步长内容)来获取 L-BFGS 的正则化目标函数的梯度和损失。我们将在以后将此重构为正则化器以替换更新器,以将正则化和步骤更新之间的逻辑分开。