 Find the 2 missing number from an array of N element given N2 element
Scala Code :

def findMissingNums(datax:Array[Int], len:Integer) = { val total = datax.sum val actual_total = (1 to len).sum val diff = actual_total  total var i = diff var firstElem=0 var secondElem=0 while (i > 0) { if (!datax.contains(i) && diff < len) { firstElem = i secondElem = diff  firstElem i = 0 } else if(!datax.contains(i) && diff > len){ if(i>=diff){ } else{ firstElem = i secondElem = diff  firstElem } } i = i  1 } println(firstElem+","+secondElem) }
2. Swap the 2 numbers without using temp variable
def swapNum(num1:Int,num2:Int){
var number1=num1
var number2=num2
number1 = number2 – number1
number2 = number2 – number1
number1 = number1 + number2
println(“num1 = “+number1+”, num2 = “+number2)}
Category: Scala
Data types of Spark Mlib – Part 2
Friends, so far we have gone through Basic types of spark mllib data type. Spark mllib also supports distributed matrices that includes Row Matrix , IndexedRowMatrix, CoordinateMatrix, BlockMatrix.
A distributed matrix has longtyped row and column indices and doubletyped values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.
 RowMatrix :A RowMatrix is made up of collection of row which made up of local vector. Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.
A
RowMatrix
can be created from anRDD[Vector]
instance. Then we can compute its column summary statistics and decompositions. QR decomposition is of the form A = QR where Q is an orthogonal matrix and R is an upper triangular matrix.Example :

val rows: RDD[Vector] = sc.parallelize(Seq(Vectors.dense(1.0, 0.0, 3.0),Vectors.dense(2.0,30.0,4.0))) O/p :rows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = ParallelCollectionRDD[2] at parallelize at <console>:26 val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols()
IndexedRowMatrix :
It is similar to RowMatrix where each row represented by it’s index & local vector
AnIndexedRowMatrix
can be created from anRDD[IndexedRow]
instance, whereIndexedRow
is a wrapper over(Long, Vector)
.Example :
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} val rows: RDD[IndexedRow] = sc.parallelize(Seq(IndexedRow(0,Vectors.dense(1.0,2.0,3.0)),IndexedRow(1,Vectors.dense(4.0,5.0,6.0))) // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // Drop its row indices. val rowMat: RowMatrix = mat.toRowMatrix()
 Coordinate Matrix :
A coordinate matrix is backed by rowindices , columnindices & value in double & can be created from an RDD[MatrixEntry]. We can also convert Coordinate matrix to indexedRowMatrix by calling toIndexRowMatrixExample :
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5))) // Create a CoordinateMatrix from an RDD[MatrixEntry]. val mat: CoordinateMatrix = new CoordinateMatrix(entries) // Get its size. val m = mat.numRows() val n = mat.numCols() // Convert it to an IndexRowMatrix whose rows are sparse vectors. val indexedRowMatrix = mat.toIndexedRowMatrix()
 BlockMatrix :
When we want to perform multiple matrix addition & multiplication where each submatrix backed by it’s index we uses BlockMatrix.
A
BlockMatrix
can be most easily created from anIndexedRowMatrix
orCoordinateMatrix
by callingtoBlockMatrix which will create block size of 1024X1024
.Example :
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5))) // Create a CoordinateMatrix from an RDD[MatrixEntry]. val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) // Transform the CoordinateMatrix to a BlockMatrix val matA: BlockMatrix = coordMat.toBlockMatrix().cache() // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid. // Nothing happens if it is valid. matA.validate() // Calculate A^T A. val ata = matA.transpose.multiply(matA)
Data types of Spark Mlib – Part 1
Hello friends Spark Mlib does support multiple data types in the form of vectors & matrices.
A local vector has 1st argument as indices that is integers in nature & 2nd argument as double type as values. There are 2 types of vectors
 Dense vector :
A dense vector is backed by a double array representing its entry values
def dense(values: Array[Double]): Vector
Creates a dense vector from a double array.
Example :
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
o/p : dv: org.apache.spark.mllib.linalg.Vector = [1.0,0.0,3.0]
2.A sparse vector is backed by two parallel arrays: indices and values.
def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector
Creates a sparse vector providing its index array and value array.
We can declare sparse vector in other way to with Seq as 2nd parameter & size as 1st
def sparse(size: Int, elements: Seq[(Int, Double)]): Vector
Creates a sparse vector using unordered (index, value) pairs.
Example :
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
o/p : sv1: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0])
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
o/p : sv2: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0])
3. Labeled Point :
A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either
0
(negative) or 1
(positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ...
.
A labeled point is represented by the case class LabeledPoint
.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
Matrix
Spark Mlib supports 2 types of Matrices 1. Local matrix & 2. Distributed Matrix
Local Matrix :
The base class of local matrices is Matrix
, and we provide two implementations: DenseMatrix
, and SparseMatrix
. We recommend using the factory methods implemented in Matrices
to create local matrices. Remember, local matrices in MLlib are stored in columnmajor order.
 DenseMatrix :
def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix
Creates a columnmajor dense matrix.
Example : val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) o/p : dm: org.apache.spark.mllib.linalg.Matrix = 1.0 2.0 3.0 4.0 5.0 6.0
 SparseMatrix :
