Kafka – Insights (Managing under-replicated topic’s alert)

Apache Kafka is a distributed fault tolerant queue based messaging system. It has main component as Producer, consumer, topic, zookeeper, kafka broker. We are going to discuss today with respect to workflow of kafka publish-subscribe system working & some tips & treaks we can configure in order to maintain it very well.

Now lets discuss how kafka works internally

  1. Producers send message to a topic at regular intervals.
  2. Kafka broker stores all messages in the partitions configured for that particular topic. It ensures the messages are equally shared between partitions. If the producer sends two messages and there are two partitions, Kafka will store one message in the first partition and the second message in the second partition.
    Usually we can set partitions & replications factor while declaring kafka topics :
    bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic-name
  3. Consumer subscribes to a specific topic.
  4. Once the consumer subscribes to a topic, Kafka will provide the current offset of the topic to the consumer and also saves the offset in the Zookeeper ensemble.
  5. Consumer will request the Kafka in a regular interval (like 100 Ms) for new messages.
  6. Once Kafka receives the messages from producers, it forwards these messages to the consumers.
  7. Consumer will receive the message and process it.
  8. Once the messages are processed, consumer will send an acknowledgement to the Kafka broker.
  9. Once Kafka receives an acknowledgement, it changes the offset to the new value and updates it in the Zookeeper. Since offsets are maintained in the Zookeeper, the consumer can read next message correctly even during server outrages.
  10. This above flow will repeat until the consumer stops the request.
  11. Consumer has the option to rewind/skip to the desired offset of a topic at any time and read all the subsequent messages.

Now here consider if we have kafka cluster of 4 nodes & replication factor for a topic 3. As we know kafka maintains list of in-sync replicas(ISR) based on 2 configuration

  1. replica.lag.max.messages : This config tells that if a replica is out-of-sync with master for more than N messages than it removes that replica from ISR & creates an alert for a particular topic being under replicated as that replica is out-of-sync for N number of messages.
  2. replica.lag.time.max.ms : This config tells that if a slave (Replica) didn’t send a request to master for a message within configured time than that replica will removed from ISR(In-sync replica) & alert will be pop as under replicated topic.

Sometimes while working on production we face a lots of this kinds of errors which is not a good sign. There are various reasons for a node or replica for being slow or down.
so the ideal practice to minimize the alerts is to only set replica.lag.max.ms & avoid setting replica.lag.max.messages .

