Friends, so far we have gone through Basic types of spark mllib data type. Spark mllib also supports distributed matrices that includes Row Matrix , IndexedRowMatrix, CoordinateMatrix, BlockMatrix.
A distributed matrix has longtyped row and column indices and doubletyped values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.
 RowMatrix :A RowMatrix is made up of collection of row which made up of local vector. Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.
A
RowMatrix
can be created from anRDD[Vector]
instance. Then we can compute its column summary statistics and decompositions. QR decomposition is of the form A = QR where Q is an orthogonal matrix and R is an upper triangular matrix.Example :

val rows: RDD[Vector] = sc.parallelize(Seq(Vectors.dense(1.0, 0.0, 3.0),Vectors.dense(2.0,30.0,4.0))) O/p :rows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = ParallelCollectionRDD[2] at parallelize at <console>:26 val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols()
IndexedRowMatrix :
It is similar to RowMatrix where each row represented by it’s index & local vector
AnIndexedRowMatrix
can be created from anRDD[IndexedRow]
instance, whereIndexedRow
is a wrapper over(Long, Vector)
.Example :
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} val rows: RDD[IndexedRow] = sc.parallelize(Seq(IndexedRow(0,Vectors.dense(1.0,2.0,3.0)),IndexedRow(1,Vectors.dense(4.0,5.0,6.0))) // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // Drop its row indices. val rowMat: RowMatrix = mat.toRowMatrix()
 Coordinate Matrix :
A coordinate matrix is backed by rowindices , columnindices & value in double & can be created from an RDD[MatrixEntry]. We can also convert Coordinate matrix to indexedRowMatrix by calling toIndexRowMatrixExample :
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5))) // Create a CoordinateMatrix from an RDD[MatrixEntry]. val mat: CoordinateMatrix = new CoordinateMatrix(entries) // Get its size. val m = mat.numRows() val n = mat.numCols() // Convert it to an IndexRowMatrix whose rows are sparse vectors. val indexedRowMatrix = mat.toIndexedRowMatrix()
 BlockMatrix :
When we want to perform multiple matrix addition & multiplication where each submatrix backed by it’s index we uses BlockMatrix.
A
BlockMatrix
can be most easily created from anIndexedRowMatrix
orCoordinateMatrix
by callingtoBlockMatrix which will create block size of 1024X1024
.Example :
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5))) // Create a CoordinateMatrix from an RDD[MatrixEntry]. val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) // Transform the CoordinateMatrix to a BlockMatrix val matA: BlockMatrix = coordMat.toBlockMatrix().cache() // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid. // Nothing happens if it is valid. matA.validate() // Calculate A^T A. val ata = matA.transpose.multiply(matA)