Friends, so far we have gone through Basic types of spark mllib data type. Spark mllib also supports distributed matrices that includes Row Matrix , IndexedRowMatrix, CoordinateMatrix, BlockMatrix.
A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.
- RowMatrix :A RowMatrix is made up of collection of row which made up of local vector. Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.
A
RowMatrix
can be created from anRDD[Vector]
instance. Then we can compute its column summary statistics and decompositions. QR decomposition is of the form A = QR where Q is an orthogonal matrix and R is an upper triangular matrix.Example :
-
val rows: RDD[Vector] = sc.parallelize(Seq(Vectors.dense(1.0, 0.0, 3.0),Vectors.dense(2.0,30.0,4.0))) O/p :rows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = ParallelCollectionRDD[2] at parallelize at <console>:26 val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols()
IndexedRowMatrix :
It is similar to RowMatrix where each row represented by it’s index & local vector
AnIndexedRowMatrix
can be created from anRDD[IndexedRow]
instance, whereIndexedRow
is a wrapper over(Long, Vector)
.Example :
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} val rows: RDD[IndexedRow] = sc.parallelize(Seq(IndexedRow(0,Vectors.dense(1.0,2.0,3.0)),IndexedRow(1,Vectors.dense(4.0,5.0,6.0))) // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // Drop its row indices. val rowMat: RowMatrix = mat.toRowMatrix()
- Coordinate Matrix :
A coordinate matrix is backed by rowindices , columnindices & value in double & can be created from an RDD[MatrixEntry]. We can also convert Coordinate matrix to indexedRowMatrix by calling toIndexRowMatrixExample :
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5))) // Create a CoordinateMatrix from an RDD[MatrixEntry]. val mat: CoordinateMatrix = new CoordinateMatrix(entries) // Get its size. val m = mat.numRows() val n = mat.numCols() // Convert it to an IndexRowMatrix whose rows are sparse vectors. val indexedRowMatrix = mat.toIndexedRowMatrix()
- BlockMatrix :
When we want to perform multiple matrix addition & multiplication where each submatrix backed by it’s index we uses BlockMatrix.
A
BlockMatrix
can be most easily created from anIndexedRowMatrix
orCoordinateMatrix
by callingtoBlockMatrix which will create block size of 1024X1024
.Example :
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5))) // Create a CoordinateMatrix from an RDD[MatrixEntry]. val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) // Transform the CoordinateMatrix to a BlockMatrix val matA: BlockMatrix = coordMat.toBlockMatrix().cache() // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid. // Nothing happens if it is valid. matA.validate() // Calculate A^T A. val ata = matA.transpose.multiply(matA)