Spark – Streaming

Streaming Fundamentals

  1. Streaming Context :
    – Consumes a stream of data in Spark
    – Register an InputDStream to produce Receiver object
    – It is the main Entry point for spark funcionality
    – Spark provides a number of default implementation of sources like Twitter, Akka Actor and ZeroMQ that are accessible from the context
    – A streamingContext object can be created from a SparkContext object
    – A SparkContext represent the connection to a Spark Cluster and can be used to create RDDs, acumulators and broadcast variables on that clusterimport org.apache.spark._
    import org.apache.spark.streaming._
    var ssc = new StreamingContext(sc,Second(1));
  2. DStream :
    – Discretized Stream is the basic abstraction provided by spark streaming
    – It is a continues stream of data
    – It is received from source or from a processed data stream generated by transforming the input stream
    – Internally , A Dstream is represented by a continuous series of RDD and each RDD contains data from a certain interval
    – Any operation applied on a DSTream translates to operations on the underlying RDDs.
    – Input DStream are DStreams representing the stream of input data received from streaming  sources. There are 2 sourceof DSTream
    A.) Basic Source includes  File System & Socket connection
    B.) Advance Source includes Kafka, Flume, Kinesis
    – Every input DStream is associated with a Receiver object which receives the data from a source and stores it in Spark’s memory for processingTransformation on DStream
    – Transformation allow the data from the input  DStream to be modified similar to RDD , DStream supports many of the transformation available on normal Spark RDD including map,flatmap,filter,reduce,groupby
    A.) map(func) : Return a new DStream by passing each element of source DStream through a function func
    B.) flatMap(func) is similar to map(func) but each input item can be mapped to 0 or more output items and returna  new DStream by passing each Dstream source
    C.) filter(func) returns a new DStream by selecting only the records that matches the criteria
    D.) reduce(func) return a new DStream of single-element RDDs
    E.) groupBy(func) return the new RDD which basically is made up with key and corresponding list of items of that group

    – DStream window :
    Spark Streaming also provides windowed computations which allow us to apply transformations over a sliding window of data
    – Output operation on DStream :
    Output operation allow Dstream’s data to be pushed out to external system like databases or file system
    output operation trigger the actual execution of all the DStream transformation
    It support print() , saveAsTextFile(prefix,[suffix]),saveAsObjectFiles(prefix,[suffix]), saveAsHadoopFiles(prefix,[suffix]), foreachRDD(func) types of output operation

    Example :
    dstream.foreachRDD(rdd => rdd.foreachPartition(pr =>
    val connection = ConnectionPool.getConnection()
    pr.foreach(record => connection.send(record))

  3. Caching and persistence  :
    – Dstream allows developer to cache/persis the stream’s data in memory. This is useful if the data in the DStream will be computed multiple times
    – This can be done using the persist() method on Dstream
    – For input stream that receive data over the network(such as Kafka,flume, socket etc) the default persistence level is set to replicate the data to wo nodes or fault-tolerance.
  4. Acuumulators , Broadcast variables & Checkpoints
    – Accumulators are variables that are only added through an associatives and commutative operation
    – They are used to implement counters or sums
    – Tracking accumulators in the UI can be useful for understanding the progress of running stages
    – Spark natively supports numeric accumulators We can create named or unnamed accumulators
    Broadcast variables :
    – Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks
    – They can be used to give every node a copy of a large input dataset in an efficient manner
    – Spark also attempts to distribute broadcast variables using efficient broadcast algorithm to reduce communication cost

HBase shell commands

  1. describe ‘tablename’ :
    Displays the metadata of particular table
    Example :

    describe 'employee' 
    {NAME => 'professional_info', BLOOMFILTER => 'ROW', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING =
  2. disable ’employee’ :
    Always disable table before performing any DDL operation
  3. alter ’employee’, {NAME => ‘col_fam1’, COMPRESSION => ‘GZ’}
    With alter command you can add columns on fly as well as add different parameter to columns like storing IN_MEMORY , Setting compression for particular column family , setting number of versions etc.
  4. list :
    List all the tables stored in Hbase :
    Example :
    [“employee”, “table1”, “user_hcat_load_table”]
  5. enable_all ‘t.*’ :
    Enables all the table matching regex
  6. exists ‘student’ :
    Check weather student table exist or not
  7. show_filters :
    Shows all the available filter in Hbase
  8. alter_status :
    Get the status of the alter command. Indicates the number of regions of the table that have received the updated schema Pass table name.
  9. alter_async :
    Alter column family schema, does not wait for all regions to receive the
    schema changes. Pass table name and a dictionary specifying new column
    family schema. Dictionaries are described on the main help command output.
    Dictionary must include name of column family to alter.
    To change or add the ‘f1’ column family in table ‘t1’ from defaults
    to instead keep a maximum of 5 cell VERSIONS, do:hbase> alter_async ‘t1’, NAME => ‘f1’, VERSIONS => 5To delete the ‘f1’ column family in table ‘t1’, do:

    hbase> alter_async ‘t1’, NAME => ‘f1’, METHOD => ‘delete’or a shorter version:hbase> alter_async ‘t1’, ‘delete’ => ‘f1’
    You can also change table-scope attributes like MAX_FILESIZE

    For example, to change the max size of a family to 128MB, do:

    hbase> alter ‘t1’, METHOD => ‘table_att’, MAX_FILESIZE => ‘134217728’

    There could be more than one alteration in one command:

    hbase> alter ‘t1’, {NAME => ‘f1’}, {NAME => ‘f2’, METHOD => ‘delete’}

    To check if all the regions have been updated, use alter_status <table_name>

  10. count :
    Counts the number of rows in table. COUNT interval is by default 1000, one can increase the interval as well as set scan caching on count scan by default.
    hbase> count ‘t1’, INTERVAL => 100000
    hbase> count ‘t1’, CACHE => 1000
    hbase> count ‘t1’, INTERVAL => 10, CACHE => 1000

  11. delete :
    Put a delete cell value at specified table/row/column and optionally
    timestamp coordinates. Deletes must match the deleted cell’s
    coordinates exactly. When scanning, a delete cell suppresses older
    versions. To delete a cell from ‘t1’ at row ‘r1’ under column ‘c1’
    marked with the time ‘ts1’, do:hbase> delete ‘t1’, ‘r1’, ‘c1’, ts1
  12. deleteall :
    Delete all cells in a given row; pass a table name, row, and optionally
    a column and timestamp. Examples:hbase> deleteall ‘t1’, ‘r1’
    hbase> deleteall ‘t1’, ‘r1’, ‘c1’
    hbase> deleteall ‘t1’, ‘r1’, ‘c1’, ts1

  13. get :
    Get row or cell contents; pass table name, row, and optionally
    a dictionary of column(s), timestamp, timerange and versions.
    hbase> get ‘t1’, ‘r1’
    hbase> get ‘t1’, ‘r1’, {TIMERANGE => [ts1, ts2]}
    hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’}
    hbase> get ‘t1’, ‘r1’, {COLUMN => [‘c1’, ‘c2’, ‘c3’]}
    hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1}
    hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMERANGE => [ts1, ts2], VERSIONS => 4}
    hbase> get ‘t1’, ‘r1’, {COLUMN => ‘c1’, TIMESTAMP => ts1, VERSIONS => 4}
    hbase> get ‘t1’, ‘r1’, {FILTER => “ValueFilter(=, ‘binary:abc’)”}
    hbase> get ‘t1’, ‘r1’, ‘c1’
    hbase> get ‘t1’, ‘r1’, ‘c1’, ‘c2’
    hbase> get ‘t1’, ‘r1’, [‘c1’, ‘c2’]
  14.  put :
    Put a cell ‘value’ at specified table/row/column and optionally
    timestamp coordinates. To put a cell value into table ‘t1’ at
    row ‘r1’ under column ‘c1’ marked with the time ‘ts1’, do:hbase> put ‘t1’, ‘r1’, ‘c1’, ‘value’, ts1

  15. scan :
    Scan a table; pass table name and optionally a dictionary of scanner
    specifications. Scanner specifications may include one or more of:
    or COLUMNS, CACHEIf no columns are specified, all columns will be scanned.
    To scan all members of a column family, leave the qualifier empty as in
    ‘col_family:’.The filter can be specified in two ways:
    1. Using a filterString – more information on this is available in the
    Filter Language document attached to the HBASE-4176 JIRA
    2. Using the entire package name of the filter.Some examples:hbase> scan ‘.META.’
    hbase> scan ‘.META.’, {COLUMNS => ‘info:regioninfo’}
    hbase> scan ‘t1’, {COLUMNS => [‘c1’, ‘c2’], LIMIT => 10, STARTROW => ‘xyz’}
    hbase> scan ‘t1’, {COLUMNS => ‘c1’, TIMERANGE => [1303668804, 1303668904]}
    hbase> scan ‘t1’, {FILTER => “(PrefixFilter (‘row2’) AND
    (QualifierFilter (>=, ‘binary:xyz’))) AND (TimestampsFilter ( 123, 456))”}
    hbase> scan ‘t1’, {FILTER =>, 0)}
  16. truncate :
    Disables, drops and recreates the specified table.
    hbase>truncate ‘t1’

    Reference :



