数据类型 - 基于 RDD 的 API
MLlib 支持存储在单台机器上的本地向量和矩阵,以及由一个或多个 RDD 支持的分布式矩阵。本地向量和本地矩阵是充当公共接口的简单数据模型。底层线性代数运算由 Breeze 提供。MLlib 中将监督学习中使用的训练样本称为“标记点”。
本地向量
本地向量具有整数类型的基于 0 的索引和双精度类型的数值,存储在单台机器上。MLlib 支持两种类型的本地向量:密集向量和稀疏向量。密集向量由表示其条目值的双精度数组支持,而稀疏向量由两个并行数组支持:索引和数值。例如,向量 (1.0, 0.0, 3.0)
可以用密集格式表示为 [1.0, 0.0, 3.0]
,或者用稀疏格式表示为 (3, [0, 2], [1.0, 3.0])
,其中 3
是向量的长度。
MLlib 将以下类型识别为密集向量
- NumPy 的
array
- Python 的列表,例如
[1, 2, 3]
并将以下类型识别为稀疏向量
- MLlib 的
SparseVector
。 - SciPy 的
csc_matrix
,只有一列
为了提高效率,我们建议使用 NumPy 数组而不是列表,并使用 Vectors
中实现的工厂方法来创建稀疏向量。
有关 API 的更多详细信息,请参阅 Vectors
Python 文档。
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors
# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))
本地向量的基类是 Vector
,我们提供了两种实现:DenseVector
和 SparseVector
。我们建议使用 Vectors
中实现的工厂方法来创建本地向量。
有关 API 的详细信息,请参阅 Vector
Scala 文档 和 Vectors
Scala 文档。
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
注意: Scala 默认导入 scala.collection.immutable.Vector
,因此您必须显式导入 org.apache.spark.mllib.linalg.Vector
才能使用 MLlib 的 Vector
。
本地向量的基类是 Vector
,我们提供了两种实现:DenseVector
和 SparseVector
。我们建议使用 Vectors
中实现的工厂方法来创建本地向量。
有关 API 的详细信息,请参阅 Vector
Java 文档 和 Vectors
Java 文档。
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Create a dense vector (1.0, 0.0, 3.0).
Vector dv = Vectors.dense(1.0, 0.0, 3.0);
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0});
标记点
标记点是与标签/响应关联的本地向量,可以是密集的也可以是稀疏的。在 MLlib 中,标记点用于监督学习算法。我们使用双精度数来存储标签,因此我们可以在回归和分类中使用标记点。对于二元分类,标签应该是 0
(负)或 1
(正)。对于多类别分类,标签应该是从零开始的类别索引:0, 1, 2, ...
。
标记点由 LabeledPoint
表示。
有关 API 的更多详细信息,请参阅 LabeledPoint
Python 文档。
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])
# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))
标记点由案例类 LabeledPoint
表示。
有关 API 的详细信息,请参阅 LabeledPoint
Scala 文档。
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
标记点由 LabeledPoint
表示。
有关 API 的详细信息,请参阅 LabeledPoint
Java 文档。
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
// Create a labeled point with a positive label and a dense feature vector.
LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));
// Create a labeled point with a negative label and a sparse feature vector.
LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}));
稀疏数据
在实践中,拥有稀疏的训练数据是非常常见的。MLlib 支持读取以 LIBSVM
格式存储的训练样本,这是 LIBSVM
和 LIBLINEAR
使用的默认格式。这是一种文本格式,其中每一行使用以下格式表示一个标记的稀疏特征向量
label index1:value1 index2:value2 ...
其中索引是从 1 开始的,并且按升序排列。加载后,特征索引将转换为从 0 开始。
MLUtils.loadLibSVMFile
读取以 LIBSVM 格式存储的训练样本。
有关 API 的更多详细信息,请参阅 MLUtils
Python 文档。
from pyspark.mllib.util import MLUtils
examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
MLUtils.loadLibSVMFile
读取以 LIBSVM 格式存储的训练样本。
有关 API 的详细信息,请参阅 MLUtils
Scala 文档。
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
MLUtils.loadLibSVMFile
读取以 LIBSVM 格式存储的训练样本。
有关 API 的更多详细信息,请参阅 MLUtils
Java 文档。
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.api.java.JavaRDD;
JavaRDD<LabeledPoint> examples =
MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();
本地矩阵
本地矩阵具有整数类型的行索引和列索引以及双精度类型的数值,存储在单台机器上。MLlib 支持密集矩阵和稀疏矩阵,密集矩阵的条目值存储在按列优先顺序排列的单个双精度数组中,而稀疏矩阵的非零条目值存储在按列优先顺序排列的压缩稀疏列(CSC)格式中。例如,以下密集矩阵 \[ \begin{pmatrix} 1.0 & 2.0 \\ 3.0 & 4.0 \\ 5.0 & 6.0 \end{pmatrix} \]
存储在一个一维数组 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0]
中,矩阵大小为 (3, 2)
。
本地矩阵的基类是 Matrix
,我们提供了两种实现:DenseMatrix
和 SparseMatrix
。我们建议使用 Matrices
中实现的工厂方法来创建本地矩阵。请记住,MLlib 中的本地矩阵以列优先顺序存储。
有关 API 的更多详细信息,请参阅 Matrix
Python 文档 和 Matrices
Python 文档。
from pyspark.mllib.linalg import Matrix, Matrices
# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])
# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
本地矩阵的基类是 Matrix
,我们提供了两种实现:DenseMatrix
和 SparseMatrix
。我们建议使用 Matrices
中实现的工厂方法来创建本地矩阵。请记住,MLlib 中的本地矩阵以列优先顺序存储。
有关 API 的详细信息,请参阅 Matrix
Scala 文档 和 Matrices
Scala 文档。
import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
本地矩阵的基类是 Matrix
,我们提供了两种实现:DenseMatrix
和 SparseMatrix
。我们建议使用 Matrices
中实现的工厂方法来创建本地矩阵。请记住,MLlib 中的本地矩阵以列优先顺序存储。
有关 API 的详细信息,请参阅 Matrix
Java 文档 和 Matrices
Java 文档。
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Matrices;
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
Matrix dm = Matrices.dense(3, 2, new double[] {1.0, 3.0, 5.0, 2.0, 4.0, 6.0});
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
Matrix sm = Matrices.sparse(3, 2, new int[] {0, 1, 3}, new int[] {0, 2, 1}, new double[] {9, 6, 8});
分布式矩阵
分布式矩阵具有长整型行索引和列索引以及双精度浮点型值,分布式存储在一个或多个 RDD 中。选择正确的格式来存储大型分布式矩阵非常重要。将分布式矩阵转换为不同的格式可能需要全局混洗,这是非常昂贵的操作。目前已经实现了四种类型的分布式矩阵。
基本类型称为 RowMatrix
。RowMatrix
是一个面向行的分布式矩阵,没有有意义的行索引,例如,特征向量集合。它由其行的 RDD 支持,其中每一行都是一个本地向量。我们假设 RowMatrix
的列数不是很大,以便单个本地向量可以合理地传送到驱动程序,并且可以使用单个节点存储/操作。 IndexedRowMatrix
类似于 RowMatrix
,但具有行索引,可用于标识行和执行连接。 CoordinateMatrix
是以 坐标列表 (COO) 格式存储的分布式矩阵,由其条目的 RDD 支持。 BlockMatrix
是由 MatrixBlock
的 RDD 支持的分布式矩阵,MatrixBlock
是 (Int, Int, Matrix)
的元组。
注意
分布式矩阵的基础 RDD 必须是确定性的,因为我们缓存了矩阵大小。通常,使用非确定性 RDD 会导致错误。
RowMatrix
RowMatrix
是一个面向行的分布式矩阵,没有有意义的行索引,由其行的 RDD 支持,其中每一行都是一个本地向量。由于每一行都由一个本地向量表示,因此列数受整数范围的限制,但在实践中应该小得多。
可以从向量的 RDD
创建 RowMatrix
。
有关 API 的更多详细信息,请参阅 RowMatrix
Python 文档。
from pyspark.mllib.linalg.distributed import RowMatrix
# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])
# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows
可以从 RDD[Vector]
实例创建 RowMatrix
。然后我们可以计算其列汇总统计信息和分解。 QR 分解 形式为 A = QR,其中 Q 是正交矩阵,R 是上三角矩阵。有关 奇异值分解 (SVD) 和 主成分分析 (PCA),请参阅 降维。
有关 API 的详细信息,请参阅 RowMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// QR decomposition
val qrResult = mat.tallSkinnyQR(true)
可以从 JavaRDD<Vector>
实例创建 RowMatrix
。然后我们可以计算其列汇总统计信息。
有关 API 的详细信息,请参阅 RowMatrix
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
JavaRDD<Vector> rows = ... // a JavaRDD of local vectors
// Create a RowMatrix from a JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Get its size.
long m = mat.numRows();
long n = mat.numCols();
// QR decomposition
QRDecomposition<RowMatrix, Matrix> result = mat.tallSkinnyQR(true);
IndexedRowMatrix
IndexedRowMatrix
类似于 RowMatrix
,但具有有意义的行索引。它由索引行的 RDD 支持,因此每一行都由其索引(长整型)和本地向量表示。
可以从 IndexedRow
的 RDD
创建 IndexedRowMatrix
,其中 IndexedRow
是 (long, vector)
的包装器。可以通过删除其行索引将 IndexedRowMatrix
转换为 RowMatrix
。
有关 API 的更多详细信息,请参阅 IndexedRowMatrix
Python 文档。
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
# Create an RDD of indexed rows.
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])
# Create an IndexedRowMatrix from an RDD of IndexedRows.
mat = IndexedRowMatrix(indexedRows)
# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3
# Get the rows as an RDD of IndexedRows.
rowsRDD = mat.rows
# Convert to a RowMatrix by dropping the row indices.
rowMat = mat.toRowMatrix()
可以从 RDD[IndexedRow]
实例创建 IndexedRowMatrix
,其中 IndexedRow
是 (Long, Vector)
的包装器。可以通过删除其行索引将 IndexedRowMatrix
转换为 RowMatrix
。
有关 API 的详细信息,请参阅 IndexedRowMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()
可以从 JavaRDD<IndexedRow>
实例创建 IndexedRowMatrix
,其中 IndexedRow
是 (long, Vector)
的包装器。可以通过删除其行索引将 IndexedRowMatrix
转换为 RowMatrix
。
有关 API 的详细信息,请参阅 IndexedRowMatrix
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.IndexedRow;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
JavaRDD<IndexedRow> rows = ... // a JavaRDD of indexed rows
// Create an IndexedRowMatrix from a JavaRDD<IndexedRow>.
IndexedRowMatrix mat = new IndexedRowMatrix(rows.rdd());
// Get its size.
long m = mat.numRows();
long n = mat.numCols();
// Drop its row indices.
RowMatrix rowMat = mat.toRowMatrix();
CoordinateMatrix
CoordinateMatrix
是由其条目的 RDD 支持的分布式矩阵。每个条目都是一个 (i: Long, j: Long, value: Double)
元组,其中 i
是行索引,j
是列索引,value
是条目值。仅当矩阵的两个维度都很大且矩阵非常稀疏时,才应使用 CoordinateMatrix
。
可以从 MatrixEntry
条目的 RDD
创建 CoordinateMatrix
,其中 MatrixEntry
是 (long, long, float)
的包装器。可以通过调用 toRowMatrix
将 CoordinateMatrix
转换为 RowMatrix
,或者通过调用 toIndexedRowMatrix
将其转换为具有稀疏行的 IndexedRowMatrix
。
有关 API 的更多详细信息,请参阅 CoordinateMatrix
Python 文档。
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
# Create an RDD of coordinate entries.
# - This can be done explicitly with the MatrixEntry class:
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)])
# - or using (long, long, float) tuples:
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])
# Create a CoordinateMatrix from an RDD of MatrixEntries.
mat = CoordinateMatrix(entries)
# Get its size.
m = mat.numRows() # 3
n = mat.numCols() # 2
# Get the entries as an RDD of MatrixEntries.
entriesRDD = mat.entries
# Convert to a RowMatrix.
rowMat = mat.toRowMatrix()
# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
# Convert to a BlockMatrix.
blockMat = mat.toBlockMatrix()
可以从 RDD[MatrixEntry]
实例创建 CoordinateMatrix
,其中 MatrixEntry
是 (Long, Long, Double)
的包装器。可以通过调用 toIndexedRowMatrix
将 CoordinateMatrix
转换为具有稀疏行的 IndexedRowMatrix
。当前不支持 CoordinateMatrix
的其他计算。
有关 API 的详细信息,请参阅 CoordinateMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()
可以从 JavaRDD<MatrixEntry>
实例创建 CoordinateMatrix
,其中 MatrixEntry
是 (long, long, double)
的包装器。可以通过调用 toIndexedRowMatrix
将 CoordinateMatrix
转换为具有稀疏行的 IndexedRowMatrix
。当前不支持 CoordinateMatrix
的其他计算。
有关 API 的详细信息,请参阅 CoordinateMatrix
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
JavaRDD<MatrixEntry> entries = ... // a JavaRDD of matrix entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix mat = new CoordinateMatrix(entries.rdd());
// Get its size.
long m = mat.numRows();
long n = mat.numCols();
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();
BlockMatrix
BlockMatrix
是由 MatrixBlock
的 RDD 支持的分布式矩阵,其中 MatrixBlock
是 ((Int, Int), Matrix)
的元组,其中 (Int, Int)
是块的索引,Matrix
是给定索引处的子矩阵,大小为 rowsPerBlock
x colsPerBlock
。 BlockMatrix
支持与另一个 BlockMatrix
进行 add
和 multiply
等方法。 BlockMatrix
还有一个辅助函数 validate
,可用于检查 BlockMatrix
是否设置正确。
可以从子矩阵块的 RDD
创建 BlockMatrix
,其中子矩阵块是 ((blockRowIndex, blockColIndex), sub-matrix)
元组。
有关 API 的更多详细信息,请参阅 BlockMatrix
Python 文档。
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)
# Get its size.
m = mat.numRows() # 6
n = mat.numCols() # 2
# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks
# Convert to a LocalMatrix.
localMat = mat.toLocalMatrix()
# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()
最简单的创建 BlockMatrix
的方法是从 IndexedRowMatrix
或 CoordinateMatrix
调用 toBlockMatrix
。 toBlockMatrix
默认创建大小为 1024 x 1024 的块。用户可以通过 toBlockMatrix(rowsPerBlock, colsPerBlock)
提供值来更改块大小。
有关 API 的详细信息,请参阅 BlockMatrix
Scala 文档。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()
// Calculate A^T A.
val ata = matA.transpose.multiply(matA)
可以通过调用 toBlockMatrix
方法,最容易地从 IndexedRowMatrix
或 CoordinateMatrix
创建 BlockMatrix
。默认情况下,toBlockMatrix
创建大小为 1024 x 1024 的块。用户可以通过 toBlockMatrix(rowsPerBlock, colsPerBlock)
提供值来更改块大小。
有关 API 的详细信息,请参阅 BlockMatrix
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
// Transform the CoordinateMatrix to a BlockMatrix
BlockMatrix matA = coordMat.toBlockMatrix().cache();
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate();
// Calculate A^T A.
BlockMatrix ata = matA.transpose().multiply(matA);