数据类型 - 基于 RDD 的 API

MLlib 支持存储在单台机器上的本地向量和矩阵,以及由一个或多个 RDD 支持的分布式矩阵。本地向量和本地矩阵是充当公共接口的简单数据模型。底层线性代数运算由 Breeze 提供。MLlib 中将监督学习中使用的训练样本称为“标记点”。

本地向量

本地向量具有整数类型的基于 0 的索引和双精度类型的数值,存储在单台机器上。MLlib 支持两种类型的本地向量:密集向量和稀疏向量。密集向量由表示其条目值的双精度数组支持,而稀疏向量由两个并行数组支持:索引和数值。例如,向量 (1.0, 0.0, 3.0) 可以用密集格式表示为 [1.0, 0.0, 3.0],或者用稀疏格式表示为 (3, [0, 2], [1.0, 3.0]),其中 3 是向量的长度。

MLlib 将以下类型识别为密集向量

  • NumPy 的 array
  • Python 的列表,例如 [1, 2, 3]

并将以下类型识别为稀疏向量

为了提高效率,我们建议使用 NumPy 数组而不是列表,并使用 Vectors 中实现的工厂方法来创建稀疏向量。

有关 API 的更多详细信息,请参阅 Vectors Python 文档

import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))

本地向量的基类是 Vector,我们提供了两种实现:DenseVectorSparseVector。我们建议使用 Vectors 中实现的工厂方法来创建本地向量。

有关 API 的详细信息,请参阅 Vector Scala 文档Vectors Scala 文档

import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

注意: Scala 默认导入 scala.collection.immutable.Vector,因此您必须显式导入 org.apache.spark.mllib.linalg.Vector 才能使用 MLlib 的 Vector

本地向量的基类是 Vector,我们提供了两种实现:DenseVectorSparseVector。我们建议使用 Vectors 中实现的工厂方法来创建本地向量。

有关 API 的详细信息,请参阅 Vector Java 文档Vectors Java 文档

import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Create a dense vector (1.0, 0.0, 3.0).
Vector dv = Vectors.dense(1.0, 0.0, 3.0);
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0});

标记点

标记点是与标签/响应关联的本地向量,可以是密集的也可以是稀疏的。在 MLlib 中,标记点用于监督学习算法。我们使用双精度数来存储标签,因此我们可以在回归和分类中使用标记点。对于二元分类,标签应该是 0(负)或 1(正)。对于多类别分类,标签应该是从零开始的类别索引:0, 1, 2, ...

标记点由 LabeledPoint 表示。

有关 API 的更多详细信息,请参阅 LabeledPoint Python 文档

from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))

标记点由案例类 LabeledPoint 表示。

有关 API 的详细信息,请参阅 LabeledPoint Scala 文档

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

标记点由 LabeledPoint 表示。

有关 API 的详细信息,请参阅 LabeledPoint Java 文档

import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;

// Create a labeled point with a positive label and a dense feature vector.
LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));

// Create a labeled point with a negative label and a sparse feature vector.
LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}));

稀疏数据

在实践中,拥有稀疏的训练数据是非常常见的。MLlib 支持读取以 LIBSVM 格式存储的训练样本,这是 LIBSVMLIBLINEAR 使用的默认格式。这是一种文本格式,其中每一行使用以下格式表示一个标记的稀疏特征向量

label index1:value1 index2:value2 ...

其中索引是从 1 开始的,并且按升序排列。加载后,特征索引将转换为从 0 开始。

MLUtils.loadLibSVMFile 读取以 LIBSVM 格式存储的训练样本。

有关 API 的更多详细信息,请参阅 MLUtils Python 文档

from pyspark.mllib.util import MLUtils

examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

MLUtils.loadLibSVMFile 读取以 LIBSVM 格式存储的训练样本。

有关 API 的详细信息,请参阅 MLUtils Scala 文档

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

MLUtils.loadLibSVMFile 读取以 LIBSVM 格式存储的训练样本。

有关 API 的更多详细信息,请参阅 MLUtils Java 文档

import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.api.java.JavaRDD;

JavaRDD<LabeledPoint> examples = 
  MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();

本地矩阵

本地矩阵具有整数类型的行索引和列索引以及双精度类型的数值,存储在单台机器上。MLlib 支持密集矩阵和稀疏矩阵,密集矩阵的条目值存储在按列优先顺序排列的单个双精度数组中,而稀疏矩阵的非零条目值存储在按列优先顺序排列的压缩稀疏列(CSC)格式中。例如,以下密集矩阵 \[ \begin{pmatrix} 1.0 & 2.0 \\ 3.0 & 4.0 \\ 5.0 & 6.0 \end{pmatrix} \] 存储在一个一维数组 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 中,矩阵大小为 (3, 2)

本地矩阵的基类是 Matrix,我们提供了两种实现:DenseMatrixSparseMatrix。我们建议使用 Matrices 中实现的工厂方法来创建本地矩阵。请记住,MLlib 中的本地矩阵以列优先顺序存储。

有关 API 的更多详细信息,请参阅 Matrix Python 文档Matrices Python 文档