Data types of Spark Mlib – Part 2

Friends, so far we have gone through Basic types of spark mllib data type. Spark mllib also supports distributed matrices that includes Row Matrix , IndexedRowMatrix, CoordinateMatrix, BlockMatrix.

A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far.

  1. RowMatrix :A RowMatrix is made up of collection of row which made up of local vector. Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice.

    A RowMatrix can be created from an RDD[Vector] instance. Then we can compute its column summary statistics and decompositions. QR decomposition is of the form A = QR where Q is an orthogonal matrix and R is an upper triangular matrix.

    Example :

    val rows: RDD[Vector] = sc.parallelize(Seq(Vectors.dense(1.0, 0.0, 3.0),Vectors.dense(2.0,30.0,4.0))) 
    O/p :rows: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = ParallelCollectionRDD[2] at parallelize at <console>:26
    val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols()
  3. IndexedRowMatrix :

    It is similar to RowMatrix where each row represented by it’s index & local vector
    An IndexedRowMatrix can be created from an RDD[IndexedRow] instance, where IndexedRow is a wrapper over (Long, Vector).

    Example :

    import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
    val rows: RDD[IndexedRow] = sc.parallelize(Seq(IndexedRow(0,Vectors.dense(1.0,2.0,3.0)),IndexedRow(1,Vectors.dense(4.0,5.0,6.0)))
    // Create an IndexedRowMatrix from an RDD[IndexedRow].
    val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
    // Get its size.
    val m = mat.numRows()
    val n = mat.numCols()
    // Drop its row indices.
    val rowMat: RowMatrix = mat.toRowMatrix()


  4. Coordinate Matrix :
    A coordinate matrix is backed by rowindices , columnindices & value in double & can be created from an RDD[MatrixEntry]. We can also convert Coordinate matrix to indexedRowMatrix by calling toIndexRowMatrix

    Example :

    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
    val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5)))
    // Create a CoordinateMatrix from an RDD[MatrixEntry].
    val mat: CoordinateMatrix = new CoordinateMatrix(entries)
    // Get its size.
    val m = mat.numRows()
    val n = mat.numCols()
    // Convert it to an IndexRowMatrix whose rows are sparse vectors.
    val indexedRowMatrix = mat.toIndexedRowMatrix()
  5. BlockMatrix  :

    When we want to perform multiple matrix addition & multiplication where each submatrix backed by it’s index we uses BlockMatrix.

    A BlockMatrix can be most easily created from an IndexedRowMatrix or CoordinateMatrix by calling toBlockMatrix which will create block size of 1024X1024.

    Example :

    import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
    val entries: RDD[MatrixEntry] = sc.parallelize(Seq(MatrixEntry(0,1,1.2),MatrixEntry(0,2,1.5),MatrixEntry(0,3,2.5)))
    // Create a CoordinateMatrix from an RDD[MatrixEntry].
    val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
    // Transform the CoordinateMatrix to a BlockMatrix
    val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
    // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
    // Nothing happens if it is valid.
    // Calculate A^T A.
    val ata = matA.transpose.multiply(matA)

Data types of Spark Mlib – Part 1

Hello friends Spark Mlib does support multiple data types in the form of vectors & matrices.

A local vector has 1st argument as indices that is integers in nature & 2nd argument as double type as values. There are 2 types of vectors

  1. Dense vector :

A dense vector is backed by a double array representing its entry values

def dense(values: Array[Double]): Vector

Creates a dense vector from a double array.

Example : 
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
o/p : dv: org.apache.spark.mllib.linalg.Vector = [1.0,0.0,3.0]

 2.A sparse vector is backed by two parallel arrays: indices and values.