Thanks to  Neha nerkhade (https://www.confluent.io/blog/author/neha-narkhede/) for her awesome blog.

Reference :


Some logical question /answer

  1. Find the 2 missing number from an array of N element given N-2 element

    Scala Code :

  2. def findMissingNums(datax:Array[Int], len:Integer) = {
      val total = datax.sum
      val actual_total = (1 to len).sum
      val diff = actual_total - total
      var i = diff
      var firstElem=0
      var secondElem=0
      while (i > 0) {
        if (!datax.contains(i) && diff < len) {       
         firstElem = i
         secondElem = diff - firstElem  
         i = 0     
        else if(!datax.contains(i) && diff > len){
         firstElem = i
         secondElem = diff - firstElem
        i = i - 1

    2. Swap the 2 numbers without using temp variable

    def swapNum(num1:Int,num2:Int){
    var number1=num1
    var number2=num2
    number1 = number2 – number1
    number2 = number2 – number1
    number1 = number1 + number2
    println(“num1 = “+number1+”, num2 = “+number2)


HBase shell commands

  1. describe ‘tablename’ :
    Displays the metadata of particular table
    Example :

    describe 'employee' 
    {NAME => 'professional_info', BLOOMFILTER => 'ROW', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING =
  2. disable ’employee’ :
    Always disable table before performing any DDL operation
  3. alter ’employee’, {NAME => ‘col_fam1’, COMPRESSION => ‘GZ’}
    With alter command you can add columns on fly as well as add different parameter to columns like storing IN_MEMORY , Setting compression for particular column family , setting number of versions etc.
  4. list :
    List all the tables stored in Hbase :
    Example :
    [“employee”, “table1”, “user_hcat_load_table”]
  5. enable_all ‘t.*’ :
    Enables all the table matching regex
  6. exists ‘student’ :
    Check weather student table exist or not
  7. show_filters :
    Shows all the available filter in Hbase
  8. alter_status :
    Get the status of the alter command. Indicates the number of regions of the table that have received the updated schema Pass table name.
  9. alter_async :
    Alter column family schema, does not wait for all regions to receive the
    schema changes. Pass table name and a dictionary specifying new column
    family schema. Dictionaries are described on the main help command output.
    Dictionary must include name of column family to alter.
    To change or add the ‘f1’ column family in table ‘t1’ from defaults
    to instead keep a maximum of 5 cell VERSIONS, do:hbase> alter_async ‘t1’, NAME => ‘f1’, VERSIONS => 5To delete the ‘f1’ column family in table ‘t1’, do:

    hbase> alter_async ‘t1’, NAME => ‘f1’, METHOD => ‘delete’or a shorter version:hbase> alter_async ‘t1’, ‘delete’ => ‘f1’
    You can also change table-scope attributes like MAX_FILESIZE

    For example, to change the max size of a family to 128MB, do:

    hbase> alter ‘t1’, METHOD => ‘table_att’, MAX_FILESIZE => ‘134217728’

    There could be more than one alteration in one command:

    hbase> alter ‘t1’, {NAME => ‘f1’}, {NAME => ‘f2’, METHOD => ‘delete’}

    To check if all the regions have been updated, use alter_status <table_name>

  10. count :
    Counts the number of rows in table. COUNT interval is by default 1000, one can increase the interval as well as set scan caching on count scan by default.
    hbase> count ‘t1’, INTERVAL => 100000
    hbase> count ‘t1’, CACHE => 1000
    hbase> count ‘t1’, INTERVAL => 10, CACHE => 1000

  11. delete :
    Put a delete cell value at specified table/row/column and optionally
    timestamp coordinates. Deletes must match the deleted cell’s
    coordinates exactly. When scanning, a delete cell suppresses older
    versions. To delete a cell from ‘t1’ at row ‘r1’ under column ‘c1’
    marked with the time ‘ts1’, do:hbase> delete ‘t1’, ‘r1’, ‘c1’, ts1
  12. deleteall :
    Delete all cells in a given row; pass a table name, row, and optionally
    a column and timestamp. Examples:hbase> deleteall ‘t1’, ‘r1’
    hbase> deleteall ‘t1’, ‘r1’, ‘c1’
    hbase> deleteall ‘t1’, ‘r1’, ‘c1’, ts1

  13. get :
    Get row or cell contents; pass table name, row, and optionally
    a dictionary of column(s), timestamp, timerange and versions.
    hbase> get ‘t1’, ‘r1’
    hbase> get ‘t1’, ‘r1’, {TIMERANGE => [ts1, ts2]}
    hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’}
    hbase> get ‘t1’, ‘r1’, {COLUMN => [‘c1’, ‘c2’, ‘c3’]}
    hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1}
    hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMERANGE => [ts1, ts2], VERSIONS => 4}
    hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1, VERSIONS => 4}
    hbase> get ‘t1’, ‘r1’, {FILTER => “ValueFilter(=, ‘binary:abc’)”}
    hbase> get ‘t1’, ‘r1’, ‘c1’
    hbase> get ‘t1’, ‘r1’, ‘c1’, ‘c2’
    hbase> get ‘t1’, ‘r1’, [‘c1’, ‘c2’]
  14.  put :
    Put a cell ‘value’ at specified table/row/column and optionally
    timestamp coordinates. To put a cell value into table ‘t1’ at
    row ‘r1’ under column ‘c1’ marked with the time ‘ts1’, do:hbase> put ‘t1’, ‘r1’, ‘c1’, ‘value’, ts1

  15. scan :
    Scan a table; pass table name and optionally a dictionary of scanner
    specifications. Scanner specifications may include one or more of:
    or COLUMNS, CACHEIf no columns are specified, all columns will be scanned.
    To scan all members of a column family, leave the qualifier empty as in
    ‘col_family:’.The filter can be specified in two ways:
    1. Using a filterString – more information on this is available in the
    Filter Language document attached to the HBASE-4176 JIRA
    2. Using the entire package name of the filter.Some examples:hbase> scan ‘.META.’
    hbase> scan ‘.META.’, {COLUMNS => ‘info:regioninfo’}
    hbase> scan ‘t1’, {COLUMNS => [‘c1’, ‘c2’], LIMIT => 10, STARTROW => ‘xyz’}
    hbase> scan ‘t1’, {COLUMNS => ‘c1’, TIMERANGE => [1303668804, 1303668904]}
    hbase> scan ‘t1’, {FILTER => “(PrefixFilter (‘row2’) AND
    (QualifierFilter (>=, ‘binary:xyz’))) AND (TimestampsFilter ( 123, 456))”}
    hbase> scan ‘t1’, {FILTER =>
    org.apache.hadoop.hbase.filter.ColumnPaginationFilter.new(1, 0)}
  16. truncate :
    Disables, drops and recreates the specified table.
    hbase>truncate ‘t1’

    Reference :




Data types of Spark Mlib – Part 2

Friends, so far we have gone through Basic types of spark mllib data type. Spark mllib also supports distributed matrices that includes Row Matrix , IndexedRowMatrix, CoordinateMatrix, BlockMatrix.

A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.

  1. RowMatrix :A RowMatrix is made up of collection of row which made up of local vector. Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.

    A RowMatrix can be created from an RDD[Vector] instance. Then we can compute its column summary statistics and decompositions. QR decomposition is of the form A = QR where Q is an orthogonal matrix and R is an upper triangular matrix.

    Example :

    val rows: RDD[Vector] = sc.parallelize(Seq(Vectors.dense(1.0, 0.0, 3.0),Vectors.dense(2.0,30.0,4.0))) 
    O/p :rows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = ParallelCollectionRDD[2] at parallelize at <console>:26
    val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols()
  3. IndexedRowMatrix :

    It is similar to RowMatrix where each row represented by it’s index & local vector
    An IndexedRowMatrix can be created from an RDD[IndexedRow] instance, where IndexedRow is a wrapper over (Long, Vector).

    Example :

    import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
    val rows: RDD[IndexedRow] = sc.parallelize(Seq(IndexedRow(0,Vectors.dense(1.0,2.0,3.0)),IndexedRow(1,Vectors.dense(4.0,5.0,6.0)))
    // Create an IndexedRowMatrix from an RDD[IndexedRow].
    val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
    // Get its size.
    val m = mat.numRows()
    val n = mat.numCols()
    // Drop its row indices.
    val rowMat: RowMatrix = mat.toRowMatrix()


  4. Coordinate Matrix :
    A coordinate matrix is backed by rowindices , columnindices & value in double & can be created from an RDD[MatrixEntry]. We can also convert Coordinate matrix to indexedRowMatrix by calling toIndexRowMatrix

    Example :

    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5)))
    // Create a CoordinateMatrix from an RDD[MatrixEntry].
    val mat: CoordinateMatrix = new CoordinateMatrix(entries)
    // Get its size.
    val m = mat.numRows()
    val n = mat.numCols()
    // Convert it to an IndexRowMatrix whose rows are sparse vectors.
    val indexedRowMatrix = mat.toIndexedRowMatrix()
  5. BlockMatrix  :

    When we want to perform multiple matrix addition & multiplication where each submatrix backed by it’s index we uses BlockMatrix.

    A BlockMatrix can be most easily created from an IndexedRowMatrix or CoordinateMatrix by calling toBlockMatrix which will create block size of 1024X1024.

    Example :

    import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
    val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5)))
    // Create a CoordinateMatrix from an RDD[MatrixEntry].
    val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
    // Transform the CoordinateMatrix to a BlockMatrix
    val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
    // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
    // Nothing happens if it is valid.
    // Calculate A^T A.
    val ata = matA.transpose.multiply(matA)

