维度约减 - 基于 RDD 的 API
维度约减是减少所考虑变量数量的过程。它可用于从原始和嘈杂的特征中提取潜在特征,或在保持结构的同时压缩数据。spark.mllib
为 RowMatrix 类提供了维度约减支持。
奇异值分解 (SVD)
奇异值分解 (SVD) 将一个矩阵分解为三个矩阵:$U$、$Σ$ 和 $V$,使得
\[ A = U \Sigma V^T, \]
其中
- $U$ 是一个正交矩阵,其列称为左奇异向量,
- $\Sigma$ 是一个对角矩阵,其非负对角线元素按降序排列,其对角线元素称为奇异值,
- $V$ 是一个正交矩阵,其列称为右奇异向量。
对于大型矩阵,通常我们不需要完整的分解,而只需要顶部奇异值及其相关的奇异向量。这可以节省存储空间、去噪并恢复矩阵的低秩结构。
如果我们保留前 $k$ 个奇异值,那么生成的低秩矩阵的维度将是
$U$
:$m \times k$
,$\Sigma$
:$k \times k$
,$V$
:$n \times k$
。
性能
我们假设 $n$ 小于 $m$。奇异值和右奇异向量是从格拉姆矩阵 $A^T A$ 的特征值和特征向量中导出的。存储左奇异向量 $U$ 的矩阵,如果用户通过 `computeU` 参数请求,则通过矩阵乘法 $U = A (V S^{-1})$ 计算。实际使用的方法是根据计算成本自动确定的
- 如果 $n$ 较小 ($n < 100$) 或 $k$ 相对于 $n$ 较大 ($k > n / 2$),我们首先计算格拉姆矩阵,然后在驱动器上本地计算其顶部特征值和特征向量。这需要在每个执行器和驱动器上进行单次遍历,存储空间为 $O(n^2)$,并且在驱动器上耗时 $O(n^2 k)$。
- 否则,我们以分布式方式计算 $(A^T A) v$,并将其发送到 ARPACK 以在驱动节点上计算 $(A^T A)$ 的顶部特征值和特征向量。这需要 $O(k)$ 次遍历,每个执行器上的存储空间为 $O(n)$,以及驱动器上的存储空间为 $O(n k)$。
SVD 示例
spark.mllib
为行式矩阵(由 RowMatrix 类提供)提供了 SVD 功能。
有关 API 的详细信息,请参阅 SingularValueDecomposition
Python 文档。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 5 singular values and corresponding singular vectors.
svd = mat.computeSVD(5, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
如果 U
被定义为 IndexedRowMatrix
,则相同的代码也适用于 IndexedRowMatrix
。
有关 API 的详细信息,请参阅 SingularValueDecomposition
Scala 文档。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(immutable.ArraySeq.unsafeWrapArray(data))
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 5 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(5, computeU = true)
val U: RowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.
如果 U
被定义为 IndexedRowMatrix
,则相同的代码也适用于 IndexedRowMatrix
。
有关 API 的详细信息,请参阅 SingularValueDecomposition
Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.SingularValueDecomposition;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 5 singular values and corresponding singular vectors.
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
RowMatrix U = svd.U(); // The U factor is a RowMatrix.
Vector s = svd.s(); // The singular values are stored in a local dense vector.
Matrix V = svd.V(); // The V factor is a local dense matrix.
如果 U
被定义为 IndexedRowMatrix
,则相同的代码也适用于 IndexedRowMatrix
。
主成分分析 (PCA)
主成分分析 (PCA) 是一种统计方法,用于寻找一种旋转,使得第一个坐标具有最大的可能方差,并且每个后续坐标依次也具有最大的可能方差。旋转矩阵的列称为主成分。PCA 广泛用于维度约减。
spark.mllib
支持对以行式存储的高瘦矩阵和任何向量进行 PCA。
以下代码演示了如何在 RowMatrix
上计算主成分,并使用它们将向量投影到低维空间。
有关 API 的详细信息,请参阅 RowMatrix
Python 文档。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 4 principal components.
# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(4)
# Project the rows to the linear space spanned by the top 4 principal components.
projected = mat.multiply(pc)
以下代码演示了如何在 RowMatrix
上计算主成分,并使用它们将向量投影到低维空间。
有关 API 的详细信息,请参阅 RowMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(immutable.ArraySeq.unsafeWrapArray(data))
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
val pc: Matrix = mat.computePrincipalComponents(4)
// Project the rows to the linear space spanned by the top 4 principal components.
val projected: RowMatrix = mat.multiply(pc)
以下代码演示了如何在源向量上计算主成分,并使用它们将向量投影到低维空间,同时保留相关标签
有关 API 的详细信息,请参阅 PCA
Scala 文档。
import org.apache.spark.mllib.feature.PCA
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
val data: RDD[LabeledPoint] = sc.parallelize(Seq(
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 1)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 1, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0)),
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0))))
// Compute the top 5 principal components.
val pca = new PCA(5).fit(data.map(_.features))
// Project vectors to the linear space spanned by the top 5 principal
// components, keeping the label
val projected = data.map(p => p.copy(features = pca.transform(p.features)))
以下代码演示了如何在 RowMatrix
上计算主成分,并使用它们将向量投影到低维空间。
有关 API 的详细信息,请参阅 RowMatrix
Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
Matrix pc = mat.computePrincipalComponents(4);
// Project the rows to the linear space spanned by the top 4 principal components.
RowMatrix projected = mat.multiply(pc);