数据类型 - 基于 RDD 的 API

MLlib 支持存储在单机上的局部向量和矩阵,以及由一个或多个 RDD 支持的分布式矩阵。局部向量和局部矩阵是作为公共接口的简单数据模型。底层的线性代数运算由 Breeze 提供。在监督学习中使用的训练样本在 MLlib 中被称为“标签点”。

局部向量

局部向量具有整数类型的 0-based 索引和双精度浮点类型的值,存储在单机上。MLlib 支持两种类型的局部向量:稠密向量(dense)和稀疏向量(sparse)。稠密向量由表示其条目值的双精度数组支持,而稀疏向量由两个并行数组(索引和值)支持。例如,向量 (1.0, 0.0, 3.0) 可以用稠密格式表示为 [1.0, 0.0, 3.0],或者用稀疏格式表示为 (3, [0, 2], [1.0, 3.0]),其中 3 是向量的大小。

MLlib 将以下类型识别为稠密向量

  • NumPy 的 array
  • Python 列表,例如 [1, 2, 3]

并将以下类型识别为稀疏向量

为了效率,我们建议使用 NumPy 数组而不是列表,并使用 Vectors 中实现的工厂方法来创建稀疏向量。

有关 API 的更多详细信息,请参阅 Vectors Python 文档

import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))

局部向量的基类是 Vector,我们提供了两种实现:DenseVectorSparseVector。我们建议使用 Vectors 中实现的工厂方法来创建局部向量。

有关 API 的详细信息,请参阅 Vector Scala 文档Vectors Scala 文档

import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

注意: Scala 默认导入 scala.collection.immutable.Vector,因此您必须显式导入 org.apache.spark.mllib.linalg.Vector 才能使用 MLlib 的 Vector

局部向量的基类是 Vector,我们提供了两种实现:DenseVectorSparseVector。我们建议使用 Vectors 中实现的工厂方法来创建局部向量。

有关 API 的详细信息,请参阅 Vector Java 文档Vectors Java 文档

import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Create a dense vector (1.0, 0.0, 3.0).
Vector dv = Vectors.dense(1.0, 0.0, 3.0);
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0});

标签点

标签点是局部向量,可以是稠密或稀疏的,与标签/响应相关联。在 MLlib 中,标签点用于监督学习算法。我们使用双精度浮点数存储标签,因此可以在回归和分类中都使用标签点。对于二元分类,标签应为 0(负类)或 1(正类)。对于多类分类,标签应为从零开始的类索引:0, 1, 2, ...

标签点由 LabeledPoint 表示。

有关 API 的更多详细信息,请参阅 LabeledPoint Python 文档

from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))

标签点由案例类 LabeledPoint 表示。

有关 API 的详细信息,请参阅 LabeledPoint Scala 文档

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

标签点由 LabeledPoint 表示。

有关 API 的详细信息,请参阅 LabeledPoint Java 文档

import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;

// Create a labeled point with a positive label and a dense feature vector.
LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));

// Create a labeled point with a negative label and a sparse feature vector.
LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}));

稀疏数据

在实践中,拥有稀疏训练数据非常常见。MLlib 支持读取以 LIBSVM 格式存储的训练样本,该格式是 LIBSVMLIBLINEAR 使用的默认格式。它是一种文本格式,其中每一行使用以下格式表示一个带有标签的稀疏特征向量

label index1:value1 index2:value2 ...

其中索引是基于一的且按升序排列。加载后,特征索引将转换为基于零的。

MLUtils.loadLibSVMFile 读取以 LIBSVM 格式存储的训练样本。

有关 API 的更多详细信息,请参阅 MLUtils Python 文档

from pyspark.mllib.util import MLUtils

examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

MLUtils.loadLibSVMFile 读取以 LIBSVM 格式存储的训练样本。

有关 API 的详细信息,请参阅 MLUtils Scala 文档

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