def sparse(size: Int, indices: Array[Int], values: Array[Double]):  Vector

Creates a sparse vector providing its index array and value array.

We can declare sparse vector in other way to with Seq as 2nd  parameter & size as 1st

def sparse(size: Int, elements: Seq[(Int, Double)]): Vector

Creates a sparse vector using unordered (index, value) pairs.

Example : 

val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
o/p : sv1: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0])

val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
o/p : sv2: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0])

3. Labeled Point :

A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....

A labeled point is represented by the case class LabeledPoint.

val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))


Spark Mlib supports 2 types of Matrices 1. Local matrix & 2. Distributed Matrix

Local Matrix :

The base class of local matrices is Matrix, and we provide two implementations: DenseMatrix, and SparseMatrix. We recommend using the factory methods implemented in Matrices to create local matrices. Remember, local matrices in MLlib are stored in column-major order.

  1. DenseMatrix :

    def dense(numRows: Int, numCols: Int, values: Array[Double]): Matrix


    Creates a column-major dense matrix.

    Example :
    val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
    o/p : dm: org.apache.spark.mllib.linalg.Matrix = 
    1.0 2.0 
    3.0 4.0 
    5.0 6.0
  2. SparseMatrix :

    def sparse(numRows: Int, numCols: Int, colPtrs: Array[Int], rowIndices: Array[Int], values:Array[Double]): Matrix

    Creates a column-major sparse matrix in Compressed Sparse Column (CSC) format.

    numRows -> Describes number of rows in matrix
    numCols -> Describe number of columns in matrix
    colPtrs -> Describe index corrosponding to start of new column
    RowIndices -> Row index of element in column-major way.
    Values -> values in double

    Example : 
     1.0 0.0 4.0
     0.0 3.0 5.0
     2.0 0.0 6.0
    is stored as values: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], rowIndices=[0, 2, 1, 0, 1, 2], colPointers=[0, 2, 3, 6].
    Another example :
    val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
    O/p :sm: org.apache.spark.mllib.linalg.Matrix = 
    3 x 2 CSCMatrix
    (0,0) 9.0
    (2,1) 6.0
    (1,1) 8.0

    We will see distributed Matrix in next session.


Running scala oops files from command line- spark-shell

Writing a script in scala , but still want to follow object oriented programming as most of the programmer are from OOPs background due to java practice, can still execute the spark scripts using spark-shell in OOPs way.

Here I am giving a simple example of CollectAsync() future method , also demonstrate how we can run scala scripts through spark-shell that is having oo code.

CollectAsync() returns FutureAction that returns object of type Seq[Int] in future so does allow parallel programming by allowing multiple operation to perform in parallel to support non-blocking IO.

Test.scala :

import org.apache.spark.FutureAction
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.FutureAction
import scala.concurrent.Future

object Test {
def main(args: Array[String]) {
//val sc1 = new SparkContext()
val data = sc.parallelize(1 to 250, 1)
var futureData: FutureAction[Seq[Int]] = data.collectAsync()
println("Hello WOrld")
var itr:Iterator[Int] = data.toLocalIterator
Test.main(null) /* This line is part of scala file Test.scala */

Now in order to run above program you have to write following command :

sudo spark-shell -i Test.scala

So in this case spark-shell is taking Test.scala file & create a module after that

Test.main(null) gets called that calls the main method of Test class.

1 thing make sure you are not creating another SparkContext in your code

you can re-utilize sparkContext of present spark session.

Apache Graphx

GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators

Spark GraphX is a graph processing framework built on top of Spark.

GraphX models graphs as property graphs where vertices and edges can have properties.

GraphX comes with its own package org.apache.spark.graphx.


Graph abstract class represents a collection of vertices and edges.

abstract class Graph[VD: ClassTag, ED: ClassTag]

vertices attribute is of type VertexRDD while edges is of type EdgeRDD.

Standard GraphX API

Graph class comes with a small set of API.

  • Transformations

    • mapVertices

    • mapEdges

    • mapTriplets

    • reverse

    • subgraph

    • mask
    • groupEdges

  • Joins

    • outerJoinVertices

  • Computation

    • aggregateMessages

Creating Graphs (Graph object)

Graph object comes with the following factory methods to create instances of Graph:

Main classes & interfaces in Graphx :
Class Description
A single directed edge consisting of a source id, target id, and the data associated with the edge.
Represents an edge along with its neighboring vertices and allows sending messages along the edge.
The direction of a directed edge relative to a vertex.
EdgeRDD[ED, VD] extends RDD[Edge[ED} by storing the edges in columnar format on each partition for performance.
An edge triplet represents an edge along with the vertex attributes of its neighboring vertices.
The Graph abstractly represents a graph with arbitrary objects associated with vertices and edges.
Registers GraphX classes with Kryo for improved performance.
Provides utilities for loading Graphs from files.
Contains additional functionality for Graph.
Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical direction, resulting in a random vertex cut that colocates all edges between two vertices, regardless of direction.
Assigns edges to partitions using only the source vertex ID, colocating edges with the same source.
Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix, guaranteeing a 2 * sqrt(numParts) - 1 bound on vertex replication.
Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a random vertex cut that colocates all same-direction edges between two vertices.
Implements a Pregel-like bulk-synchronous message-passing API.
Represents a subset of the fields of an [[EdgeTriplet]] or [[EdgeContext]].
Extends RDD[(VertexId, VD)] by ensuring that there is only one entry for each vertex and by pre-indexing the entries for fast, efficient joins.

Example Property Graph

Suppose we want to construct a property graph consisting of the various collaborators on the GraphX project. The vertex property might contain the username and occupation. We could annotate edges with a string describing the relationships between collaborators:

The Property Graph

The resulting graph would have the type signature:

val userGraph: Graph[(String, String), String]

There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on graph builders. Probably the most general method is to use the Graph object. For example the following code constructs a graph from a collection of RDDs:

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

In the above example we make use of the Edge case class. Edges have a srcId and a dstId corresponding to the source and destination vertex identifiers. In addition, the Edge class has an attr member which stores the edge property.

We can deconstruct a graph into the respective vertex and edge views by using the graph.vertices and graph.edges members respectively.

val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

Note that graph.vertices returns an VertexRDD[(String, String)] which extends RDD[(VertexId, (String, String))] and so we use the scala case expression to deconstruct the tuple. On the other hand, graph.edges returns an EdgeRDD containingEdge[String] objects. We could have also used the case class type constructor as in the following:

graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. The triplet view logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]] containing instances of the EdgeTriplet class. This join can be expressed in the following SQL expression:

SELECT,, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id

or graphically as:

Edge Triplet

The EdgeTriplet class extends the Edge class by adding the srcAttr and dstAttr members which contain the source and destination properties respectively. We can use the triplet view of a graph to render a collection of strings describing relationships between users.

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] = =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)