from pyspark.mllib.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])

本地矩阵的基类是 Matrix,我们提供了两种实现:DenseMatrixSparseMatrix。我们建议使用 Matrices 中实现的工厂方法来创建本地矩阵。请记住,MLlib 中的本地矩阵以列优先顺序存储。

有关 API 的详细信息,请参阅 Matrix Scala 文档Matrices Scala 文档

import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

本地矩阵的基类是 Matrix,我们提供了两种实现:DenseMatrixSparseMatrix。我们建议使用 Matrices 中实现的工厂方法来创建本地矩阵。请记住,MLlib 中的本地矩阵以列优先顺序存储。

有关 API 的详细信息,请参阅 Matrix Java 文档Matrices Java 文档

import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Matrices;

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
Matrix dm = Matrices.dense(3, 2, new double[] {1.0, 3.0, 5.0, 2.0, 4.0, 6.0});

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
Matrix sm = Matrices.sparse(3, 2, new int[] {0, 1, 3}, new int[] {0, 2, 1}, new double[] {9, 6, 8});

分布式矩阵

分布式矩阵具有长整型行索引和列索引以及双精度浮点型值,分布式存储在一个或多个 RDD 中。选择正确的格式来存储大型分布式矩阵非常重要。将分布式矩阵转换为不同的格式可能需要全局混洗,这是非常昂贵的操作。目前已经实现了四种类型的分布式矩阵。

基本类型称为 RowMatrixRowMatrix 是一个面向行的分布式矩阵,没有有意义的行索引,例如,特征向量集合。它由其行的 RDD 支持,其中每一行都是一个本地向量。我们假设 RowMatrix 的列数不是很大,以便单个本地向量可以合理地传送到驱动程序,并且可以使用单个节点存储/操作。 IndexedRowMatrix 类似于 RowMatrix,但具有行索引,可用于标识行和执行连接。 CoordinateMatrix 是以 坐标列表 (COO) 格式存储的分布式矩阵,由其条目的 RDD 支持。 BlockMatrix 是由 MatrixBlock 的 RDD 支持的分布式矩阵,MatrixBlock(Int, Int, Matrix) 的元组。

注意

分布式矩阵的基础 RDD 必须是确定性的,因为我们缓存了矩阵大小。通常,使用非确定性 RDD 会导致错误。

RowMatrix

RowMatrix 是一个面向行的分布式矩阵,没有有意义的行索引,由其行的 RDD 支持,其中每一行都是一个本地向量。由于每一行都由一个本地向量表示,因此列数受整数范围的限制,但在实践中应该小得多。

可以从向量的 RDD 创建 RowMatrix

有关 API 的更多详细信息,请参阅 RowMatrix Python 文档

from pyspark.mllib.linalg.distributed import RowMatrix

# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows

可以从 RDD[Vector] 实例创建 RowMatrix。然后我们可以计算其列汇总统计信息和分解。 QR 分解 形式为 A = QR,其中 Q 是正交矩阵,R 是上三角矩阵。有关 奇异值分解 (SVD)主成分分析 (PCA),请参阅 降维

有关 API 的详细信息,请参阅 RowMatrix Scala 文档

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix

val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// QR decomposition 
val qrResult = mat.tallSkinnyQR(true)

可以从 JavaRDD<Vector> 实例创建 RowMatrix。然后我们可以计算其列汇总统计信息。

有关 API 的详细信息,请参阅 RowMatrix Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD<Vector> rows = ... // a JavaRDD of local vectors
// Create a RowMatrix from a JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// QR decomposition 
QRDecomposition<RowMatrix, Matrix> result = mat.tallSkinnyQR(true);

IndexedRowMatrix

IndexedRowMatrix 类似于 RowMatrix,但具有有意义的行索引。它由索引行的 RDD 支持,因此每一行都由其索引(长整型)和本地向量表示。

可以从 IndexedRowRDD 创建 IndexedRowMatrix,其中 IndexedRow(long, vector) 的包装器。可以通过删除其行索引将 IndexedRowMatrix 转换为 RowMatrix

有关 API 的更多详细信息,请参阅 IndexedRowMatrix Python 文档

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# Create an RDD of indexed rows.
#   - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
                              IndexedRow(1, [4, 5, 6]),
                              IndexedRow(2, [7, 8, 9]),
                              IndexedRow(3, [10, 11, 12])])
#   - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
                              (2, [7, 8, 9]), (3, [10, 11, 12])])

# Create an IndexedRowMatrix from an RDD of IndexedRows.
mat = IndexedRowMatrix(indexedRows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of IndexedRows.
rowsRDD = mat.rows

# Convert to a RowMatrix by dropping the row indices.
rowMat = mat.toRowMatrix()

可以从 RDD[IndexedRow] 实例创建 IndexedRowMatrix,其中 IndexedRow(Long, Vector) 的包装器。可以通过删除其行索引将 IndexedRowMatrix 转换为 RowMatrix

有关 API 的详细信息,请参阅 IndexedRowMatrix Scala 文档

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}

val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()