MLUtils.loadLibSVMFile 读取以 LIBSVM 格式存储的训练样本。

有关 API 的详细信息,请参阅 MLUtils Java 文档

import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.api.java.JavaRDD;

JavaRDD<LabeledPoint> examples = 
  MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();

局部矩阵

局部矩阵具有整数类型的行和列索引以及双精度浮点类型的值,存储在单机上。MLlib 支持稠密矩阵和稀疏矩阵,其中稠密矩阵的条目值以列优先顺序存储在单个双精度数组中,稀疏矩阵的非零条目值以列优先顺序存储在压缩稀疏列(CSC)格式中。例如,以下稠密矩阵 \[ \begin{pmatrix} 1.0 & 2.0 \\ 3.0 & 4.0 \\ 5.0 & 6.0 \end{pmatrix} \] 以一维数组 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 形式存储,矩阵大小为 (3, 2)

局部矩阵的基类是 Matrix,我们提供了两种实现:DenseMatrixSparseMatrix。我们建议使用 Matrices 中实现的工厂方法来创建局部矩阵。请记住,MLlib 中的局部矩阵以列优先顺序存储。

有关 API 的更多详细信息,请参阅 Matrix Python 文档Matrices Python 文档

from pyspark.mllib.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])

局部矩阵的基类是 Matrix,我们提供了两种实现:DenseMatrixSparseMatrix。我们建议使用 Matrices 中实现的工厂方法来创建局部矩阵。请记住,MLlib 中的局部矩阵以列优先顺序存储。

有关 API 的详细信息,请参阅 Matrix Scala 文档Matrices Scala 文档

import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

局部矩阵的基类是 Matrix,我们提供了两种实现:DenseMatrixSparseMatrix。我们建议使用 Matrices 中实现的工厂方法来创建局部矩阵。请记住,MLlib 中的局部矩阵以列优先顺序存储。

有关 API 的详细信息,请参阅 Matrix Java 文档Matrices Java 文档

import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Matrices;

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
Matrix dm = Matrices.dense(3, 2, new double[] {1.0, 3.0, 5.0, 2.0, 4.0, 6.0});

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
Matrix sm = Matrices.sparse(3, 2, new int[] {0, 1, 3}, new int[] {0, 2, 1}, new double[] {9, 6, 8});

分布式矩阵

分布式矩阵具有长整型行和列索引以及双精度浮点类型的值,分布式存储在一个或多个 RDD 中。选择正确的格式来存储大型分布式矩阵非常重要。将分布式矩阵转换为不同格式可能需要进行全局混洗,这非常耗费资源。目前已实现了四种类型的分布式矩阵。

基本类型称为 RowMatrixRowMatrix 是一个面向行的分布式矩阵,没有有意义的行索引,例如,特征向量的集合。它由其行的 RDD 支持,其中每一行都是一个局部向量。我们假定 RowMatrix 的列数不大,以便单个局部向量可以合理地传输到驱动程序,并且也可以在单个节点上存储/操作。IndexedRowMatrix 类似于 RowMatrix,但带有行索引,可用于标识行和执行连接。CoordinateMatrix 是一种分布式矩阵,以 坐标列表(COO) 格式存储,由其条目的 RDD 支持。BlockMatrix 是一个由 MatrixBlock RDD 支持的分布式矩阵,其中 MatrixBlock 是一个 (Int, Int, Matrix) 的元组。

注意

分布式矩阵的底层 RDD 必须是确定性的,因为我们会缓存矩阵大小。通常,使用非确定性 RDD 可能会导致错误。

行矩阵 (RowMatrix)

一个 RowMatrix 是一个面向行的分布式矩阵,没有有意义的行索引,由其行的 RDD 支持,其中每一行都是一个局部向量。由于每行都由一个局部向量表示,因此列数受整数范围限制,但在实践中应该远小于此。

一个 RowMatrix 可以从向量的 RDD 创建。