def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values:Array[Double]): Matrix
Creates a columnmajor sparse matrix in Compressed Sparse Column (CSC) format.
numRows > Describes number of rows in matrix
numCols > Describe number of columns in matrix
colPtrs > Describe index corrosponding to start of new column
RowIndices > Row index of element in columnmajor way.
Values > values in doubleExample :
1.0 0.0 4.0 0.0 3.0 5.0 2.0 0.0 6.0
is stored asvalues: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]
,rowIndices=[0, 2, 1, 0, 1, 2]
,colPointers=[0, 2, 3, 6]
.Another example : val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8)) O/p :sm: org.apache.spark.mllib.linalg.Matrix = 3 x 2 CSCMatrix (0,0) 9.0 (2,1) 6.0 (1,1) 8.0
We will see distributed Matrix in next session.
Running scala oops files from command line sparkshell
Writing a script in scala , but still want to follow object oriented programming as most of the programmer are from OOPs background due to java practice, can still execute the spark scripts using sparkshell in OOPs way.
Here I am giving a simple example of CollectAsync() future method , also demonstrate how we can run scala scripts through sparkshell that is having oo code.
CollectAsync() returns FutureAction that returns object of type Seq[Int] in future so does allow parallel programming by allowing multiple operation to perform in parallel to support nonblocking IO.
Test.scala :
import org.apache.spark.FutureAction import org.apache.spark.{SPARK_VERSION, SparkContext} import org.apache.spark.FutureAction import scala.concurrent.Future object Test { def main(args: Array[String]) { //val sc1 = new SparkContext() val data = sc.parallelize(1 to 250, 1) var futureData: FutureAction[Seq[Int]] = data.collectAsync() while(!futureData.isCompleted){ println("Hello WOrld") } var itr:Iterator[Int] = data.toLocalIterator while(itr.hasNext){ println(itr.next()) } } } Test.main(null) /* This line is part of scala file Test.scala */
Now in order to run above program you have to write following command :
sudo sparkshell i Test.scala
So in this case sparkshell is taking Test.scala file & create a module after that
Test.main(null) gets called that calls the main method of Test class.
1 thing make sure you are not creating another SparkContext in your code
you can reutilize sparkContext of present spark session.
Scala immutable collection
Scala collections come into 2 categories mutable & immutable collections. Scala’s core power is the collection framework.. let see it’s diagram below.
Repartitioning & partition in spark
In Hadoop, partitioning a data allows processing of huge volume of data in parallel such that it takes minimum amount of time to process entire dataset. Apache spark decides partitioning based on different factors. Factor that decide default partitioning
 On hadoop split by HDFS cores.
 Filter or map function don’t change partitioning
 Number of cpu cores in cluster when running on nonhadoop mode.
Repartitioning : increases partition , it rebalance the partition after filter &it increases parallelism.
You can define partition in spark at the time of creating RDD as follow :
val users = sc.textFile(“hdfs://atr3p11:8020/project/users.csv”,1);
where 2nd argument is nothing but number of partition.
By default if not used hdfs spark creates partition based on number of cores. & if used hdfs path it will create partition based on input split (default block size of hdfs).
To know the partition size , just enter in sparkshell
users.partitions.size
Spark can only run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions (and probably 23x times that).
As far as choosing a “good” number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling sc.defaultParallelism
.
Also, the number of partitions determines how many files get generated by actions that save RDDs to files.
The maximum size of a partition is ultimately limited by the available memory of an executor.
In the first RDD transformation, e.g. reading from a file using sc.textFile(path, partition)
, thepartition
parameter will be applied to all further transformations and actions on this RDD.
When using textFile
with compressed files (file.txt.gz
not file.txt
or similar), Spark disables splitting that makes for an RDD with only 1 partition (as reads against gzipped files cannot be parallelized). In this case, to change the number of partitions you should do repartitioning.
Some operations, e.g. map
, flatMap
, filter
, don’t preserve partitioning.
map
, flatMap
, filter
operations apply a function to every partition.
rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)
With the lines, you end up with rdd
to be exactly 100 partitions of roughly equal in size.

rdd.repartition(N)
does ashuffle
to split data to matchN

partitioning is done on round robin basis
Note : If partitioning scheme doesn’t work for you, you can write your own custom partitioner.
coalesce
Transformation :
The coalesce
transformation is used to change the number of partitions. It can trigger RDD shufflingdepending on the shuffle
flag (disabled by default, i.e. false
).

shuffle
isfalse
by default and it’s explicitly used here for demo purposes. Note the number of partitions that remains the same as the number of partitions in the source RDDrdd
.
parallelize
a local 10number sequence andcoalesce
it first without and then with shuffling (note theshuffle
parameter beingfalse
andtrue
, respectively).