可以从 JavaRDD<IndexedRow> 实例创建 IndexedRowMatrix,其中 IndexedRow(long, Vector) 的包装器。可以通过删除其行索引将 IndexedRowMatrix 转换为 RowMatrix

有关 API 的详细信息,请参阅 IndexedRowMatrix Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.IndexedRow;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD<IndexedRow> rows = ... // a JavaRDD of indexed rows
// Create an IndexedRowMatrix from a JavaRDD<IndexedRow>.
IndexedRowMatrix mat = new IndexedRowMatrix(rows.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// Drop its row indices.
RowMatrix rowMat = mat.toRowMatrix();

CoordinateMatrix

CoordinateMatrix 是由其条目的 RDD 支持的分布式矩阵。每个条目都是一个 (i: Long, j: Long, value: Double) 元组,其中 i 是行索引,j 是列索引,value 是条目值。仅当矩阵的两个维度都很大且矩阵非常稀疏时,才应使用 CoordinateMatrix

可以从 MatrixEntry 条目的 RDD 创建 CoordinateMatrix,其中 MatrixEntry(long, long, float) 的包装器。可以通过调用 toRowMatrixCoordinateMatrix 转换为 RowMatrix,或者通过调用 toIndexedRowMatrix 将其转换为具有稀疏行的 IndexedRowMatrix

有关 API 的更多详细信息,请参阅 CoordinateMatrix Python 文档

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# Create an RDD of coordinate entries.
#   - This can be done explicitly with the MatrixEntry class:
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)])
#   - or using (long, long, float) tuples:
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])

# Create a CoordinateMatrix from an RDD of MatrixEntries.
mat = CoordinateMatrix(entries)

# Get its size.
m = mat.numRows()  # 3
n = mat.numCols()  # 2

# Get the entries as an RDD of MatrixEntries.
entriesRDD = mat.entries

# Convert to a RowMatrix.
rowMat = mat.toRowMatrix()

# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Convert to a BlockMatrix.
blockMat = mat.toBlockMatrix()

可以从 RDD[MatrixEntry] 实例创建 CoordinateMatrix,其中 MatrixEntry(Long, Long, Double) 的包装器。可以通过调用 toIndexedRowMatrixCoordinateMatrix 转换为具有稀疏行的 IndexedRowMatrix。当前不支持 CoordinateMatrix 的其他计算。

有关 API 的详细信息,请参阅 CoordinateMatrix Scala 文档

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()

可以从 JavaRDD<MatrixEntry> 实例创建 CoordinateMatrix,其中 MatrixEntry(long, long, double) 的包装器。可以通过调用 toIndexedRowMatrixCoordinateMatrix 转换为具有稀疏行的 IndexedRowMatrix。当前不支持 CoordinateMatrix 的其他计算。

有关 API 的详细信息,请参阅 CoordinateMatrix Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;

JavaRDD<MatrixEntry> entries = ... // a JavaRDD of matrix entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix mat = new CoordinateMatrix(entries.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// Convert it to an IndexRowMatrix whose rows are sparse vectors.
IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();

BlockMatrix

BlockMatrix 是由 MatrixBlock 的 RDD 支持的分布式矩阵,其中 MatrixBlock((Int, Int), Matrix) 的元组,其中 (Int, Int) 是块的索引,Matrix 是给定索引处的子矩阵,大小为 rowsPerBlock x colsPerBlockBlockMatrix 支持与另一个 BlockMatrix 进行 addmultiply 等方法。 BlockMatrix 还有一个辅助函数 validate,可用于检查 BlockMatrix 是否设置正确。

可以从子矩阵块的 RDD 创建 BlockMatrix,其中子矩阵块是 ((blockRowIndex, blockColIndex), sub-matrix) 元组。

有关 API 的更多详细信息,请参阅 BlockMatrix Python 文档

from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
                         ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])

# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)

# Get its size.
m = mat.numRows()  # 6
n = mat.numCols()  # 2

# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks

# Convert to a LocalMatrix.
localMat = mat.toLocalMatrix()

# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()

最简单的创建 BlockMatrix 的方法是从 IndexedRowMatrixCoordinateMatrix 调用 toBlockMatrixtoBlockMatrix 默认创建大小为 1024 x 1024 的块。用户可以通过 toBlockMatrix(rowsPerBlock, colsPerBlock) 提供值来更改块大小。

有关 API 的详细信息,请参阅 BlockMatrix Scala 文档

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()

// Calculate A^T A.
val ata = matA.transpose.multiply(matA)

可以通过调用 toBlockMatrix 方法,最容易地从 IndexedRowMatrixCoordinateMatrix 创建 BlockMatrix。默认情况下,toBlockMatrix 创建大小为 1024 x 1024 的块。用户可以通过 toBlockMatrix(rowsPerBlock, colsPerBlock) 提供值来更改块大小。

有关 API 的详细信息,请参阅 BlockMatrix Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;

JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
// Transform the CoordinateMatrix to a BlockMatrix
BlockMatrix matA = coordMat.toBlockMatrix().cache();

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate();

// Calculate A^T A.
BlockMatrix ata = matA.transpose().multiply(matA);