Data types of Spark Mlib – Part 1

Hello friends Spark Mlib does support multiple data types in the form of vectors & matrices.

A local vector has 1st argument as indices that is integers in nature & 2nd argument as double type as values. There are 2 types of vectors

  1. Dense vector :

A dense vector is backed by a double array representing its entry values

def dense(values: Array[Double]): Vector

Creates a dense vector from a double array.

Example : 
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
o/p : dv: org.apache.spark.mllib.linalg.Vector = [1.0,0.0,3.0]

 2.A sparse vector is backed by two parallel arrays: indices and values.

def sparse(size: Int, indices: Array[Int], values: Array[Double]):  Vector

Creates a sparse vector providing its index array and value array.

We can declare sparse vector in other way to with Seq as 2nd  parameter & size as 1st

def sparse(size: Int, elements: Seq[(Int, Double)]): Vector

Creates a sparse vector using unordered (index, value) pairs.

Example : 

val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
o/p : sv1: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0])

val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
o/p : sv2: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0])

3. Labeled Point :

A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....

A labeled point is represented by the case class LabeledPoint.

val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))


Spark Mlib supports 2 types of Matrices 1. Local matrix & 2. Distributed Matrix

Local Matrix :

The base class of local matrices is Matrix, and we provide two implementations: DenseMatrix, and SparseMatrix. We recommend using the factory methods implemented in Matrices to create local matrices. Remember, local matrices in MLlib are stored in column-major order.

  1. DenseMatrix :

    def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix


    Creates a column-major dense matrix.

    Example :
    val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
    o/p : dm: org.apache.spark.mllib.linalg.Matrix = 
    1.0 2.0 
    3.0 4.0 
    5.0 6.0
  2. SparseMatrix :

    def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values:Array[Double]): Matrix

    Creates a column-major sparse matrix in Compressed Sparse Column (CSC) format.

    numRows -> Describes number of rows in matrix
    numCols -> Describe number of columns in matrix
    colPtrs -> Describe index corrosponding to start of new column
    RowIndices -> Row index of element in column-major way.
    Values -> values in double

    Example : 
     1.0 0.0 4.0
     0.0 3.0 5.0
     2.0 0.0 6.0
    is stored as values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], rowIndices=[0, 2, 1, 0, 1, 2], colPointers=[0, 2, 3, 6].
    Another example :
    val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
    O/p :sm: org.apache.spark.mllib.linalg.Matrix = 
    3 x 2 CSCMatrix
    (0,0) 9.0
    (2,1) 6.0
    (1,1) 8.0

    We will see distributed Matrix in next session.



Running scala oops files from command line- spark-shell

Writing a script in scala , but still want to follow object oriented programming as most of the programmer are from OOPs background due to java practice, can still execute the spark scripts using spark-shell in OOPs way.

Here I am giving a simple example of CollectAsync() future method , also demonstrate how we can run scala scripts through spark-shell that is having oo code.

CollectAsync() returns FutureAction that returns object of type Seq[Int] in future so does allow parallel programming by allowing multiple operation to perform in parallel to support non-blocking IO.

Test.scala :

import org.apache.spark.FutureAction
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.FutureAction
import scala.concurrent.Future

object Test {
def main(args: Array[String]) {
//val sc1 = new SparkContext()
val data = sc.parallelize(1 to 250, 1)
var futureData: FutureAction[Seq[Int]] = data.collectAsync()
println("Hello WOrld")
var itr:Iterator[Int] = data.toLocalIterator
Test.main(null) /* This line is part of scala file Test.scala */

Now in order to run above program you have to write following command :

sudo spark-shell -i Test.scala

So in this case spark-shell is taking Test.scala file & create a module after that

Test.main(null) gets called that calls the main method of Test class.

1 thing make sure you are not creating another SparkContext in your code

you can re-utilize sparkContext of present spark session.