有关 API 的更多详细信息,请参阅 RowMatrix Python 文档

from pyspark.mllib.linalg.distributed import RowMatrix

# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows

一个 RowMatrix 可以从 RDD[Vector] 实例创建。然后我们可以计算其列的汇总统计量和分解。QR 分解 的形式为 A = QR,其中 Q 是一个正交矩阵,R 是一个上三角矩阵。关于 奇异值分解 (SVD)主成分分析 (PCA),请参阅 降维

有关 API 的详细信息,请参阅 RowMatrix Scala 文档

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix

val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// QR decomposition 
val qrResult = mat.tallSkinnyQR(true)

一个 RowMatrix 可以从 JavaRDD<Vector> 实例创建。然后我们可以计算其列的汇总统计量。

有关 API 的详细信息,请参阅 RowMatrix Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD<Vector> rows = ... // a JavaRDD of local vectors
// Create a RowMatrix from a JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// QR decomposition 
QRDecomposition<RowMatrix, Matrix> result = mat.tallSkinnyQR(true);

带索引的行矩阵 (IndexedRowMatrix)

一个 IndexedRowMatrix 类似于 RowMatrix,但具有有意义的行索引。它由带索引的行的 RDD 支持,因此每行都由其索引(长整型)和一个局部向量表示。

一个 IndexedRowMatrix 可以从 IndexedRowRDD 创建,其中 IndexedRow(long, vector) 的包装器。IndexedRowMatrix 可以通过删除其行索引转换为 RowMatrix

有关 API 的更多详细信息,请参阅 IndexedRowMatrix Python 文档

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# Create an RDD of indexed rows.
#   - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
                              IndexedRow(1, [4, 5, 6]),
                              IndexedRow(2, [7, 8, 9]),
                              IndexedRow(3, [10, 11, 12])])
#   - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
                              (2, [7, 8, 9]), (3, [10, 11, 12])])

# Create an IndexedRowMatrix from an RDD of IndexedRows.
mat = IndexedRowMatrix(indexedRows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of IndexedRows.
rowsRDD = mat.rows

# Convert to a RowMatrix by dropping the row indices.
rowMat = mat.toRowMatrix()

一个 IndexedRowMatrix 可以从 RDD[IndexedRow] 实例创建,其中 IndexedRow(Long, Vector) 的包装器。IndexedRowMatrix 可以通过删除其行索引转换为 RowMatrix

有关 API 的详细信息,请参阅 IndexedRowMatrix Scala 文档

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}

val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()

一个 IndexedRowMatrix 可以从 JavaRDD<IndexedRow> 实例创建,其中 IndexedRow(long, Vector) 的包装器。IndexedRowMatrix 可以通过删除其行索引转换为 RowMatrix

有关 API 的详细信息,请参阅 IndexedRowMatrix Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.IndexedRow;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD<IndexedRow> rows = ... // a JavaRDD of indexed rows
// Create an IndexedRowMatrix from a JavaRDD<IndexedRow>.
IndexedRowMatrix mat = new IndexedRowMatrix(rows.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// Drop its row indices.
RowMatrix rowMat = mat.toRowMatrix();

坐标矩阵 (CoordinateMatrix)

一个 CoordinateMatrix 是一个由其条目的 RDD 支持的分布式矩阵。每个条目是一个 (i: Long, j: Long, value: Double) 的元组,其中 i 是行索引,j 是列索引,value 是条目值。CoordinateMatrix 仅应在矩阵的两个维度都非常大且矩阵非常稀疏时使用。

一个 CoordinateMatrix 可以从 MatrixEntry 条目的 RDD 创建,其中 MatrixEntry(long, long, float) 的包装器。CoordinateMatrix 可以通过调用 toRowMatrix 转换为 RowMatrix,或者通过调用 toIndexedRowMatrix 转换为具有稀疏行的 IndexedRowMatrix

有关 API 的更多详细信息,请参阅 CoordinateMatrix Python 文档

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# Create an RDD of coordinate entries.
#   - This can be done explicitly with the MatrixEntry class:
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)])
#   - or using (long, long, float) tuples:
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])