Data types of Spark Mlib – Part 1

Hello friends Spark Mlib does support multiple data types in the form of vectors & matrices.

A local vector has 1st argument as indices that is integers in nature & 2nd argument as double type as values. There are 2 types of vectors

  1. Dense vector :

A dense vector is backed by a double array representing its entry values

def dense(values: Array[Double]): Vector

Creates a dense vector from a double array.

Example : 
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
o/p : dv: org.apache.spark.mllib.linalg.Vector = [1.0,0.0,3.0]

 2.A sparse vector is backed by two parallel arrays: indices and values.

def sparse(size: Int, indices: Array[Int], values: Array[Double]):  Vector

Creates a sparse vector providing its index array and value array.

We can declare sparse vector in other way to with Seq as 2nd  parameter & size as 1st

def sparse(size: Int, elements: Seq[(Int, Double)]): Vector

Creates a sparse vector using unordered (index, value) pairs.

Example : 

val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
o/p : sv1: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0])

val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
o/p : sv2: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0])

3. Labeled Point :

A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....

A labeled point is represented by the case class LabeledPoint.

val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))


Spark Mlib supports 2 types of Matrices 1. Local matrix & 2. Distributed Matrix

Local Matrix :

The base class of local matrices is Matrix, and we provide two implementations: DenseMatrix, and SparseMatrix. We recommend using the factory methods implemented in Matrices to create local matrices. Remember, local matrices in MLlib are stored in column-major order.

  1. DenseMatrix :

    def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix


    Creates a column-major dense matrix.

    Example :
    val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
    o/p : dm: org.apache.spark.mllib.linalg.Matrix = 
    1.0 2.0 
    3.0 4.0 
    5.0 6.0
  2. SparseMatrix :

    def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values:Array[Double]): Matrix

    Creates a column-major sparse matrix in Compressed Sparse Column (CSC) format.

    numRows -> Describes number of rows in matrix
    numCols -> Describe number of columns in matrix
    colPtrs -> Describe index corrosponding to start of new column
    RowIndices -> Row index of element in column-major way.
    Values -> values in double

    Example : 
     1.0 0.0 4.0
     0.0 3.0 5.0
     2.0 0.0 6.0
    is stored as values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], rowIndices=[0, 2, 1, 0, 1, 2], colPointers=[0, 2, 3, 6].
    Another example :
    val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
    O/p :sm: org.apache.spark.mllib.linalg.Matrix = 
    3 x 2 CSCMatrix
    (0,0) 9.0
    (2,1) 6.0
    (1,1) 8.0

    We will see distributed Matrix in next session.


