数据类型 - 基于 RDD 的 API
MLlib 支持存储在单机上的局部向量和矩阵,以及由一个或多个 RDD 支持的分布式矩阵。局部向量和局部矩阵是作为公共接口的简单数据模型。底层的线性代数运算由 Breeze 提供。在监督学习中使用的训练样本在 MLlib 中被称为“标签点”。
局部向量
局部向量具有整数类型的 0-based 索引和双精度浮点类型的值,存储在单机上。MLlib 支持两种类型的局部向量:稠密向量(dense)和稀疏向量(sparse)。稠密向量由表示其条目值的双精度数组支持,而稀疏向量由两个并行数组(索引和值)支持。例如,向量 (1.0, 0.0, 3.0)
可以用稠密格式表示为 [1.0, 0.0, 3.0]
,或者用稀疏格式表示为 (3, [0, 2], [1.0, 3.0])
,其中 3
是向量的大小。
MLlib 将以下类型识别为稠密向量
- NumPy 的
array
- Python 列表,例如
[1, 2, 3]
并将以下类型识别为稀疏向量
- MLlib 的
SparseVector
。 - SciPy 的单列
csc_matrix
为了效率,我们建议使用 NumPy 数组而不是列表,并使用 Vectors
中实现的工厂方法来创建稀疏向量。
有关 API 的更多详细信息,请参阅 Vectors
Python 文档。
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors
# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))
局部向量的基类是 Vector
,我们提供了两种实现:DenseVector
和 SparseVector
。我们建议使用 Vectors
中实现的工厂方法来创建局部向量。
有关 API 的详细信息,请参阅 Vector
Scala 文档和 Vectors
Scala 文档。
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
注意: Scala 默认导入 scala.collection.immutable.Vector
,因此您必须显式导入 org.apache.spark.mllib.linalg.Vector
才能使用 MLlib 的 Vector
。
局部向量的基类是 Vector
,我们提供了两种实现:DenseVector
和 SparseVector
。我们建议使用 Vectors
中实现的工厂方法来创建局部向量。
有关 API 的详细信息,请参阅 Vector
Java 文档和 Vectors
Java 文档。
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Create a dense vector (1.0, 0.0, 3.0).
Vector dv = Vectors.dense(1.0, 0.0, 3.0);
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0});
标签点
标签点是局部向量,可以是稠密或稀疏的,与标签/响应相关联。在 MLlib 中,标签点用于监督学习算法。我们使用双精度浮点数存储标签,因此可以在回归和分类中都使用标签点。对于二元分类,标签应为 0
(负类)或 1
(正类)。对于多类分类,标签应为从零开始的类索引:0, 1, 2, ...
。
标签点由 LabeledPoint
表示。
有关 API 的更多详细信息,请参阅 LabeledPoint
Python 文档。
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
标签点由案例类 LabeledPoint
表示。
有关 API 的详细信息,请参阅 LabeledPoint
Scala 文档。
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
标签点由 LabeledPoint
表示。
有关 API 的详细信息,请参阅 LabeledPoint
Java 文档。
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
// Create a labeled point with a positive label and a dense feature vector.
LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));
// Create a labeled point with a negative label and a sparse feature vector.
LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}));
稀疏数据
在实践中,拥有稀疏训练数据非常常见。MLlib 支持读取以 LIBSVM
格式存储的训练样本,该格式是 LIBSVM
和 LIBLINEAR
使用的默认格式。它是一种文本格式,其中每一行使用以下格式表示一个带有标签的稀疏特征向量
label index1:value1 index2:value2 ...
其中索引是基于一的且按升序排列。加载后,特征索引将转换为基于零的。
MLUtils.loadLibSVMFile
读取以 LIBSVM 格式存储的训练样本。
有关 API 的更多详细信息,请参阅 MLUtils
Python 文档。
from pyspark.mllib.util import MLUtils
examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
MLUtils.loadLibSVMFile
读取以 LIBSVM 格式存储的训练样本。
有关 API 的详细信息,请参阅 MLUtils
Scala 文档。
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
MLUtils.loadLibSVMFile
读取以 LIBSVM 格式存储的训练样本。
有关 API 的详细信息,请参阅 MLUtils
Java 文档。
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.api.java.JavaRDD;
JavaRDD<LabeledPoint> examples =
MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();
局部矩阵
局部矩阵具有整数类型的行和列索引以及双精度浮点类型的值,存储在单机上。MLlib 支持稠密矩阵和稀疏矩阵,其中稠密矩阵的条目值以列优先顺序存储在单个双精度数组中,稀疏矩阵的非零条目值以列优先顺序存储在压缩稀疏列(CSC)格式中。例如,以下稠密矩阵 \[ \begin{pmatrix} 1.0 & 2.0 \\ 3.0 & 4.0 \\ 5.0 & 6.0 \end{pmatrix} \]
以一维数组 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0]
形式存储,矩阵大小为 (3, 2)
。
局部矩阵的基类是 Matrix
,我们提供了两种实现:DenseMatrix
和 SparseMatrix
。我们建议使用 Matrices
中实现的工厂方法来创建局部矩阵。请记住,MLlib 中的局部矩阵以列优先顺序存储。
有关 API 的更多详细信息,请参阅 Matrix
Python 文档和 Matrices
Python 文档。
from pyspark.mllib.linalg import Matrix, Matrices
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
局部矩阵的基类是 Matrix
,我们提供了两种实现:DenseMatrix
和 SparseMatrix
。我们建议使用 Matrices
中实现的工厂方法来创建局部矩阵。请记住,MLlib 中的局部矩阵以列优先顺序存储。
有关 API 的详细信息,请参阅 Matrix
Scala 文档和 Matrices
Scala 文档。
import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
局部矩阵的基类是 Matrix
,我们提供了两种实现:DenseMatrix
和 SparseMatrix
。我们建议使用 Matrices
中实现的工厂方法来创建局部矩阵。请记住,MLlib 中的局部矩阵以列优先顺序存储。
有关 API 的详细信息,请参阅 Matrix
Java 文档和 Matrices
Java 文档。
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Matrices;
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
Matrix dm = Matrices.dense(3, 2, new double[] {1.0, 3.0, 5.0, 2.0, 4.0, 6.0});
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
Matrix sm = Matrices.sparse(3, 2, new int[] {0, 1, 3}, new int[] {0, 2, 1}, new double[] {9, 6, 8});
分布式矩阵
分布式矩阵具有长整型行和列索引以及双精度浮点类型的值,分布式存储在一个或多个 RDD 中。选择正确的格式来存储大型分布式矩阵非常重要。将分布式矩阵转换为不同格式可能需要进行全局混洗,这非常耗费资源。目前已实现了四种类型的分布式矩阵。
基本类型称为 RowMatrix
。RowMatrix
是一个面向行的分布式矩阵,没有有意义的行索引,例如,特征向量的集合。它由其行的 RDD 支持,其中每一行都是一个局部向量。我们假定 RowMatrix
的列数不大,以便单个局部向量可以合理地传输到驱动程序,并且也可以在单个节点上存储/操作。IndexedRowMatrix
类似于 RowMatrix
,但带有行索引,可用于标识行和执行连接。CoordinateMatrix
是一种分布式矩阵,以 坐标列表(COO) 格式存储,由其条目的 RDD 支持。BlockMatrix
是一个由 MatrixBlock
RDD 支持的分布式矩阵,其中 MatrixBlock
是一个 (Int, Int, Matrix)
的元组。
注意
分布式矩阵的底层 RDD 必须是确定性的,因为我们会缓存矩阵大小。通常,使用非确定性 RDD 可能会导致错误。
行矩阵 (RowMatrix)
一个 RowMatrix
是一个面向行的分布式矩阵,没有有意义的行索引,由其行的 RDD 支持,其中每一行都是一个局部向量。由于每行都由一个局部向量表示,因此列数受整数范围限制,但在实践中应该远小于此。
一个 RowMatrix
可以从向量的 RDD
创建。
有关 API 的更多详细信息,请参阅 RowMatrix
Python 文档。
from pyspark.mllib.linalg.distributed import RowMatrix
# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows
一个 RowMatrix
可以从 RDD[Vector]
实例创建。然后我们可以计算其列的汇总统计量和分解。QR 分解 的形式为 A = QR,其中 Q 是一个正交矩阵,R 是一个上三角矩阵。关于 奇异值分解 (SVD) 和 主成分分析 (PCA),请参阅 降维。
有关 API 的详细信息,请参阅 RowMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// QR decomposition
val qrResult = mat.tallSkinnyQR(true)
一个 RowMatrix
可以从 JavaRDD<Vector>
实例创建。然后我们可以计算其列的汇总统计量。
有关 API 的详细信息,请参阅 RowMatrix
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
JavaRDD<Vector> rows = ... // a JavaRDD of local vectors
// Create a RowMatrix from a JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Get its size.
long m = mat.numRows();
long n = mat.numCols();
// QR decomposition
QRDecomposition<RowMatrix, Matrix> result = mat.tallSkinnyQR(true);
带索引的行矩阵 (IndexedRowMatrix)
一个 IndexedRowMatrix
类似于 RowMatrix
,但具有有意义的行索引。它由带索引的行的 RDD 支持,因此每行都由其索引(长整型)和一个局部向量表示。
一个 IndexedRowMatrix
可以从 IndexedRow
的 RDD
创建,其中 IndexedRow
是 (long, vector)
的包装器。IndexedRowMatrix
可以通过删除其行索引转换为 RowMatrix
。
有关 API 的更多详细信息,请参阅 IndexedRowMatrix
Python 文档。
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# Create an RDD of indexed rows.
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])
# Create an IndexedRowMatrix from an RDD of IndexedRows.
mat = IndexedRowMatrix(indexedRows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
# Get the rows as an RDD of IndexedRows.
rowsRDD = mat.rows
# Convert to a RowMatrix by dropping the row indices.
rowMat = mat.toRowMatrix()
一个 IndexedRowMatrix
可以从 RDD[IndexedRow]
实例创建,其中 IndexedRow
是 (Long, Vector)
的包装器。IndexedRowMatrix
可以通过删除其行索引转换为 RowMatrix
。
有关 API 的详细信息,请参阅 IndexedRowMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()
一个 IndexedRowMatrix
可以从 JavaRDD<IndexedRow>
实例创建,其中 IndexedRow
是 (long, Vector)
的包装器。IndexedRowMatrix
可以通过删除其行索引转换为 RowMatrix
。
有关 API 的详细信息,请参阅 IndexedRowMatrix
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.IndexedRow;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
JavaRDD<IndexedRow> rows = ... // a JavaRDD of indexed rows
// Create an IndexedRowMatrix from a JavaRDD<IndexedRow>.
IndexedRowMatrix mat = new IndexedRowMatrix(rows.rdd());
// Get its size.
long m = mat.numRows();
long n = mat.numCols();
// Drop its row indices.
RowMatrix rowMat = mat.toRowMatrix();
坐标矩阵 (CoordinateMatrix)
一个 CoordinateMatrix
是一个由其条目的 RDD 支持的分布式矩阵。每个条目是一个 (i: Long, j: Long, value: Double)
的元组,其中 i
是行索引,j
是列索引,value
是条目值。CoordinateMatrix
仅应在矩阵的两个维度都非常大且矩阵非常稀疏时使用。
一个 CoordinateMatrix
可以从 MatrixEntry
条目的 RDD
创建,其中 MatrixEntry
是 (long, long, float)
的包装器。CoordinateMatrix
可以通过调用 toRowMatrix
转换为 RowMatrix
,或者通过调用 toIndexedRowMatrix
转换为具有稀疏行的 IndexedRowMatrix
。
有关 API 的更多详细信息,请参阅 CoordinateMatrix
Python 文档。
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
# Create an RDD of coordinate entries.
# - This can be done explicitly with the MatrixEntry class:
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)])
# - or using (long, long, float) tuples:
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])
# Create a CoordinateMatrix from an RDD of MatrixEntries.
mat = CoordinateMatrix(entries)
# Get its size.
m = mat.numRows() # 3
n = mat.numCols() # 2
# Get the entries as an RDD of MatrixEntries.
entriesRDD = mat.entries
# Convert to a RowMatrix.
rowMat = mat.toRowMatrix()
# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
# Convert to a BlockMatrix.
blockMat = mat.toBlockMatrix()
一个 CoordinateMatrix
可以从 RDD[MatrixEntry]
实例创建,其中 MatrixEntry
是 (Long, Long, Double)
的包装器。CoordinateMatrix
可以通过调用 toIndexedRowMatrix
转换为具有稀疏行的 IndexedRowMatrix
。目前不支持 CoordinateMatrix
的其他计算。
有关 API 的详细信息,请参阅 CoordinateMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()
一个 CoordinateMatrix
可以从 JavaRDD<MatrixEntry>
实例创建,其中 MatrixEntry
是 (long, long, double)
的包装器。CoordinateMatrix
可以通过调用 toIndexedRowMatrix
转换为具有稀疏行的 IndexedRowMatrix
。目前不支持 CoordinateMatrix
的其他计算。
有关 API 的详细信息,请参阅 CoordinateMatrix
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
JavaRDD<MatrixEntry> entries = ... // a JavaRDD of matrix entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix mat = new CoordinateMatrix(entries.rdd());
// Get its size.
long m = mat.numRows();
long n = mat.numCols();
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();
块矩阵 (BlockMatrix)
一个 BlockMatrix
是一个由 MatrixBlock
RDD 支持的分布式矩阵,其中 MatrixBlock
是一个 ((Int, Int), Matrix)
的元组,其中 (Int, Int)
是块的索引,Matrix
是给定索引处的子矩阵,大小为 rowsPerBlock
x colsPerBlock
。BlockMatrix
支持诸如与另一个 BlockMatrix
进行 add
和 multiply
等方法。BlockMatrix
还带有一个辅助函数 validate
,可用于检查 BlockMatrix
是否正确设置。
一个 BlockMatrix
可以从子矩阵块的 RDD
创建,其中子矩阵块是一个 ((blockRowIndex, blockColIndex), sub-matrix)
的元组。
有关 API 的更多详细信息,请参阅 BlockMatrix
Python 文档。
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)
# Get its size.
m = mat.numRows() # 6
n = mat.numCols() # 2
# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks
# Convert to a LocalMatrix.
localMat = mat.toLocalMatrix()
# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()
一个 BlockMatrix
最容易通过调用 toBlockMatrix
从 IndexedRowMatrix
或 CoordinateMatrix
创建。toBlockMatrix
默认创建大小为 1024 x 1024 的块。用户可以通过 toBlockMatrix(rowsPerBlock, colsPerBlock)
提供值来更改块大小。
有关 API 的详细信息,请参阅 BlockMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()
// Calculate A^T A.
val ata = matA.transpose.multiply(matA)
一个 BlockMatrix
最容易通过调用 toBlockMatrix
从 IndexedRowMatrix
或 CoordinateMatrix
创建。toBlockMatrix
默认创建大小为 1024 x 1024 的块。用户可以通过 toBlockMatrix(rowsPerBlock, colsPerBlock)
提供值来更改块大小。
有关 API 的详细信息,请参阅 BlockMatrix
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
// Transform the CoordinateMatrix to a BlockMatrix
BlockMatrix matA = coordMat.toBlockMatrix().cache();
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate();
// Calculate A^T A.
BlockMatrix ata = matA.transpose().multiply(matA);