Graph Operators

Just as RDDs have basic operations like map, filter, and reduceByKey, property graphs also have a collection of basic operators that take user defined functions and produce new graphs with transformed properties and structure. The core operators that have optimized implementations are defined in Graph and convenient operators that are expressed as a compositions of the core operators are defined in GraphOps. However, thanks to Scala implicits the operators in GraphOps are automatically available as members of Graph. For example, we can compute the in-degree of each vertex (defined in GraphOps) by the following:

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

The reason for differentiating between core graph operations and GraphOps is to be able to support different graph representations in the future. Each graph representation must provide implementations of the core operations and reuse many of the useful operations defined in GraphOps.

Summary List of Operators

The following is a quick summary of the functionality defined in both Graph and GraphOps but presented as members of Graph for simplicity. Note that some function signatures have been simplified (e.g., default arguments and type constraints removed) and some more advanced functionality has been removed so please consult the API docs for the official list of operations.

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

Property Operators

Like the RDD map operator, the property graph contains the following:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

Each of these operators yields a new graph with the vertex or edge properties modified by the user defined map function.

Note that in each case the graph structure is unaffected. This is a key feature of these operators which allows the resulting graph to reuse the structural indices of the original graph. The following snippets are logically equivalent, but the first one does not preserve the structural indices and would not benefit from the GraphX system optimizations:

val newVertices = { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)

Instead, use mapVertices to preserve the indices:

val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

These operators are often used to initialize the graph for a particular computation or project away unnecessary properties. For example, given a graph with the out degrees as the vertex properties (we describe how to construct such a graph later), we initialize it for PageRank:

// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
  graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
  inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

Structural Operators

Currently GraphX supports only a simple set of commonly used structural operators and we expect to add more in the future. The following is a list of the basic structural operators.

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]

The reverse operator returns a new graph with all the edge directions reversed. This can be useful when, for example, trying to compute the inverse PageRank. Because the reverse operation does not modify vertex or edge properties or change the number of edges, it can be implemented efficiently without data movement or duplication.

The subgraph operator takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate (evaluate to true) and edges that satisfy the edge predicate and connect vertices that satisfy the vertex predicate. The subgraph operator can be used in number of situations to restrict the graph to the vertices and edges of interest or eliminate broken links. For example in the following code we remove broken links:

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1

Note in the above example only the vertex predicate is provided. The subgraph operator defaults to true if the vertex or edge predicates are not provided.

The mask operator constructs a subgraph by returning a graph that contains the vertices and edges that are also found in the input graph. This can be used in conjunction with the subgraph operator to restrict a graph based on the properties in another related graph. For example, we might run connected components using the graph with missing vertices and then restrict the answer to the valid subgraph.

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

The groupEdges operator merges parallel edges (i.e., duplicate edges between pairs of vertices) in the multigraph. In many numerical applications, parallel edges can be added (their weights combined) into a single edge thereby reducing the size of the graph.

Join Operators

In many cases it is necessary to join data from external collections (RDDs) with graphs. For example, we might have extra user properties that we want to merge with an existing graph or we might want to pull vertex properties from one graph into another. These tasks can be accomplished using the join operators. Below we list the key join operators:

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]

The joinVertices operator joins the vertices with the input RDD and returns a new graph with the vertex properties obtained by applying the user defined map function to the result of the joined vertices. Vertices without a matching value in the RDD retain their original value.

Note that if the RDD contains more than one value for a given vertex only one will be used. It is therefore recommended that the input RDD be made unique using the following which will also pre-index the resulting values to substantially accelerate the subsequent join.

val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
  (id, oldCost, extraCost) => oldCost + extraCost)

The more general outerJoinVertices behaves similarly to joinVertices except that the user defined map function is applied to all vertices and can change the vertex property type. Because not all vertices may have a matching value in the input RDD the map function takes an Option type. For example, we can setup a graph for PageRank by initializing vertex properties with their outDegree.

val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree

You may have noticed the multiple parameter lists (e.g., f(a)(b)) curried function pattern used in the above examples. While we could have equally written f(a)(b) as f(a,b) this would mean that type inference on b would not depend on a. As a consequence, the user would need to provide type annotation for the user defined function:

val joinedGraph = graph.joinVertices(uniqueCosts,
  (id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)

Neighborhood Aggregation

A key step in many graph analytics tasks is aggregating information about the neighborhood of each vertex. For example, we might want to know the number of followers each user has or the average age of the the followers of each user. Many iterative graph algorithms (e.g., PageRank, Shortest Path, and connected components) repeatedly aggregate properties of neighboring vertices (e.g., current PageRank Value, shortest path to the source, and smallest reachable vertex id).

To improve performance the primary aggregation operator changed from graph.mapReduceTriplets to the newgraph.AggregateMessages. While the changes in the API are relatively small, we provide a transition guide below.

Aggregate Messages (aggregateMessages)

The core aggregation operation in GraphX is aggregateMessages. This operator applies a user defined sendMsg function to each edge triplet in the graph and then uses the mergeMsg function to aggregate those messages at their destination vertex.

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]

The user defined sendMsg function takes an EdgeContext, which exposes the source and destination attributes along with the edge attribute and functions (sendToSrc, and sendToDst) to send messages to the source and destination attributes. Think of sendMsg as the map function in map-reduce. The user defined mergeMsg function takes two messages destined to the same vertex and yields a single message. Think of mergeMsg as the reduce function in map-reduce. The aggregateMessages operator returns a VertexRDD[Msg] containing the aggregate message (of type Msg) destined to each vertex. Vertices that did not receive a message are not included in the returned VertexRDDVertexRDD.

In addition, aggregateMessages takes an optional tripletsFields which indicates what data is accessed in the EdgeContext (i.e., the source vertex attribute but not the destination vertex attribute). The possible options for the tripletsFields are defined in TripletFields and the default value is TripletFields.All which indicates that the user defined sendMsg function may access any of the fields in the EdgeContext. ThetripletFields argument can be used to notify GraphX that only part of the EdgeContext will be needed allowing GraphX to select an optimized join strategy. For example if we are computing the average age of the followers of each user we would only require the source field and so we would use TripletFields.Src to indicate that we only require the source field

