降维 - 基于 RDD 的 API
降维是减少考虑的变量数量的过程。它可以用于从原始和嘈杂的特征中提取潜在特征,或者在保持结构的同时压缩数据。spark.mllib
提供了对 RowMatrix 类进行降维的支持。
奇异值分解 (SVD)
奇异值分解 (SVD) 将一个矩阵分解成三个矩阵:$U$,$\Sigma$,和 $V$,使得
\[ A = U \Sigma V^T, \]
其中
- $U$ 是一个正交矩阵,其列被称为左奇异向量,
- $\Sigma$ 是一个对角矩阵,具有按降序排列的非负对角线,其对角线被称为奇异值,
- $V$ 是一个正交矩阵,其列被称为右奇异向量。
对于大型矩阵,通常我们不需要完全分解,而只需要顶部的奇异值及其相关的奇异向量。这可以节省存储空间,去噪并恢复矩阵的低秩结构。
如果我们保留顶部的 $k$ 个奇异值,那么得到的低秩矩阵的维度将是
$U$
:$m \times k$
,$\Sigma$
:$k \times k$
,$V$
:$n \times k$
.
性能
我们假设 $n$ 小于 $m$。奇异值和右奇异向量是从格拉姆矩阵 $A^T A$ 的特征值和特征向量推导出来的。存储左奇异向量 $U$ 的矩阵通过矩阵乘法计算为 $U = A (V S^{-1})$,如果用户通过 computeU 参数请求。实际使用的方法是根据计算成本自动确定的。
- 如果 $n$ 很小 ($n < 100$) 或者 $k$ 相对于 $n$ 较大 ($k > n / 2$),我们首先计算格拉姆矩阵,然后在驱动程序上本地计算其顶部的特征值和特征向量。这需要单次传递,每个执行器和驱动程序上需要 $O(n^2)$ 的存储空间,以及驱动程序上需要 $O(n^2 k)$ 的时间。
- 否则,我们以分布式方式计算 $(A^T A) v$,并将其发送到 ARPACK 以在驱动程序节点上计算 $(A^T A)$ 的顶部特征值和特征向量。这需要 $O(k)$ 次传递,每个执行器上需要 $O(n)$ 的存储空间,以及驱动程序上需要 $O(n k)$ 的存储空间。
SVD 示例
spark.mllib
向行式矩阵提供 SVD 功能,该功能在 RowMatrix 类中提供。
有关 API 的详细信息,请参阅 SingularValueDecomposition
Python 文档。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 5 singular values and corresponding singular vectors.
svd = mat.computeSVD(5, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
如果 U
被定义为 IndexedRowMatrix
,则相同的代码适用于 IndexedRowMatrix
。
有关 API 的详细信息,请参阅 SingularValueDecomposition
Scala 文档。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(data)
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 5 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(5, computeU = true)
val U: RowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.
如果 U
被定义为 IndexedRowMatrix
,则相同的代码适用于 IndexedRowMatrix
。
有关 API 的详细信息,请参阅 SingularValueDecomposition
Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.SingularValueDecomposition;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 5 singular values and corresponding singular vectors.
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
RowMatrix U = svd.U(); // The U factor is a RowMatrix.
Vector s = svd.s(); // The singular values are stored in a local dense vector.
Matrix V = svd.V(); // The V factor is a local dense matrix.
如果 U
被定义为 IndexedRowMatrix
,则相同的代码适用于 IndexedRowMatrix
。
主成分分析 (PCA)
主成分分析 (PCA) 是一种统计方法,用于找到一种旋转,使得第一个坐标具有尽可能大的方差,并且每个后续坐标依次具有尽可能大的方差。旋转矩阵的列称为主成分。 PCA 广泛用于降维。
spark.mllib
支持存储在面向行的格式和任何 Vectors 中的高瘦矩阵的 PCA。
以下代码演示了如何在 RowMatrix
上计算主成分,并使用它们将向量投影到低维空间中。
有关 API 的详细信息,请参阅 RowMatrix
Python 文档。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 4 principal components.
# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(4)
# Project the rows to the linear space spanned by the top 4 principal components.
projected = mat.multiply(pc)
以下代码演示了如何在 RowMatrix
上计算主成分,并使用它们将向量投影到低维空间中。
有关 API 的详细信息,请参阅 RowMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(data)
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
val pc: Matrix = mat.computePrincipalComponents(4)
// Project the rows to the linear space spanned by the top 4 principal components.
val projected: RowMatrix = mat.multiply(pc)
以下代码演示了如何在源向量上计算主成分,并使用它们将向量投影到低维空间中,同时保留相关的标签
有关 API 的详细信息,请参阅 PCA
Scala 文档。
import org.apache.spark.mllib.feature.PCA
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
val data: RDD[LabeledPoint] = sc.parallelize(Seq(
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 1)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 1, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0)),
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0))))
// Compute the top 5 principal components.
val pca = new PCA(5).fit(data.map(_.features))
// Project vectors to the linear space spanned by the top 5 principal
// components, keeping the label
val projected = data.map(p => p.copy(features = pca.transform(p.features)))
以下代码演示了如何在 RowMatrix
上计算主成分,并使用它们将向量投影到低维空间中。
有关 API 的详细信息,请参阅 RowMatrix
Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
Matrix pc = mat.computePrincipalComponents(4);
// Project the rows to the linear space spanned by the top 4 principal components.
RowMatrix projected = mat.multiply(pc);