线性方法 - 基于 RDD 的 API
\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]
数学公式
许多标准的*机器学习*方法可以表述为凸优化问题,即找到依赖于变量向量 $\wv$
(在代码中称为*权重*)的凸函数 $f$
的最小化器的任务,该向量有 $d$
个条目。形式上,我们可以将其写成优化问题 $\min_{\wv \in\R^d} \; f(\wv)$
,其中目标函数的形式为 \begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation}
这里向量 $\x_i\in\R^d$
是训练数据示例,对于 $1\le i\le n$
,而 $y_i\in\R$
是它们对应的标签,我们想要预测这些标签。如果 $L(\wv; \x, y)$ 可以表示为 $\wv^T x$ 和 $y$ 的函数,我们称该方法为*线性*方法。 spark.mllib
的几种分类和回归算法属于此类别,将在此处讨论。
目标函数 $f$
有两部分:控制模型复杂度的正则化器和衡量模型在训练数据上的误差的损失。损失函数 $L(\wv;.)$
通常是 $\wv$
中的凸函数。固定的正则化参数 $\lambda \ge 0$
(代码中的 regParam
)定义了最小化损失(即训练误差)和最小化模型复杂度(即避免过拟合)这两个目标之间的权衡。
损失函数
下表总结了 spark.mllib
支持的方法的损失函数及其梯度或次梯度
损失函数 $L(\wv; \x, y)$ | 梯度或次梯度 | |
---|---|---|
铰链损失 | $\max \{0, 1-y \wv^T \x \}, \quad y \in \{-1, +1\}$ | $\begin{cases}-y \cdot \x & \text{如果 $y \wv^T \x <1$}, \\ 0 & \text{否则}.\end{cases}$ |
逻辑损失 | $\log(1+\exp( -y \wv^T \x)), \quad y \in \{-1, +1\}$ | $-y \left(1-\frac1{1+\exp(-y \wv^T \x)} \right) \cdot \x$ |
平方损失 | $\frac{1}{2} (\wv^T \x - y)^2, \quad y \in \R$ | $(\wv^T \x - y) \cdot \x$ |
请注意,在上面的数学公式中,二进制标签 $y$ 表示为 $+1$(正)或 $-1$(负),这对于公式来说很方便。*但是*,为了与多类标签保持一致,负标签在 spark.mllib
中由 $0$ 表示,而不是 $-1$。
正则化器
正则化器 的目的是鼓励简单的模型并避免过拟合。我们在 spark.mllib
中支持以下正则化器
正则化器 $R(\wv)$ | 梯度或次梯度 | |
---|---|---|
零(未正则化) | 0 | $\0$ |
L2 | $\frac{1}{2}\|\wv\|_2^2$ | $\wv$ |
L1 | $\|\wv\|_1$ | $\mathrm{sign}(\wv)$ |
弹性网络 | $\alpha \|\wv\|_1 + (1-\alpha)\frac{1}{2}\|\wv\|_2^2$ | $\alpha \mathrm{sign}(\wv) + (1-\alpha) \wv$ |
这里 $\mathrm{sign}(\wv)$
是由 $\wv$
的所有条目的符号($\pm1$
)组成的向量。
由于平滑性,L2 正则化问题通常比 L1 正则化问题更容易解决。但是,L1 正则化可以帮助促进权重的稀疏性,从而导致更小、更易于解释的模型,后者可用于特征选择。 弹性网络 是 L1 和 L2 正则化的组合。不建议在没有任何正则化的情况下训练模型,尤其是在训练示例数量较少的情况下。
优化
在底层,线性方法使用凸优化方法来优化目标函数。 spark.mllib
使用两种方法,SGD 和 L-BFGS,在优化部分中进行了描述。目前,大多数算法 API 支持随机梯度下降 (SGD),少数支持 L-BFGS。有关在优化方法之间进行选择的指南,请参阅此优化部分。
分类
分类 旨在将项目分成几类。最常见的分类类型是二元分类,其中有两类,通常称为正类和负类。如果类别超过两个,则称为多类分类。 spark.mllib
支持两种用于分类的线性方法:线性支持向量机 (SVM) 和逻辑回归。线性 SVM 仅支持二元分类,而逻辑回归支持二元和多类分类问题。对于这两种方法,spark.mllib
都支持 L1 和 L2 正则化变体。训练数据集由 MLlib 中的 LabeledPoint 的 RDD 表示,其中标签是从零开始的类索引:$0, 1, 2, \ldots$。
线性支持向量机 (SVM)
线性 SVM 是大规模分类任务的标准方法。它是一种如公式 $\eqref{eq:regPrimal}$
中所述的线性方法,其公式中的损失函数由铰链损失给出
\[ L(\wv;\x,y) := \max \{0, 1-y \wv^T \x \}. \]
默认情况下,线性 SVM 使用 L2 正则化进行训练。我们也支持可选的 L1 正则化。在这种情况下,问题变成了一个线性规划问题。
线性 SVM 算法输出一个 SVM 模型。给定一个新的数据点,用 $\x$ 表示,模型根据 $\wv^T \x$ 的值进行预测。默认情况下,如果 $\wv^T \x \geq 0$,则结果为正,否则为负。
示例
以下示例显示了如何加载示例数据集、构建 SVM 模型以及使用生成的模型进行预测以计算训练误差。
有关 API 的更多详细信息,请参阅SVMWithSGD
Python 文档和SVMModel
Python 文档。
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)
# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
# Save and load model
model.save(sc, "target/tmp/pythonSVMWithSGDModel")
sameModel = SVMModel.load(sc, "target/tmp/pythonSVMWithSGDModel")
以下代码片段说明了如何加载示例数据集、使用算法对象中的静态方法对此训练数据执行训练算法,以及使用生成的模型进行预测以计算训练误差。
有关 API 的详细信息,请参阅SVMWithSGD
Scala 文档和SVMModel
Scala 文档。
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils
// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
// Run training algorithm to build the model
val numIterations = 100
val model = SVMWithSGD.train(training, numIterations)
// Clear the default threshold.
model.clearThreshold()
// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println(s"Area under ROC = $auROC")
// Save and load model
model.save(sc, "target/tmp/scalaSVMWithSGDModel")
val sameModel = SVMModel.load(sc, "target/tmp/scalaSVMWithSGDModel")
SVMWithSGD.train()
方法默认使用正则化参数设置为 1.0 的 L2 正则化。如果我们想配置此算法,我们可以通过直接创建新对象并调用 setter 方法来进一步自定义 SVMWithSGD
。所有其他 spark.mllib
算法也支持以这种方式进行自定义。例如,以下代码生成正则化参数设置为 0.1 的 L1 正则化 SVM 变体,并运行训练算法 200 次迭代。
MLlib 的所有方法都使用 Java 友好类型,因此您可以在 Java 中以与在 Scala 中相同的方式导入和调用它们。唯一需要注意的是,这些方法采用 Scala RDD 对象,而 Spark Java API 使用单独的 JavaRDD
类。您可以通过在 JavaRDD
对象上调用 .rdd()
将 Java RDD 转换为 Scala RDD。下面给出了一个与 Scala 中提供的示例等效的自包含应用程序示例
有关 API 的详细信息,请参阅 SVMWithSGD
Java 文档 和 SVMModel
Java 文档。
import scala.Tuple2;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.SVMModel;
import org.apache.spark.mllib.classification.SVMWithSGD;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L);
training.cache();
JavaRDD<LabeledPoint> test = data.subtract(training);
// Run training algorithm to build the model.
int numIterations = 100;
SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);
// Clear the default threshold.
model.clearThreshold();
// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(p ->
new Tuple2<>(model.predict(p.features()), p.label()));
// Get evaluation metrics.
BinaryClassificationMetrics metrics =
new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
double auROC = metrics.areaUnderROC();
System.out.println("Area under ROC = " + auROC);
// Save and load model
model.save(sc, "target/tmp/javaSVMWithSGDModel");
SVMModel sameModel = SVMModel.load(sc, "target/tmp/javaSVMWithSGDModel");
SVMWithSGD.train()
方法默认使用正则化参数设置为 1.0 的 L2 正则化。如果我们想配置此算法,我们可以通过直接创建新对象并调用 setter 方法来进一步自定义 SVMWithSGD
。所有其他 spark.mllib
算法也支持以这种方式进行自定义。例如,以下代码生成正则化参数设置为 0.1 的 L1 正则化 SVM 变体,并运行训练算法 200 次迭代。
要运行上述应用程序,请按照 Spark 快速入门指南的自包含应用程序部分中提供的说明进行操作。请确保还将 spark-mllib 作为依赖项包含在您的构建文件中。
逻辑回归
逻辑回归被广泛用于预测二元响应。它是一种如上面等式 $\eqref{eq:regPrimal}$
中所述的线性方法,其公式中的损失函数由逻辑损失给出:\[ L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x)). \]
对于二分类问题,该算法输出二元逻辑回归模型。给定一个新的数据点,用 $\x$ 表示,模型通过应用逻辑函数 \[ \mathrm{f}(z) = \frac{1}{1 + e^{-z}} \]
进行预测,其中 $z = \wv^T \x$。默认情况下,如果 $\mathrm{f}(\wv^T x) > 0.5$,则结果为正,否则为负,尽管与线性 SVM 不同,逻辑回归模型的原始输出 $\mathrm{f}(z)$ 具有概率解释(即,$\x$ 为正的概率)。
二元逻辑回归可以推广到多项逻辑回归,以训练和预测多类分类问题。例如,对于 $K$ 个可能的结果,可以选择其中一个结果作为“枢轴”,其他 $K - 1$ 个结果可以分别针对枢轴结果进行回归。在 spark.mllib
中,第一个类 $0$ 被选为“枢轴”类。有关参考文献,请参见 统计学习基础 的第 4.4 节。这是一个 详细的数学推导。
对于多类分类问题,该算法将输出一个多项逻辑回归模型,其中包含针对第一类回归的 $K - 1$ 个二元逻辑回归模型。给定一个新的数据点,将运行 $K - 1$ 个模型,概率最大的类将被选为预测类。
我们实现了两种算法来解决逻辑回归:小批量梯度下降和 L-BFGS。我们建议使用 L-BFGS 而不是小批量梯度下降,因为它收敛速度更快。
示例
以下示例显示了如何加载样本数据集、构建逻辑回归模型,以及使用生成的模型进行预测以计算训练误差。
请注意,Python API 尚不支持多类分类和模型保存/加载,但将来会支持。
有关 API 的更多详细信息,请参阅 LogisticRegressionWithLBFGS
Python 文档 和 LogisticRegressionModel
Python 文档。
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)
# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
# Save and load model
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
"target/tmp/pythonLogisticRegressionWithLBFGSModel")
以下代码说明了如何加载样本多类数据集,将其拆分为训练集和测试集,并使用 LogisticRegressionWithLBFGS 拟合逻辑回归模型。然后根据测试数据集评估模型并将其保存到磁盘。
有关 API 的详细信息,请参阅 LogisticRegressionWithLBFGS
Scala 文档 和 LogisticRegressionModel
Scala 文档。
import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
// Run training algorithm to build the model
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training)
// Compute raw scores on the test set.
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
// Get evaluation metrics.
val metrics = new MulticlassMetrics(predictionAndLabels)
val accuracy = metrics.accuracy
println(s"Accuracy = $accuracy")
// Save and load model
model.save(sc, "target/tmp/scalaLogisticRegressionWithLBFGSModel")
val sameModel = LogisticRegressionModel.load(sc,
"target/tmp/scalaLogisticRegressionWithLBFGSModel")
以下代码说明了如何加载样本多类数据集,将其拆分为训练集和测试集,并使用 LogisticRegressionWithLBFGS 拟合逻辑回归模型。然后根据测试数据集评估模型并将其保存到磁盘。
有关 API 的详细信息,请参阅 LogisticRegressionWithLBFGS
Java 文档 和 LogisticRegressionModel
Java 文档。
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[] {0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];
// Run training algorithm to build the model.
LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)
.run(training.rdd());
// Compute raw scores on the test set.
JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
new Tuple2<>(model.predict(p.features()), p.label()));
// Get evaluation metrics.
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double accuracy = metrics.accuracy();
System.out.println("Accuracy = " + accuracy);
// Save and load model
model.save(sc, "target/tmp/javaLogisticRegressionWithLBFGSModel");
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
"target/tmp/javaLogisticRegressionWithLBFGSModel");
回归
线性最小二乘法、Lasso 和岭回归
线性最小二乘法是回归问题最常见的公式。它是一种如上面等式 $\eqref{eq:regPrimal}$
中所述的线性方法,其公式中的损失函数由平方损失给出:\[ L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2. \]
通过使用不同类型的正则化,可以推导出各种相关的回归方法:普通最小二乘法 或 线性最小二乘法 不使用正则化;岭回归 使用 L2 正则化;Lasso 使用 L1 正则化。对于所有这些模型,平均损失或训练误差 $\frac{1}{n} \sum_{i=1}^n (\wv^T x_i - y_i)^2$ 被称为 均方误差。
流式线性回归
当数据以流式传输方式到达时,在线拟合回归模型非常有用,可以随着新数据的到达更新模型的参数。spark.mllib
目前支持使用普通最小二乘法进行流式线性回归。拟合与离线执行的拟合类似,只是拟合发生在每批数据上,因此模型会不断更新以反映来自流的数据。
示例
以下示例演示了如何从两个不同的文本文件输入流中加载训练和测试数据,将流解析为标记点,在线拟合第一个流的线性回归模型,以及对第二个流进行预测。
首先,我们导入解析输入数据和创建模型所需的类。
然后,我们为训练和测试数据创建输入流。我们假设已经创建了一个 StreamingContext ssc
,有关更多信息,请参见 Spark 流式编程指南。在本例中,我们在训练和测试流中使用标记点,但在实践中,您可能希望对测试数据使用未标记的向量。
我们通过将权重初始化为 0 来创建模型。
现在我们注册用于训练和测试的流并启动作业。
我们现在可以将包含数据的文本文件保存到训练或测试文件夹中。每行应该是一个格式为 (y,[x1,x2,x3])
的数据点,其中 y
是标签,x1,x2,x3
是特征。每当将文本文件放入 sys.argv[1]
时,模型都会更新。每当将文本文件放入 sys.argv[2]
时,您都会看到预测。随着您向训练目录提供更多数据,预测会变得更好!
这是一个完整的示例
import sys
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(',')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = ssc.textFileStream(sys.argv[1]).map(parse).cache()
testData = ssc.textFileStream(sys.argv[2]).map(parse)
numFeatures = 3
model = StreamingLinearRegressionWithSGD()
model.setInitialWeights([0.0, 0.0, 0.0])
model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
ssc.start()
ssc.awaitTermination()
首先,我们导入解析输入数据和创建模型所需的类。
然后,我们为训练和测试数据创建输入流。我们假设已经创建了一个 StreamingContext ssc
,有关更多信息,请参见 Spark 流式编程指南。在本例中,我们在训练和测试流中使用标记点,但在实践中,您可能希望对测试数据使用未标记的向量。
我们通过将权重初始化为零来创建模型,并注册用于训练和测试的流,然后启动作业。将预测与真实标签一起打印,让我们可以轻松查看结果。
最后,我们可以在训练或测试文件夹中保存包含数据的文本文件。每行应该是一个格式为 (y,[x1,x2,x3])
的数据点,其中 y
是标签,x1,x2,x3
是特征。每当将文本文件放入 args(0)
时,模型都会更新。每当将文本文件放入 args(1)
时,您都会看到预测。随着您向训练目录提供更多数据,预测会变得更好!
这是一个完整的示例
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(numFeatures))
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
实现(开发者)
在幕后,spark.mllib
实现了随机梯度下降 (SGD) 的简单分布式版本,它建立在底层梯度下降原语之上(如优化部分所述)。所有提供的算法都将正则化参数 (regParam
) 以及与随机梯度下降相关的各种参数(stepSize
、numIterations
、miniBatchFraction
)作为输入。对于它们中的每一个,我们都支持所有三种可能的正则化(无、L1 或 L2)。
对于逻辑回归,L-BFGS 版本在 LogisticRegressionWithLBFGS 下实现,并且此版本支持二元和多项逻辑回归,而 SGD 版本仅支持二元逻辑回归。但是,L-BFGS 版本不支持 L1 正则化,而 SGD 版本支持 L1 正则化。当不需要 L1 正则化时,强烈建议使用 L-BFGS 版本,因为它通过使用拟牛顿法逼近逆 Hessian 矩阵,与 SGD 相比收敛速度更快、精度更高。
算法均在 Scala 中实现