降维 - 基于 RDD 的 API
降维 是减少所考虑变量数量的过程。它可以用来从原始和噪声特征中提取潜在特征,或在保持结构的同时压缩数据。 spark.mllib
提供了对 RowMatrix 类进行降维的支持。
奇异值分解 (SVD)
奇异值分解 (SVD) 将矩阵分解为三个矩阵:$U$、$\Sigma$ 和 $V$,使得
\[ A = U \Sigma V^T, \]
其中
- $U$ 是一个正交矩阵,其列称为左奇异向量,
- $\Sigma$ 是一个对角矩阵,其对角线是非负的,按降序排列,其对角线称为奇异值,
- $V$ 是一个正交矩阵,其列称为右奇异向量。
对于大型矩阵,通常我们不需要完整的分解,只需要前几个奇异值及其相关的奇异向量。这可以节省存储空间,去噪并恢复矩阵的低秩结构。
如果我们保留前 $k$ 个奇异值,那么得到的低秩矩阵的维度将为
$U$
:$m \times k$
,$\Sigma$
:$k \times k$
,$V$
:$n \times k$
.
性能
我们假设 $n$ 小于 $m$。奇异值和右奇异向量是从 Gramian 矩阵 $A^T A$ 的特征值和特征向量推导出来的。存储左奇异向量 $U$ 的矩阵是通过矩阵乘法计算的,即 $U = A (V S^{-1})$,如果用户通过 computeU 参数请求。实际使用的算法会根据计算成本自动确定。
- 如果 $n$ 很小 ($n < 100$) 或 $k$ 相对于 $n$ 很大 ($k > n / 2$),我们首先计算 Gramian 矩阵,然后在驱动程序上本地计算其前几个特征值和特征向量。这需要一次传递,每个执行器和驱动程序上的存储空间为 $O(n^2)$,驱动程序上的时间为 $O(n^2 k)$。
- 否则,我们以分布式方式计算 $(A^T A) v$,并将其发送到 ARPACK,以在驱动程序节点上计算 $(A^T A)$ 的前几个特征值和特征向量。这需要 $O(k)$ 次传递,每个执行器上的存储空间为 $O(n)$,驱动程序上的存储空间为 $O(n k)$。
SVD 示例
spark.mllib
为 RowMatrix 类提供的面向行的矩阵提供了 SVD 功能。
有关 API 的详细信息,请参阅 SingularValueDecomposition
Python 文档。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 5 singular values and corresponding singular vectors.
svd = mat.computeSVD(5, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.
如果 U
被定义为 IndexedRowMatrix
,则相同的代码适用于 IndexedRowMatrix
。
有关 API 的详细信息,请参阅 SingularValueDecomposition
Scala 文档。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(data)
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 5 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(5, computeU = true)
val U: RowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.
如果 U
被定义为 IndexedRowMatrix
,则相同的代码适用于 IndexedRowMatrix
。
有关 API 的详细信息,请参阅 SingularValueDecomposition
Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.SingularValueDecomposition;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 5 singular values and corresponding singular vectors.
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
RowMatrix U = svd.U(); // The U factor is a RowMatrix.
Vector s = svd.s(); // The singular values are stored in a local dense vector.
Matrix V = svd.V(); // The V factor is a local dense matrix.
如果 U
被定义为 IndexedRowMatrix
,则相同的代码适用于 IndexedRowMatrix
。
主成分分析 (PCA)
主成分分析 (PCA) 是一种统计方法,用于找到一个旋转,使得第一个坐标具有最大的可能方差,并且每个后续坐标依次具有最大的可能方差。旋转矩阵的列称为主成分。PCA 广泛用于降维。
spark.mllib
支持对以面向行格式存储的高瘦矩阵和任何向量进行 PCA。
以下代码演示了如何在 RowMatrix
上计算主成分,并使用它们将向量投影到低维空间。
有关 API 的详细信息,请参阅 RowMatrix
Python 文档。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 4 principal components.
# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(4)
# Project the rows to the linear space spanned by the top 4 principal components.
projected = mat.multiply(pc)
以下代码演示了如何在 RowMatrix
上计算主成分,并使用它们将向量投影到低维空间。
有关 API 的详细信息,请参阅 RowMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(data)
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
val pc: Matrix = mat.computePrincipalComponents(4)
// Project the rows to the linear space spanned by the top 4 principal components.
val projected: RowMatrix = mat.multiply(pc)
以下代码演示了如何在源向量上计算主成分,并使用它们将向量投影到低维空间,同时保留关联的标签
有关 API 的详细信息,请参阅 PCA
Scala 文档。
import org.apache.spark.mllib.feature.PCA
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
val data: RDD[LabeledPoint] = sc.parallelize(Seq(
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 1)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 1, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0)),
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0))))
// Compute the top 5 principal components.
val pca = new PCA(5).fit(data.map(_.features))
// Project vectors to the linear space spanned by the top 5 principal
// components, keeping the label
val projected = data.map(p => p.copy(features = pca.transform(p.features)))
以下代码演示了如何在 RowMatrix
上计算主成分,并使用它们将向量投影到低维空间。
有关 API 的详细信息,请参阅 RowMatrix
Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
Matrix pc = mat.computePrincipalComponents(4);
// Project the rows to the linear space spanned by the top 4 principal components.
RowMatrix projected = mat.multiply(pc);