In earlier versions of GraphX we used byte code inspection to infer the TripletFields however we have found that bytecode inspection to be slightly unreliable and instead opted for more explicit user control.

In the following example we use the aggregateMessages operator to compute the average age of the more senior followers of each user.

import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators

// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst(1, triplet.srcAttr)
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) =>
    value match { case (count, totalAge) => totalAge / count } )
// Display the results
Find full example code at “examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala” in the Spark repo.

The aggregateMessages operation performs optimally when the messages (and the sums of messages) are constant sized (e.g., floats and addition instead of lists and concatenation).

Map Reduce Triplets Transition Guide (Legacy)

In earlier versions of GraphX neighborhood aggregation was accomplished using the mapReduceTriplets operator:

class Graph[VD, ED] {
  def mapReduceTriplets[Msg](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
      reduce: (Msg, Msg) => Msg)
    : VertexRDD[Msg]

The mapReduceTriplets operator takes a user defined map function which is applied to each triplet and can yield messages which are aggregated using the user defined reduce function. However, we found the user of the returned iterator to be expensive and it inhibited our ability to apply additional optimizations (e.g., local vertex renumbering). In aggregateMessages we introduced the EdgeContext which exposes the triplet fields and also functions to explicitly send messages to the source and destination vertex. Furthermore we removed bytecode inspection and instead require the user to indicate what fields in the triplet are actually required.

The following code block using mapReduceTriplets:

val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
  Iterator((triplet.dstId, "Hi"))
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

can be rewritten using aggregateMessages as:

val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

Computing Degree Information

A common aggregation task is computing the degree of each vertex: the number of edges adjacent to each vertex. In the context of directed graphs it is often necessary to know the in-degree, out-degree, and the total degree of each vertex. The GraphOps class contains a collection of operators to compute the degrees of each vertex. For example in the following we compute the max in, out, and total degrees:

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

Collecting Neighbors

In some cases it may be easier to express computation by collecting neighboring vertices and their attributes at each vertex. This can be easily accomplished using the collectNeighborIds and the collectNeighbors operators.

class GraphOps[VD, ED] {
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]

These operators can be quite costly as they duplicate information and require substantial communication. If possible try expressing the same computation using the aggregateMessages operator directly.

Graphx is more faster then Spark naive when graph computation is needed.

CueSheet – Easy spark application deployment guide

CueSheet is a framework for writing Apache Spark 2.x applications more conveniently, designed to neatly separate the concerns of the business logic and the deployment environment, as well as to minimize the usage of shell scripts which are inconvenient to write and do not support validation. To jump-start, check out cuesheet-starter-kit which provides the skeleton for building CueSheet applications. CueSheet is featured in Spark Summit East 2017.

An example of a CueSheet application is shown below. Any Scala object extending CueSheet becomes a CueSheet application; the object body can then use the variables like sc, sqlContext, and spark to write the business logic, as if it is inside spark-shell:

import com.kakao.cuesheet.CueSheet

object Example extends CueSheet {{
  val rdd = sc.parallelize(1 to 100)
  println(s"sum = ${rdd.sum()}")
  println(s"sum2 = ${ + 1).sum()}")

CueSheet will take care of creating SparkContext or SparkSession according to the configuration given in a separate file, so that your application code can contain just the business logic. Furthermore, CueSheet will launch the application locally or to a YARN cluster by simply running your object as a Java application, eliminating the need to use spark-submit and accompanying shell scripts.

CueSheet also supports Spark Streaming applications, via ssc. When it is used in the object body, it automatically becomes a Spark Streaming application, and ssc provides access to the StreamingContext.

Importing CueSheet

libraryDependencies += "com.kakao.cuesheet" %% "cuesheet" % "0.10.0"

CueSheet can be used in Scala projects by configuring SBT as above. Note that this dependency is not specified as"provided", which makes it possible to launch the application right in the IDE, and even debug using breakpoints in driver code when launched in client mode.


Configurations for your CueSheet application, including Spark configurations and the arguments in spark-submit, are specified using the HOCON format. It is by default application.conf in your classpath root, but an alternate configuration file can be specified using -Dconfig.resource or -Dconfig.file. Below is an example configuration file.

spark {
  master = "yarn:classpath:com.kakao.cuesheet.launcher.test"
  deploy.mode = cluster = "cloudera"

  executor.instances = 2
  executor.memory = 1g
  driver.memory = 1g

  streaming.blockInterval = 10000
  eventLog.enabled = false
  eventLog.dir = "hdfs:///user/spark/applicationHistory"
  yarn.historyServer.address = "http://history.server:18088"

  driver.extraJavaOptions = "-XX:MaxPermSize=512m"

Unlike the standard spark configuration, spark.master for YARN should include an indicator for finding YARN/Hive/Hadoop configurations. It is the easiest to put the XML files inside your classpath, usually by putting them undersrc/main/resources, and specify the package classpath as above. Alternatively, spark.master can contain a URL to download the configuration in a ZIP file, e.g. yarn:http://cloudera.manager/hive/, copied from Cloudera Manager’s ‘Download Client Configuration’ link. The usual local or local[8] can also be used asspark.master.

deploy.mode can be either client or cluster, and should be the username to be used as the Hadoop user. CueSheet assumes that this user has the write permission to the home directory.

Using HDFS

While submitting an application to YARN, CueSheet will copy Spark and CueSheet’s dependency jars to HDFS. This way, in the next time you submit your application, CueSheet will analyze your classpath to find and assemble only the classes that are not part of the already installed jars.

One-Liner for Easy Deployment

When given a tag name as system property cuesheet.install, CueSheet will print a rather long shell command which can launch your application from anywhere hdfs command is available. Below is an example of the one-liner shell command that CueSheet produces when given -Dcuesheet.install=v0.0.1 as a JVM argument.

rm -rf SimpleExample_2.10-v0.0.1 && mkdir SimpleExample_2.10-v0.0.1 && cd SimpleExample_2.10-v0.0.1 &&
echo '<configuration><property><name>dfs.ha.automatic-failover.enabled</name><value>false</value></property><property><name>fs.defaultFS</name><value>hdfs://quickstart.cloudera:8020</value></property></configuration>' > core-site.xml &&
hdfs --config . dfs -get hdfs:///user/cloudera/.cuesheet/applications/com.kakao.cuesheet.SimpleExample/v0.0.1/SimpleExample_2.10.jar \!SimpleExample_2.10.jar &&
hdfs --config . dfs -get hdfs:///user/cloudera/.cuesheet/lib/0.10.0-SNAPSHOT-scala-2.10-spark-2.1.0/*.jar &&
java -classpath "*" com.kakao.cuesheet.SimpleExample "hello" "world" && cd .. && rm -rf SimpleExample_2.10-v0.0.1

What this command does is to download the CueSheet and Spark jars as well as your application assembly from HDFS, and launch the application in the same environment that was launched in the IDE. This way, it is not required to haveHADOOP_CONF_DIR or SPARK_HOME properly installed and set on every node, making it much easier to use it in distributed schedulers like Marathon, Chronos, or Aurora. These schedulers typically allow a single-line shell command as their job specification, so you can simply paste what CueSheet gives you in the scheduler’s Web UI.

Additional Features

Being started as a library of reusable Spark functions, CueSheet contains a number of additional features, not in an extremely coherent manner. Many parts of CueSheet including these features are powered by Mango library, another open-source project by Kakao.

One additional quirk is the “stop” tab CueSheet adds to the Spark UI. As shown below, it features three buttons with an increasing degree of seriousness. To stop a Spark Streaming application, to possibly trigger a restart by a scheduler like Marathon, one of the left two buttons will do the job. If you need to halt a Spark application ASAP, the red button will immediately kill the Spark driver.

Re-partitioning & partition in spark

  In Hadoop, partitioning a data allows processing of huge volume of data in parallel such that it takes minimum amount of time to process entire dataset. Apache spark decides partitioning based on different factors. Factor that decide default partitioning

  1. On hadoop split by HDFS cores.
  2. Filter or map function don’t change partitioning
  3. Number of cpu cores in cluster when running on non-hadoop mode.

Re-partitioning : increases partition , it re-balance the partition  after filter &it increases parallelism.

 You can define partition in spark at the time of creating RDD as follow :

val users = sc.textFile(“hdfs://at-r3p11:8020/project/users.csv”,1);

where 2nd argument is nothing but number of partition.

By default if not used hdfs spark creates partition based on number of cores. & if used hdfs path it will create partition based on input split (default block size of hdfs).

To know the partition size , just enter in spark-shell


Spark can only run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions (and probably 2-3x times that).

As far as choosing a “good” number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling sc.defaultParallelism.

Also, the number of partitions determines how many files get generated by actions that save RDDs to files.

The maximum size of a partition is ultimately limited by the available memory of an executor.

In the first RDD transformation, e.g. reading from a file using sc.textFile(path, partition), thepartition parameter will be applied to all further transformations and actions on this RDD.

When using textFile with compressed files (file.txt.gz not file.txt or similar), Spark disables splitting that makes for an RDD with only 1 partition (as reads against gzipped files cannot be parallelized). In this case, to change the number of partitions you should do repartitioning.

Some operations, e.g. map, flatMap, filter, don’t preserve partitioning.

map, flatMap, filter operations apply a function to every partition.

rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)

With the lines, you end up with rdd to be exactly 100 partitions of roughly equal in size.

  • rdd.repartition(N) does a shuffle to split data to match N

  • partitioning is done on round robin basis

Note :
If partitioning scheme doesn’t work for you, you can write your own custom partitioner.

coalesce Transformation :

The coalesce transformation is used to change the number of partitions. It can trigger RDD shufflingdepending on the shuffle flag (disabled by default, i.e. false).

In the following sample, you parallelize a local 10-number sequence and coalesce it first without and then with shuffling (note the shuffle parameter being false and true, respectively).
scala> val rdd = sc.parallelize(0 to 10, 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.partitions.size
res0: Int = 8

scala> rdd.coalesce(numPartitions=8, shuffle=false)   (1)
res1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at :27

  1. shuffle is false by default and it’s explicitly used here for demo purposes. Note the number of partitions that remains the same as the number of partitions in the source RDD rdd.

Apache Oozie – A Scheduling System


In batch processing system, we have to schedule the jobs which runs periodically. This creates a lots of overhead in deployment & maintenance of system.  As a solution to this, Oozie provides workflows in xml format using which we can define multiple Map/Reduce jobs into a logical unit of work, accomplishing the larger task [4].

Workflows work perfectly when invoked on demand or manually. But for achieving higher level of automation and effectiveness, it becomes necessary to run them based on one or more of the following parameters: regular time intervals, data availability or external events. Then, we need more functionality than provided by Oozie workflows.

In this paper, Oozie Coordinator Jobs will be discussed which provide options to embed workflows and trigger them on regular time-intervals or on basis of data availability.

The Oozie coordinator allows expressing conditions to trigger execution of workflow in the form of the predicates [1]. These predicates are conditional statements on parameters like time, data and external events. If the predicate is satisfied, then only the workflow job/action is started.

Oozie Coordinator System

As stated at Oozie documentation page [1], “Oozie is a Java Web-Application that runs in a Java servlet-container”. It uses XML for taking configuration inputs from user and uses a database (default is derby but MySQL, HSQLDB or any RDBMS database can also be used) to store:


  • Definitions of Workflow and Coordinator
  • Currently running workflow and Coordinator instances, including instance states, configuration variables and parameters.

Oozie Coordinator is a collection of predicates (conditional statements based on time-frequency and data availability) and actions (i.e. Hadoop Map/Reduce jobs, Hadoop file system, Hadoop Streaming, Pig, Java and Oozie sub-workflow). Actions are recurrent workflow jobs invoked each time predicate returns true.

Oozie version 2 and higher supports Coordinator Jobs. Coordinator Job is defined in the XML Process Definition Language.

Predicates are conditional statements, defined using attributes “interval, start-time and end-time” for time-based triggering and xml-tags “dataset and input-events” for data-availability based triggering of workflows.
Actions are the mechanism by which a workflow is triggered for the execution of a computation/processing task. Action contains description of one or more workflows to be executed.

Oozie is lightweight as it uses existing Hadoop Map/Reduce framework for executing all tasks in a workflow. This approach allows it to leverage existing Hadoop installation for providing scalability, reliability, parallelism, etc.
On the basis of functionality, Coordinator can be sub-divided into two major groups [2]:

1. Time-Based Coordinator: This type of Coordinator definition is used for invoking the workflow repeatedly after an interval between a specified period of time.

2.File-Based Coordinator: This type of Coordinator definition is used for invoking the workflow on the basis of data availability and data polling.

2.1  Simple File-Based Coordinator: The action is invoked whenever data available predicate is true.

2.2 Sliding Window-Based Coordinator:  It is invoked frequently and data is aggregated over multiple overlapping previous instances. For example, invoking it at a frequency of 5 minutes and running action on aggregated previous 4 instances of 15 minutes data.

2.3Rollups-Based Coordinator: It is invoked after a long period of time and data is aggregated over multiple previous instances from last time of invocation. For example, it will run once a day, and will trigger a workflow that aggregates 24 instances of hourly data.

Oozie Coordinator Components and Variables

  • Coordinator-App: It is a wrapper component that defines the attributes of a coordinator and includes all other components.

Attributes are:

  • start , end :  describes the start and end time in yyyy-mm-ddThh:mmZ format 
  • Time zone: describes the time zone (is the value of Z in the above time format) like UTC. 
  • Controls: It contains parameters like timeout, concurrency, etc. to configure the execution of coordinator job.
  • Datasets: It contains the definition of multiple data sources and frequency of data polling.

Attributes are:

  • Frequency: interval of time at which data polling is done.
  • Initial-Instance: start time of data polling in yyyy-mm-ddThh:mmZ format.
  • Uri-Template: URI of the data source. Expression language can be used. For example, ${YEAR} corresponds to current year. It helps in dynamic selection of data source directories.
  • Done-flag: This flag denotes the success of data polling. It can be a file in which case the presence of file is checked before calling action. It can be left empty otherwise for implicit success message.
  • Input-Events:  denotes the processing of the input data before running the action.
  • Data-in: it denotes the aggregated output data of input-event.
  • Start-instance and end-instance: boundary of data instances that needs to be aggregated.
  • Output-Events:  denotes the processing of the output data after running the action.
  • Data-out: it denotes the output dataset.
  • Instance:  instance of dataset that is to be used as sink for output.
  • Action: It includes the path of the workflow that has to be invoked when predicate return true.

It could also be configured to record the events required to evaluate SLA compliance.

Oozie Coordinator Lifecycle Operations


The lifecycle operations of coordinator are similar to those of oozie workflow except start operation. “Start” is not applicable for coordinators.

  • Submit/Run: Both operations submit the coordinator job to oozie. The job will be in PREP state till the mentioned start-time of the coordinator. 
  • Suspend: Suspends/pause the coordinator job. 
  • Resume: Resumes the execution of the coordinator job. 
  • Kill: kill the coordinator job and ends its execution. 
  • reRun: re-submitting the coordinator job/actions with new parameters. 

Oozie Coordinator Example   

In this section, we will see how to use oozie coordinator for scheduling and triggering of the workflows.


  • A Sample Workflow: First of all, we need a oozie workflow job. For example purpose, I have taken the simple wordcount example provided by Apache-Hadoop-Distribution in hadoop-examples-0.20.2-cdh3u0.jar [6].

The workflow for wordcount is:

<workflow-app xmlns=’uri:oozie:workflow:0.1′ name=’java-main-wf’>
<start to=’mapreduce-wordcount-example’ />
<action name=’mapreduce-wordcount-example’>
<ok to=”end” />
<error to=”fail” />
<kill name=”fail”>
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
<end name=’end’ />

Once workflow is created it has to be deployed correctly. A typical Oozie deployment is a HDFS directory, containing workflow.xml and a lib subdirectory, containing jar files of classes used by workflow actions.
For example, the directory structure in hadoop will be as shown below. (If is training)

[training@localhost ~]$ hadoop dfs -ls /user/training/oozie/workflow/wordcount
Found 2 items
drwxr-xr-x   – training supergroup          0 2012-09-18 12:05 /user/training/oozie/workflow/wordcount/lib
-rw-r–r–   1 training supergroup        918 2012-09-18 11:47 /user/training/oozie/workflow/wordcount/workflow.xml

The file will have following properties:


With job properties in place, this workflow can be invoked manually using the oozie workflows submit command from command-line.

[training@localhost Desktop]$ oozie job -oozie=http://localhost:11000/oozie/ -config oozie/wordcount-demo/workflow/ -run;

job: 0000000-120918134457517-oozie-oozi-W

2. Oozie Coordinator Definition: As discussed above, coordinator-definitions will be different for different kind of triggering and scheduling.

So, we will take each kind of Coordinator one by one and schedule wordcount example on the basis of that.
Moreover, Oozie coordinators can be parameterized using variables like ${inputDir}, ${startTime}, etc. within the coordinator definition. When submitting a coordinator job, values for the parameters must be provided as input. As parameters are key-value pairs, they can be written in a file or a XML file. Parameters can also be provided in form of a java Map object if using JAVA API to invoke a coordinator job.

  • Time-Based Coordinator

The generic definition for this kind of coordinator is

<coordinator-app name=”coordinator1″ frequency=”${frequency}” start=”${startTime}” end=”${endTime}” timezone=”${timezone}” xmlns=”uri:oozie:coordinator:0.1″>

Save the file as coordinator.xml in a HDFS directory. (Please note that coordinator.xml is the only name which can be given to the file as oozie uses this default name for reading file in HDFS directory.)

The can be defined as




The coordinator application path must be specified in the file with the oozie.coord.application.path property. Specified path must be an HDFS path.

  • File-Based Coordinator
<coordinator-app name=”coordinator1″ frequency=”${frequency}” start=”${startTime}” end=”${endTime}” timezone=”UTC” xmlns=”uri:oozie:coordinator:0.1″>
<dataset name=”input1″ frequency=”${datasetfrequency}” initial-instance=”${datasetinitialinstance}”
<done-flag> </done-flag>
<data-in name=”coordInput1″ dataset=”input1″>

Save the file as coordinator.xml in a HDFS directory. (Please note that coordinator.xml is the only name which can be given to the file as oozie uses this default name for reading file in HDFS directory.)

The can be defined as



inputDir= ${coord:dataIn(‘coordInput1’)}


The coordinator application path must be specified in the file with the oozie.coord.application.path property. Specified path must be an HDFS path.

  • Sliding-Window Based Coordinator

This is a specific usecase for the File-Based Coordinator where coordinator is invoked frequently and data is aggregated over multiple overlapping previous instances.
The rule for this can be generalized as
Coordinator-frequency < DataSet-Frequency

For example, the coordinator will be like


  • Rollups Based Coordinator

This is a specific usecase for the File-Based Coordinator where coordinator is invoked after a long period of time and data is aggregated over multiple previous instances from last time of invocation. 

The rule for this can be generalized as
Coordinator-frequency > DataSet-Frequency


Running Coordinator Example from Command line

  • Submitting/Running the coordinator job
$ oozie job -oozie http://localhost:11000/oozie -config [-submit][-run]
job: 0000672-120823182447665-oozie-hado-C

The parameters for the job must be provided in a file, either a Java Properties file (.properties) or a Hadoop XML Configuration file (.xml). This file must be specified with the -config option.

  • Suspending the coordinator job
$ oozie job -oozie http://localhost:11000/oozie -suspend 0000673-120823182447665-oozie-hado-C
  • Resuming a Coordinator Job
$ oozie job -oozie http://localhost:11000/oozie -resume 0000673-120823182447665-oozie-hado-C
  • Killing a Coordinator Job
$ oozie job -oozie http://localhost:11000/oozie -kill 0000673-120823182447665-oozie-hado-C
  • Rerunning a Coordinator Action or Multiple Actions
$ oozie job -rerun 0000673-120823182447665-oozie-hado-C [-nocleanup]
[-refresh][-action 1,3-5] [-date 2012-01-01T01:00Z::2012-05-31T23:59Z, 2012-11-10T01:00Z, 2012-12-31T22:00Z]

-action or -date is required to rerun. If neither -action nor -date is given, the exception will be thrown.

  • Checking the Status of a Coordinator/Workflow job or a Coordinator Action

$ oozie job -oozie http://localhost:11000/oozie -info 0000673-20823182447665-oozie-hado-C

The info option can display information about a workflow job or coordinator job or coordinator action.

Invoking Coordinator Jobs from Java Client

The Oozie has exposed a JAVA API for invoking and controlling the workflows programmatically. Same API is also made applicable for coordinator but with some changes as coordinator and workflow differ in functioning.


 //The service for executing coordinators on oozie
   public class CoordinatorOozieService
// Oozie Client
OozieClient oozieClient = null;

public CoordinatorOozieService(String url){
oozieClient = new OozieClient(url);

//To submit the coordinator job on oozie
public String submitJob(String jobPropertyFilePath) throws OozieClientException, IOException{

// create an empty coordinator job configuration object
//with just the USER_NAME set to the JVM user name
Properties conf = oozieClient.createConfiguration();

conf.setProperty(“”, “training”);

//set the coordinator properties
conf.load(new FileInputStream(jobPropertyFilePath));

// submit the coordinator job
return oozieClient.submit(conf);

//To submit the coordinator job on oozie
public String submitJob(Properties workflowProperties) throws OozieClientException, IOException{

// create an empty coordinator job configuration object
//with just the USER_NAME set to the JVM user name
Properties conf = oozieClient.createConfiguration();

//set the coordinator properties

conf.setProperty(“”, “training”);

// submit the coordinator job
return oozieClient.submit(conf);

// To run (submit and start) the coordinator job on oozie
public String runJob(String jobPropertyFilePath) throws OozieClientException, IOException{

// create an empty coordinator job configuration object
//with just the USER_NAME set to the JVM user name

Properties conf = oozieClient.createConfiguration();

conf.setProperty(“”, “training”);

//set the coordinator properties
conf.load(new FileInputStream(jobPropertyFilePath));

// submit and start the coordinator job

// To suspend the coordinator job on oozie
public void suspendJob(String jobId) throws OozieClientException {

// start the coordinator job

// To resume the coordinator job on oozie
public void resumeJob(String jobId) throws OozieClientException {

// start the coordinator job

//To kill the coordinator job on oozie
public void killJob(String jobId) throws OozieClientException {

// start the coordinator job

//To get the status of the Coordinator Job with id <jobID>
public Status getJobStatus(String jobID) throws OozieClientException{
CoordinatorJob job = oozieClient.getCoordJobInfo(jobID);
return job.getStatus();



The Oozie Coordinator can be used for efficient scheduling of the Hadoop-related workflows. It also helps in triggering the same on the basis of availability of the data or external events. Moreover, it provides lot of configurable and pluggable components which helps in easy and effective deployment and maintenance of the Oozie workflow jobs.
As the coordinator is specified in XML, it is easy to integrate it with the J2EE applications. Invoking of coordinator jobs through java has already been explained above.


Oozie provides a new component, “Bundle” in its latest version 3. It provides a higher-level abstraction in which it creates a set of coordinator applications often called a Data Pipeline. Data Dependency can be inserted between multiple coordinator jobs to create an implicit data application pipeline. Oozie Lifecycle operations (start/stop/suspend/resume/rerun) can also be applied at the bundle level which  results in a better and easy operational control.

How to generate graphs in python

Friends, Python is a language which is very easy & rich , having tons of library for various purpose. Today i am going to show you how to use matlplotlib library for ploting graphs as well as saving the graph in particular directory.

import numpy as np
import matplotlib.pyplot as plt
import os

N = 5
menMeans = (20, 35, 30, 35, 27)
menStd = (2, 3, 4, 1, 2)

ind = np.arange(N)  # the x locations for the groups
width = 0.35       # the width of the bars

fig, ax = plt.subplots()
rects1 =, menMeans, width, color=’r’, yerr=menStd)

womenMeans = (25, 32, 34, 20, 25)
womenStd = (3, 5, 2, 3, 3)
rects2 = + width, womenMeans, width, color=’y’, yerr=womenStd)

# add some text for labels, title and axes ticks
ax.set_title(‘Scores by group and gender’)
ax.set_xticks(ind + width)
ax.set_xticklabels((‘G1’, ‘G2’, ‘G3’, ‘G4’, ‘G5’))

ax.legend((rects1[0], rects2[0]), (‘Men’, ‘Women’))

def autolabel(rects):
# attach some text labels
for rect in rects:
height = rect.get_height()
ax.text(rect.get_x() + rect.get_width()/2., 1.05*height,
‘%d’ % int(height),
ha=’center’, va=’bottom’)

def save(path, ext=’png’, close=True, verbose=True):
# Extract the directory and filename from the given path
directory = os.path.split(path)[0]
filename = “%s.%s” % (os.path.split(path)[1], ext)
if directory == ”:
directory = ‘.’
# If the directory does not exist, create it
if not os.path.exists(directory):
# The final path to save to
savepath = os.path.join(directory, filename)
if verbose:
print(“Saving figure to ‘%s’…” % savepath),
# Actually save the figure
# Close it
if close:
if verbose:

save(“/path/to/file”, ext=”png”, close=False, verbose=True)