# Create a CoordinateMatrix from an RDD of MatrixEntries.
mat = CoordinateMatrix(entries)

# Get its size.
m = mat.numRows()  # 3
n = mat.numCols()  # 2

# Get the entries as an RDD of MatrixEntries.
entriesRDD = mat.entries

# Convert to a RowMatrix.
rowMat = mat.toRowMatrix()

# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Convert to a BlockMatrix.
blockMat = mat.toBlockMatrix()

一个 CoordinateMatrix 可以从 RDD[MatrixEntry] 实例创建,其中 MatrixEntry(Long, Long, Double) 的包装器。CoordinateMatrix 可以通过调用 toIndexedRowMatrix 转换为具有稀疏行的 IndexedRowMatrix。目前不支持 CoordinateMatrix 的其他计算。

有关 API 的详细信息,请参阅 CoordinateMatrix Scala 文档

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()

一个 CoordinateMatrix 可以从 JavaRDD<MatrixEntry> 实例创建,其中 MatrixEntry(long, long, double) 的包装器。CoordinateMatrix 可以通过调用 toIndexedRowMatrix 转换为具有稀疏行的 IndexedRowMatrix。目前不支持 CoordinateMatrix 的其他计算。

有关 API 的详细信息,请参阅 CoordinateMatrix Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;

JavaRDD<MatrixEntry> entries = ... // a JavaRDD of matrix entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix mat = new CoordinateMatrix(entries.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// Convert it to an IndexRowMatrix whose rows are sparse vectors.
IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();

块矩阵 (BlockMatrix)

一个 BlockMatrix 是一个由 MatrixBlock RDD 支持的分布式矩阵,其中 MatrixBlock 是一个 ((Int, Int), Matrix) 的元组,其中 (Int, Int) 是块的索引,Matrix 是给定索引处的子矩阵,大小为 rowsPerBlock x colsPerBlockBlockMatrix 支持诸如与另一个 BlockMatrix 进行 addmultiply 等方法。BlockMatrix 还带有一个辅助函数 validate,可用于检查 BlockMatrix 是否正确设置。

一个 BlockMatrix 可以从子矩阵块的 RDD 创建,其中子矩阵块是一个 ((blockRowIndex, blockColIndex), sub-matrix) 的元组。

有关 API 的更多详细信息,请参阅 BlockMatrix Python 文档

from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
                         ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])

# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)

# Get its size.
m = mat.numRows()  # 6
n = mat.numCols()  # 2

# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks

# Convert to a LocalMatrix.
localMat = mat.toLocalMatrix()

# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()

一个 BlockMatrix 最容易通过调用 toBlockMatrixIndexedRowMatrixCoordinateMatrix 创建。toBlockMatrix 默认创建大小为 1024 x 1024 的块。用户可以通过 toBlockMatrix(rowsPerBlock, colsPerBlock) 提供值来更改块大小。

有关 API 的详细信息,请参阅 BlockMatrix Scala 文档

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()

// Calculate A^T A.
val ata = matA.transpose.multiply(matA)

一个 BlockMatrix 最容易通过调用 toBlockMatrixIndexedRowMatrixCoordinateMatrix 创建。toBlockMatrix 默认创建大小为 1024 x 1024 的块。用户可以通过 toBlockMatrix(rowsPerBlock, colsPerBlock) 提供值来更改块大小。

有关 API 的详细信息,请参阅 BlockMatrix Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;

JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
// Transform the CoordinateMatrix to a BlockMatrix
BlockMatrix matA = coordMat.toBlockMatrix().cache();

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate();

// Calculate A^T A.
BlockMatrix ata = matA.transpose().multiply(matA);