Running scala oops files from command line- spark-shell

Writing a script in scala , but still want to follow object oriented programming as most of the programmer are from OOPs background due to java practice, can still execute the spark scripts using spark-shell in OOPs way.

Here I am giving a simple example of CollectAsync() future method , also demonstrate how we can run scala scripts through spark-shell that is having oo code.

CollectAsync() returns FutureAction that returns object of type Seq[Int] in future so does allow parallel programming by allowing multiple operation to perform in parallel to support non-blocking IO.

Test.scala :

import org.apache.spark.FutureAction
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.FutureAction
import scala.concurrent.Future

object Test {
def main(args: Array[String]) {
//val sc1 = new SparkContext()
val data = sc.parallelize(1 to 250, 1)
var futureData: FutureAction[Seq[Int]] = data.collectAsync()
println("Hello WOrld")
var itr:Iterator[Int] = data.toLocalIterator
Test.main(null) /* This line is part of scala file Test.scala */

Now in order to run above program you have to write following command :

sudo spark-shell -i Test.scala

So in this case spark-shell is taking Test.scala file & create a module after that

Test.main(null) gets called that calls the main method of Test class.

1 thing make sure you are not creating another SparkContext in your code

you can re-utilize sparkContext of present spark session.

Priority Queue – Data Structure

We know that Queue follows First-In-First-Out model but sometimes we need to process the objects in the queue based on the priority. That is when JavaPriorityQueue is used.

For example, let’s say we have an application that generates stocks reports for daily trading session. This application processes a lot of data and takes time to process it. So customers are sending request to the application that is actually getting queued but we want to process premium customers first and standard customers after them. So in this case PriorityQueue implementation in java can be really helpful.

PriorityQueue is an unbounded queue based on a priority heap and the elements of the priority queue are ordered by default in natural order. We can provide a Comparator for ordering at the time of instantiation of priority queue.

Java Priority Queue doesn’t allow null values and we can’t create PriorityQueue of Objects that are non-comparable. We use java Comparable and Comparator for sorting Objects and Priority Queue use them for priority processing of it’s elements.

The simplest way to implement a priority queue data type is to keep an associative array mapping each priority to a list of elements with that priority. If association lists or hash tables are used to implement the associative array, adding an element takes constant time but removing or peeking at the element of highest priority takes linear (O(n)) time, because we must search all keys for the largest one. If a self-balancing binary search tree is used, all three operations take O(log n) time; this is a popular solution in environments that already provide balanced trees but nothing more sophisticated.

There are a number of specialized heap data structures that either supply additional operations or outperform the above approaches. The binary heap uses O(log n) time for both operations, but allows peeking at the element of highest priority without removing it in constant time. Binomial heaps add several more operations, but require O(log n) time for peeking. Fibonacci heaps can insert elements, peek at the maximum priority element, and decrease an element’s priority in amortized constant time (deletions are still O(log n)).


// BinaryHeap class
// CONSTRUCTION: empty or with initial array.
// ******************PUBLIC OPERATIONS*********************
// void insert( x )       –> Insert x
// Comparable deleteMin( )–> Return and remove smallest item
// Comparable findMin( )  –> Return smallest item
// boolean isEmpty( )     –> Return true if empty; else false
// void makeEmpty( )      –> Remove all items
// ******************ERRORS********************************
// Throws UnderflowException for findMin and deleteMin when empty

* Implements a binary heap.
* Note that all “matching” is based on the compareTo method.
* @author Bhavesh Gadoya
public class BinaryHeap implements PriorityQueue {
* Construct the binary heap.
public BinaryHeap( ) {
currentSize = 0;
array = new Comparable[ DEFAULT_CAPACITY + 1 ];

* Construct the binary heap from an array.
* @param items the inital items in the binary heap.
public BinaryHeap( Comparable [ ] items ) {
currentSize = items.length;
array = new Comparable[ items.length + 1 ];

for( int i = 0; i < items.length; i++ )
array[ i + 1 ] = items[ i ];
buildHeap( );

* Insert into the priority queue.
* Duplicates are allowed.
* @param x the item to insert.
* @return null, signifying that decreaseKey cannot be used.
public PriorityQueue.Position insert( Comparable x ) {
if( currentSize + 1 == array.length )
doubleArray( );

// Percolate up
int hole = ++currentSize;
array[ 0 ] = x;

for( ; x.compareTo( array[ hole / 2 ] ) < 0; hole /= 2 )
array[ hole ] = array[ hole / 2 ];
array[ hole ] = x;

return null;

* @throws UnsupportedOperationException because no Positions are returned
* by the insert method for BinaryHeap.
public void decreaseKey( PriorityQueue.Position p, Comparable newVal ) {
throw new UnsupportedOperationException( “Cannot use decreaseKey for binary heap” );

* Find the smallest item in the priority queue.
* @return the smallest item.
* @throws UnderflowException if empty.
public Comparable findMin( ) {
if( isEmpty( ) )
throw new UnderflowException( “Empty binary heap” );
return array[ 1 ];

* Remove the smallest item from the priority queue.
* @return the smallest item.
* @throws UnderflowException if empty.
public Comparable deleteMin( ) {
Comparable minItem = findMin( );
array[ 1 ] = array[ currentSize– ];
percolateDown( 1 );

return minItem;

* Establish heap order property from an arbitrary
* arrangement of items. Runs in linear time.
private void buildHeap( ) {
for( int i = currentSize / 2; i > 0; i– )
percolateDown( i );

* Test if the priority queue is logically empty.
* @return true if empty, false otherwise.
public boolean isEmpty( ) {
return currentSize == 0;

* Returns size.
* @return current size.
public int size( ) {
return currentSize;

* Make the priority queue logically empty.
public void makeEmpty( ) {
currentSize = 0;

private static final int DEFAULT_CAPACITY = 100;

private int currentSize;      // Number of elements in heap
private Comparable [ ] array; // The heap array

* Internal method to percolate down in the heap.
* @param hole the index at which the percolate begins.
private void percolateDown( int hole ) {
int child;
Comparable tmp = array[ hole ];

for( ; hole * 2 <= currentSize; hole = child ) {
child = hole * 2;
if( child != currentSize &&
array[ child + 1 ].compareTo( array[ child ] ) < 0 )
if( array[ child ].compareTo( tmp ) < 0 )
array[ hole ] = array[ child ];
array[ hole ] = tmp;

* Internal method to extend array.
private void doubleArray( ) {
Comparable [ ] newArray;

newArray = new Comparable[ array.length * 2 ];
for( int i = 0; i < array.length; i++ )
newArray[ i ] = array[ i ];
array = newArray;

// Test program
public static void main( String [ ] args ) {
int numItems = 10000;
BinaryHeap h1 = new BinaryHeap( );
Integer [ ] items = new Integer[ numItems – 1 ];

int i = 37;
int j;

for( i = 37, j = 0; i != 0; i = ( i + 37 ) % numItems, j++ ) {
h1.insert( new Integer( i ) );
items[ j ] = new Integer( i );

for( i = 1; i < numItems; i++ )
if( ((Integer)( h1.deleteMin( ) )).intValue( ) != i )
System.out.println( “Oops! ” + i );

BinaryHeap h2 = new BinaryHeap( items );
for( i = 1; i < numItems; i++ )
if( ((Integer)( h2.deleteMin( ) )).intValue( ) != i )
System.out.println( “Oops! ” + i );

// PriorityQueue interface
// ******************PUBLIC OPERATIONS*********************
// Position insert( x )   –> Insert x
// Comparable deleteMin( )–> Return and remove smallest item
// Comparable findMin( )  –> Return smallest item
// boolean isEmpty( )     –> Return true if empty; else false
// void makeEmpty( )      –> Remove all items
// int size( )            –> Return size
// void decreaseKey( p, v)–> Decrease value in p to v
// ******************ERRORS********************************
// Throws UnderflowException for findMin and deleteMin when empty

* PriorityQueue interface.
* Some priority queues may support a decreaseKey operation,
* but this is considered an advanced operation. If so,
* a Position is returned by insert.
* Note that all “matching” is based on the compareTo method.
* @author Bhavesh Gadoya
public interface PriorityQueue {
* The Position interface represents a type that can
* be used for the decreaseKey operation.
public interface Position {
* Returns the value stored at this position.
* @return the value stored at this position.
Comparable getValue( );

* Insert into the priority queue, maintaining heap order.
* Duplicates are allowed.
* @param x the item to insert.
* @return may return a Position useful for decreaseKey.
Position insert( Comparable x );

* Find the smallest item in the priority queue.
* @return the smallest item.
* @throws UnderflowException if empty.
Comparable findMin( );

* Remove the smallest item from the priority queue.
* @return the smallest item.
* @throws UnderflowException if empty.
Comparable deleteMin( );

* Test if the priority queue is logically empty.
* @return true if empty, false otherwise.
boolean isEmpty( );

* Make the priority queue logically empty.
void makeEmpty( );

* Returns the size.
* @return current size.
int size( );

* Change the value of the item stored in the pairing heap.
* This is considered an advanced operation and might not
* be supported by all priority queues. A priority queue
* will signal its intention to not support decreaseKey by
* having insert return null consistently.
* @param p any non-null Position returned by insert.
* @param newVal the new value, which must be smaller
*    than the currently stored value.
* @throws IllegalArgumentException if p invalid.
* @throws UnsupportedOperationException if appropriate.
void decreaseKey( Position p, Comparable newVal );

* Exception class for access in empty containers
* such as stacks, queues, and priority queues.
* @author Bhavesh Gadoya
public class UnderflowException extends RuntimeException {
* Construct this exception object.
* @param message the error message.
public UnderflowException( String message ) {
super( message );


In-built java implementation of priorityQueue :

package com.journaldev.collections;

public class Customer {
	private int id;
	private String name;
	public Customer(int i, String n){

	public int getId() {
		return id;

	public String getName() {
		return name;

We will use java random number generation to generate random customer objects. For natural ordering, I will use Integer that is also a java wrapper class.

Here is our final test code that shows how to use priority queue in java.

package com.journaldev.collections;

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;

public class PriorityQueueExample {

	public static void main(String[] args) {
		//natural ordering example of priority queue
		Queue<Integer> integerPriorityQueue = new PriorityQueue<>(7);
		Random rand = new Random();
		for(int i=0;i<7;i++){
			integerPriorityQueue.add(new Integer(rand.nextInt(100)));
		for(int i=0;i<7;i++){
			Integer in = integerPriorityQueue.poll();
			System.out.println("Processing Integer:"+in);
		//PriorityQueue example with Comparator
		Queue<Customer> customerPriorityQueue = new PriorityQueue<>(7, idComparator);
	//Comparator anonymous class implementation
	public static Comparator<Customer> idComparator = new Comparator<Customer>(){
		public int compare(Customer c1, Customer c2) {
            return (int) (c1.getId() - c2.getId());

	//utility method to add random data to Queue
	private static void addDataToQueue(Queue<Customer> customerPriorityQueue) {
		Random rand = new Random();
		for(int i=0; i<7; i++){
			int id = rand.nextInt(100);
			customerPriorityQueue.add(new Customer(id, "Pankaj "+id));
	//utility method to poll data from queue
	private static void pollDataFromQueue(Queue<Customer> customerPriorityQueue) {
			Customer cust = customerPriorityQueue.poll();
			if(cust == null) break;
			System.out.println("Processing Customer with ID="+cust.getId());



Re-partitioning & partition in spark

  In Hadoop, partitioning a data allows processing of huge volume of data in parallel such that it takes minimum amount of time to process entire dataset. Apache spark decides partitioning based on different factors. Factor that decide default partitioning

  1. On hadoop split by HDFS cores.
  2. Filter or map function don’t change partitioning
  3. Number of cpu cores in cluster when running on non-hadoop mode.

Re-partitioning : increases partition , it re-balance the partition  after filter &it increases parallelism.

 You can define partition in spark at the time of creating RDD as follow :

val users = sc.textFile(“hdfs://at-r3p11:8020/project/users.csv”,1);

where 2nd argument is nothing but number of partition.

By default if not used hdfs spark creates partition based on number of cores. & if used hdfs path it will create partition based on input split (default block size of hdfs).

To know the partition size , just enter in spark-shell


Spark can only run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions (and probably 2-3x times that).

As far as choosing a “good” number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling sc.defaultParallelism.

Also, the number of partitions determines how many files get generated by actions that save RDDs to files.

The maximum size of a partition is ultimately limited by the available memory of an executor.

In the first RDD transformation, e.g. reading from a file using sc.textFile(path, partition), thepartition parameter will be applied to all further transformations and actions on this RDD.

When using textFile with compressed files (file.txt.gz not file.txt or similar), Spark disables splitting that makes for an RDD with only 1 partition (as reads against gzipped files cannot be parallelized). In this case, to change the number of partitions you should do repartitioning.

Some operations, e.g. map, flatMap, filter, don’t preserve partitioning.

map, flatMap, filter operations apply a function to every partition.

rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)

With the lines, you end up with rdd to be exactly 100 partitions of roughly equal in size.

  • rdd.repartition(N) does a shuffle to split data to match N

  • partitioning is done on round robin basis

Note :
If partitioning scheme doesn’t work for you, you can write your own custom partitioner.

coalesce Transformation :

The coalesce transformation is used to change the number of partitions. It can trigger RDD shufflingdepending on the shuffle flag (disabled by default, i.e. false).

In the following sample, you parallelize a local 10-number sequence and coalesce it first without and then with shuffling (note the shuffle parameter being false and true, respectively).
scala> val rdd = sc.parallelize(0 to 10, 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.partitions.size
res0: Int = 8

scala> rdd.coalesce(numPartitions=8, shuffle=false)   (1)
res1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at :27

  1. shuffle is false by default and it’s explicitly used here for demo purposes. Note the number of partitions that remains the same as the number of partitions in the source RDD rdd.

Asynchronous processing in java

Asynchronous programming is very popular these days, primarily because of its ability to improve the overall throughput on a multi-core system. Asynchronous programming is a programming paradigm that facilitates fast and responsive user interfaces. The asynchronous programming model in Java provides a consistent programming model to write programs that support asynchrony.

Asynchronous programming provides a non-blocking, event-driven programming model. This programming model leverages the multiple cores in your system to provide parallelization by using multiple CPU cores to execute the tasks, thus increasing the application’s throughput. Note that throughput is a measure of the amount of work done in unit time. In this programming paradigm, a unit of work would execute separately from the main application thread and notify the calling thread about its execution state: success, in progress or failure.

Application of asynchronous can be a situation where we want to execute multiple things in parellel without waiting for 1 task to finish such that it increase the throughput of the system. Consider we want to send email to 100k+ users and at the same time need to process other data, such that we don’t want to wait for email task to complete to proceed.

Another good example of this can be logging frameworks: You typically would want to log exceptions and errors into your log targets; in other words, file, database, or something similar. There is no point for your application to wait till the logging tasks are over. In doing so, the application’s responsiveness would be affected. On the contrary, if the call to the logging framework can be made asynchronously, the application can proceed with other tasks concurrently, without having to wait. This is an example of a non-blocking mode of execution.

1. Future is a base interface and defines abstraction of an object which promises result to be available in future while FutureTask is an implementation of the Future interface.

2. Future is a parametric interface and type-safe written as Future<V>, where V denotes value.

3. Future provides get() method to get result, which is blocking method and blocks until result is available to Future.

4. Future interface also defines cancel() method to cancel task.

5. isDone() and isCancelled() method is used to query Future task states. isDone() returns true if task is completed and result is available to Future. If you call get() method, after isDone() returned true then it should return immediately. On the other hand, isCancelled() method returns true, if this task is cancelled before its completion.

6. Future has four sub interfaces, each with additional functionality e.g. Response, RunnableFuture, RunnableScheduledFuture and ScheduledFuture. RunnableFuture also implements Runnable and successful finish of run() method cause completion of this Future.

7. FutureTask and SwingWorker are two well known implementation of Future interface. FutureTask also implements RunnableFuture interface, which means this can be used as Runnable and can be submitted to ExecutorService for execution.

8. Though most of the time ExecutorService creates FutureTask for you, i.e. when you submit() Callable or Runnable object. You can also created it manually.

9. FutureTask is normally used to wrap Runnable or Callable object and submit them to ExecutorService for asynchronous execution.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger; /** * Java program to show how to use Future in Java. Future allows to write * asynchronous code in Java, where Future promises result to be available in * future * * @author Javin */
public class FutureDemo {
private static final ExecutorService threadpool = Executors.newFixedThreadPool(2);
public static void main(String args[]) throws InterruptedException, ExecutionException {
FactorialCalculator task = new FactorialCalculator(1000);

System.out.println(“Submitting Task …”);
Future future = threadpool.submit(task);
System.out.println(“Task is submitted”);
while (!future.isDone()) {
System.out.println(“Task is not completed yet….”);
Thread.sleep(1); //sleep for 1 millisecond before checking again
System.out.println(“Task is completed, let’s check result”);
long factorial = (long) future.get();
System.out.println(“Factorial of 1000000 is : ” + factorial);
private static class FactorialCalculator implements Callable {
private final int number;
public FactorialCalculator(int number) {
this.number = number;

@Override public Long call() {
long output = 0;
try {
output = factorial(number);
} catch (InterruptedException ex) {
//Logger.getLogger(Test.class.getName()).log(Level.SEVERE, null, ex);
return output;

private long factorial(int number) throws InterruptedException {
if (number < 0) {
throw new IllegalArgumentException(“Number must be greater than zero”);
long result = 1;
while (number > 0) {
Thread.sleep(1); // adding delay for example
result = result * number;
return result;


Usage in spring framework is given in below link :